Matthias Andreas Benkard | 832a54e | 2019-01-29 09:27:38 +0100 | [diff] [blame^] | 1 | /* |
| 2 | Copyright 2014 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package cache |
| 18 | |
| 19 | import ( |
| 20 | "errors" |
| 21 | "sync" |
| 22 | |
| 23 | "k8s.io/apimachinery/pkg/util/sets" |
| 24 | ) |
| 25 | |
| 26 | // PopProcessFunc is passed to Pop() method of Queue interface. |
| 27 | // It is supposed to process the element popped from the queue. |
| 28 | type PopProcessFunc func(interface{}) error |
| 29 | |
| 30 | // ErrRequeue may be returned by a PopProcessFunc to safely requeue |
| 31 | // the current item. The value of Err will be returned from Pop. |
| 32 | type ErrRequeue struct { |
| 33 | // Err is returned by the Pop function |
| 34 | Err error |
| 35 | } |
| 36 | |
| 37 | var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue") |
| 38 | |
| 39 | func (e ErrRequeue) Error() string { |
| 40 | if e.Err == nil { |
| 41 | return "the popped item should be requeued without returning an error" |
| 42 | } |
| 43 | return e.Err.Error() |
| 44 | } |
| 45 | |
| 46 | // Queue is exactly like a Store, but has a Pop() method too. |
| 47 | type Queue interface { |
| 48 | Store |
| 49 | |
| 50 | // Pop blocks until it has something to process. |
| 51 | // It returns the object that was process and the result of processing. |
| 52 | // The PopProcessFunc may return an ErrRequeue{...} to indicate the item |
| 53 | // should be requeued before releasing the lock on the queue. |
| 54 | Pop(PopProcessFunc) (interface{}, error) |
| 55 | |
| 56 | // AddIfNotPresent adds a value previously |
| 57 | // returned by Pop back into the queue as long |
| 58 | // as nothing else (presumably more recent) |
| 59 | // has since been added. |
| 60 | AddIfNotPresent(interface{}) error |
| 61 | |
| 62 | // HasSynced returns true if the first batch of items has been popped |
| 63 | HasSynced() bool |
| 64 | |
| 65 | // Close queue |
| 66 | Close() |
| 67 | } |
| 68 | |
| 69 | // Helper function for popping from Queue. |
| 70 | // WARNING: Do NOT use this function in non-test code to avoid races |
| 71 | // unless you really really really really know what you are doing. |
| 72 | func Pop(queue Queue) interface{} { |
| 73 | var result interface{} |
| 74 | queue.Pop(func(obj interface{}) error { |
| 75 | result = obj |
| 76 | return nil |
| 77 | }) |
| 78 | return result |
| 79 | } |
| 80 | |
| 81 | // FIFO receives adds and updates from a Reflector, and puts them in a queue for |
| 82 | // FIFO order processing. If multiple adds/updates of a single item happen while |
| 83 | // an item is in the queue before it has been processed, it will only be |
| 84 | // processed once, and when it is processed, the most recent version will be |
| 85 | // processed. This can't be done with a channel. |
| 86 | // |
| 87 | // FIFO solves this use case: |
| 88 | // * You want to process every object (exactly) once. |
| 89 | // * You want to process the most recent version of the object when you process it. |
| 90 | // * You do not want to process deleted objects, they should be removed from the queue. |
| 91 | // * You do not want to periodically reprocess objects. |
| 92 | // Compare with DeltaFIFO for other use cases. |
| 93 | type FIFO struct { |
| 94 | lock sync.RWMutex |
| 95 | cond sync.Cond |
| 96 | // We depend on the property that items in the set are in the queue and vice versa. |
| 97 | items map[string]interface{} |
| 98 | queue []string |
| 99 | |
| 100 | // populated is true if the first batch of items inserted by Replace() has been populated |
| 101 | // or Delete/Add/Update was called first. |
| 102 | populated bool |
| 103 | // initialPopulationCount is the number of items inserted by the first call of Replace() |
| 104 | initialPopulationCount int |
| 105 | |
| 106 | // keyFunc is used to make the key used for queued item insertion and retrieval, and |
| 107 | // should be deterministic. |
| 108 | keyFunc KeyFunc |
| 109 | |
| 110 | // Indication the queue is closed. |
| 111 | // Used to indicate a queue is closed so a control loop can exit when a queue is empty. |
| 112 | // Currently, not used to gate any of CRED operations. |
| 113 | closed bool |
| 114 | closedLock sync.Mutex |
| 115 | } |
| 116 | |
| 117 | var ( |
| 118 | _ = Queue(&FIFO{}) // FIFO is a Queue |
| 119 | ) |
| 120 | |
| 121 | // Close the queue. |
| 122 | func (f *FIFO) Close() { |
| 123 | f.closedLock.Lock() |
| 124 | defer f.closedLock.Unlock() |
| 125 | f.closed = true |
| 126 | f.cond.Broadcast() |
| 127 | } |
| 128 | |
| 129 | // Return true if an Add/Update/Delete/AddIfNotPresent are called first, |
| 130 | // or an Update called first but the first batch of items inserted by Replace() has been popped |
| 131 | func (f *FIFO) HasSynced() bool { |
| 132 | f.lock.Lock() |
| 133 | defer f.lock.Unlock() |
| 134 | return f.populated && f.initialPopulationCount == 0 |
| 135 | } |
| 136 | |
| 137 | // Add inserts an item, and puts it in the queue. The item is only enqueued |
| 138 | // if it doesn't already exist in the set. |
| 139 | func (f *FIFO) Add(obj interface{}) error { |
| 140 | id, err := f.keyFunc(obj) |
| 141 | if err != nil { |
| 142 | return KeyError{obj, err} |
| 143 | } |
| 144 | f.lock.Lock() |
| 145 | defer f.lock.Unlock() |
| 146 | f.populated = true |
| 147 | if _, exists := f.items[id]; !exists { |
| 148 | f.queue = append(f.queue, id) |
| 149 | } |
| 150 | f.items[id] = obj |
| 151 | f.cond.Broadcast() |
| 152 | return nil |
| 153 | } |
| 154 | |
| 155 | // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already |
| 156 | // present in the set, it is neither enqueued nor added to the set. |
| 157 | // |
| 158 | // This is useful in a single producer/consumer scenario so that the consumer can |
| 159 | // safely retry items without contending with the producer and potentially enqueueing |
| 160 | // stale items. |
| 161 | func (f *FIFO) AddIfNotPresent(obj interface{}) error { |
| 162 | id, err := f.keyFunc(obj) |
| 163 | if err != nil { |
| 164 | return KeyError{obj, err} |
| 165 | } |
| 166 | f.lock.Lock() |
| 167 | defer f.lock.Unlock() |
| 168 | f.addIfNotPresent(id, obj) |
| 169 | return nil |
| 170 | } |
| 171 | |
| 172 | // addIfNotPresent assumes the fifo lock is already held and adds the provided |
| 173 | // item to the queue under id if it does not already exist. |
| 174 | func (f *FIFO) addIfNotPresent(id string, obj interface{}) { |
| 175 | f.populated = true |
| 176 | if _, exists := f.items[id]; exists { |
| 177 | return |
| 178 | } |
| 179 | |
| 180 | f.queue = append(f.queue, id) |
| 181 | f.items[id] = obj |
| 182 | f.cond.Broadcast() |
| 183 | } |
| 184 | |
| 185 | // Update is the same as Add in this implementation. |
| 186 | func (f *FIFO) Update(obj interface{}) error { |
| 187 | return f.Add(obj) |
| 188 | } |
| 189 | |
| 190 | // Delete removes an item. It doesn't add it to the queue, because |
| 191 | // this implementation assumes the consumer only cares about the objects, |
| 192 | // not the order in which they were created/added. |
| 193 | func (f *FIFO) Delete(obj interface{}) error { |
| 194 | id, err := f.keyFunc(obj) |
| 195 | if err != nil { |
| 196 | return KeyError{obj, err} |
| 197 | } |
| 198 | f.lock.Lock() |
| 199 | defer f.lock.Unlock() |
| 200 | f.populated = true |
| 201 | delete(f.items, id) |
| 202 | return err |
| 203 | } |
| 204 | |
| 205 | // List returns a list of all the items. |
| 206 | func (f *FIFO) List() []interface{} { |
| 207 | f.lock.RLock() |
| 208 | defer f.lock.RUnlock() |
| 209 | list := make([]interface{}, 0, len(f.items)) |
| 210 | for _, item := range f.items { |
| 211 | list = append(list, item) |
| 212 | } |
| 213 | return list |
| 214 | } |
| 215 | |
| 216 | // ListKeys returns a list of all the keys of the objects currently |
| 217 | // in the FIFO. |
| 218 | func (f *FIFO) ListKeys() []string { |
| 219 | f.lock.RLock() |
| 220 | defer f.lock.RUnlock() |
| 221 | list := make([]string, 0, len(f.items)) |
| 222 | for key := range f.items { |
| 223 | list = append(list, key) |
| 224 | } |
| 225 | return list |
| 226 | } |
| 227 | |
| 228 | // Get returns the requested item, or sets exists=false. |
| 229 | func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { |
| 230 | key, err := f.keyFunc(obj) |
| 231 | if err != nil { |
| 232 | return nil, false, KeyError{obj, err} |
| 233 | } |
| 234 | return f.GetByKey(key) |
| 235 | } |
| 236 | |
| 237 | // GetByKey returns the requested item, or sets exists=false. |
| 238 | func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { |
| 239 | f.lock.RLock() |
| 240 | defer f.lock.RUnlock() |
| 241 | item, exists = f.items[key] |
| 242 | return item, exists, nil |
| 243 | } |
| 244 | |
| 245 | // Checks if the queue is closed |
| 246 | func (f *FIFO) IsClosed() bool { |
| 247 | f.closedLock.Lock() |
| 248 | defer f.closedLock.Unlock() |
| 249 | if f.closed { |
| 250 | return true |
| 251 | } |
| 252 | return false |
| 253 | } |
| 254 | |
| 255 | // Pop waits until an item is ready and processes it. If multiple items are |
| 256 | // ready, they are returned in the order in which they were added/updated. |
| 257 | // The item is removed from the queue (and the store) before it is processed, |
| 258 | // so if you don't successfully process it, it should be added back with |
| 259 | // AddIfNotPresent(). process function is called under lock, so it is safe |
| 260 | // update data structures in it that need to be in sync with the queue. |
| 261 | func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { |
| 262 | f.lock.Lock() |
| 263 | defer f.lock.Unlock() |
| 264 | for { |
| 265 | for len(f.queue) == 0 { |
| 266 | // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. |
| 267 | // When Close() is called, the f.closed is set and the condition is broadcasted. |
| 268 | // Which causes this loop to continue and return from the Pop(). |
| 269 | if f.IsClosed() { |
| 270 | return nil, FIFOClosedError |
| 271 | } |
| 272 | |
| 273 | f.cond.Wait() |
| 274 | } |
| 275 | id := f.queue[0] |
| 276 | f.queue = f.queue[1:] |
| 277 | if f.initialPopulationCount > 0 { |
| 278 | f.initialPopulationCount-- |
| 279 | } |
| 280 | item, ok := f.items[id] |
| 281 | if !ok { |
| 282 | // Item may have been deleted subsequently. |
| 283 | continue |
| 284 | } |
| 285 | delete(f.items, id) |
| 286 | err := process(item) |
| 287 | if e, ok := err.(ErrRequeue); ok { |
| 288 | f.addIfNotPresent(id, item) |
| 289 | err = e.Err |
| 290 | } |
| 291 | return item, err |
| 292 | } |
| 293 | } |
| 294 | |
| 295 | // Replace will delete the contents of 'f', using instead the given map. |
| 296 | // 'f' takes ownership of the map, you should not reference the map again |
| 297 | // after calling this function. f's queue is reset, too; upon return, it |
| 298 | // will contain the items in the map, in no particular order. |
| 299 | func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { |
| 300 | items := map[string]interface{}{} |
| 301 | for _, item := range list { |
| 302 | key, err := f.keyFunc(item) |
| 303 | if err != nil { |
| 304 | return KeyError{item, err} |
| 305 | } |
| 306 | items[key] = item |
| 307 | } |
| 308 | |
| 309 | f.lock.Lock() |
| 310 | defer f.lock.Unlock() |
| 311 | |
| 312 | if !f.populated { |
| 313 | f.populated = true |
| 314 | f.initialPopulationCount = len(items) |
| 315 | } |
| 316 | |
| 317 | f.items = items |
| 318 | f.queue = f.queue[:0] |
| 319 | for id := range items { |
| 320 | f.queue = append(f.queue, id) |
| 321 | } |
| 322 | if len(f.queue) > 0 { |
| 323 | f.cond.Broadcast() |
| 324 | } |
| 325 | return nil |
| 326 | } |
| 327 | |
| 328 | // Resync will touch all objects to put them into the processing queue |
| 329 | func (f *FIFO) Resync() error { |
| 330 | f.lock.Lock() |
| 331 | defer f.lock.Unlock() |
| 332 | |
| 333 | inQueue := sets.NewString() |
| 334 | for _, id := range f.queue { |
| 335 | inQueue.Insert(id) |
| 336 | } |
| 337 | for id := range f.items { |
| 338 | if !inQueue.Has(id) { |
| 339 | f.queue = append(f.queue, id) |
| 340 | } |
| 341 | } |
| 342 | if len(f.queue) > 0 { |
| 343 | f.cond.Broadcast() |
| 344 | } |
| 345 | return nil |
| 346 | } |
| 347 | |
| 348 | // NewFIFO returns a Store which can be used to queue up items to |
| 349 | // process. |
| 350 | func NewFIFO(keyFunc KeyFunc) *FIFO { |
| 351 | f := &FIFO{ |
| 352 | items: map[string]interface{}{}, |
| 353 | queue: []string{}, |
| 354 | keyFunc: keyFunc, |
| 355 | } |
| 356 | f.cond.L = &f.lock |
| 357 | return f |
| 358 | } |