blob: 373e74eeda5276a95f93a19b7170e6dc02922013 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package storage
18
19import (
20 "fmt"
21 "sort"
22 "strconv"
23 "sync"
24 "time"
25
26 "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/api/meta"
28 "k8s.io/apimachinery/pkg/fields"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/runtime"
31 "k8s.io/apimachinery/pkg/util/clock"
32 "k8s.io/apimachinery/pkg/watch"
33 utiltrace "k8s.io/apiserver/pkg/util/trace"
34 "k8s.io/client-go/tools/cache"
35)
36
37const (
38 // blockTimeout determines how long we're willing to block the request
39 // to wait for a given resource version to be propagated to cache,
40 // before terminating request and returning Timeout error with retry
41 // after suggestion.
42 blockTimeout = 3 * time.Second
43)
44
45// watchCacheEvent is a single "watch event" that is send to users of
46// watchCache. Additionally to a typical "watch.Event" it contains
47// the previous value of the object to enable proper filtering in the
48// upper layers.
49type watchCacheEvent struct {
50 Type watch.EventType
51 Object runtime.Object
52 ObjLabels labels.Set
53 ObjFields fields.Set
54 ObjUninitialized bool
55 PrevObject runtime.Object
56 PrevObjLabels labels.Set
57 PrevObjFields fields.Set
58 PrevObjUninitialized bool
59 Key string
60 ResourceVersion uint64
61}
62
63// Computing a key of an object is generally non-trivial (it performs
64// e.g. validation underneath). Similarly computing object fields and
65// labels. To avoid computing them multiple times (to serve the event
66// in different List/Watch requests), in the underlying store we are
67// keeping structs (key, object, labels, fields, uninitialized).
68type storeElement struct {
69 Key string
70 Object runtime.Object
71 Labels labels.Set
72 Fields fields.Set
73 Uninitialized bool
74}
75
76func storeElementKey(obj interface{}) (string, error) {
77 elem, ok := obj.(*storeElement)
78 if !ok {
79 return "", fmt.Errorf("not a storeElement: %v", obj)
80 }
81 return elem.Key, nil
82}
83
84// watchCacheElement is a single "watch event" stored in a cache.
85// It contains the resource version of the object and the object
86// itself.
87type watchCacheElement struct {
88 resourceVersion uint64
89 watchCacheEvent *watchCacheEvent
90}
91
92// watchCache implements a Store interface.
93// However, it depends on the elements implementing runtime.Object interface.
94//
95// watchCache is a "sliding window" (with a limited capacity) of objects
96// observed from a watch.
97type watchCache struct {
98 sync.RWMutex
99
100 // Condition on which lists are waiting for the fresh enough
101 // resource version.
102 cond *sync.Cond
103
104 // Maximum size of history window.
105 capacity int
106
107 // keyFunc is used to get a key in the underlying storage for a given object.
108 keyFunc func(runtime.Object) (string, error)
109
110 // getAttrsFunc is used to get labels and fields of an object.
111 getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)
112
113 // cache is used a cyclic buffer - its first element (with the smallest
114 // resourceVersion) is defined by startIndex, its last element is defined
115 // by endIndex (if cache is full it will be startIndex + capacity).
116 // Both startIndex and endIndex can be greater than buffer capacity -
117 // you should always apply modulo capacity to get an index in cache array.
118 cache []watchCacheElement
119 startIndex int
120 endIndex int
121
122 // store will effectively support LIST operation from the "end of cache
123 // history" i.e. from the moment just after the newest cached watched event.
124 // It is necessary to effectively allow clients to start watching at now.
125 // NOTE: We assume that <store> is thread-safe.
126 store cache.Store
127
128 // ResourceVersion up to which the watchCache is propagated.
129 resourceVersion uint64
130
131 // This handler is run at the end of every successful Replace() method.
132 onReplace func()
133
134 // This handler is run at the end of every Add/Update/Delete method
135 // and additionally gets the previous value of the object.
136 onEvent func(*watchCacheEvent)
137
138 // for testing timeouts.
139 clock clock.Clock
140}
141
142func newWatchCache(
143 capacity int,
144 keyFunc func(runtime.Object) (string, error),
145 getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache {
146 wc := &watchCache{
147 capacity: capacity,
148 keyFunc: keyFunc,
149 getAttrsFunc: getAttrsFunc,
150 cache: make([]watchCacheElement, capacity),
151 startIndex: 0,
152 endIndex: 0,
153 store: cache.NewStore(storeElementKey),
154 resourceVersion: 0,
155 clock: clock.RealClock{},
156 }
157 wc.cond = sync.NewCond(wc.RLocker())
158 return wc
159}
160
161// Add takes runtime.Object as an argument.
162func (w *watchCache) Add(obj interface{}) error {
163 object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
164 if err != nil {
165 return err
166 }
167 event := watch.Event{Type: watch.Added, Object: object}
168
169 f := func(elem *storeElement) error { return w.store.Add(elem) }
170 return w.processEvent(event, resourceVersion, f)
171}
172
173// Update takes runtime.Object as an argument.
174func (w *watchCache) Update(obj interface{}) error {
175 object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
176 if err != nil {
177 return err
178 }
179 event := watch.Event{Type: watch.Modified, Object: object}
180
181 f := func(elem *storeElement) error { return w.store.Update(elem) }
182 return w.processEvent(event, resourceVersion, f)
183}
184
185// Delete takes runtime.Object as an argument.
186func (w *watchCache) Delete(obj interface{}) error {
187 object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
188 if err != nil {
189 return err
190 }
191 event := watch.Event{Type: watch.Deleted, Object: object}
192
193 f := func(elem *storeElement) error { return w.store.Delete(elem) }
194 return w.processEvent(event, resourceVersion, f)
195}
196
197func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
198 object, ok := obj.(runtime.Object)
199 if !ok {
200 return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
201 }
202 meta, err := meta.Accessor(object)
203 if err != nil {
204 return nil, 0, err
205 }
206 resourceVersion, err := parseResourceVersion(meta.GetResourceVersion())
207 if err != nil {
208 return nil, 0, err
209 }
210 return object, resourceVersion, nil
211}
212
213func parseResourceVersion(resourceVersion string) (uint64, error) {
214 if resourceVersion == "" {
215 return 0, nil
216 }
217 // Use bitsize being the size of int on the machine.
218 return strconv.ParseUint(resourceVersion, 10, 0)
219}
220
221func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
222 key, err := w.keyFunc(event.Object)
223 if err != nil {
224 return fmt.Errorf("couldn't compute key: %v", err)
225 }
226 elem := &storeElement{Key: key, Object: event.Object}
227 elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
228 if err != nil {
229 return err
230 }
231
232 watchCacheEvent := &watchCacheEvent{
233 Type: event.Type,
234 Object: elem.Object,
235 ObjLabels: elem.Labels,
236 ObjFields: elem.Fields,
237 ObjUninitialized: elem.Uninitialized,
238 Key: key,
239 ResourceVersion: resourceVersion,
240 }
241
242 // TODO: We should consider moving this lock below after the watchCacheEvent
243 // is created. In such situation, the only problematic scenario is Replace(
244 // happening after getting object from store and before acquiring a lock.
245 // Maybe introduce another lock for this purpose.
246 w.Lock()
247 defer w.Unlock()
248 previous, exists, err := w.store.Get(elem)
249 if err != nil {
250 return err
251 }
252 if exists {
253 previousElem := previous.(*storeElement)
254 watchCacheEvent.PrevObject = previousElem.Object
255 watchCacheEvent.PrevObjLabels = previousElem.Labels
256 watchCacheEvent.PrevObjFields = previousElem.Fields
257 watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
258 }
259
260 if w.onEvent != nil {
261 w.onEvent(watchCacheEvent)
262 }
263 w.updateCache(resourceVersion, watchCacheEvent)
264 w.resourceVersion = resourceVersion
265 w.cond.Broadcast()
266 return updateFunc(elem)
267}
268
269// Assumes that lock is already held for write.
270func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
271 if w.endIndex == w.startIndex+w.capacity {
272 // Cache is full - remove the oldest element.
273 w.startIndex++
274 }
275 w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
276 w.endIndex++
277}
278
279// List returns list of pointers to <storeElement> objects.
280func (w *watchCache) List() []interface{} {
281 return w.store.List()
282}
283
284// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
285// NOTE: This function acquired lock and doesn't release it.
286// You HAVE TO explicitly call w.RUnlock() after this function.
287func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error {
288 startTime := w.clock.Now()
289 go func() {
290 // Wake us up when the time limit has expired. The docs
291 // promise that time.After (well, NewTimer, which it calls)
292 // will wait *at least* the duration given. Since this go
293 // routine starts sometime after we record the start time, and
294 // it will wake up the loop below sometime after the broadcast,
295 // we don't need to worry about waking it up before the time
296 // has expired accidentally.
297 <-w.clock.After(blockTimeout)
298 w.cond.Broadcast()
299 }()
300
301 w.RLock()
302 if trace != nil {
303 trace.Step("watchCache locked acquired")
304 }
305 for w.resourceVersion < resourceVersion {
306 if w.clock.Since(startTime) >= blockTimeout {
307 // Timeout with retry after 1 second.
308 return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
309 }
310 w.cond.Wait()
311 }
312 if trace != nil {
313 trace.Step("watchCache fresh enough")
314 }
315 return nil
316}
317
318// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
319func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
320 err := w.waitUntilFreshAndBlock(resourceVersion, trace)
321 defer w.RUnlock()
322 if err != nil {
323 return nil, 0, err
324 }
325 return w.store.List(), w.resourceVersion, nil
326}
327
328// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
329func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
330 err := w.waitUntilFreshAndBlock(resourceVersion, trace)
331 defer w.RUnlock()
332 if err != nil {
333 return nil, false, 0, err
334 }
335 value, exists, err := w.store.GetByKey(key)
336 return value, exists, w.resourceVersion, err
337}
338
339func (w *watchCache) ListKeys() []string {
340 return w.store.ListKeys()
341}
342
343// Get takes runtime.Object as a parameter. However, it returns
344// pointer to <storeElement>.
345func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
346 object, ok := obj.(runtime.Object)
347 if !ok {
348 return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
349 }
350 key, err := w.keyFunc(object)
351 if err != nil {
352 return nil, false, fmt.Errorf("couldn't compute key: %v", err)
353 }
354
355 return w.store.Get(&storeElement{Key: key, Object: object})
356}
357
358// GetByKey returns pointer to <storeElement>.
359func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
360 return w.store.GetByKey(key)
361}
362
363// Replace takes slice of runtime.Object as a parameter.
364func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
365 version, err := parseResourceVersion(resourceVersion)
366 if err != nil {
367 return err
368 }
369
370 toReplace := make([]interface{}, 0, len(objs))
371 for _, obj := range objs {
372 object, ok := obj.(runtime.Object)
373 if !ok {
374 return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
375 }
376 key, err := w.keyFunc(object)
377 if err != nil {
378 return fmt.Errorf("couldn't compute key: %v", err)
379 }
380 objLabels, objFields, objUninitialized, err := w.getAttrsFunc(object)
381 if err != nil {
382 return err
383 }
384 toReplace = append(toReplace, &storeElement{
385 Key: key,
386 Object: object,
387 Labels: objLabels,
388 Fields: objFields,
389 Uninitialized: objUninitialized,
390 })
391 }
392
393 w.Lock()
394 defer w.Unlock()
395
396 w.startIndex = 0
397 w.endIndex = 0
398 if err := w.store.Replace(toReplace, resourceVersion); err != nil {
399 return err
400 }
401 w.resourceVersion = version
402 if w.onReplace != nil {
403 w.onReplace()
404 }
405 w.cond.Broadcast()
406 return nil
407}
408
409func (w *watchCache) SetOnReplace(onReplace func()) {
410 w.Lock()
411 defer w.Unlock()
412 w.onReplace = onReplace
413}
414
415func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
416 w.Lock()
417 defer w.Unlock()
418 w.onEvent = onEvent
419}
420
421func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
422 size := w.endIndex - w.startIndex
423 // if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
424 // is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
425 oldest := w.resourceVersion + 1
426 if size > 0 {
427 oldest = w.cache[w.startIndex%w.capacity].resourceVersion
428 }
429 if resourceVersion == 0 {
430 // resourceVersion = 0 means that we don't require any specific starting point
431 // and we would like to start watching from ~now.
432 // However, to keep backward compatibility, we additionally need to return the
433 // current state and only then start watching from that point.
434 //
435 // TODO: In v2 api, we should stop returning the current state - #13969.
436 allItems := w.store.List()
437 result := make([]*watchCacheEvent, len(allItems))
438 for i, item := range allItems {
439 elem, ok := item.(*storeElement)
440 if !ok {
441 return nil, fmt.Errorf("not a storeElement: %v", elem)
442 }
443 objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
444 if err != nil {
445 return nil, err
446 }
447 result[i] = &watchCacheEvent{
448 Type: watch.Added,
449 Object: elem.Object,
450 ObjLabels: objLabels,
451 ObjFields: objFields,
452 ObjUninitialized: objUninitialized,
453 Key: elem.Key,
454 ResourceVersion: w.resourceVersion,
455 }
456 }
457 return result, nil
458 }
459 if resourceVersion < oldest-1 {
460 return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
461 }
462
463 // Binary search the smallest index at which resourceVersion is greater than the given one.
464 f := func(i int) bool {
465 return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
466 }
467 first := sort.Search(size, f)
468 result := make([]*watchCacheEvent, size-first)
469 for i := 0; i < size-first; i++ {
470 result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
471 }
472 return result, nil
473}
474
475func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
476 w.RLock()
477 defer w.RUnlock()
478 return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
479}
480
481func (w *watchCache) Resync() error {
482 // Nothing to do
483 return nil
484}