blob: 8bf41f517efd2d2ebae8ab18830c5697bba3ac2a [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20 "context"
21 "time"
22
23 "k8s.io/apimachinery/pkg/api/meta"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/fields"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/util/wait"
28 "k8s.io/apimachinery/pkg/watch"
29 restclient "k8s.io/client-go/rest"
30 "k8s.io/client-go/tools/pager"
31)
32
33// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
34type ListerWatcher interface {
35 // List should return a list type object; the Items field will be extracted, and the
36 // ResourceVersion field will be used to start the watch in the right place.
37 List(options metav1.ListOptions) (runtime.Object, error)
38 // Watch should begin a watch at the specified version.
39 Watch(options metav1.ListOptions) (watch.Interface, error)
40}
41
42// ListFunc knows how to list resources
43type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
44
45// WatchFunc knows how to watch resources
46type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
47
48// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
49// It is a convenience function for users of NewReflector, etc.
50// ListFunc and WatchFunc must not be nil
51type ListWatch struct {
52 ListFunc ListFunc
53 WatchFunc WatchFunc
54 // DisableChunking requests no chunking for this list watcher.
55 DisableChunking bool
56}
57
58// Getter interface knows how to access Get method from RESTClient.
59type Getter interface {
60 Get() *restclient.Request
61}
62
63// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
64func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
65 optionsModifier := func(options *metav1.ListOptions) {
66 options.FieldSelector = fieldSelector.String()
67 }
68 return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
69}
70
71// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
72// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
73// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
74func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
75 listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
76 optionsModifier(&options)
77 return c.Get().
78 Namespace(namespace).
79 Resource(resource).
80 VersionedParams(&options, metav1.ParameterCodec).
81 Do().
82 Get()
83 }
84 watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
85 options.Watch = true
86 optionsModifier(&options)
87 return c.Get().
88 Namespace(namespace).
89 Resource(resource).
90 VersionedParams(&options, metav1.ParameterCodec).
91 Watch()
92 }
93 return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
94}
95
96func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
97 if options.TimeoutSeconds != nil {
98 return time.Duration(*options.TimeoutSeconds) * time.Second
99 }
100 return 0
101}
102
103// List a set of apiserver resources
104func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
105 if !lw.DisableChunking {
106 return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
107 }
108 return lw.ListFunc(options)
109}
110
111// Watch a set of apiserver resources
112func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
113 return lw.WatchFunc(options)
114}
115
116// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
117// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
118// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
119func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
120 if len(conditions) == 0 {
121 return nil, nil
122 }
123
124 list, err := lw.List(metav1.ListOptions{})
125 if err != nil {
126 return nil, err
127 }
128 initialItems, err := meta.ExtractList(list)
129 if err != nil {
130 return nil, err
131 }
132
133 // use the initial items as simulated "adds"
134 var lastEvent *watch.Event
135 currIndex := 0
136 passedConditions := 0
137 for _, condition := range conditions {
138 // check the next condition against the previous event and short circuit waiting for the next watch
139 if lastEvent != nil {
140 done, err := condition(*lastEvent)
141 if err != nil {
142 return lastEvent, err
143 }
144 if done {
145 passedConditions = passedConditions + 1
146 continue
147 }
148 }
149
150 ConditionSucceeded:
151 for currIndex < len(initialItems) {
152 lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
153 currIndex++
154
155 done, err := condition(*lastEvent)
156 if err != nil {
157 return lastEvent, err
158 }
159 if done {
160 passedConditions = passedConditions + 1
161 break ConditionSucceeded
162 }
163 }
164 }
165 if passedConditions == len(conditions) {
166 return lastEvent, nil
167 }
168 remainingConditions := conditions[passedConditions:]
169
170 metaObj, err := meta.ListAccessor(list)
171 if err != nil {
172 return nil, err
173 }
174 currResourceVersion := metaObj.GetResourceVersion()
175
176 watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
177 if err != nil {
178 return nil, err
179 }
180
181 evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
182 if err == watch.ErrWatchClosed {
183 // present a consistent error interface to callers
184 err = wait.ErrWaitTimeout
185 }
186 return evt, err
187}