blob: 028c75e8e19e80588a31970373c9301b1c6b7d68 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20 "sync"
21 "time"
22
23 "k8s.io/apimachinery/pkg/runtime"
24 "k8s.io/apimachinery/pkg/util/clock"
25 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26 "k8s.io/apimachinery/pkg/util/wait"
27)
28
29// Config contains all the settings for a Controller.
30type Config struct {
31 // The queue for your objects; either a FIFO or
32 // a DeltaFIFO. Your Process() function should accept
33 // the output of this Queue's Pop() method.
34 Queue
35
36 // Something that can list and watch your objects.
37 ListerWatcher
38
39 // Something that can process your objects.
40 Process ProcessFunc
41
42 // The type of your objects.
43 ObjectType runtime.Object
44
45 // Reprocess everything at least this often.
46 // Note that if it takes longer for you to clear the queue than this
47 // period, you will end up processing items in the order determined
48 // by FIFO.Replace(). Currently, this is random. If this is a
49 // problem, we can change that replacement policy to append new
50 // things to the end of the queue instead of replacing the entire
51 // queue.
52 FullResyncPeriod time.Duration
53
54 // ShouldResync, if specified, is invoked when the controller's reflector determines the next
55 // periodic sync should occur. If this returns true, it means the reflector should proceed with
56 // the resync.
57 ShouldResync ShouldResyncFunc
58
59 // If true, when Process() returns an error, re-enqueue the object.
60 // TODO: add interface to let you inject a delay/backoff or drop
61 // the object completely if desired. Pass the object in
62 // question to this interface as a parameter.
63 RetryOnError bool
64}
65
66// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
67// resync or not. It can be used by a shared informer to support multiple event handlers with custom
68// resync periods.
69type ShouldResyncFunc func() bool
70
71// ProcessFunc processes a single object.
72type ProcessFunc func(obj interface{}) error
73
74// Controller is a generic controller framework.
75type controller struct {
76 config Config
77 reflector *Reflector
78 reflectorMutex sync.RWMutex
79 clock clock.Clock
80}
81
82type Controller interface {
83 Run(stopCh <-chan struct{})
84 HasSynced() bool
85 LastSyncResourceVersion() string
86}
87
88// New makes a new Controller from the given Config.
89func New(c *Config) Controller {
90 ctlr := &controller{
91 config: *c,
92 clock: &clock.RealClock{},
93 }
94 return ctlr
95}
96
97// Run begins processing items, and will continue until a value is sent down stopCh.
98// It's an error to call Run more than once.
99// Run blocks; call via go.
100func (c *controller) Run(stopCh <-chan struct{}) {
101 defer utilruntime.HandleCrash()
102 go func() {
103 <-stopCh
104 c.config.Queue.Close()
105 }()
106 r := NewReflector(
107 c.config.ListerWatcher,
108 c.config.ObjectType,
109 c.config.Queue,
110 c.config.FullResyncPeriod,
111 )
112 r.ShouldResync = c.config.ShouldResync
113 r.clock = c.clock
114
115 c.reflectorMutex.Lock()
116 c.reflector = r
117 c.reflectorMutex.Unlock()
118
119 var wg wait.Group
120 defer wg.Wait()
121
122 wg.StartWithChannel(stopCh, r.Run)
123
124 wait.Until(c.processLoop, time.Second, stopCh)
125}
126
127// Returns true once this controller has completed an initial resource listing
128func (c *controller) HasSynced() bool {
129 return c.config.Queue.HasSynced()
130}
131
132func (c *controller) LastSyncResourceVersion() string {
133 if c.reflector == nil {
134 return ""
135 }
136 return c.reflector.LastSyncResourceVersion()
137}
138
139// processLoop drains the work queue.
140// TODO: Consider doing the processing in parallel. This will require a little thought
141// to make sure that we don't end up processing the same object multiple times
142// concurrently.
143//
144// TODO: Plumb through the stopCh here (and down to the queue) so that this can
145// actually exit when the controller is stopped. Or just give up on this stuff
146// ever being stoppable. Converting this whole package to use Context would
147// also be helpful.
148func (c *controller) processLoop() {
149 for {
150 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
151 if err != nil {
152 if err == FIFOClosedError {
153 return
154 }
155 if c.config.RetryOnError {
156 // This is the safe way to re-enqueue.
157 c.config.Queue.AddIfNotPresent(obj)
158 }
159 }
160 }
161}
162
163// ResourceEventHandler can handle notifications for events that happen to a
164// resource. The events are informational only, so you can't return an
165// error.
166// * OnAdd is called when an object is added.
167// * OnUpdate is called when an object is modified. Note that oldObj is the
168// last known state of the object-- it is possible that several changes
169// were combined together, so you can't use this to see every single
170// change. OnUpdate is also called when a re-list happens, and it will
171// get called even if nothing changed. This is useful for periodically
172// evaluating or syncing something.
173// * OnDelete will get the final state of the item if it is known, otherwise
174// it will get an object of type DeletedFinalStateUnknown. This can
175// happen if the watch is closed and misses the delete event and we don't
176// notice the deletion until the subsequent re-list.
177type ResourceEventHandler interface {
178 OnAdd(obj interface{})
179 OnUpdate(oldObj, newObj interface{})
180 OnDelete(obj interface{})
181}
182
183// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
184// as few of the notification functions as you want while still implementing
185// ResourceEventHandler.
186type ResourceEventHandlerFuncs struct {
187 AddFunc func(obj interface{})
188 UpdateFunc func(oldObj, newObj interface{})
189 DeleteFunc func(obj interface{})
190}
191
192// OnAdd calls AddFunc if it's not nil.
193func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
194 if r.AddFunc != nil {
195 r.AddFunc(obj)
196 }
197}
198
199// OnUpdate calls UpdateFunc if it's not nil.
200func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
201 if r.UpdateFunc != nil {
202 r.UpdateFunc(oldObj, newObj)
203 }
204}
205
206// OnDelete calls DeleteFunc if it's not nil.
207func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
208 if r.DeleteFunc != nil {
209 r.DeleteFunc(obj)
210 }
211}
212
213// FilteringResourceEventHandler applies the provided filter to all events coming
214// in, ensuring the appropriate nested handler method is invoked. An object
215// that starts passing the filter after an update is considered an add, and an
216// object that stops passing the filter after an update is considered a delete.
217type FilteringResourceEventHandler struct {
218 FilterFunc func(obj interface{}) bool
219 Handler ResourceEventHandler
220}
221
222// OnAdd calls the nested handler only if the filter succeeds
223func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
224 if !r.FilterFunc(obj) {
225 return
226 }
227 r.Handler.OnAdd(obj)
228}
229
230// OnUpdate ensures the proper handler is called depending on whether the filter matches
231func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
232 newer := r.FilterFunc(newObj)
233 older := r.FilterFunc(oldObj)
234 switch {
235 case newer && older:
236 r.Handler.OnUpdate(oldObj, newObj)
237 case newer && !older:
238 r.Handler.OnAdd(newObj)
239 case !newer && older:
240 r.Handler.OnDelete(oldObj)
241 default:
242 // do nothing
243 }
244}
245
246// OnDelete calls the nested handler only if the filter succeeds
247func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
248 if !r.FilterFunc(obj) {
249 return
250 }
251 r.Handler.OnDelete(obj)
252}
253
254// DeletionHandlingMetaNamespaceKeyFunc checks for
255// DeletedFinalStateUnknown objects before calling
256// MetaNamespaceKeyFunc.
257func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
258 if d, ok := obj.(DeletedFinalStateUnknown); ok {
259 return d.Key, nil
260 }
261 return MetaNamespaceKeyFunc(obj)
262}
263
264// NewInformer returns a Store and a controller for populating the store
265// while also providing event notifications. You should only used the returned
266// Store for Get/List operations; Add/Modify/Deletes will cause the event
267// notifications to be faulty.
268//
269// Parameters:
270// * lw is list and watch functions for the source of the resource you want to
271// be informed of.
272// * objType is an object of the type that you expect to receive.
273// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
274// calls, even if nothing changed). Otherwise, re-list will be delayed as
275// long as possible (until the upstream source closes the watch or times out,
276// or you stop the controller).
277// * h is the object you want notifications sent to.
278//
279func NewInformer(
280 lw ListerWatcher,
281 objType runtime.Object,
282 resyncPeriod time.Duration,
283 h ResourceEventHandler,
284) (Store, Controller) {
285 // This will hold the client state, as we know it.
286 clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
287
288 // This will hold incoming changes. Note how we pass clientState in as a
289 // KeyLister, that way resync operations will result in the correct set
290 // of update/delete deltas.
291 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
292
293 cfg := &Config{
294 Queue: fifo,
295 ListerWatcher: lw,
296 ObjectType: objType,
297 FullResyncPeriod: resyncPeriod,
298 RetryOnError: false,
299
300 Process: func(obj interface{}) error {
301 // from oldest to newest
302 for _, d := range obj.(Deltas) {
303 switch d.Type {
304 case Sync, Added, Updated:
305 if old, exists, err := clientState.Get(d.Object); err == nil && exists {
306 if err := clientState.Update(d.Object); err != nil {
307 return err
308 }
309 h.OnUpdate(old, d.Object)
310 } else {
311 if err := clientState.Add(d.Object); err != nil {
312 return err
313 }
314 h.OnAdd(d.Object)
315 }
316 case Deleted:
317 if err := clientState.Delete(d.Object); err != nil {
318 return err
319 }
320 h.OnDelete(d.Object)
321 }
322 }
323 return nil
324 },
325 }
326 return clientState, New(cfg)
327}
328
329// NewIndexerInformer returns a Indexer and a controller for populating the index
330// while also providing event notifications. You should only used the returned
331// Index for Get/List operations; Add/Modify/Deletes will cause the event
332// notifications to be faulty.
333//
334// Parameters:
335// * lw is list and watch functions for the source of the resource you want to
336// be informed of.
337// * objType is an object of the type that you expect to receive.
338// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
339// calls, even if nothing changed). Otherwise, re-list will be delayed as
340// long as possible (until the upstream source closes the watch or times out,
341// or you stop the controller).
342// * h is the object you want notifications sent to.
343// * indexers is the indexer for the received object type.
344//
345func NewIndexerInformer(
346 lw ListerWatcher,
347 objType runtime.Object,
348 resyncPeriod time.Duration,
349 h ResourceEventHandler,
350 indexers Indexers,
351) (Indexer, Controller) {
352 // This will hold the client state, as we know it.
353 clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
354
355 // This will hold incoming changes. Note how we pass clientState in as a
356 // KeyLister, that way resync operations will result in the correct set
357 // of update/delete deltas.
358 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
359
360 cfg := &Config{
361 Queue: fifo,
362 ListerWatcher: lw,
363 ObjectType: objType,
364 FullResyncPeriod: resyncPeriod,
365 RetryOnError: false,
366
367 Process: func(obj interface{}) error {
368 // from oldest to newest
369 for _, d := range obj.(Deltas) {
370 switch d.Type {
371 case Sync, Added, Updated:
372 if old, exists, err := clientState.Get(d.Object); err == nil && exists {
373 if err := clientState.Update(d.Object); err != nil {
374 return err
375 }
376 h.OnUpdate(old, d.Object)
377 } else {
378 if err := clientState.Add(d.Object); err != nil {
379 return err
380 }
381 h.OnAdd(d.Object)
382 }
383 case Deleted:
384 if err := clientState.Delete(d.Object); err != nil {
385 return err
386 }
387 h.OnDelete(d.Object)
388 }
389 }
390 return nil
391 },
392 }
393 return clientState, New(cfg)
394}