blob: e05c01ee2960d27a4a5763f99574cc700ab58691 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20 "errors"
21 "sync"
22
23 "k8s.io/apimachinery/pkg/util/sets"
24)
25
26// PopProcessFunc is passed to Pop() method of Queue interface.
27// It is supposed to process the element popped from the queue.
28type PopProcessFunc func(interface{}) error
29
30// ErrRequeue may be returned by a PopProcessFunc to safely requeue
31// the current item. The value of Err will be returned from Pop.
32type ErrRequeue struct {
33 // Err is returned by the Pop function
34 Err error
35}
36
37var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
38
39func (e ErrRequeue) Error() string {
40 if e.Err == nil {
41 return "the popped item should be requeued without returning an error"
42 }
43 return e.Err.Error()
44}
45
46// Queue is exactly like a Store, but has a Pop() method too.
47type Queue interface {
48 Store
49
50 // Pop blocks until it has something to process.
51 // It returns the object that was process and the result of processing.
52 // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
53 // should be requeued before releasing the lock on the queue.
54 Pop(PopProcessFunc) (interface{}, error)
55
56 // AddIfNotPresent adds a value previously
57 // returned by Pop back into the queue as long
58 // as nothing else (presumably more recent)
59 // has since been added.
60 AddIfNotPresent(interface{}) error
61
62 // HasSynced returns true if the first batch of items has been popped
63 HasSynced() bool
64
65 // Close queue
66 Close()
67}
68
69// Helper function for popping from Queue.
70// WARNING: Do NOT use this function in non-test code to avoid races
71// unless you really really really really know what you are doing.
72func Pop(queue Queue) interface{} {
73 var result interface{}
74 queue.Pop(func(obj interface{}) error {
75 result = obj
76 return nil
77 })
78 return result
79}
80
81// FIFO receives adds and updates from a Reflector, and puts them in a queue for
82// FIFO order processing. If multiple adds/updates of a single item happen while
83// an item is in the queue before it has been processed, it will only be
84// processed once, and when it is processed, the most recent version will be
85// processed. This can't be done with a channel.
86//
87// FIFO solves this use case:
88// * You want to process every object (exactly) once.
89// * You want to process the most recent version of the object when you process it.
90// * You do not want to process deleted objects, they should be removed from the queue.
91// * You do not want to periodically reprocess objects.
92// Compare with DeltaFIFO for other use cases.
93type FIFO struct {
94 lock sync.RWMutex
95 cond sync.Cond
96 // We depend on the property that items in the set are in the queue and vice versa.
97 items map[string]interface{}
98 queue []string
99
100 // populated is true if the first batch of items inserted by Replace() has been populated
101 // or Delete/Add/Update was called first.
102 populated bool
103 // initialPopulationCount is the number of items inserted by the first call of Replace()
104 initialPopulationCount int
105
106 // keyFunc is used to make the key used for queued item insertion and retrieval, and
107 // should be deterministic.
108 keyFunc KeyFunc
109
110 // Indication the queue is closed.
111 // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
112 // Currently, not used to gate any of CRED operations.
113 closed bool
114 closedLock sync.Mutex
115}
116
117var (
118 _ = Queue(&FIFO{}) // FIFO is a Queue
119)
120
121// Close the queue.
122func (f *FIFO) Close() {
123 f.closedLock.Lock()
124 defer f.closedLock.Unlock()
125 f.closed = true
126 f.cond.Broadcast()
127}
128
129// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
130// or an Update called first but the first batch of items inserted by Replace() has been popped
131func (f *FIFO) HasSynced() bool {
132 f.lock.Lock()
133 defer f.lock.Unlock()
134 return f.populated && f.initialPopulationCount == 0
135}
136
137// Add inserts an item, and puts it in the queue. The item is only enqueued
138// if it doesn't already exist in the set.
139func (f *FIFO) Add(obj interface{}) error {
140 id, err := f.keyFunc(obj)
141 if err != nil {
142 return KeyError{obj, err}
143 }
144 f.lock.Lock()
145 defer f.lock.Unlock()
146 f.populated = true
147 if _, exists := f.items[id]; !exists {
148 f.queue = append(f.queue, id)
149 }
150 f.items[id] = obj
151 f.cond.Broadcast()
152 return nil
153}
154
155// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
156// present in the set, it is neither enqueued nor added to the set.
157//
158// This is useful in a single producer/consumer scenario so that the consumer can
159// safely retry items without contending with the producer and potentially enqueueing
160// stale items.
161func (f *FIFO) AddIfNotPresent(obj interface{}) error {
162 id, err := f.keyFunc(obj)
163 if err != nil {
164 return KeyError{obj, err}
165 }
166 f.lock.Lock()
167 defer f.lock.Unlock()
168 f.addIfNotPresent(id, obj)
169 return nil
170}
171
172// addIfNotPresent assumes the fifo lock is already held and adds the provided
173// item to the queue under id if it does not already exist.
174func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
175 f.populated = true
176 if _, exists := f.items[id]; exists {
177 return
178 }
179
180 f.queue = append(f.queue, id)
181 f.items[id] = obj
182 f.cond.Broadcast()
183}
184
185// Update is the same as Add in this implementation.
186func (f *FIFO) Update(obj interface{}) error {
187 return f.Add(obj)
188}
189
190// Delete removes an item. It doesn't add it to the queue, because
191// this implementation assumes the consumer only cares about the objects,
192// not the order in which they were created/added.
193func (f *FIFO) Delete(obj interface{}) error {
194 id, err := f.keyFunc(obj)
195 if err != nil {
196 return KeyError{obj, err}
197 }
198 f.lock.Lock()
199 defer f.lock.Unlock()
200 f.populated = true
201 delete(f.items, id)
202 return err
203}
204
205// List returns a list of all the items.
206func (f *FIFO) List() []interface{} {
207 f.lock.RLock()
208 defer f.lock.RUnlock()
209 list := make([]interface{}, 0, len(f.items))
210 for _, item := range f.items {
211 list = append(list, item)
212 }
213 return list
214}
215
216// ListKeys returns a list of all the keys of the objects currently
217// in the FIFO.
218func (f *FIFO) ListKeys() []string {
219 f.lock.RLock()
220 defer f.lock.RUnlock()
221 list := make([]string, 0, len(f.items))
222 for key := range f.items {
223 list = append(list, key)
224 }
225 return list
226}
227
228// Get returns the requested item, or sets exists=false.
229func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
230 key, err := f.keyFunc(obj)
231 if err != nil {
232 return nil, false, KeyError{obj, err}
233 }
234 return f.GetByKey(key)
235}
236
237// GetByKey returns the requested item, or sets exists=false.
238func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
239 f.lock.RLock()
240 defer f.lock.RUnlock()
241 item, exists = f.items[key]
242 return item, exists, nil
243}
244
245// Checks if the queue is closed
246func (f *FIFO) IsClosed() bool {
247 f.closedLock.Lock()
248 defer f.closedLock.Unlock()
249 if f.closed {
250 return true
251 }
252 return false
253}
254
255// Pop waits until an item is ready and processes it. If multiple items are
256// ready, they are returned in the order in which they were added/updated.
257// The item is removed from the queue (and the store) before it is processed,
258// so if you don't successfully process it, it should be added back with
259// AddIfNotPresent(). process function is called under lock, so it is safe
260// update data structures in it that need to be in sync with the queue.
261func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
262 f.lock.Lock()
263 defer f.lock.Unlock()
264 for {
265 for len(f.queue) == 0 {
266 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
267 // When Close() is called, the f.closed is set and the condition is broadcasted.
268 // Which causes this loop to continue and return from the Pop().
269 if f.IsClosed() {
270 return nil, FIFOClosedError
271 }
272
273 f.cond.Wait()
274 }
275 id := f.queue[0]
276 f.queue = f.queue[1:]
277 if f.initialPopulationCount > 0 {
278 f.initialPopulationCount--
279 }
280 item, ok := f.items[id]
281 if !ok {
282 // Item may have been deleted subsequently.
283 continue
284 }
285 delete(f.items, id)
286 err := process(item)
287 if e, ok := err.(ErrRequeue); ok {
288 f.addIfNotPresent(id, item)
289 err = e.Err
290 }
291 return item, err
292 }
293}
294
295// Replace will delete the contents of 'f', using instead the given map.
296// 'f' takes ownership of the map, you should not reference the map again
297// after calling this function. f's queue is reset, too; upon return, it
298// will contain the items in the map, in no particular order.
299func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
300 items := map[string]interface{}{}
301 for _, item := range list {
302 key, err := f.keyFunc(item)
303 if err != nil {
304 return KeyError{item, err}
305 }
306 items[key] = item
307 }
308
309 f.lock.Lock()
310 defer f.lock.Unlock()
311
312 if !f.populated {
313 f.populated = true
314 f.initialPopulationCount = len(items)
315 }
316
317 f.items = items
318 f.queue = f.queue[:0]
319 for id := range items {
320 f.queue = append(f.queue, id)
321 }
322 if len(f.queue) > 0 {
323 f.cond.Broadcast()
324 }
325 return nil
326}
327
328// Resync will touch all objects to put them into the processing queue
329func (f *FIFO) Resync() error {
330 f.lock.Lock()
331 defer f.lock.Unlock()
332
333 inQueue := sets.NewString()
334 for _, id := range f.queue {
335 inQueue.Insert(id)
336 }
337 for id := range f.items {
338 if !inQueue.Has(id) {
339 f.queue = append(f.queue, id)
340 }
341 }
342 if len(f.queue) > 0 {
343 f.cond.Broadcast()
344 }
345 return nil
346}
347
348// NewFIFO returns a Store which can be used to queue up items to
349// process.
350func NewFIFO(keyFunc KeyFunc) *FIFO {
351 f := &FIFO{
352 items: map[string]interface{}{},
353 queue: []string{},
354 keyFunc: keyFunc,
355 }
356 f.cond.L = &f.lock
357 return f
358}