blob: 45c3b500d427e833f715a47479f1217c3b0aa7b6 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20 "errors"
21 "fmt"
22 "sync"
23
24 "k8s.io/apimachinery/pkg/util/sets"
25
26 "github.com/golang/glog"
27)
28
29// NewDeltaFIFO returns a Store which can be used process changes to items.
30//
31// keyFunc is used to figure out what key an object should have. (It's
32// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
33//
34// 'keyLister' is expected to return a list of keys that the consumer of
35// this queue "knows about". It is used to decide which items are missing
36// when Replace() is called; 'Deleted' deltas are produced for these items.
37// It may be nil if you don't need to detect all deletions.
38// TODO: consider merging keyLister with this object, tracking a list of
39// "known" keys when Pop() is called. Have to think about how that
40// affects error retrying.
41// NOTE: It is possible to misuse this and cause a race when using an
42// external known object source.
43// Whether there is a potential race depends on how the comsumer
44// modifies knownObjects. In Pop(), process function is called under
45// lock, so it is safe to update data structures in it that need to be
46// in sync with the queue (e.g. knownObjects).
47//
48// Example:
49// In case of sharedIndexInformer being a consumer
50// (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
51// src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
52// there is no race as knownObjects (s.indexer) is modified safely
53// under DeltaFIFO's lock. The only exceptions are GetStore() and
54// GetIndexer() methods, which expose ways to modify the underlying
55// storage. Currently these two methods are used for creating Lister
56// and internal tests.
57//
58// Also see the comment on DeltaFIFO.
59func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
60 f := &DeltaFIFO{
61 items: map[string]Deltas{},
62 queue: []string{},
63 keyFunc: keyFunc,
64 knownObjects: knownObjects,
65 }
66 f.cond.L = &f.lock
67 return f
68}
69
70// DeltaFIFO is like FIFO, but allows you to process deletes.
71//
72// DeltaFIFO is a producer-consumer queue, where a Reflector is
73// intended to be the producer, and the consumer is whatever calls
74// the Pop() method.
75//
76// DeltaFIFO solves this use case:
77// * You want to process every object change (delta) at most once.
78// * When you process an object, you want to see everything
79// that's happened to it since you last processed it.
80// * You want to process the deletion of objects.
81// * You might want to periodically reprocess objects.
82//
83// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
84// interface{} to satisfy the Store/Queue interfaces, but it
85// will always return an object of type Deltas.
86//
87// A note on threading: If you call Pop() in parallel from multiple
88// threads, you could end up with multiple threads processing slightly
89// different versions of the same object.
90//
91// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
92// to list keys that are "known", for the purpose of figuring out which
93// items have been deleted when Replace() or Delete() are called. The deleted
94// object will be included in the DeleteFinalStateUnknown markers. These objects
95// could be stale.
96type DeltaFIFO struct {
97 // lock/cond protects access to 'items' and 'queue'.
98 lock sync.RWMutex
99 cond sync.Cond
100
101 // We depend on the property that items in the set are in
102 // the queue and vice versa, and that all Deltas in this
103 // map have at least one Delta.
104 items map[string]Deltas
105 queue []string
106
107 // populated is true if the first batch of items inserted by Replace() has been populated
108 // or Delete/Add/Update was called first.
109 populated bool
110 // initialPopulationCount is the number of items inserted by the first call of Replace()
111 initialPopulationCount int
112
113 // keyFunc is used to make the key used for queued item
114 // insertion and retrieval, and should be deterministic.
115 keyFunc KeyFunc
116
117 // knownObjects list keys that are "known", for the
118 // purpose of figuring out which items have been deleted
119 // when Replace() or Delete() is called.
120 knownObjects KeyListerGetter
121
122 // Indication the queue is closed.
123 // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
124 // Currently, not used to gate any of CRED operations.
125 closed bool
126 closedLock sync.Mutex
127}
128
129var (
130 _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
131)
132
133var (
134 // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
135 // object with zero length is encountered (should be impossible,
136 // but included for completeness).
137 ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
138)
139
140// Close the queue.
141func (f *DeltaFIFO) Close() {
142 f.closedLock.Lock()
143 defer f.closedLock.Unlock()
144 f.closed = true
145 f.cond.Broadcast()
146}
147
148// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
149// DeletedFinalStateUnknown objects.
150func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
151 if d, ok := obj.(Deltas); ok {
152 if len(d) == 0 {
153 return "", KeyError{obj, ErrZeroLengthDeltasObject}
154 }
155 obj = d.Newest().Object
156 }
157 if d, ok := obj.(DeletedFinalStateUnknown); ok {
158 return d.Key, nil
159 }
160 return f.keyFunc(obj)
161}
162
163// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
164// or an Update called first but the first batch of items inserted by Replace() has been popped
165func (f *DeltaFIFO) HasSynced() bool {
166 f.lock.Lock()
167 defer f.lock.Unlock()
168 return f.populated && f.initialPopulationCount == 0
169}
170
171// Add inserts an item, and puts it in the queue. The item is only enqueued
172// if it doesn't already exist in the set.
173func (f *DeltaFIFO) Add(obj interface{}) error {
174 f.lock.Lock()
175 defer f.lock.Unlock()
176 f.populated = true
177 return f.queueActionLocked(Added, obj)
178}
179
180// Update is just like Add, but makes an Updated Delta.
181func (f *DeltaFIFO) Update(obj interface{}) error {
182 f.lock.Lock()
183 defer f.lock.Unlock()
184 f.populated = true
185 return f.queueActionLocked(Updated, obj)
186}
187
188// Delete is just like Add, but makes an Deleted Delta. If the item does not
189// already exist, it will be ignored. (It may have already been deleted by a
190// Replace (re-list), for example.
191func (f *DeltaFIFO) Delete(obj interface{}) error {
192 id, err := f.KeyOf(obj)
193 if err != nil {
194 return KeyError{obj, err}
195 }
196 f.lock.Lock()
197 defer f.lock.Unlock()
198 f.populated = true
199 if f.knownObjects == nil {
200 if _, exists := f.items[id]; !exists {
201 // Presumably, this was deleted when a relist happened.
202 // Don't provide a second report of the same deletion.
203 return nil
204 }
205 } else {
206 // We only want to skip the "deletion" action if the object doesn't
207 // exist in knownObjects and it doesn't have corresponding item in items.
208 // Note that even if there is a "deletion" action in items, we can ignore it,
209 // because it will be deduped automatically in "queueActionLocked"
210 _, exists, err := f.knownObjects.GetByKey(id)
211 _, itemsExist := f.items[id]
212 if err == nil && !exists && !itemsExist {
213 // Presumably, this was deleted when a relist happened.
214 // Don't provide a second report of the same deletion.
215 return nil
216 }
217 }
218
219 return f.queueActionLocked(Deleted, obj)
220}
221
222// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
223// present in the set, it is neither enqueued nor added to the set.
224//
225// This is useful in a single producer/consumer scenario so that the consumer can
226// safely retry items without contending with the producer and potentially enqueueing
227// stale items.
228//
229// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
230// different from the Add/Update/Delete functions.
231func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
232 deltas, ok := obj.(Deltas)
233 if !ok {
234 return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
235 }
236 id, err := f.KeyOf(deltas.Newest().Object)
237 if err != nil {
238 return KeyError{obj, err}
239 }
240 f.lock.Lock()
241 defer f.lock.Unlock()
242 f.addIfNotPresent(id, deltas)
243 return nil
244}
245
246// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
247// already holds the fifo lock.
248func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
249 f.populated = true
250 if _, exists := f.items[id]; exists {
251 return
252 }
253
254 f.queue = append(f.queue, id)
255 f.items[id] = deltas
256 f.cond.Broadcast()
257}
258
259// re-listing and watching can deliver the same update multiple times in any
260// order. This will combine the most recent two deltas if they are the same.
261func dedupDeltas(deltas Deltas) Deltas {
262 n := len(deltas)
263 if n < 2 {
264 return deltas
265 }
266 a := &deltas[n-1]
267 b := &deltas[n-2]
268 if out := isDup(a, b); out != nil {
269 d := append(Deltas{}, deltas[:n-2]...)
270 return append(d, *out)
271 }
272 return deltas
273}
274
275// If a & b represent the same event, returns the delta that ought to be kept.
276// Otherwise, returns nil.
277// TODO: is there anything other than deletions that need deduping?
278func isDup(a, b *Delta) *Delta {
279 if out := isDeletionDup(a, b); out != nil {
280 return out
281 }
282 // TODO: Detect other duplicate situations? Are there any?
283 return nil
284}
285
286// keep the one with the most information if both are deletions.
287func isDeletionDup(a, b *Delta) *Delta {
288 if b.Type != Deleted || a.Type != Deleted {
289 return nil
290 }
291 // Do more sophisticated checks, or is this sufficient?
292 if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
293 return a
294 }
295 return b
296}
297
298// willObjectBeDeletedLocked returns true only if the last delta for the
299// given object is Delete. Caller must lock first.
300func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
301 deltas := f.items[id]
302 return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
303}
304
305// queueActionLocked appends to the delta list for the object.
306// Caller must lock first.
307func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
308 id, err := f.KeyOf(obj)
309 if err != nil {
310 return KeyError{obj, err}
311 }
312
313 // If object is supposed to be deleted (last event is Deleted),
314 // then we should ignore Sync events, because it would result in
315 // recreation of this object.
316 if actionType == Sync && f.willObjectBeDeletedLocked(id) {
317 return nil
318 }
319
320 newDeltas := append(f.items[id], Delta{actionType, obj})
321 newDeltas = dedupDeltas(newDeltas)
322
323 _, exists := f.items[id]
324 if len(newDeltas) > 0 {
325 if !exists {
326 f.queue = append(f.queue, id)
327 }
328 f.items[id] = newDeltas
329 f.cond.Broadcast()
330 } else if exists {
331 // We need to remove this from our map (extra items
332 // in the queue are ignored if they are not in the
333 // map).
334 delete(f.items, id)
335 }
336 return nil
337}
338
339// List returns a list of all the items; it returns the object
340// from the most recent Delta.
341// You should treat the items returned inside the deltas as immutable.
342func (f *DeltaFIFO) List() []interface{} {
343 f.lock.RLock()
344 defer f.lock.RUnlock()
345 return f.listLocked()
346}
347
348func (f *DeltaFIFO) listLocked() []interface{} {
349 list := make([]interface{}, 0, len(f.items))
350 for _, item := range f.items {
351 // Copy item's slice so operations on this slice
352 // won't interfere with the object we return.
353 item = copyDeltas(item)
354 list = append(list, item.Newest().Object)
355 }
356 return list
357}
358
359// ListKeys returns a list of all the keys of the objects currently
360// in the FIFO.
361func (f *DeltaFIFO) ListKeys() []string {
362 f.lock.RLock()
363 defer f.lock.RUnlock()
364 list := make([]string, 0, len(f.items))
365 for key := range f.items {
366 list = append(list, key)
367 }
368 return list
369}
370
371// Get returns the complete list of deltas for the requested item,
372// or sets exists=false.
373// You should treat the items returned inside the deltas as immutable.
374func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
375 key, err := f.KeyOf(obj)
376 if err != nil {
377 return nil, false, KeyError{obj, err}
378 }
379 return f.GetByKey(key)
380}
381
382// GetByKey returns the complete list of deltas for the requested item,
383// setting exists=false if that list is empty.
384// You should treat the items returned inside the deltas as immutable.
385func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
386 f.lock.RLock()
387 defer f.lock.RUnlock()
388 d, exists := f.items[key]
389 if exists {
390 // Copy item's slice so operations on this slice
391 // won't interfere with the object we return.
392 d = copyDeltas(d)
393 }
394 return d, exists, nil
395}
396
397// Checks if the queue is closed
398func (f *DeltaFIFO) IsClosed() bool {
399 f.closedLock.Lock()
400 defer f.closedLock.Unlock()
401 if f.closed {
402 return true
403 }
404 return false
405}
406
407// Pop blocks until an item is added to the queue, and then returns it. If
408// multiple items are ready, they are returned in the order in which they were
409// added/updated. The item is removed from the queue (and the store) before it
410// is returned, so if you don't successfully process it, you need to add it back
411// with AddIfNotPresent().
412// process function is called under lock, so it is safe update data structures
413// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
414// may return an instance of ErrRequeue with a nested error to indicate the current
415// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
416//
417// Pop returns a 'Deltas', which has a complete list of all the things
418// that happened to the object (deltas) while it was sitting in the queue.
419func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
420 f.lock.Lock()
421 defer f.lock.Unlock()
422 for {
423 for len(f.queue) == 0 {
424 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
425 // When Close() is called, the f.closed is set and the condition is broadcasted.
426 // Which causes this loop to continue and return from the Pop().
427 if f.IsClosed() {
428 return nil, FIFOClosedError
429 }
430
431 f.cond.Wait()
432 }
433 id := f.queue[0]
434 f.queue = f.queue[1:]
435 item, ok := f.items[id]
436 if f.initialPopulationCount > 0 {
437 f.initialPopulationCount--
438 }
439 if !ok {
440 // Item may have been deleted subsequently.
441 continue
442 }
443 delete(f.items, id)
444 err := process(item)
445 if e, ok := err.(ErrRequeue); ok {
446 f.addIfNotPresent(id, item)
447 err = e.Err
448 }
449 // Don't need to copyDeltas here, because we're transferring
450 // ownership to the caller.
451 return item, err
452 }
453}
454
455// Replace will delete the contents of 'f', using instead the given map.
456// 'f' takes ownership of the map, you should not reference the map again
457// after calling this function. f's queue is reset, too; upon return, it
458// will contain the items in the map, in no particular order.
459func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
460 f.lock.Lock()
461 defer f.lock.Unlock()
462 keys := make(sets.String, len(list))
463
464 for _, item := range list {
465 key, err := f.KeyOf(item)
466 if err != nil {
467 return KeyError{item, err}
468 }
469 keys.Insert(key)
470 if err := f.queueActionLocked(Sync, item); err != nil {
471 return fmt.Errorf("couldn't enqueue object: %v", err)
472 }
473 }
474
475 if f.knownObjects == nil {
476 // Do deletion detection against our own list.
477 for k, oldItem := range f.items {
478 if keys.Has(k) {
479 continue
480 }
481 var deletedObj interface{}
482 if n := oldItem.Newest(); n != nil {
483 deletedObj = n.Object
484 }
485 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
486 return err
487 }
488 }
489
490 if !f.populated {
491 f.populated = true
492 f.initialPopulationCount = len(list)
493 }
494
495 return nil
496 }
497
498 // Detect deletions not already in the queue.
499 knownKeys := f.knownObjects.ListKeys()
500 queuedDeletions := 0
501 for _, k := range knownKeys {
502 if keys.Has(k) {
503 continue
504 }
505
506 deletedObj, exists, err := f.knownObjects.GetByKey(k)
507 if err != nil {
508 deletedObj = nil
509 glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
510 } else if !exists {
511 deletedObj = nil
512 glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
513 }
514 queuedDeletions++
515 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
516 return err
517 }
518 }
519
520 if !f.populated {
521 f.populated = true
522 f.initialPopulationCount = len(list) + queuedDeletions
523 }
524
525 return nil
526}
527
528// Resync will send a sync event for each item
529func (f *DeltaFIFO) Resync() error {
530 f.lock.Lock()
531 defer f.lock.Unlock()
532
533 if f.knownObjects == nil {
534 return nil
535 }
536
537 keys := f.knownObjects.ListKeys()
538 for _, k := range keys {
539 if err := f.syncKeyLocked(k); err != nil {
540 return err
541 }
542 }
543 return nil
544}
545
546func (f *DeltaFIFO) syncKey(key string) error {
547 f.lock.Lock()
548 defer f.lock.Unlock()
549
550 return f.syncKeyLocked(key)
551}
552
553func (f *DeltaFIFO) syncKeyLocked(key string) error {
554 obj, exists, err := f.knownObjects.GetByKey(key)
555 if err != nil {
556 glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
557 return nil
558 } else if !exists {
559 glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
560 return nil
561 }
562
563 // If we are doing Resync() and there is already an event queued for that object,
564 // we ignore the Resync for it. This is to avoid the race, in which the resync
565 // comes with the previous value of object (since queueing an event for the object
566 // doesn't trigger changing the underlying store <knownObjects>.
567 id, err := f.KeyOf(obj)
568 if err != nil {
569 return KeyError{obj, err}
570 }
571 if len(f.items[id]) > 0 {
572 return nil
573 }
574
575 if err := f.queueActionLocked(Sync, obj); err != nil {
576 return fmt.Errorf("couldn't queue object: %v", err)
577 }
578 return nil
579}
580
581// A KeyListerGetter is anything that knows how to list its keys and look up by key.
582type KeyListerGetter interface {
583 KeyLister
584 KeyGetter
585}
586
587// A KeyLister is anything that knows how to list its keys.
588type KeyLister interface {
589 ListKeys() []string
590}
591
592// A KeyGetter is anything that knows how to get the value stored under a given key.
593type KeyGetter interface {
594 GetByKey(key string) (interface{}, bool, error)
595}
596
597// DeltaType is the type of a change (addition, deletion, etc)
598type DeltaType string
599
600const (
601 Added DeltaType = "Added"
602 Updated DeltaType = "Updated"
603 Deleted DeltaType = "Deleted"
604 // The other types are obvious. You'll get Sync deltas when:
605 // * A watch expires/errors out and a new list/watch cycle is started.
606 // * You've turned on periodic syncs.
607 // (Anything that trigger's DeltaFIFO's Replace() method.)
608 Sync DeltaType = "Sync"
609)
610
611// Delta is the type stored by a DeltaFIFO. It tells you what change
612// happened, and the object's state after* that change.
613//
614// [*] Unless the change is a deletion, and then you'll get the final
615// state of the object before it was deleted.
616type Delta struct {
617 Type DeltaType
618 Object interface{}
619}
620
621// Deltas is a list of one or more 'Delta's to an individual object.
622// The oldest delta is at index 0, the newest delta is the last one.
623type Deltas []Delta
624
625// Oldest is a convenience function that returns the oldest delta, or
626// nil if there are no deltas.
627func (d Deltas) Oldest() *Delta {
628 if len(d) > 0 {
629 return &d[0]
630 }
631 return nil
632}
633
634// Newest is a convenience function that returns the newest delta, or
635// nil if there are no deltas.
636func (d Deltas) Newest() *Delta {
637 if n := len(d); n > 0 {
638 return &d[n-1]
639 }
640 return nil
641}
642
643// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
644// the objects in the slice. This allows Get/List to return an object that we
645// know won't be clobbered by a subsequent modifications.
646func copyDeltas(d Deltas) Deltas {
647 d2 := make(Deltas, len(d))
648 copy(d2, d)
649 return d2
650}
651
652// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
653// an object was deleted but the watch deletion event was missed. In this
654// case we don't know the final "resting" state of the object, so there's
655// a chance the included `Obj` is stale.
656type DeletedFinalStateUnknown struct {
657 Key string
658 Obj interface{}
659}