Matthias Andreas Benkard | 832a54e | 2019-01-29 09:27:38 +0100 | [diff] [blame] | 1 | /* |
| 2 | Copyright 2014 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package cache |
| 18 | |
| 19 | import ( |
| 20 | "errors" |
| 21 | "fmt" |
| 22 | "sync" |
| 23 | |
| 24 | "k8s.io/apimachinery/pkg/util/sets" |
| 25 | |
| 26 | "github.com/golang/glog" |
| 27 | ) |
| 28 | |
| 29 | // NewDeltaFIFO returns a Store which can be used process changes to items. |
| 30 | // |
| 31 | // keyFunc is used to figure out what key an object should have. (It's |
| 32 | // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) |
| 33 | // |
| 34 | // 'keyLister' is expected to return a list of keys that the consumer of |
| 35 | // this queue "knows about". It is used to decide which items are missing |
| 36 | // when Replace() is called; 'Deleted' deltas are produced for these items. |
| 37 | // It may be nil if you don't need to detect all deletions. |
| 38 | // TODO: consider merging keyLister with this object, tracking a list of |
| 39 | // "known" keys when Pop() is called. Have to think about how that |
| 40 | // affects error retrying. |
| 41 | // NOTE: It is possible to misuse this and cause a race when using an |
| 42 | // external known object source. |
| 43 | // Whether there is a potential race depends on how the comsumer |
| 44 | // modifies knownObjects. In Pop(), process function is called under |
| 45 | // lock, so it is safe to update data structures in it that need to be |
| 46 | // in sync with the queue (e.g. knownObjects). |
| 47 | // |
| 48 | // Example: |
| 49 | // In case of sharedIndexInformer being a consumer |
| 50 | // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/ |
| 51 | // src/k8s.io/client-go/tools/cache/shared_informer.go#L192), |
| 52 | // there is no race as knownObjects (s.indexer) is modified safely |
| 53 | // under DeltaFIFO's lock. The only exceptions are GetStore() and |
| 54 | // GetIndexer() methods, which expose ways to modify the underlying |
| 55 | // storage. Currently these two methods are used for creating Lister |
| 56 | // and internal tests. |
| 57 | // |
| 58 | // Also see the comment on DeltaFIFO. |
| 59 | func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { |
| 60 | f := &DeltaFIFO{ |
| 61 | items: map[string]Deltas{}, |
| 62 | queue: []string{}, |
| 63 | keyFunc: keyFunc, |
| 64 | knownObjects: knownObjects, |
| 65 | } |
| 66 | f.cond.L = &f.lock |
| 67 | return f |
| 68 | } |
| 69 | |
| 70 | // DeltaFIFO is like FIFO, but allows you to process deletes. |
| 71 | // |
| 72 | // DeltaFIFO is a producer-consumer queue, where a Reflector is |
| 73 | // intended to be the producer, and the consumer is whatever calls |
| 74 | // the Pop() method. |
| 75 | // |
| 76 | // DeltaFIFO solves this use case: |
| 77 | // * You want to process every object change (delta) at most once. |
| 78 | // * When you process an object, you want to see everything |
| 79 | // that's happened to it since you last processed it. |
| 80 | // * You want to process the deletion of objects. |
| 81 | // * You might want to periodically reprocess objects. |
| 82 | // |
| 83 | // DeltaFIFO's Pop(), Get(), and GetByKey() methods return |
| 84 | // interface{} to satisfy the Store/Queue interfaces, but it |
| 85 | // will always return an object of type Deltas. |
| 86 | // |
| 87 | // A note on threading: If you call Pop() in parallel from multiple |
| 88 | // threads, you could end up with multiple threads processing slightly |
| 89 | // different versions of the same object. |
| 90 | // |
| 91 | // A note on the KeyLister used by the DeltaFIFO: It's main purpose is |
| 92 | // to list keys that are "known", for the purpose of figuring out which |
| 93 | // items have been deleted when Replace() or Delete() are called. The deleted |
| 94 | // object will be included in the DeleteFinalStateUnknown markers. These objects |
| 95 | // could be stale. |
| 96 | type DeltaFIFO struct { |
| 97 | // lock/cond protects access to 'items' and 'queue'. |
| 98 | lock sync.RWMutex |
| 99 | cond sync.Cond |
| 100 | |
| 101 | // We depend on the property that items in the set are in |
| 102 | // the queue and vice versa, and that all Deltas in this |
| 103 | // map have at least one Delta. |
| 104 | items map[string]Deltas |
| 105 | queue []string |
| 106 | |
| 107 | // populated is true if the first batch of items inserted by Replace() has been populated |
| 108 | // or Delete/Add/Update was called first. |
| 109 | populated bool |
| 110 | // initialPopulationCount is the number of items inserted by the first call of Replace() |
| 111 | initialPopulationCount int |
| 112 | |
| 113 | // keyFunc is used to make the key used for queued item |
| 114 | // insertion and retrieval, and should be deterministic. |
| 115 | keyFunc KeyFunc |
| 116 | |
| 117 | // knownObjects list keys that are "known", for the |
| 118 | // purpose of figuring out which items have been deleted |
| 119 | // when Replace() or Delete() is called. |
| 120 | knownObjects KeyListerGetter |
| 121 | |
| 122 | // Indication the queue is closed. |
| 123 | // Used to indicate a queue is closed so a control loop can exit when a queue is empty. |
| 124 | // Currently, not used to gate any of CRED operations. |
| 125 | closed bool |
| 126 | closedLock sync.Mutex |
| 127 | } |
| 128 | |
| 129 | var ( |
| 130 | _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue |
| 131 | ) |
| 132 | |
| 133 | var ( |
| 134 | // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas |
| 135 | // object with zero length is encountered (should be impossible, |
| 136 | // but included for completeness). |
| 137 | ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") |
| 138 | ) |
| 139 | |
| 140 | // Close the queue. |
| 141 | func (f *DeltaFIFO) Close() { |
| 142 | f.closedLock.Lock() |
| 143 | defer f.closedLock.Unlock() |
| 144 | f.closed = true |
| 145 | f.cond.Broadcast() |
| 146 | } |
| 147 | |
| 148 | // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or |
| 149 | // DeletedFinalStateUnknown objects. |
| 150 | func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { |
| 151 | if d, ok := obj.(Deltas); ok { |
| 152 | if len(d) == 0 { |
| 153 | return "", KeyError{obj, ErrZeroLengthDeltasObject} |
| 154 | } |
| 155 | obj = d.Newest().Object |
| 156 | } |
| 157 | if d, ok := obj.(DeletedFinalStateUnknown); ok { |
| 158 | return d.Key, nil |
| 159 | } |
| 160 | return f.keyFunc(obj) |
| 161 | } |
| 162 | |
| 163 | // Return true if an Add/Update/Delete/AddIfNotPresent are called first, |
| 164 | // or an Update called first but the first batch of items inserted by Replace() has been popped |
| 165 | func (f *DeltaFIFO) HasSynced() bool { |
| 166 | f.lock.Lock() |
| 167 | defer f.lock.Unlock() |
| 168 | return f.populated && f.initialPopulationCount == 0 |
| 169 | } |
| 170 | |
| 171 | // Add inserts an item, and puts it in the queue. The item is only enqueued |
| 172 | // if it doesn't already exist in the set. |
| 173 | func (f *DeltaFIFO) Add(obj interface{}) error { |
| 174 | f.lock.Lock() |
| 175 | defer f.lock.Unlock() |
| 176 | f.populated = true |
| 177 | return f.queueActionLocked(Added, obj) |
| 178 | } |
| 179 | |
| 180 | // Update is just like Add, but makes an Updated Delta. |
| 181 | func (f *DeltaFIFO) Update(obj interface{}) error { |
| 182 | f.lock.Lock() |
| 183 | defer f.lock.Unlock() |
| 184 | f.populated = true |
| 185 | return f.queueActionLocked(Updated, obj) |
| 186 | } |
| 187 | |
| 188 | // Delete is just like Add, but makes an Deleted Delta. If the item does not |
| 189 | // already exist, it will be ignored. (It may have already been deleted by a |
| 190 | // Replace (re-list), for example. |
| 191 | func (f *DeltaFIFO) Delete(obj interface{}) error { |
| 192 | id, err := f.KeyOf(obj) |
| 193 | if err != nil { |
| 194 | return KeyError{obj, err} |
| 195 | } |
| 196 | f.lock.Lock() |
| 197 | defer f.lock.Unlock() |
| 198 | f.populated = true |
| 199 | if f.knownObjects == nil { |
| 200 | if _, exists := f.items[id]; !exists { |
| 201 | // Presumably, this was deleted when a relist happened. |
| 202 | // Don't provide a second report of the same deletion. |
| 203 | return nil |
| 204 | } |
| 205 | } else { |
| 206 | // We only want to skip the "deletion" action if the object doesn't |
| 207 | // exist in knownObjects and it doesn't have corresponding item in items. |
| 208 | // Note that even if there is a "deletion" action in items, we can ignore it, |
| 209 | // because it will be deduped automatically in "queueActionLocked" |
| 210 | _, exists, err := f.knownObjects.GetByKey(id) |
| 211 | _, itemsExist := f.items[id] |
| 212 | if err == nil && !exists && !itemsExist { |
| 213 | // Presumably, this was deleted when a relist happened. |
| 214 | // Don't provide a second report of the same deletion. |
| 215 | return nil |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | return f.queueActionLocked(Deleted, obj) |
| 220 | } |
| 221 | |
| 222 | // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already |
| 223 | // present in the set, it is neither enqueued nor added to the set. |
| 224 | // |
| 225 | // This is useful in a single producer/consumer scenario so that the consumer can |
| 226 | // safely retry items without contending with the producer and potentially enqueueing |
| 227 | // stale items. |
| 228 | // |
| 229 | // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is |
| 230 | // different from the Add/Update/Delete functions. |
| 231 | func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { |
| 232 | deltas, ok := obj.(Deltas) |
| 233 | if !ok { |
| 234 | return fmt.Errorf("object must be of type deltas, but got: %#v", obj) |
| 235 | } |
| 236 | id, err := f.KeyOf(deltas.Newest().Object) |
| 237 | if err != nil { |
| 238 | return KeyError{obj, err} |
| 239 | } |
| 240 | f.lock.Lock() |
| 241 | defer f.lock.Unlock() |
| 242 | f.addIfNotPresent(id, deltas) |
| 243 | return nil |
| 244 | } |
| 245 | |
| 246 | // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller |
| 247 | // already holds the fifo lock. |
| 248 | func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { |
| 249 | f.populated = true |
| 250 | if _, exists := f.items[id]; exists { |
| 251 | return |
| 252 | } |
| 253 | |
| 254 | f.queue = append(f.queue, id) |
| 255 | f.items[id] = deltas |
| 256 | f.cond.Broadcast() |
| 257 | } |
| 258 | |
| 259 | // re-listing and watching can deliver the same update multiple times in any |
| 260 | // order. This will combine the most recent two deltas if they are the same. |
| 261 | func dedupDeltas(deltas Deltas) Deltas { |
| 262 | n := len(deltas) |
| 263 | if n < 2 { |
| 264 | return deltas |
| 265 | } |
| 266 | a := &deltas[n-1] |
| 267 | b := &deltas[n-2] |
| 268 | if out := isDup(a, b); out != nil { |
| 269 | d := append(Deltas{}, deltas[:n-2]...) |
| 270 | return append(d, *out) |
| 271 | } |
| 272 | return deltas |
| 273 | } |
| 274 | |
| 275 | // If a & b represent the same event, returns the delta that ought to be kept. |
| 276 | // Otherwise, returns nil. |
| 277 | // TODO: is there anything other than deletions that need deduping? |
| 278 | func isDup(a, b *Delta) *Delta { |
| 279 | if out := isDeletionDup(a, b); out != nil { |
| 280 | return out |
| 281 | } |
| 282 | // TODO: Detect other duplicate situations? Are there any? |
| 283 | return nil |
| 284 | } |
| 285 | |
| 286 | // keep the one with the most information if both are deletions. |
| 287 | func isDeletionDup(a, b *Delta) *Delta { |
| 288 | if b.Type != Deleted || a.Type != Deleted { |
| 289 | return nil |
| 290 | } |
| 291 | // Do more sophisticated checks, or is this sufficient? |
| 292 | if _, ok := b.Object.(DeletedFinalStateUnknown); ok { |
| 293 | return a |
| 294 | } |
| 295 | return b |
| 296 | } |
| 297 | |
| 298 | // willObjectBeDeletedLocked returns true only if the last delta for the |
| 299 | // given object is Delete. Caller must lock first. |
| 300 | func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool { |
| 301 | deltas := f.items[id] |
| 302 | return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted |
| 303 | } |
| 304 | |
| 305 | // queueActionLocked appends to the delta list for the object. |
| 306 | // Caller must lock first. |
| 307 | func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { |
| 308 | id, err := f.KeyOf(obj) |
| 309 | if err != nil { |
| 310 | return KeyError{obj, err} |
| 311 | } |
| 312 | |
| 313 | // If object is supposed to be deleted (last event is Deleted), |
| 314 | // then we should ignore Sync events, because it would result in |
| 315 | // recreation of this object. |
| 316 | if actionType == Sync && f.willObjectBeDeletedLocked(id) { |
| 317 | return nil |
| 318 | } |
| 319 | |
| 320 | newDeltas := append(f.items[id], Delta{actionType, obj}) |
| 321 | newDeltas = dedupDeltas(newDeltas) |
| 322 | |
| 323 | _, exists := f.items[id] |
| 324 | if len(newDeltas) > 0 { |
| 325 | if !exists { |
| 326 | f.queue = append(f.queue, id) |
| 327 | } |
| 328 | f.items[id] = newDeltas |
| 329 | f.cond.Broadcast() |
| 330 | } else if exists { |
| 331 | // We need to remove this from our map (extra items |
| 332 | // in the queue are ignored if they are not in the |
| 333 | // map). |
| 334 | delete(f.items, id) |
| 335 | } |
| 336 | return nil |
| 337 | } |
| 338 | |
| 339 | // List returns a list of all the items; it returns the object |
| 340 | // from the most recent Delta. |
| 341 | // You should treat the items returned inside the deltas as immutable. |
| 342 | func (f *DeltaFIFO) List() []interface{} { |
| 343 | f.lock.RLock() |
| 344 | defer f.lock.RUnlock() |
| 345 | return f.listLocked() |
| 346 | } |
| 347 | |
| 348 | func (f *DeltaFIFO) listLocked() []interface{} { |
| 349 | list := make([]interface{}, 0, len(f.items)) |
| 350 | for _, item := range f.items { |
| 351 | // Copy item's slice so operations on this slice |
| 352 | // won't interfere with the object we return. |
| 353 | item = copyDeltas(item) |
| 354 | list = append(list, item.Newest().Object) |
| 355 | } |
| 356 | return list |
| 357 | } |
| 358 | |
| 359 | // ListKeys returns a list of all the keys of the objects currently |
| 360 | // in the FIFO. |
| 361 | func (f *DeltaFIFO) ListKeys() []string { |
| 362 | f.lock.RLock() |
| 363 | defer f.lock.RUnlock() |
| 364 | list := make([]string, 0, len(f.items)) |
| 365 | for key := range f.items { |
| 366 | list = append(list, key) |
| 367 | } |
| 368 | return list |
| 369 | } |
| 370 | |
| 371 | // Get returns the complete list of deltas for the requested item, |
| 372 | // or sets exists=false. |
| 373 | // You should treat the items returned inside the deltas as immutable. |
| 374 | func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { |
| 375 | key, err := f.KeyOf(obj) |
| 376 | if err != nil { |
| 377 | return nil, false, KeyError{obj, err} |
| 378 | } |
| 379 | return f.GetByKey(key) |
| 380 | } |
| 381 | |
| 382 | // GetByKey returns the complete list of deltas for the requested item, |
| 383 | // setting exists=false if that list is empty. |
| 384 | // You should treat the items returned inside the deltas as immutable. |
| 385 | func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { |
| 386 | f.lock.RLock() |
| 387 | defer f.lock.RUnlock() |
| 388 | d, exists := f.items[key] |
| 389 | if exists { |
| 390 | // Copy item's slice so operations on this slice |
| 391 | // won't interfere with the object we return. |
| 392 | d = copyDeltas(d) |
| 393 | } |
| 394 | return d, exists, nil |
| 395 | } |
| 396 | |
| 397 | // Checks if the queue is closed |
| 398 | func (f *DeltaFIFO) IsClosed() bool { |
| 399 | f.closedLock.Lock() |
| 400 | defer f.closedLock.Unlock() |
| 401 | if f.closed { |
| 402 | return true |
| 403 | } |
| 404 | return false |
| 405 | } |
| 406 | |
| 407 | // Pop blocks until an item is added to the queue, and then returns it. If |
| 408 | // multiple items are ready, they are returned in the order in which they were |
| 409 | // added/updated. The item is removed from the queue (and the store) before it |
| 410 | // is returned, so if you don't successfully process it, you need to add it back |
| 411 | // with AddIfNotPresent(). |
| 412 | // process function is called under lock, so it is safe update data structures |
| 413 | // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc |
| 414 | // may return an instance of ErrRequeue with a nested error to indicate the current |
| 415 | // item should be requeued (equivalent to calling AddIfNotPresent under the lock). |
| 416 | // |
| 417 | // Pop returns a 'Deltas', which has a complete list of all the things |
| 418 | // that happened to the object (deltas) while it was sitting in the queue. |
| 419 | func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { |
| 420 | f.lock.Lock() |
| 421 | defer f.lock.Unlock() |
| 422 | for { |
| 423 | for len(f.queue) == 0 { |
| 424 | // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. |
| 425 | // When Close() is called, the f.closed is set and the condition is broadcasted. |
| 426 | // Which causes this loop to continue and return from the Pop(). |
| 427 | if f.IsClosed() { |
| 428 | return nil, FIFOClosedError |
| 429 | } |
| 430 | |
| 431 | f.cond.Wait() |
| 432 | } |
| 433 | id := f.queue[0] |
| 434 | f.queue = f.queue[1:] |
| 435 | item, ok := f.items[id] |
| 436 | if f.initialPopulationCount > 0 { |
| 437 | f.initialPopulationCount-- |
| 438 | } |
| 439 | if !ok { |
| 440 | // Item may have been deleted subsequently. |
| 441 | continue |
| 442 | } |
| 443 | delete(f.items, id) |
| 444 | err := process(item) |
| 445 | if e, ok := err.(ErrRequeue); ok { |
| 446 | f.addIfNotPresent(id, item) |
| 447 | err = e.Err |
| 448 | } |
| 449 | // Don't need to copyDeltas here, because we're transferring |
| 450 | // ownership to the caller. |
| 451 | return item, err |
| 452 | } |
| 453 | } |
| 454 | |
| 455 | // Replace will delete the contents of 'f', using instead the given map. |
| 456 | // 'f' takes ownership of the map, you should not reference the map again |
| 457 | // after calling this function. f's queue is reset, too; upon return, it |
| 458 | // will contain the items in the map, in no particular order. |
| 459 | func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { |
| 460 | f.lock.Lock() |
| 461 | defer f.lock.Unlock() |
| 462 | keys := make(sets.String, len(list)) |
| 463 | |
| 464 | for _, item := range list { |
| 465 | key, err := f.KeyOf(item) |
| 466 | if err != nil { |
| 467 | return KeyError{item, err} |
| 468 | } |
| 469 | keys.Insert(key) |
| 470 | if err := f.queueActionLocked(Sync, item); err != nil { |
| 471 | return fmt.Errorf("couldn't enqueue object: %v", err) |
| 472 | } |
| 473 | } |
| 474 | |
| 475 | if f.knownObjects == nil { |
| 476 | // Do deletion detection against our own list. |
| 477 | for k, oldItem := range f.items { |
| 478 | if keys.Has(k) { |
| 479 | continue |
| 480 | } |
| 481 | var deletedObj interface{} |
| 482 | if n := oldItem.Newest(); n != nil { |
| 483 | deletedObj = n.Object |
| 484 | } |
| 485 | if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { |
| 486 | return err |
| 487 | } |
| 488 | } |
| 489 | |
| 490 | if !f.populated { |
| 491 | f.populated = true |
| 492 | f.initialPopulationCount = len(list) |
| 493 | } |
| 494 | |
| 495 | return nil |
| 496 | } |
| 497 | |
| 498 | // Detect deletions not already in the queue. |
| 499 | knownKeys := f.knownObjects.ListKeys() |
| 500 | queuedDeletions := 0 |
| 501 | for _, k := range knownKeys { |
| 502 | if keys.Has(k) { |
| 503 | continue |
| 504 | } |
| 505 | |
| 506 | deletedObj, exists, err := f.knownObjects.GetByKey(k) |
| 507 | if err != nil { |
| 508 | deletedObj = nil |
| 509 | glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) |
| 510 | } else if !exists { |
| 511 | deletedObj = nil |
| 512 | glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) |
| 513 | } |
| 514 | queuedDeletions++ |
| 515 | if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { |
| 516 | return err |
| 517 | } |
| 518 | } |
| 519 | |
| 520 | if !f.populated { |
| 521 | f.populated = true |
| 522 | f.initialPopulationCount = len(list) + queuedDeletions |
| 523 | } |
| 524 | |
| 525 | return nil |
| 526 | } |
| 527 | |
| 528 | // Resync will send a sync event for each item |
| 529 | func (f *DeltaFIFO) Resync() error { |
| 530 | f.lock.Lock() |
| 531 | defer f.lock.Unlock() |
| 532 | |
| 533 | if f.knownObjects == nil { |
| 534 | return nil |
| 535 | } |
| 536 | |
| 537 | keys := f.knownObjects.ListKeys() |
| 538 | for _, k := range keys { |
| 539 | if err := f.syncKeyLocked(k); err != nil { |
| 540 | return err |
| 541 | } |
| 542 | } |
| 543 | return nil |
| 544 | } |
| 545 | |
| 546 | func (f *DeltaFIFO) syncKey(key string) error { |
| 547 | f.lock.Lock() |
| 548 | defer f.lock.Unlock() |
| 549 | |
| 550 | return f.syncKeyLocked(key) |
| 551 | } |
| 552 | |
| 553 | func (f *DeltaFIFO) syncKeyLocked(key string) error { |
| 554 | obj, exists, err := f.knownObjects.GetByKey(key) |
| 555 | if err != nil { |
| 556 | glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) |
| 557 | return nil |
| 558 | } else if !exists { |
| 559 | glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) |
| 560 | return nil |
| 561 | } |
| 562 | |
| 563 | // If we are doing Resync() and there is already an event queued for that object, |
| 564 | // we ignore the Resync for it. This is to avoid the race, in which the resync |
| 565 | // comes with the previous value of object (since queueing an event for the object |
| 566 | // doesn't trigger changing the underlying store <knownObjects>. |
| 567 | id, err := f.KeyOf(obj) |
| 568 | if err != nil { |
| 569 | return KeyError{obj, err} |
| 570 | } |
| 571 | if len(f.items[id]) > 0 { |
| 572 | return nil |
| 573 | } |
| 574 | |
| 575 | if err := f.queueActionLocked(Sync, obj); err != nil { |
| 576 | return fmt.Errorf("couldn't queue object: %v", err) |
| 577 | } |
| 578 | return nil |
| 579 | } |
| 580 | |
| 581 | // A KeyListerGetter is anything that knows how to list its keys and look up by key. |
| 582 | type KeyListerGetter interface { |
| 583 | KeyLister |
| 584 | KeyGetter |
| 585 | } |
| 586 | |
| 587 | // A KeyLister is anything that knows how to list its keys. |
| 588 | type KeyLister interface { |
| 589 | ListKeys() []string |
| 590 | } |
| 591 | |
| 592 | // A KeyGetter is anything that knows how to get the value stored under a given key. |
| 593 | type KeyGetter interface { |
| 594 | GetByKey(key string) (interface{}, bool, error) |
| 595 | } |
| 596 | |
| 597 | // DeltaType is the type of a change (addition, deletion, etc) |
| 598 | type DeltaType string |
| 599 | |
| 600 | const ( |
| 601 | Added DeltaType = "Added" |
| 602 | Updated DeltaType = "Updated" |
| 603 | Deleted DeltaType = "Deleted" |
| 604 | // The other types are obvious. You'll get Sync deltas when: |
| 605 | // * A watch expires/errors out and a new list/watch cycle is started. |
| 606 | // * You've turned on periodic syncs. |
| 607 | // (Anything that trigger's DeltaFIFO's Replace() method.) |
| 608 | Sync DeltaType = "Sync" |
| 609 | ) |
| 610 | |
| 611 | // Delta is the type stored by a DeltaFIFO. It tells you what change |
| 612 | // happened, and the object's state after* that change. |
| 613 | // |
| 614 | // [*] Unless the change is a deletion, and then you'll get the final |
| 615 | // state of the object before it was deleted. |
| 616 | type Delta struct { |
| 617 | Type DeltaType |
| 618 | Object interface{} |
| 619 | } |
| 620 | |
| 621 | // Deltas is a list of one or more 'Delta's to an individual object. |
| 622 | // The oldest delta is at index 0, the newest delta is the last one. |
| 623 | type Deltas []Delta |
| 624 | |
| 625 | // Oldest is a convenience function that returns the oldest delta, or |
| 626 | // nil if there are no deltas. |
| 627 | func (d Deltas) Oldest() *Delta { |
| 628 | if len(d) > 0 { |
| 629 | return &d[0] |
| 630 | } |
| 631 | return nil |
| 632 | } |
| 633 | |
| 634 | // Newest is a convenience function that returns the newest delta, or |
| 635 | // nil if there are no deltas. |
| 636 | func (d Deltas) Newest() *Delta { |
| 637 | if n := len(d); n > 0 { |
| 638 | return &d[n-1] |
| 639 | } |
| 640 | return nil |
| 641 | } |
| 642 | |
| 643 | // copyDeltas returns a shallow copy of d; that is, it copies the slice but not |
| 644 | // the objects in the slice. This allows Get/List to return an object that we |
| 645 | // know won't be clobbered by a subsequent modifications. |
| 646 | func copyDeltas(d Deltas) Deltas { |
| 647 | d2 := make(Deltas, len(d)) |
| 648 | copy(d2, d) |
| 649 | return d2 |
| 650 | } |
| 651 | |
| 652 | // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where |
| 653 | // an object was deleted but the watch deletion event was missed. In this |
| 654 | // case we don't know the final "resting" state of the object, so there's |
| 655 | // a chance the included `Obj` is stale. |
| 656 | type DeletedFinalStateUnknown struct { |
| 657 | Key string |
| 658 | Obj interface{} |
| 659 | } |