| /* |
| Copyright 2014 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package cache |
| |
| import ( |
| "errors" |
| "sync" |
| |
| "k8s.io/apimachinery/pkg/util/sets" |
| ) |
| |
| // PopProcessFunc is passed to Pop() method of Queue interface. |
| // It is supposed to process the element popped from the queue. |
| type PopProcessFunc func(interface{}) error |
| |
| // ErrRequeue may be returned by a PopProcessFunc to safely requeue |
| // the current item. The value of Err will be returned from Pop. |
| type ErrRequeue struct { |
| // Err is returned by the Pop function |
| Err error |
| } |
| |
| var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue") |
| |
| func (e ErrRequeue) Error() string { |
| if e.Err == nil { |
| return "the popped item should be requeued without returning an error" |
| } |
| return e.Err.Error() |
| } |
| |
| // Queue is exactly like a Store, but has a Pop() method too. |
| type Queue interface { |
| Store |
| |
| // Pop blocks until it has something to process. |
| // It returns the object that was process and the result of processing. |
| // The PopProcessFunc may return an ErrRequeue{...} to indicate the item |
| // should be requeued before releasing the lock on the queue. |
| Pop(PopProcessFunc) (interface{}, error) |
| |
| // AddIfNotPresent adds a value previously |
| // returned by Pop back into the queue as long |
| // as nothing else (presumably more recent) |
| // has since been added. |
| AddIfNotPresent(interface{}) error |
| |
| // HasSynced returns true if the first batch of items has been popped |
| HasSynced() bool |
| |
| // Close queue |
| Close() |
| } |
| |
| // Helper function for popping from Queue. |
| // WARNING: Do NOT use this function in non-test code to avoid races |
| // unless you really really really really know what you are doing. |
| func Pop(queue Queue) interface{} { |
| var result interface{} |
| queue.Pop(func(obj interface{}) error { |
| result = obj |
| return nil |
| }) |
| return result |
| } |
| |
| // FIFO receives adds and updates from a Reflector, and puts them in a queue for |
| // FIFO order processing. If multiple adds/updates of a single item happen while |
| // an item is in the queue before it has been processed, it will only be |
| // processed once, and when it is processed, the most recent version will be |
| // processed. This can't be done with a channel. |
| // |
| // FIFO solves this use case: |
| // * You want to process every object (exactly) once. |
| // * You want to process the most recent version of the object when you process it. |
| // * You do not want to process deleted objects, they should be removed from the queue. |
| // * You do not want to periodically reprocess objects. |
| // Compare with DeltaFIFO for other use cases. |
| type FIFO struct { |
| lock sync.RWMutex |
| cond sync.Cond |
| // We depend on the property that items in the set are in the queue and vice versa. |
| items map[string]interface{} |
| queue []string |
| |
| // populated is true if the first batch of items inserted by Replace() has been populated |
| // or Delete/Add/Update was called first. |
| populated bool |
| // initialPopulationCount is the number of items inserted by the first call of Replace() |
| initialPopulationCount int |
| |
| // keyFunc is used to make the key used for queued item insertion and retrieval, and |
| // should be deterministic. |
| keyFunc KeyFunc |
| |
| // Indication the queue is closed. |
| // Used to indicate a queue is closed so a control loop can exit when a queue is empty. |
| // Currently, not used to gate any of CRED operations. |
| closed bool |
| closedLock sync.Mutex |
| } |
| |
| var ( |
| _ = Queue(&FIFO{}) // FIFO is a Queue |
| ) |
| |
| // Close the queue. |
| func (f *FIFO) Close() { |
| f.closedLock.Lock() |
| defer f.closedLock.Unlock() |
| f.closed = true |
| f.cond.Broadcast() |
| } |
| |
| // Return true if an Add/Update/Delete/AddIfNotPresent are called first, |
| // or an Update called first but the first batch of items inserted by Replace() has been popped |
| func (f *FIFO) HasSynced() bool { |
| f.lock.Lock() |
| defer f.lock.Unlock() |
| return f.populated && f.initialPopulationCount == 0 |
| } |
| |
| // Add inserts an item, and puts it in the queue. The item is only enqueued |
| // if it doesn't already exist in the set. |
| func (f *FIFO) Add(obj interface{}) error { |
| id, err := f.keyFunc(obj) |
| if err != nil { |
| return KeyError{obj, err} |
| } |
| f.lock.Lock() |
| defer f.lock.Unlock() |
| f.populated = true |
| if _, exists := f.items[id]; !exists { |
| f.queue = append(f.queue, id) |
| } |
| f.items[id] = obj |
| f.cond.Broadcast() |
| return nil |
| } |
| |
| // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already |
| // present in the set, it is neither enqueued nor added to the set. |
| // |
| // This is useful in a single producer/consumer scenario so that the consumer can |
| // safely retry items without contending with the producer and potentially enqueueing |
| // stale items. |
| func (f *FIFO) AddIfNotPresent(obj interface{}) error { |
| id, err := f.keyFunc(obj) |
| if err != nil { |
| return KeyError{obj, err} |
| } |
| f.lock.Lock() |
| defer f.lock.Unlock() |
| f.addIfNotPresent(id, obj) |
| return nil |
| } |
| |
| // addIfNotPresent assumes the fifo lock is already held and adds the provided |
| // item to the queue under id if it does not already exist. |
| func (f *FIFO) addIfNotPresent(id string, obj interface{}) { |
| f.populated = true |
| if _, exists := f.items[id]; exists { |
| return |
| } |
| |
| f.queue = append(f.queue, id) |
| f.items[id] = obj |
| f.cond.Broadcast() |
| } |
| |
| // Update is the same as Add in this implementation. |
| func (f *FIFO) Update(obj interface{}) error { |
| return f.Add(obj) |
| } |
| |
| // Delete removes an item. It doesn't add it to the queue, because |
| // this implementation assumes the consumer only cares about the objects, |
| // not the order in which they were created/added. |
| func (f *FIFO) Delete(obj interface{}) error { |
| id, err := f.keyFunc(obj) |
| if err != nil { |
| return KeyError{obj, err} |
| } |
| f.lock.Lock() |
| defer f.lock.Unlock() |
| f.populated = true |
| delete(f.items, id) |
| return err |
| } |
| |
| // List returns a list of all the items. |
| func (f *FIFO) List() []interface{} { |
| f.lock.RLock() |
| defer f.lock.RUnlock() |
| list := make([]interface{}, 0, len(f.items)) |
| for _, item := range f.items { |
| list = append(list, item) |
| } |
| return list |
| } |
| |
| // ListKeys returns a list of all the keys of the objects currently |
| // in the FIFO. |
| func (f *FIFO) ListKeys() []string { |
| f.lock.RLock() |
| defer f.lock.RUnlock() |
| list := make([]string, 0, len(f.items)) |
| for key := range f.items { |
| list = append(list, key) |
| } |
| return list |
| } |
| |
| // Get returns the requested item, or sets exists=false. |
| func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { |
| key, err := f.keyFunc(obj) |
| if err != nil { |
| return nil, false, KeyError{obj, err} |
| } |
| return f.GetByKey(key) |
| } |
| |
| // GetByKey returns the requested item, or sets exists=false. |
| func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { |
| f.lock.RLock() |
| defer f.lock.RUnlock() |
| item, exists = f.items[key] |
| return item, exists, nil |
| } |
| |
| // Checks if the queue is closed |
| func (f *FIFO) IsClosed() bool { |
| f.closedLock.Lock() |
| defer f.closedLock.Unlock() |
| if f.closed { |
| return true |
| } |
| return false |
| } |
| |
| // Pop waits until an item is ready and processes it. If multiple items are |
| // ready, they are returned in the order in which they were added/updated. |
| // The item is removed from the queue (and the store) before it is processed, |
| // so if you don't successfully process it, it should be added back with |
| // AddIfNotPresent(). process function is called under lock, so it is safe |
| // update data structures in it that need to be in sync with the queue. |
| func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { |
| f.lock.Lock() |
| defer f.lock.Unlock() |
| for { |
| for len(f.queue) == 0 { |
| // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. |
| // When Close() is called, the f.closed is set and the condition is broadcasted. |
| // Which causes this loop to continue and return from the Pop(). |
| if f.IsClosed() { |
| return nil, FIFOClosedError |
| } |
| |
| f.cond.Wait() |
| } |
| id := f.queue[0] |
| f.queue = f.queue[1:] |
| if f.initialPopulationCount > 0 { |
| f.initialPopulationCount-- |
| } |
| item, ok := f.items[id] |
| if !ok { |
| // Item may have been deleted subsequently. |
| continue |
| } |
| delete(f.items, id) |
| err := process(item) |
| if e, ok := err.(ErrRequeue); ok { |
| f.addIfNotPresent(id, item) |
| err = e.Err |
| } |
| return item, err |
| } |
| } |
| |
| // Replace will delete the contents of 'f', using instead the given map. |
| // 'f' takes ownership of the map, you should not reference the map again |
| // after calling this function. f's queue is reset, too; upon return, it |
| // will contain the items in the map, in no particular order. |
| func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { |
| items := map[string]interface{}{} |
| for _, item := range list { |
| key, err := f.keyFunc(item) |
| if err != nil { |
| return KeyError{item, err} |
| } |
| items[key] = item |
| } |
| |
| f.lock.Lock() |
| defer f.lock.Unlock() |
| |
| if !f.populated { |
| f.populated = true |
| f.initialPopulationCount = len(items) |
| } |
| |
| f.items = items |
| f.queue = f.queue[:0] |
| for id := range items { |
| f.queue = append(f.queue, id) |
| } |
| if len(f.queue) > 0 { |
| f.cond.Broadcast() |
| } |
| return nil |
| } |
| |
| // Resync will touch all objects to put them into the processing queue |
| func (f *FIFO) Resync() error { |
| f.lock.Lock() |
| defer f.lock.Unlock() |
| |
| inQueue := sets.NewString() |
| for _, id := range f.queue { |
| inQueue.Insert(id) |
| } |
| for id := range f.items { |
| if !inQueue.Has(id) { |
| f.queue = append(f.queue, id) |
| } |
| } |
| if len(f.queue) > 0 { |
| f.cond.Broadcast() |
| } |
| return nil |
| } |
| |
| // NewFIFO returns a Store which can be used to queue up items to |
| // process. |
| func NewFIFO(keyFunc KeyFunc) *FIFO { |
| f := &FIFO{ |
| items: map[string]interface{}{}, |
| queue: []string{}, |
| keyFunc: keyFunc, |
| } |
| f.cond.L = &f.lock |
| return f |
| } |