Matthias Andreas Benkard | 832a54e | 2019-01-29 09:27:38 +0100 | [diff] [blame^] | 1 | /* |
| 2 | Copyright 2015 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package cache |
| 18 | |
| 19 | import ( |
| 20 | "sync" |
| 21 | "time" |
| 22 | |
| 23 | "k8s.io/apimachinery/pkg/runtime" |
| 24 | "k8s.io/apimachinery/pkg/util/clock" |
| 25 | utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| 26 | "k8s.io/apimachinery/pkg/util/wait" |
| 27 | ) |
| 28 | |
| 29 | // Config contains all the settings for a Controller. |
| 30 | type Config struct { |
| 31 | // The queue for your objects; either a FIFO or |
| 32 | // a DeltaFIFO. Your Process() function should accept |
| 33 | // the output of this Queue's Pop() method. |
| 34 | Queue |
| 35 | |
| 36 | // Something that can list and watch your objects. |
| 37 | ListerWatcher |
| 38 | |
| 39 | // Something that can process your objects. |
| 40 | Process ProcessFunc |
| 41 | |
| 42 | // The type of your objects. |
| 43 | ObjectType runtime.Object |
| 44 | |
| 45 | // Reprocess everything at least this often. |
| 46 | // Note that if it takes longer for you to clear the queue than this |
| 47 | // period, you will end up processing items in the order determined |
| 48 | // by FIFO.Replace(). Currently, this is random. If this is a |
| 49 | // problem, we can change that replacement policy to append new |
| 50 | // things to the end of the queue instead of replacing the entire |
| 51 | // queue. |
| 52 | FullResyncPeriod time.Duration |
| 53 | |
| 54 | // ShouldResync, if specified, is invoked when the controller's reflector determines the next |
| 55 | // periodic sync should occur. If this returns true, it means the reflector should proceed with |
| 56 | // the resync. |
| 57 | ShouldResync ShouldResyncFunc |
| 58 | |
| 59 | // If true, when Process() returns an error, re-enqueue the object. |
| 60 | // TODO: add interface to let you inject a delay/backoff or drop |
| 61 | // the object completely if desired. Pass the object in |
| 62 | // question to this interface as a parameter. |
| 63 | RetryOnError bool |
| 64 | } |
| 65 | |
| 66 | // ShouldResyncFunc is a type of function that indicates if a reflector should perform a |
| 67 | // resync or not. It can be used by a shared informer to support multiple event handlers with custom |
| 68 | // resync periods. |
| 69 | type ShouldResyncFunc func() bool |
| 70 | |
| 71 | // ProcessFunc processes a single object. |
| 72 | type ProcessFunc func(obj interface{}) error |
| 73 | |
| 74 | // Controller is a generic controller framework. |
| 75 | type controller struct { |
| 76 | config Config |
| 77 | reflector *Reflector |
| 78 | reflectorMutex sync.RWMutex |
| 79 | clock clock.Clock |
| 80 | } |
| 81 | |
| 82 | type Controller interface { |
| 83 | Run(stopCh <-chan struct{}) |
| 84 | HasSynced() bool |
| 85 | LastSyncResourceVersion() string |
| 86 | } |
| 87 | |
| 88 | // New makes a new Controller from the given Config. |
| 89 | func New(c *Config) Controller { |
| 90 | ctlr := &controller{ |
| 91 | config: *c, |
| 92 | clock: &clock.RealClock{}, |
| 93 | } |
| 94 | return ctlr |
| 95 | } |
| 96 | |
| 97 | // Run begins processing items, and will continue until a value is sent down stopCh. |
| 98 | // It's an error to call Run more than once. |
| 99 | // Run blocks; call via go. |
| 100 | func (c *controller) Run(stopCh <-chan struct{}) { |
| 101 | defer utilruntime.HandleCrash() |
| 102 | go func() { |
| 103 | <-stopCh |
| 104 | c.config.Queue.Close() |
| 105 | }() |
| 106 | r := NewReflector( |
| 107 | c.config.ListerWatcher, |
| 108 | c.config.ObjectType, |
| 109 | c.config.Queue, |
| 110 | c.config.FullResyncPeriod, |
| 111 | ) |
| 112 | r.ShouldResync = c.config.ShouldResync |
| 113 | r.clock = c.clock |
| 114 | |
| 115 | c.reflectorMutex.Lock() |
| 116 | c.reflector = r |
| 117 | c.reflectorMutex.Unlock() |
| 118 | |
| 119 | var wg wait.Group |
| 120 | defer wg.Wait() |
| 121 | |
| 122 | wg.StartWithChannel(stopCh, r.Run) |
| 123 | |
| 124 | wait.Until(c.processLoop, time.Second, stopCh) |
| 125 | } |
| 126 | |
| 127 | // Returns true once this controller has completed an initial resource listing |
| 128 | func (c *controller) HasSynced() bool { |
| 129 | return c.config.Queue.HasSynced() |
| 130 | } |
| 131 | |
| 132 | func (c *controller) LastSyncResourceVersion() string { |
| 133 | if c.reflector == nil { |
| 134 | return "" |
| 135 | } |
| 136 | return c.reflector.LastSyncResourceVersion() |
| 137 | } |
| 138 | |
| 139 | // processLoop drains the work queue. |
| 140 | // TODO: Consider doing the processing in parallel. This will require a little thought |
| 141 | // to make sure that we don't end up processing the same object multiple times |
| 142 | // concurrently. |
| 143 | // |
| 144 | // TODO: Plumb through the stopCh here (and down to the queue) so that this can |
| 145 | // actually exit when the controller is stopped. Or just give up on this stuff |
| 146 | // ever being stoppable. Converting this whole package to use Context would |
| 147 | // also be helpful. |
| 148 | func (c *controller) processLoop() { |
| 149 | for { |
| 150 | obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) |
| 151 | if err != nil { |
| 152 | if err == FIFOClosedError { |
| 153 | return |
| 154 | } |
| 155 | if c.config.RetryOnError { |
| 156 | // This is the safe way to re-enqueue. |
| 157 | c.config.Queue.AddIfNotPresent(obj) |
| 158 | } |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | // ResourceEventHandler can handle notifications for events that happen to a |
| 164 | // resource. The events are informational only, so you can't return an |
| 165 | // error. |
| 166 | // * OnAdd is called when an object is added. |
| 167 | // * OnUpdate is called when an object is modified. Note that oldObj is the |
| 168 | // last known state of the object-- it is possible that several changes |
| 169 | // were combined together, so you can't use this to see every single |
| 170 | // change. OnUpdate is also called when a re-list happens, and it will |
| 171 | // get called even if nothing changed. This is useful for periodically |
| 172 | // evaluating or syncing something. |
| 173 | // * OnDelete will get the final state of the item if it is known, otherwise |
| 174 | // it will get an object of type DeletedFinalStateUnknown. This can |
| 175 | // happen if the watch is closed and misses the delete event and we don't |
| 176 | // notice the deletion until the subsequent re-list. |
| 177 | type ResourceEventHandler interface { |
| 178 | OnAdd(obj interface{}) |
| 179 | OnUpdate(oldObj, newObj interface{}) |
| 180 | OnDelete(obj interface{}) |
| 181 | } |
| 182 | |
| 183 | // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or |
| 184 | // as few of the notification functions as you want while still implementing |
| 185 | // ResourceEventHandler. |
| 186 | type ResourceEventHandlerFuncs struct { |
| 187 | AddFunc func(obj interface{}) |
| 188 | UpdateFunc func(oldObj, newObj interface{}) |
| 189 | DeleteFunc func(obj interface{}) |
| 190 | } |
| 191 | |
| 192 | // OnAdd calls AddFunc if it's not nil. |
| 193 | func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { |
| 194 | if r.AddFunc != nil { |
| 195 | r.AddFunc(obj) |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | // OnUpdate calls UpdateFunc if it's not nil. |
| 200 | func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { |
| 201 | if r.UpdateFunc != nil { |
| 202 | r.UpdateFunc(oldObj, newObj) |
| 203 | } |
| 204 | } |
| 205 | |
| 206 | // OnDelete calls DeleteFunc if it's not nil. |
| 207 | func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { |
| 208 | if r.DeleteFunc != nil { |
| 209 | r.DeleteFunc(obj) |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | // FilteringResourceEventHandler applies the provided filter to all events coming |
| 214 | // in, ensuring the appropriate nested handler method is invoked. An object |
| 215 | // that starts passing the filter after an update is considered an add, and an |
| 216 | // object that stops passing the filter after an update is considered a delete. |
| 217 | type FilteringResourceEventHandler struct { |
| 218 | FilterFunc func(obj interface{}) bool |
| 219 | Handler ResourceEventHandler |
| 220 | } |
| 221 | |
| 222 | // OnAdd calls the nested handler only if the filter succeeds |
| 223 | func (r FilteringResourceEventHandler) OnAdd(obj interface{}) { |
| 224 | if !r.FilterFunc(obj) { |
| 225 | return |
| 226 | } |
| 227 | r.Handler.OnAdd(obj) |
| 228 | } |
| 229 | |
| 230 | // OnUpdate ensures the proper handler is called depending on whether the filter matches |
| 231 | func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { |
| 232 | newer := r.FilterFunc(newObj) |
| 233 | older := r.FilterFunc(oldObj) |
| 234 | switch { |
| 235 | case newer && older: |
| 236 | r.Handler.OnUpdate(oldObj, newObj) |
| 237 | case newer && !older: |
| 238 | r.Handler.OnAdd(newObj) |
| 239 | case !newer && older: |
| 240 | r.Handler.OnDelete(oldObj) |
| 241 | default: |
| 242 | // do nothing |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | // OnDelete calls the nested handler only if the filter succeeds |
| 247 | func (r FilteringResourceEventHandler) OnDelete(obj interface{}) { |
| 248 | if !r.FilterFunc(obj) { |
| 249 | return |
| 250 | } |
| 251 | r.Handler.OnDelete(obj) |
| 252 | } |
| 253 | |
| 254 | // DeletionHandlingMetaNamespaceKeyFunc checks for |
| 255 | // DeletedFinalStateUnknown objects before calling |
| 256 | // MetaNamespaceKeyFunc. |
| 257 | func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { |
| 258 | if d, ok := obj.(DeletedFinalStateUnknown); ok { |
| 259 | return d.Key, nil |
| 260 | } |
| 261 | return MetaNamespaceKeyFunc(obj) |
| 262 | } |
| 263 | |
| 264 | // NewInformer returns a Store and a controller for populating the store |
| 265 | // while also providing event notifications. You should only used the returned |
| 266 | // Store for Get/List operations; Add/Modify/Deletes will cause the event |
| 267 | // notifications to be faulty. |
| 268 | // |
| 269 | // Parameters: |
| 270 | // * lw is list and watch functions for the source of the resource you want to |
| 271 | // be informed of. |
| 272 | // * objType is an object of the type that you expect to receive. |
| 273 | // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate |
| 274 | // calls, even if nothing changed). Otherwise, re-list will be delayed as |
| 275 | // long as possible (until the upstream source closes the watch or times out, |
| 276 | // or you stop the controller). |
| 277 | // * h is the object you want notifications sent to. |
| 278 | // |
| 279 | func NewInformer( |
| 280 | lw ListerWatcher, |
| 281 | objType runtime.Object, |
| 282 | resyncPeriod time.Duration, |
| 283 | h ResourceEventHandler, |
| 284 | ) (Store, Controller) { |
| 285 | // This will hold the client state, as we know it. |
| 286 | clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) |
| 287 | |
| 288 | // This will hold incoming changes. Note how we pass clientState in as a |
| 289 | // KeyLister, that way resync operations will result in the correct set |
| 290 | // of update/delete deltas. |
| 291 | fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) |
| 292 | |
| 293 | cfg := &Config{ |
| 294 | Queue: fifo, |
| 295 | ListerWatcher: lw, |
| 296 | ObjectType: objType, |
| 297 | FullResyncPeriod: resyncPeriod, |
| 298 | RetryOnError: false, |
| 299 | |
| 300 | Process: func(obj interface{}) error { |
| 301 | // from oldest to newest |
| 302 | for _, d := range obj.(Deltas) { |
| 303 | switch d.Type { |
| 304 | case Sync, Added, Updated: |
| 305 | if old, exists, err := clientState.Get(d.Object); err == nil && exists { |
| 306 | if err := clientState.Update(d.Object); err != nil { |
| 307 | return err |
| 308 | } |
| 309 | h.OnUpdate(old, d.Object) |
| 310 | } else { |
| 311 | if err := clientState.Add(d.Object); err != nil { |
| 312 | return err |
| 313 | } |
| 314 | h.OnAdd(d.Object) |
| 315 | } |
| 316 | case Deleted: |
| 317 | if err := clientState.Delete(d.Object); err != nil { |
| 318 | return err |
| 319 | } |
| 320 | h.OnDelete(d.Object) |
| 321 | } |
| 322 | } |
| 323 | return nil |
| 324 | }, |
| 325 | } |
| 326 | return clientState, New(cfg) |
| 327 | } |
| 328 | |
| 329 | // NewIndexerInformer returns a Indexer and a controller for populating the index |
| 330 | // while also providing event notifications. You should only used the returned |
| 331 | // Index for Get/List operations; Add/Modify/Deletes will cause the event |
| 332 | // notifications to be faulty. |
| 333 | // |
| 334 | // Parameters: |
| 335 | // * lw is list and watch functions for the source of the resource you want to |
| 336 | // be informed of. |
| 337 | // * objType is an object of the type that you expect to receive. |
| 338 | // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate |
| 339 | // calls, even if nothing changed). Otherwise, re-list will be delayed as |
| 340 | // long as possible (until the upstream source closes the watch or times out, |
| 341 | // or you stop the controller). |
| 342 | // * h is the object you want notifications sent to. |
| 343 | // * indexers is the indexer for the received object type. |
| 344 | // |
| 345 | func NewIndexerInformer( |
| 346 | lw ListerWatcher, |
| 347 | objType runtime.Object, |
| 348 | resyncPeriod time.Duration, |
| 349 | h ResourceEventHandler, |
| 350 | indexers Indexers, |
| 351 | ) (Indexer, Controller) { |
| 352 | // This will hold the client state, as we know it. |
| 353 | clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) |
| 354 | |
| 355 | // This will hold incoming changes. Note how we pass clientState in as a |
| 356 | // KeyLister, that way resync operations will result in the correct set |
| 357 | // of update/delete deltas. |
| 358 | fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) |
| 359 | |
| 360 | cfg := &Config{ |
| 361 | Queue: fifo, |
| 362 | ListerWatcher: lw, |
| 363 | ObjectType: objType, |
| 364 | FullResyncPeriod: resyncPeriod, |
| 365 | RetryOnError: false, |
| 366 | |
| 367 | Process: func(obj interface{}) error { |
| 368 | // from oldest to newest |
| 369 | for _, d := range obj.(Deltas) { |
| 370 | switch d.Type { |
| 371 | case Sync, Added, Updated: |
| 372 | if old, exists, err := clientState.Get(d.Object); err == nil && exists { |
| 373 | if err := clientState.Update(d.Object); err != nil { |
| 374 | return err |
| 375 | } |
| 376 | h.OnUpdate(old, d.Object) |
| 377 | } else { |
| 378 | if err := clientState.Add(d.Object); err != nil { |
| 379 | return err |
| 380 | } |
| 381 | h.OnAdd(d.Object) |
| 382 | } |
| 383 | case Deleted: |
| 384 | if err := clientState.Delete(d.Object); err != nil { |
| 385 | return err |
| 386 | } |
| 387 | h.OnDelete(d.Object) |
| 388 | } |
| 389 | } |
| 390 | return nil |
| 391 | }, |
| 392 | } |
| 393 | return clientState, New(cfg) |
| 394 | } |