| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package cache |
| |
| import ( |
| "sync" |
| "time" |
| |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/util/clock" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| ) |
| |
| // Config contains all the settings for a Controller. |
| type Config struct { |
| // The queue for your objects; either a FIFO or |
| // a DeltaFIFO. Your Process() function should accept |
| // the output of this Queue's Pop() method. |
| Queue |
| |
| // Something that can list and watch your objects. |
| ListerWatcher |
| |
| // Something that can process your objects. |
| Process ProcessFunc |
| |
| // The type of your objects. |
| ObjectType runtime.Object |
| |
| // Reprocess everything at least this often. |
| // Note that if it takes longer for you to clear the queue than this |
| // period, you will end up processing items in the order determined |
| // by FIFO.Replace(). Currently, this is random. If this is a |
| // problem, we can change that replacement policy to append new |
| // things to the end of the queue instead of replacing the entire |
| // queue. |
| FullResyncPeriod time.Duration |
| |
| // ShouldResync, if specified, is invoked when the controller's reflector determines the next |
| // periodic sync should occur. If this returns true, it means the reflector should proceed with |
| // the resync. |
| ShouldResync ShouldResyncFunc |
| |
| // If true, when Process() returns an error, re-enqueue the object. |
| // TODO: add interface to let you inject a delay/backoff or drop |
| // the object completely if desired. Pass the object in |
| // question to this interface as a parameter. |
| RetryOnError bool |
| } |
| |
| // ShouldResyncFunc is a type of function that indicates if a reflector should perform a |
| // resync or not. It can be used by a shared informer to support multiple event handlers with custom |
| // resync periods. |
| type ShouldResyncFunc func() bool |
| |
| // ProcessFunc processes a single object. |
| type ProcessFunc func(obj interface{}) error |
| |
| // Controller is a generic controller framework. |
| type controller struct { |
| config Config |
| reflector *Reflector |
| reflectorMutex sync.RWMutex |
| clock clock.Clock |
| } |
| |
| type Controller interface { |
| Run(stopCh <-chan struct{}) |
| HasSynced() bool |
| LastSyncResourceVersion() string |
| } |
| |
| // New makes a new Controller from the given Config. |
| func New(c *Config) Controller { |
| ctlr := &controller{ |
| config: *c, |
| clock: &clock.RealClock{}, |
| } |
| return ctlr |
| } |
| |
| // Run begins processing items, and will continue until a value is sent down stopCh. |
| // It's an error to call Run more than once. |
| // Run blocks; call via go. |
| func (c *controller) Run(stopCh <-chan struct{}) { |
| defer utilruntime.HandleCrash() |
| go func() { |
| <-stopCh |
| c.config.Queue.Close() |
| }() |
| r := NewReflector( |
| c.config.ListerWatcher, |
| c.config.ObjectType, |
| c.config.Queue, |
| c.config.FullResyncPeriod, |
| ) |
| r.ShouldResync = c.config.ShouldResync |
| r.clock = c.clock |
| |
| c.reflectorMutex.Lock() |
| c.reflector = r |
| c.reflectorMutex.Unlock() |
| |
| var wg wait.Group |
| defer wg.Wait() |
| |
| wg.StartWithChannel(stopCh, r.Run) |
| |
| wait.Until(c.processLoop, time.Second, stopCh) |
| } |
| |
| // Returns true once this controller has completed an initial resource listing |
| func (c *controller) HasSynced() bool { |
| return c.config.Queue.HasSynced() |
| } |
| |
| func (c *controller) LastSyncResourceVersion() string { |
| if c.reflector == nil { |
| return "" |
| } |
| return c.reflector.LastSyncResourceVersion() |
| } |
| |
| // processLoop drains the work queue. |
| // TODO: Consider doing the processing in parallel. This will require a little thought |
| // to make sure that we don't end up processing the same object multiple times |
| // concurrently. |
| // |
| // TODO: Plumb through the stopCh here (and down to the queue) so that this can |
| // actually exit when the controller is stopped. Or just give up on this stuff |
| // ever being stoppable. Converting this whole package to use Context would |
| // also be helpful. |
| func (c *controller) processLoop() { |
| for { |
| obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) |
| if err != nil { |
| if err == FIFOClosedError { |
| return |
| } |
| if c.config.RetryOnError { |
| // This is the safe way to re-enqueue. |
| c.config.Queue.AddIfNotPresent(obj) |
| } |
| } |
| } |
| } |
| |
| // ResourceEventHandler can handle notifications for events that happen to a |
| // resource. The events are informational only, so you can't return an |
| // error. |
| // * OnAdd is called when an object is added. |
| // * OnUpdate is called when an object is modified. Note that oldObj is the |
| // last known state of the object-- it is possible that several changes |
| // were combined together, so you can't use this to see every single |
| // change. OnUpdate is also called when a re-list happens, and it will |
| // get called even if nothing changed. This is useful for periodically |
| // evaluating or syncing something. |
| // * OnDelete will get the final state of the item if it is known, otherwise |
| // it will get an object of type DeletedFinalStateUnknown. This can |
| // happen if the watch is closed and misses the delete event and we don't |
| // notice the deletion until the subsequent re-list. |
| type ResourceEventHandler interface { |
| OnAdd(obj interface{}) |
| OnUpdate(oldObj, newObj interface{}) |
| OnDelete(obj interface{}) |
| } |
| |
| // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or |
| // as few of the notification functions as you want while still implementing |
| // ResourceEventHandler. |
| type ResourceEventHandlerFuncs struct { |
| AddFunc func(obj interface{}) |
| UpdateFunc func(oldObj, newObj interface{}) |
| DeleteFunc func(obj interface{}) |
| } |
| |
| // OnAdd calls AddFunc if it's not nil. |
| func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { |
| if r.AddFunc != nil { |
| r.AddFunc(obj) |
| } |
| } |
| |
| // OnUpdate calls UpdateFunc if it's not nil. |
| func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { |
| if r.UpdateFunc != nil { |
| r.UpdateFunc(oldObj, newObj) |
| } |
| } |
| |
| // OnDelete calls DeleteFunc if it's not nil. |
| func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { |
| if r.DeleteFunc != nil { |
| r.DeleteFunc(obj) |
| } |
| } |
| |
| // FilteringResourceEventHandler applies the provided filter to all events coming |
| // in, ensuring the appropriate nested handler method is invoked. An object |
| // that starts passing the filter after an update is considered an add, and an |
| // object that stops passing the filter after an update is considered a delete. |
| type FilteringResourceEventHandler struct { |
| FilterFunc func(obj interface{}) bool |
| Handler ResourceEventHandler |
| } |
| |
| // OnAdd calls the nested handler only if the filter succeeds |
| func (r FilteringResourceEventHandler) OnAdd(obj interface{}) { |
| if !r.FilterFunc(obj) { |
| return |
| } |
| r.Handler.OnAdd(obj) |
| } |
| |
| // OnUpdate ensures the proper handler is called depending on whether the filter matches |
| func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { |
| newer := r.FilterFunc(newObj) |
| older := r.FilterFunc(oldObj) |
| switch { |
| case newer && older: |
| r.Handler.OnUpdate(oldObj, newObj) |
| case newer && !older: |
| r.Handler.OnAdd(newObj) |
| case !newer && older: |
| r.Handler.OnDelete(oldObj) |
| default: |
| // do nothing |
| } |
| } |
| |
| // OnDelete calls the nested handler only if the filter succeeds |
| func (r FilteringResourceEventHandler) OnDelete(obj interface{}) { |
| if !r.FilterFunc(obj) { |
| return |
| } |
| r.Handler.OnDelete(obj) |
| } |
| |
| // DeletionHandlingMetaNamespaceKeyFunc checks for |
| // DeletedFinalStateUnknown objects before calling |
| // MetaNamespaceKeyFunc. |
| func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { |
| if d, ok := obj.(DeletedFinalStateUnknown); ok { |
| return d.Key, nil |
| } |
| return MetaNamespaceKeyFunc(obj) |
| } |
| |
| // NewInformer returns a Store and a controller for populating the store |
| // while also providing event notifications. You should only used the returned |
| // Store for Get/List operations; Add/Modify/Deletes will cause the event |
| // notifications to be faulty. |
| // |
| // Parameters: |
| // * lw is list and watch functions for the source of the resource you want to |
| // be informed of. |
| // * objType is an object of the type that you expect to receive. |
| // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate |
| // calls, even if nothing changed). Otherwise, re-list will be delayed as |
| // long as possible (until the upstream source closes the watch or times out, |
| // or you stop the controller). |
| // * h is the object you want notifications sent to. |
| // |
| func NewInformer( |
| lw ListerWatcher, |
| objType runtime.Object, |
| resyncPeriod time.Duration, |
| h ResourceEventHandler, |
| ) (Store, Controller) { |
| // This will hold the client state, as we know it. |
| clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) |
| |
| // This will hold incoming changes. Note how we pass clientState in as a |
| // KeyLister, that way resync operations will result in the correct set |
| // of update/delete deltas. |
| fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) |
| |
| cfg := &Config{ |
| Queue: fifo, |
| ListerWatcher: lw, |
| ObjectType: objType, |
| FullResyncPeriod: resyncPeriod, |
| RetryOnError: false, |
| |
| Process: func(obj interface{}) error { |
| // from oldest to newest |
| for _, d := range obj.(Deltas) { |
| switch d.Type { |
| case Sync, Added, Updated: |
| if old, exists, err := clientState.Get(d.Object); err == nil && exists { |
| if err := clientState.Update(d.Object); err != nil { |
| return err |
| } |
| h.OnUpdate(old, d.Object) |
| } else { |
| if err := clientState.Add(d.Object); err != nil { |
| return err |
| } |
| h.OnAdd(d.Object) |
| } |
| case Deleted: |
| if err := clientState.Delete(d.Object); err != nil { |
| return err |
| } |
| h.OnDelete(d.Object) |
| } |
| } |
| return nil |
| }, |
| } |
| return clientState, New(cfg) |
| } |
| |
| // NewIndexerInformer returns a Indexer and a controller for populating the index |
| // while also providing event notifications. You should only used the returned |
| // Index for Get/List operations; Add/Modify/Deletes will cause the event |
| // notifications to be faulty. |
| // |
| // Parameters: |
| // * lw is list and watch functions for the source of the resource you want to |
| // be informed of. |
| // * objType is an object of the type that you expect to receive. |
| // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate |
| // calls, even if nothing changed). Otherwise, re-list will be delayed as |
| // long as possible (until the upstream source closes the watch or times out, |
| // or you stop the controller). |
| // * h is the object you want notifications sent to. |
| // * indexers is the indexer for the received object type. |
| // |
| func NewIndexerInformer( |
| lw ListerWatcher, |
| objType runtime.Object, |
| resyncPeriod time.Duration, |
| h ResourceEventHandler, |
| indexers Indexers, |
| ) (Indexer, Controller) { |
| // This will hold the client state, as we know it. |
| clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) |
| |
| // This will hold incoming changes. Note how we pass clientState in as a |
| // KeyLister, that way resync operations will result in the correct set |
| // of update/delete deltas. |
| fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) |
| |
| cfg := &Config{ |
| Queue: fifo, |
| ListerWatcher: lw, |
| ObjectType: objType, |
| FullResyncPeriod: resyncPeriod, |
| RetryOnError: false, |
| |
| Process: func(obj interface{}) error { |
| // from oldest to newest |
| for _, d := range obj.(Deltas) { |
| switch d.Type { |
| case Sync, Added, Updated: |
| if old, exists, err := clientState.Get(d.Object); err == nil && exists { |
| if err := clientState.Update(d.Object); err != nil { |
| return err |
| } |
| h.OnUpdate(old, d.Object) |
| } else { |
| if err := clientState.Add(d.Object); err != nil { |
| return err |
| } |
| h.OnAdd(d.Object) |
| } |
| case Deleted: |
| if err := clientState.Delete(d.Object); err != nil { |
| return err |
| } |
| h.OnDelete(d.Object) |
| } |
| } |
| return nil |
| }, |
| } |
| return clientState, New(cfg) |
| } |