blob: ab4ab04f3a0f703cd1fae5ecbdae622f0bfe083f [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package storage
18
19import (
20 "context"
21 "fmt"
22 "net/http"
23 "reflect"
24 "sync"
25 "time"
26
27 "github.com/golang/glog"
28
29 "k8s.io/apimachinery/pkg/api/errors"
30 "k8s.io/apimachinery/pkg/api/meta"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/conversion"
33 "k8s.io/apimachinery/pkg/fields"
34 "k8s.io/apimachinery/pkg/labels"
35 "k8s.io/apimachinery/pkg/runtime"
36 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37 "k8s.io/apimachinery/pkg/util/wait"
38 "k8s.io/apimachinery/pkg/watch"
39 "k8s.io/apiserver/pkg/features"
40 utilfeature "k8s.io/apiserver/pkg/util/feature"
41 utiltrace "k8s.io/apiserver/pkg/util/trace"
42 "k8s.io/client-go/tools/cache"
43)
44
45// CacherConfig contains the configuration for a given Cache.
46type CacherConfig struct {
47 // Maximum size of the history cached in memory.
48 CacheCapacity int
49
50 // An underlying storage.Interface.
51 Storage Interface
52
53 // An underlying storage.Versioner.
54 Versioner Versioner
55
56 // The Cache will be caching objects of a given Type and assumes that they
57 // are all stored under ResourcePrefix directory in the underlying database.
58 Type interface{}
59 ResourcePrefix string
60
61 // KeyFunc is used to get a key in the underlying storage for a given object.
62 KeyFunc func(runtime.Object) (string, error)
63
64 // GetAttrsFunc is used to get object labels, fields, and the uninitialized bool
65 GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error)
66
67 // TriggerPublisherFunc is used for optimizing amount of watchers that
68 // needs to process an incoming event.
69 TriggerPublisherFunc TriggerPublisherFunc
70
71 // NewList is a function that creates new empty object storing a list of
72 // objects of type Type.
73 NewListFunc func() runtime.Object
74
75 Codec runtime.Codec
76}
77
78type watchersMap map[int]*cacheWatcher
79
80func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
81 wm[number] = w
82}
83
84func (wm watchersMap) deleteWatcher(number int) {
85 delete(wm, number)
86}
87
88func (wm watchersMap) terminateAll() {
89 for key, watcher := range wm {
90 delete(wm, key)
91 watcher.stop()
92 }
93}
94
95type indexedWatchers struct {
96 allWatchers watchersMap
97 valueWatchers map[string]watchersMap
98}
99
100func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
101 if supported {
102 if _, ok := i.valueWatchers[value]; !ok {
103 i.valueWatchers[value] = watchersMap{}
104 }
105 i.valueWatchers[value].addWatcher(w, number)
106 } else {
107 i.allWatchers.addWatcher(w, number)
108 }
109}
110
111func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
112 if supported {
113 i.valueWatchers[value].deleteWatcher(number)
114 if len(i.valueWatchers[value]) == 0 {
115 delete(i.valueWatchers, value)
116 }
117 } else {
118 i.allWatchers.deleteWatcher(number)
119 }
120}
121
122func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
123 if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
124 glog.Warningf("Terminating all watchers from cacher %v", objectType)
125 }
126 i.allWatchers.terminateAll()
127 for index, watchers := range i.valueWatchers {
128 watchers.terminateAll()
129 delete(i.valueWatchers, index)
130 }
131}
132
133type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool
134
135// Cacher is responsible for serving WATCH and LIST requests for a given
136// resource from its internal cache and updating its cache in the background
137// based on the underlying storage contents.
138// Cacher implements storage.Interface (although most of the calls are just
139// delegated to the underlying storage).
140type Cacher struct {
141 // HighWaterMarks for performance debugging.
142 // Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
143 // See: https://golang.org/pkg/sync/atomic/ for more information
144 incomingHWM HighWaterMark
145 // Incoming events that should be dispatched to watchers.
146 incoming chan watchCacheEvent
147
148 sync.RWMutex
149
150 // Before accessing the cacher's cache, wait for the ready to be ok.
151 // This is necessary to prevent users from accessing structures that are
152 // uninitialized or are being repopulated right now.
153 // ready needs to be set to false when the cacher is paused or stopped.
154 // ready needs to be set to true when the cacher is ready to use after
155 // initialization.
156 ready *ready
157
158 // Underlying storage.Interface.
159 storage Interface
160
161 // Expected type of objects in the underlying cache.
162 objectType reflect.Type
163
164 // "sliding window" of recent changes of objects and the current state.
165 watchCache *watchCache
166 reflector *cache.Reflector
167
168 // Versioner is used to handle resource versions.
169 versioner Versioner
170
171 // triggerFunc is used for optimizing amount of watchers that needs to process
172 // an incoming event.
173 triggerFunc TriggerPublisherFunc
174 // watchers is mapping from the value of trigger function that a
175 // watcher is interested into the watchers
176 watcherIdx int
177 watchers indexedWatchers
178
179 // Defines a time budget that can be spend on waiting for not-ready watchers
180 // while dispatching event before shutting them down.
181 dispatchTimeoutBudget *timeBudget
182
183 // Handling graceful termination.
184 stopLock sync.RWMutex
185 stopped bool
186 stopCh chan struct{}
187 stopWg sync.WaitGroup
188}
189
190// Create a new Cacher responsible for servicing WATCH and LIST requests from
191// its internal cache and updating its cache in the background based on the
192// given configuration.
193func NewCacherFromConfig(config CacherConfig) *Cacher {
194 watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
195 listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
196 reflectorName := "storage/cacher.go:" + config.ResourcePrefix
197
198 // Give this error when it is constructed rather than when you get the
199 // first watch item, because it's much easier to track down that way.
200 if obj, ok := config.Type.(runtime.Object); ok {
201 if err := runtime.CheckCodec(config.Codec, obj); err != nil {
202 panic("storage codec doesn't seem to match given type: " + err.Error())
203 }
204 }
205
206 stopCh := make(chan struct{})
207 cacher := &Cacher{
208 ready: newReady(),
209 storage: config.Storage,
210 objectType: reflect.TypeOf(config.Type),
211 watchCache: watchCache,
212 reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
213 versioner: config.Versioner,
214 triggerFunc: config.TriggerPublisherFunc,
215 watcherIdx: 0,
216 watchers: indexedWatchers{
217 allWatchers: make(map[int]*cacheWatcher),
218 valueWatchers: make(map[string]watchersMap),
219 },
220 // TODO: Figure out the correct value for the buffer size.
221 incoming: make(chan watchCacheEvent, 100),
222 dispatchTimeoutBudget: newTimeBudget(stopCh),
223 // We need to (potentially) stop both:
224 // - wait.Until go-routine
225 // - reflector.ListAndWatch
226 // and there are no guarantees on the order that they will stop.
227 // So we will be simply closing the channel, and synchronizing on the WaitGroup.
228 stopCh: stopCh,
229 }
230 watchCache.SetOnEvent(cacher.processEvent)
231 go cacher.dispatchEvents()
232
233 cacher.stopWg.Add(1)
234 go func() {
235 defer cacher.stopWg.Done()
236 wait.Until(
237 func() {
238 if !cacher.isStopped() {
239 cacher.startCaching(stopCh)
240 }
241 }, time.Second, stopCh,
242 )
243 }()
244 return cacher
245}
246
247func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
248 // The 'usable' lock is always 'RLock'able when it is safe to use the cache.
249 // It is safe to use the cache after a successful list until a disconnection.
250 // We start with usable (write) locked. The below OnReplace function will
251 // unlock it after a successful list. The below defer will then re-lock
252 // it when this function exits (always due to disconnection), only if
253 // we actually got a successful list. This cycle will repeat as needed.
254 successfulList := false
255 c.watchCache.SetOnReplace(func() {
256 successfulList = true
257 c.ready.set(true)
258 })
259 defer func() {
260 if successfulList {
261 c.ready.set(false)
262 }
263 }()
264
265 c.terminateAllWatchers()
266 // Note that since onReplace may be not called due to errors, we explicitly
267 // need to retry it on errors under lock.
268 // Also note that startCaching is called in a loop, so there's no need
269 // to have another loop here.
270 if err := c.reflector.ListAndWatch(stopChannel); err != nil {
271 glog.Errorf("unexpected ListAndWatch error: %v", err)
272 }
273}
274
275// Implements storage.Interface.
276func (c *Cacher) Versioner() Versioner {
277 return c.storage.Versioner()
278}
279
280// Implements storage.Interface.
281func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
282 return c.storage.Create(ctx, key, obj, out, ttl)
283}
284
285// Implements storage.Interface.
286func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
287 return c.storage.Delete(ctx, key, out, preconditions)
288}
289
290// Implements storage.Interface.
291func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
292 watchRV, err := c.versioner.ParseWatchResourceVersion(resourceVersion)
293 if err != nil {
294 return nil, err
295 }
296
297 c.ready.wait()
298
299 // We explicitly use thread unsafe version and do locking ourself to ensure that
300 // no new events will be processed in the meantime. The watchCache will be unlocked
301 // on return from this function.
302 // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
303 // underlying watchCache is calling processEvent under its lock.
304 c.watchCache.RLock()
305 defer c.watchCache.RUnlock()
306 initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
307 if err != nil {
308 // To match the uncached watch implementation, once we have passed authn/authz/admission,
309 // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
310 // rather than a directly returned error.
311 return newErrWatcher(err), nil
312 }
313
314 triggerValue, triggerSupported := "", false
315 // TODO: Currently we assume that in a given Cacher object, any <predicate> that is
316 // passed here is aware of exactly the same trigger (at most one).
317 // Thus, either 0 or 1 values will be returned.
318 if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
319 triggerValue, triggerSupported = matchValues[0].Value, true
320 }
321
322 // If there is triggerFunc defined, but triggerSupported is false,
323 // we can't narrow the amount of events significantly at this point.
324 //
325 // That said, currently triggerFunc is defined only for Pods and Nodes,
326 // and there is only constant number of watchers for which triggerSupported
327 // is false (excluding those issues explicitly by users).
328 // Thus, to reduce the risk of those watchers blocking all watchers of a
329 // given resource in the system, we increase the sizes of buffers for them.
330 chanSize := 10
331 if c.triggerFunc != nil && !triggerSupported {
332 // TODO: We should tune this value and ideally make it dependent on the
333 // number of objects of a given type and/or their churn.
334 chanSize = 1000
335 }
336
337 c.Lock()
338 defer c.Unlock()
339 forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
340 watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner)
341
342 c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
343 c.watcherIdx++
344 return watcher, nil
345}
346
347// Implements storage.Interface.
348func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
349 return c.Watch(ctx, key, resourceVersion, pred)
350}
351
352// Implements storage.Interface.
353func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
354 if resourceVersion == "" {
355 // If resourceVersion is not specified, serve it from underlying
356 // storage (for backward compatibility).
357 return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
358 }
359
360 // If resourceVersion is specified, serve it from cache.
361 // It's guaranteed that the returned value is at least that
362 // fresh as the given resourceVersion.
363 getRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
364 if err != nil {
365 return err
366 }
367
368 if getRV == 0 && !c.ready.check() {
369 // If Cacher is not yet initialized and we don't require any specific
370 // minimal resource version, simply forward the request to storage.
371 return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
372 }
373
374 // Do not create a trace - it's not for free and there are tons
375 // of Get requests. We can add it if it will be really needed.
376 c.ready.wait()
377
378 objVal, err := conversion.EnforcePtr(objPtr)
379 if err != nil {
380 return err
381 }
382
383 obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
384 if err != nil {
385 return err
386 }
387
388 if exists {
389 elem, ok := obj.(*storeElement)
390 if !ok {
391 return fmt.Errorf("non *storeElement returned from storage: %v", obj)
392 }
393 objVal.Set(reflect.ValueOf(elem.Object).Elem())
394 } else {
395 objVal.Set(reflect.Zero(objVal.Type()))
396 if !ignoreNotFound {
397 return NewKeyNotFoundError(key, int64(readResourceVersion))
398 }
399 }
400 return nil
401}
402
403// Implements storage.Interface.
404func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
405 pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
406 if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
407 // If resourceVersion is not specified, serve it from underlying
408 // storage (for backward compatibility). If a continuation or limit is
409 // requested, serve it from the underlying storage as well.
410 return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
411 }
412
413 // If resourceVersion is specified, serve it from cache.
414 // It's guaranteed that the returned value is at least that
415 // fresh as the given resourceVersion.
416 listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
417 if err != nil {
418 return err
419 }
420
421 if listRV == 0 && !c.ready.check() {
422 // If Cacher is not yet initialized and we don't require any specific
423 // minimal resource version, simply forward the request to storage.
424 return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
425 }
426
427 trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
428 defer trace.LogIfLong(500 * time.Millisecond)
429
430 c.ready.wait()
431 trace.Step("Ready")
432
433 // List elements with at least 'listRV' from cache.
434 listPtr, err := meta.GetItemsPtr(listObj)
435 if err != nil {
436 return err
437 }
438 listVal, err := conversion.EnforcePtr(listPtr)
439 if err != nil || listVal.Kind() != reflect.Slice {
440 return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
441 }
442 filter := filterWithAttrsFunction(key, pred)
443
444 obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
445 if err != nil {
446 return err
447 }
448 trace.Step("Got from cache")
449
450 if exists {
451 elem, ok := obj.(*storeElement)
452 if !ok {
453 return fmt.Errorf("non *storeElement returned from storage: %v", obj)
454 }
455 if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
456 listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
457 }
458 }
459 if c.versioner != nil {
460 if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
461 return err
462 }
463 }
464 return nil
465}
466
467// Implements storage.Interface.
468func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
469 pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
470 hasContinuation := pagingEnabled && len(pred.Continue) > 0
471 hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
472 if resourceVersion == "" || hasContinuation || hasLimit {
473 // If resourceVersion is not specified, serve it from underlying
474 // storage (for backward compatibility). If a continuation is
475 // requested, serve it from the underlying storage as well.
476 // Limits are only sent to storage when resourceVersion is non-zero
477 // since the watch cache isn't able to perform continuations, and
478 // limits are ignored when resource version is zero.
479 return c.storage.List(ctx, key, resourceVersion, pred, listObj)
480 }
481
482 // If resourceVersion is specified, serve it from cache.
483 // It's guaranteed that the returned value is at least that
484 // fresh as the given resourceVersion.
485 listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
486 if err != nil {
487 return err
488 }
489
490 if listRV == 0 && !c.ready.check() {
491 // If Cacher is not yet initialized and we don't require any specific
492 // minimal resource version, simply forward the request to storage.
493 return c.storage.List(ctx, key, resourceVersion, pred, listObj)
494 }
495
496 trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
497 defer trace.LogIfLong(500 * time.Millisecond)
498
499 c.ready.wait()
500 trace.Step("Ready")
501
502 // List elements with at least 'listRV' from cache.
503 listPtr, err := meta.GetItemsPtr(listObj)
504 if err != nil {
505 return err
506 }
507 listVal, err := conversion.EnforcePtr(listPtr)
508 if err != nil || listVal.Kind() != reflect.Slice {
509 return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
510 }
511 filter := filterWithAttrsFunction(key, pred)
512
513 objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
514 if err != nil {
515 return err
516 }
517 trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
518 if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
519 // Resize the slice appropriately, since we already know that none
520 // of the elements will be filtered out.
521 listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
522 trace.Step("Resized result")
523 }
524 for _, obj := range objs {
525 elem, ok := obj.(*storeElement)
526 if !ok {
527 return fmt.Errorf("non *storeElement returned from storage: %v", obj)
528 }
529 if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
530 listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
531 }
532 }
533 trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
534 if c.versioner != nil {
535 if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
536 return err
537 }
538 }
539 return nil
540}
541
542// Implements storage.Interface.
543func (c *Cacher) GuaranteedUpdate(
544 ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
545 preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
546 // Ignore the suggestion and try to pass down the current version of the object
547 // read from cache.
548 if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
549 glog.Errorf("GetByKey returned error: %v", err)
550 } else if exists {
551 currObj := elem.(*storeElement).Object.DeepCopyObject()
552 return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
553 }
554 // If we couldn't get the object, fallback to no-suggestion.
555 return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
556}
557
558func (c *Cacher) Count(pathPrefix string) (int64, error) {
559 return c.storage.Count(pathPrefix)
560}
561
562func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
563 // TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
564 // is aware of exactly the same trigger (at most one). Thus calling:
565 // c.triggerFunc(<some object>)
566 // can return only 0 or 1 values.
567 // That means, that triggerValues itself may return up to 2 different values.
568 if c.triggerFunc == nil {
569 return nil, false
570 }
571 result := make([]string, 0, 2)
572 matchValues := c.triggerFunc(event.Object)
573 if len(matchValues) > 0 {
574 result = append(result, matchValues[0].Value)
575 }
576 if event.PrevObject == nil {
577 return result, len(result) > 0
578 }
579 prevMatchValues := c.triggerFunc(event.PrevObject)
580 if len(prevMatchValues) > 0 {
581 if len(result) == 0 || result[0] != prevMatchValues[0].Value {
582 result = append(result, prevMatchValues[0].Value)
583 }
584 }
585 return result, len(result) > 0
586}
587
588func (c *Cacher) processEvent(event *watchCacheEvent) {
589 if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
590 // Monitor if this gets backed up, and how much.
591 glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
592 }
593 c.incoming <- *event
594}
595
596func (c *Cacher) dispatchEvents() {
597 for {
598 select {
599 case event, ok := <-c.incoming:
600 if !ok {
601 return
602 }
603 c.dispatchEvent(&event)
604 case <-c.stopCh:
605 return
606 }
607 }
608}
609
610func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
611 triggerValues, supported := c.triggerValues(event)
612
613 c.Lock()
614 defer c.Unlock()
615 // Iterate over "allWatchers" no matter what the trigger function is.
616 for _, watcher := range c.watchers.allWatchers {
617 watcher.add(event, c.dispatchTimeoutBudget)
618 }
619 if supported {
620 // Iterate over watchers interested in the given values of the trigger.
621 for _, triggerValue := range triggerValues {
622 for _, watcher := range c.watchers.valueWatchers[triggerValue] {
623 watcher.add(event, c.dispatchTimeoutBudget)
624 }
625 }
626 } else {
627 // supported equal to false generally means that trigger function
628 // is not defined (or not aware of any indexes). In this case,
629 // watchers filters should generally also don't generate any
630 // trigger values, but can cause problems in case of some
631 // misconfiguration. Thus we paranoidly leave this branch.
632
633 // Iterate over watchers interested in exact values for all values.
634 for _, watchers := range c.watchers.valueWatchers {
635 for _, watcher := range watchers {
636 watcher.add(event, c.dispatchTimeoutBudget)
637 }
638 }
639 }
640}
641
642func (c *Cacher) terminateAllWatchers() {
643 c.Lock()
644 defer c.Unlock()
645 c.watchers.terminateAll(c.objectType)
646}
647
648func (c *Cacher) isStopped() bool {
649 c.stopLock.RLock()
650 defer c.stopLock.RUnlock()
651 return c.stopped
652}
653
654func (c *Cacher) Stop() {
655 // avoid stopping twice (note: cachers are shared with subresources)
656 if c.isStopped() {
657 return
658 }
659 c.stopLock.Lock()
660 if c.stopped {
661 c.stopLock.Unlock()
662 return
663 }
664 c.stopped = true
665 c.stopLock.Unlock()
666 close(c.stopCh)
667 c.stopWg.Wait()
668}
669
670func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
671 return func(lock bool) {
672 if lock {
673 c.Lock()
674 defer c.Unlock()
675 } else {
676 // false is currently passed only if we are forcing watcher to close due
677 // to its unresponsiveness and blocking other watchers.
678 // TODO: Get this information in cleaner way.
679 glog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
680 }
681 // It's possible that the watcher is already not in the structure (e.g. in case of
682 // simultaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
683 c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
684 }
685}
686
687func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc {
688 filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
689 if !hasPathPrefix(objKey, key) {
690 return false
691 }
692 return p.MatchesObjectAttributes(label, field, uninitialized)
693 }
694 return filterFunc
695}
696
697// Returns resource version to which the underlying cache is synced.
698func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
699 c.ready.wait()
700
701 resourceVersion := c.reflector.LastSyncResourceVersion()
702 return c.versioner.ParseListResourceVersion(resourceVersion)
703}
704
705// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
706type cacherListerWatcher struct {
707 storage Interface
708 resourcePrefix string
709 newListFunc func() runtime.Object
710}
711
712func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
713 return &cacherListerWatcher{
714 storage: storage,
715 resourcePrefix: resourcePrefix,
716 newListFunc: newListFunc,
717 }
718}
719
720// Implements cache.ListerWatcher interface.
721func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
722 list := lw.newListFunc()
723 if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
724 return nil, err
725 }
726 return list, nil
727}
728
729// Implements cache.ListerWatcher interface.
730func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
731 return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
732}
733
734// errWatcher implements watch.Interface to return a single error
735type errWatcher struct {
736 result chan watch.Event
737}
738
739func newErrWatcher(err error) *errWatcher {
740 // Create an error event
741 errEvent := watch.Event{Type: watch.Error}
742 switch err := err.(type) {
743 case runtime.Object:
744 errEvent.Object = err
745 case *errors.StatusError:
746 errEvent.Object = &err.ErrStatus
747 default:
748 errEvent.Object = &metav1.Status{
749 Status: metav1.StatusFailure,
750 Message: err.Error(),
751 Reason: metav1.StatusReasonInternalError,
752 Code: http.StatusInternalServerError,
753 }
754 }
755
756 // Create a watcher with room for a single event, populate it, and close the channel
757 watcher := &errWatcher{result: make(chan watch.Event, 1)}
758 watcher.result <- errEvent
759 close(watcher.result)
760
761 return watcher
762}
763
764// Implements watch.Interface.
765func (c *errWatcher) ResultChan() <-chan watch.Event {
766 return c.result
767}
768
769// Implements watch.Interface.
770func (c *errWatcher) Stop() {
771 // no-op
772}
773
774// cachWatcher implements watch.Interface
775type cacheWatcher struct {
776 sync.Mutex
777 input chan *watchCacheEvent
778 result chan watch.Event
779 done chan struct{}
780 filter filterWithAttrsFunc
781 stopped bool
782 forget func(bool)
783 versioner Versioner
784}
785
786func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {
787 watcher := &cacheWatcher{
788 input: make(chan *watchCacheEvent, chanSize),
789 result: make(chan watch.Event, chanSize),
790 done: make(chan struct{}),
791 filter: filter,
792 stopped: false,
793 forget: forget,
794 versioner: versioner,
795 }
796 go watcher.process(initEvents, resourceVersion)
797 return watcher
798}
799
800// Implements watch.Interface.
801func (c *cacheWatcher) ResultChan() <-chan watch.Event {
802 return c.result
803}
804
805// Implements watch.Interface.
806func (c *cacheWatcher) Stop() {
807 c.forget(true)
808 c.stop()
809}
810
811func (c *cacheWatcher) stop() {
812 c.Lock()
813 defer c.Unlock()
814 if !c.stopped {
815 c.stopped = true
816 close(c.done)
817 close(c.input)
818 }
819}
820
821var timerPool sync.Pool
822
823func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
824 // Try to send the event immediately, without blocking.
825 select {
826 case c.input <- event:
827 return
828 default:
829 }
830
831 // OK, block sending, but only for up to <timeout>.
832 // cacheWatcher.add is called very often, so arrange
833 // to reuse timers instead of constantly allocating.
834 startTime := time.Now()
835 timeout := budget.takeAvailable()
836
837 t, ok := timerPool.Get().(*time.Timer)
838 if ok {
839 t.Reset(timeout)
840 } else {
841 t = time.NewTimer(timeout)
842 }
843 defer timerPool.Put(t)
844
845 select {
846 case c.input <- event:
847 stopped := t.Stop()
848 if !stopped {
849 // Consume triggered (but not yet received) timer event
850 // so that future reuse does not get a spurious timeout.
851 <-t.C
852 }
853 case <-t.C:
854 // This means that we couldn't send event to that watcher.
855 // Since we don't want to block on it infinitely,
856 // we simply terminate it.
857 c.forget(false)
858 c.stop()
859 }
860
861 budget.returnUnused(timeout - time.Since(startTime))
862}
863
864// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
865func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
866 curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
867 oldObjPasses := false
868 if event.PrevObject != nil {
869 oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
870 }
871 if !curObjPasses && !oldObjPasses {
872 // Watcher is not interested in that object.
873 return
874 }
875
876 var watchEvent watch.Event
877 switch {
878 case curObjPasses && !oldObjPasses:
879 watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
880 case curObjPasses && oldObjPasses:
881 watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
882 case !curObjPasses && oldObjPasses:
883 // return a delete event with the previous object content, but with the event's resource version
884 oldObj := event.PrevObject.DeepCopyObject()
885 if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
886 utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
887 }
888 watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
889 }
890
891 // We need to ensure that if we put event X to the c.result, all
892 // previous events were already put into it before, no matter whether
893 // c.done is close or not.
894 // Thus we cannot simply select from c.done and c.result and this
895 // would give us non-determinism.
896 // At the same time, we don't want to block infinitely on putting
897 // to c.result, when c.done is already closed.
898
899 // This ensures that with c.done already close, we at most once go
900 // into the next select after this. With that, no matter which
901 // statement we choose there, we will deliver only consecutive
902 // events.
903 select {
904 case <-c.done:
905 return
906 default:
907 }
908
909 select {
910 case c.result <- watchEvent:
911 case <-c.done:
912 }
913}
914
915func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
916 defer utilruntime.HandleCrash()
917
918 // Check how long we are processing initEvents.
919 // As long as these are not processed, we are not processing
920 // any incoming events, so if it takes long, we may actually
921 // block all watchers for some time.
922 // TODO: From the logs it seems that there happens processing
923 // times even up to 1s which is very long. However, this doesn't
924 // depend that much on the number of initEvents. E.g. from the
925 // 2000-node Kubemark run we have logs like this, e.g.:
926 // ... processing 13862 initEvents took 66.808689ms
927 // ... processing 14040 initEvents took 993.532539ms
928 // We should understand what is blocking us in those cases (e.g.
929 // is it lack of CPU, network, or sth else) and potentially
930 // consider increase size of result buffer in those cases.
931 const initProcessThreshold = 500 * time.Millisecond
932 startTime := time.Now()
933 for _, event := range initEvents {
934 c.sendWatchCacheEvent(event)
935 }
936 processingTime := time.Since(startTime)
937 if processingTime > initProcessThreshold {
938 objType := "<null>"
939 if len(initEvents) > 0 {
940 objType = reflect.TypeOf(initEvents[0].Object).String()
941 }
942 glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
943 }
944
945 defer close(c.result)
946 defer c.Stop()
947 for {
948 event, ok := <-c.input
949 if !ok {
950 return
951 }
952 // only send events newer than resourceVersion
953 if event.ResourceVersion > resourceVersion {
954 c.sendWatchCacheEvent(event)
955 }
956 }
957}
958
959type ready struct {
960 ok bool
961 c *sync.Cond
962}
963
964func newReady() *ready {
965 return &ready{c: sync.NewCond(&sync.Mutex{})}
966}
967
968func (r *ready) wait() {
969 r.c.L.Lock()
970 for !r.ok {
971 r.c.Wait()
972 }
973 r.c.L.Unlock()
974}
975
976// TODO: Make check() function more sophisticated, in particular
977// allow it to behave as "waitWithTimeout".
978func (r *ready) check() bool {
979 r.c.L.Lock()
980 defer r.c.L.Unlock()
981 return r.ok
982}
983
984func (r *ready) set(ok bool) {
985 r.c.L.Lock()
986 defer r.c.L.Unlock()
987 r.ok = ok
988 r.c.Broadcast()
989}