git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server
subrepo:
subdir: "metrics-server"
merged: "92d8412"
upstream:
origin: "https://github.com/kubernetes-incubator/metrics-server.git"
branch: "master"
commit: "92d8412"
git-subrepo:
version: "0.4.0"
origin: "???"
commit: "???"
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/cacher.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/cacher.go
new file mode 100644
index 0000000..ab4ab04
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/cacher.go
@@ -0,0 +1,989 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "reflect"
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/conversion"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/apiserver/pkg/features"
+ utilfeature "k8s.io/apiserver/pkg/util/feature"
+ utiltrace "k8s.io/apiserver/pkg/util/trace"
+ "k8s.io/client-go/tools/cache"
+)
+
+// CacherConfig contains the configuration for a given Cache.
+type CacherConfig struct {
+ // Maximum size of the history cached in memory.
+ CacheCapacity int
+
+ // An underlying storage.Interface.
+ Storage Interface
+
+ // An underlying storage.Versioner.
+ Versioner Versioner
+
+ // The Cache will be caching objects of a given Type and assumes that they
+ // are all stored under ResourcePrefix directory in the underlying database.
+ Type interface{}
+ ResourcePrefix string
+
+ // KeyFunc is used to get a key in the underlying storage for a given object.
+ KeyFunc func(runtime.Object) (string, error)
+
+ // GetAttrsFunc is used to get object labels, fields, and the uninitialized bool
+ GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error)
+
+ // TriggerPublisherFunc is used for optimizing amount of watchers that
+ // needs to process an incoming event.
+ TriggerPublisherFunc TriggerPublisherFunc
+
+ // NewList is a function that creates new empty object storing a list of
+ // objects of type Type.
+ NewListFunc func() runtime.Object
+
+ Codec runtime.Codec
+}
+
+type watchersMap map[int]*cacheWatcher
+
+func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
+ wm[number] = w
+}
+
+func (wm watchersMap) deleteWatcher(number int) {
+ delete(wm, number)
+}
+
+func (wm watchersMap) terminateAll() {
+ for key, watcher := range wm {
+ delete(wm, key)
+ watcher.stop()
+ }
+}
+
+type indexedWatchers struct {
+ allWatchers watchersMap
+ valueWatchers map[string]watchersMap
+}
+
+func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
+ if supported {
+ if _, ok := i.valueWatchers[value]; !ok {
+ i.valueWatchers[value] = watchersMap{}
+ }
+ i.valueWatchers[value].addWatcher(w, number)
+ } else {
+ i.allWatchers.addWatcher(w, number)
+ }
+}
+
+func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
+ if supported {
+ i.valueWatchers[value].deleteWatcher(number)
+ if len(i.valueWatchers[value]) == 0 {
+ delete(i.valueWatchers, value)
+ }
+ } else {
+ i.allWatchers.deleteWatcher(number)
+ }
+}
+
+func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
+ if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
+ glog.Warningf("Terminating all watchers from cacher %v", objectType)
+ }
+ i.allWatchers.terminateAll()
+ for index, watchers := range i.valueWatchers {
+ watchers.terminateAll()
+ delete(i.valueWatchers, index)
+ }
+}
+
+type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool
+
+// Cacher is responsible for serving WATCH and LIST requests for a given
+// resource from its internal cache and updating its cache in the background
+// based on the underlying storage contents.
+// Cacher implements storage.Interface (although most of the calls are just
+// delegated to the underlying storage).
+type Cacher struct {
+ // HighWaterMarks for performance debugging.
+ // Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
+ // See: https://golang.org/pkg/sync/atomic/ for more information
+ incomingHWM HighWaterMark
+ // Incoming events that should be dispatched to watchers.
+ incoming chan watchCacheEvent
+
+ sync.RWMutex
+
+ // Before accessing the cacher's cache, wait for the ready to be ok.
+ // This is necessary to prevent users from accessing structures that are
+ // uninitialized or are being repopulated right now.
+ // ready needs to be set to false when the cacher is paused or stopped.
+ // ready needs to be set to true when the cacher is ready to use after
+ // initialization.
+ ready *ready
+
+ // Underlying storage.Interface.
+ storage Interface
+
+ // Expected type of objects in the underlying cache.
+ objectType reflect.Type
+
+ // "sliding window" of recent changes of objects and the current state.
+ watchCache *watchCache
+ reflector *cache.Reflector
+
+ // Versioner is used to handle resource versions.
+ versioner Versioner
+
+ // triggerFunc is used for optimizing amount of watchers that needs to process
+ // an incoming event.
+ triggerFunc TriggerPublisherFunc
+ // watchers is mapping from the value of trigger function that a
+ // watcher is interested into the watchers
+ watcherIdx int
+ watchers indexedWatchers
+
+ // Defines a time budget that can be spend on waiting for not-ready watchers
+ // while dispatching event before shutting them down.
+ dispatchTimeoutBudget *timeBudget
+
+ // Handling graceful termination.
+ stopLock sync.RWMutex
+ stopped bool
+ stopCh chan struct{}
+ stopWg sync.WaitGroup
+}
+
+// Create a new Cacher responsible for servicing WATCH and LIST requests from
+// its internal cache and updating its cache in the background based on the
+// given configuration.
+func NewCacherFromConfig(config CacherConfig) *Cacher {
+ watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
+ listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
+ reflectorName := "storage/cacher.go:" + config.ResourcePrefix
+
+ // Give this error when it is constructed rather than when you get the
+ // first watch item, because it's much easier to track down that way.
+ if obj, ok := config.Type.(runtime.Object); ok {
+ if err := runtime.CheckCodec(config.Codec, obj); err != nil {
+ panic("storage codec doesn't seem to match given type: " + err.Error())
+ }
+ }
+
+ stopCh := make(chan struct{})
+ cacher := &Cacher{
+ ready: newReady(),
+ storage: config.Storage,
+ objectType: reflect.TypeOf(config.Type),
+ watchCache: watchCache,
+ reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
+ versioner: config.Versioner,
+ triggerFunc: config.TriggerPublisherFunc,
+ watcherIdx: 0,
+ watchers: indexedWatchers{
+ allWatchers: make(map[int]*cacheWatcher),
+ valueWatchers: make(map[string]watchersMap),
+ },
+ // TODO: Figure out the correct value for the buffer size.
+ incoming: make(chan watchCacheEvent, 100),
+ dispatchTimeoutBudget: newTimeBudget(stopCh),
+ // We need to (potentially) stop both:
+ // - wait.Until go-routine
+ // - reflector.ListAndWatch
+ // and there are no guarantees on the order that they will stop.
+ // So we will be simply closing the channel, and synchronizing on the WaitGroup.
+ stopCh: stopCh,
+ }
+ watchCache.SetOnEvent(cacher.processEvent)
+ go cacher.dispatchEvents()
+
+ cacher.stopWg.Add(1)
+ go func() {
+ defer cacher.stopWg.Done()
+ wait.Until(
+ func() {
+ if !cacher.isStopped() {
+ cacher.startCaching(stopCh)
+ }
+ }, time.Second, stopCh,
+ )
+ }()
+ return cacher
+}
+
+func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
+ // The 'usable' lock is always 'RLock'able when it is safe to use the cache.
+ // It is safe to use the cache after a successful list until a disconnection.
+ // We start with usable (write) locked. The below OnReplace function will
+ // unlock it after a successful list. The below defer will then re-lock
+ // it when this function exits (always due to disconnection), only if
+ // we actually got a successful list. This cycle will repeat as needed.
+ successfulList := false
+ c.watchCache.SetOnReplace(func() {
+ successfulList = true
+ c.ready.set(true)
+ })
+ defer func() {
+ if successfulList {
+ c.ready.set(false)
+ }
+ }()
+
+ c.terminateAllWatchers()
+ // Note that since onReplace may be not called due to errors, we explicitly
+ // need to retry it on errors under lock.
+ // Also note that startCaching is called in a loop, so there's no need
+ // to have another loop here.
+ if err := c.reflector.ListAndWatch(stopChannel); err != nil {
+ glog.Errorf("unexpected ListAndWatch error: %v", err)
+ }
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Versioner() Versioner {
+ return c.storage.Versioner()
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
+ return c.storage.Create(ctx, key, obj, out, ttl)
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
+ return c.storage.Delete(ctx, key, out, preconditions)
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
+ watchRV, err := c.versioner.ParseWatchResourceVersion(resourceVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ c.ready.wait()
+
+ // We explicitly use thread unsafe version and do locking ourself to ensure that
+ // no new events will be processed in the meantime. The watchCache will be unlocked
+ // on return from this function.
+ // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
+ // underlying watchCache is calling processEvent under its lock.
+ c.watchCache.RLock()
+ defer c.watchCache.RUnlock()
+ initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
+ if err != nil {
+ // To match the uncached watch implementation, once we have passed authn/authz/admission,
+ // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
+ // rather than a directly returned error.
+ return newErrWatcher(err), nil
+ }
+
+ triggerValue, triggerSupported := "", false
+ // TODO: Currently we assume that in a given Cacher object, any <predicate> that is
+ // passed here is aware of exactly the same trigger (at most one).
+ // Thus, either 0 or 1 values will be returned.
+ if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
+ triggerValue, triggerSupported = matchValues[0].Value, true
+ }
+
+ // If there is triggerFunc defined, but triggerSupported is false,
+ // we can't narrow the amount of events significantly at this point.
+ //
+ // That said, currently triggerFunc is defined only for Pods and Nodes,
+ // and there is only constant number of watchers for which triggerSupported
+ // is false (excluding those issues explicitly by users).
+ // Thus, to reduce the risk of those watchers blocking all watchers of a
+ // given resource in the system, we increase the sizes of buffers for them.
+ chanSize := 10
+ if c.triggerFunc != nil && !triggerSupported {
+ // TODO: We should tune this value and ideally make it dependent on the
+ // number of objects of a given type and/or their churn.
+ chanSize = 1000
+ }
+
+ c.Lock()
+ defer c.Unlock()
+ forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
+ watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner)
+
+ c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
+ c.watcherIdx++
+ return watcher, nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
+ return c.Watch(ctx, key, resourceVersion, pred)
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
+ if resourceVersion == "" {
+ // If resourceVersion is not specified, serve it from underlying
+ // storage (for backward compatibility).
+ return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
+ }
+
+ // If resourceVersion is specified, serve it from cache.
+ // It's guaranteed that the returned value is at least that
+ // fresh as the given resourceVersion.
+ getRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return err
+ }
+
+ if getRV == 0 && !c.ready.check() {
+ // If Cacher is not yet initialized and we don't require any specific
+ // minimal resource version, simply forward the request to storage.
+ return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
+ }
+
+ // Do not create a trace - it's not for free and there are tons
+ // of Get requests. We can add it if it will be really needed.
+ c.ready.wait()
+
+ objVal, err := conversion.EnforcePtr(objPtr)
+ if err != nil {
+ return err
+ }
+
+ obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
+ if err != nil {
+ return err
+ }
+
+ if exists {
+ elem, ok := obj.(*storeElement)
+ if !ok {
+ return fmt.Errorf("non *storeElement returned from storage: %v", obj)
+ }
+ objVal.Set(reflect.ValueOf(elem.Object).Elem())
+ } else {
+ objVal.Set(reflect.Zero(objVal.Type()))
+ if !ignoreNotFound {
+ return NewKeyNotFoundError(key, int64(readResourceVersion))
+ }
+ }
+ return nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
+ pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
+ if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
+ // If resourceVersion is not specified, serve it from underlying
+ // storage (for backward compatibility). If a continuation or limit is
+ // requested, serve it from the underlying storage as well.
+ return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ // If resourceVersion is specified, serve it from cache.
+ // It's guaranteed that the returned value is at least that
+ // fresh as the given resourceVersion.
+ listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return err
+ }
+
+ if listRV == 0 && !c.ready.check() {
+ // If Cacher is not yet initialized and we don't require any specific
+ // minimal resource version, simply forward the request to storage.
+ return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
+ defer trace.LogIfLong(500 * time.Millisecond)
+
+ c.ready.wait()
+ trace.Step("Ready")
+
+ // List elements with at least 'listRV' from cache.
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ listVal, err := conversion.EnforcePtr(listPtr)
+ if err != nil || listVal.Kind() != reflect.Slice {
+ return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
+ }
+ filter := filterWithAttrsFunction(key, pred)
+
+ obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
+ if err != nil {
+ return err
+ }
+ trace.Step("Got from cache")
+
+ if exists {
+ elem, ok := obj.(*storeElement)
+ if !ok {
+ return fmt.Errorf("non *storeElement returned from storage: %v", obj)
+ }
+ if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
+ listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
+ }
+ }
+ if c.versioner != nil {
+ if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
+ pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
+ hasContinuation := pagingEnabled && len(pred.Continue) > 0
+ hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
+ if resourceVersion == "" || hasContinuation || hasLimit {
+ // If resourceVersion is not specified, serve it from underlying
+ // storage (for backward compatibility). If a continuation is
+ // requested, serve it from the underlying storage as well.
+ // Limits are only sent to storage when resourceVersion is non-zero
+ // since the watch cache isn't able to perform continuations, and
+ // limits are ignored when resource version is zero.
+ return c.storage.List(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ // If resourceVersion is specified, serve it from cache.
+ // It's guaranteed that the returned value is at least that
+ // fresh as the given resourceVersion.
+ listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return err
+ }
+
+ if listRV == 0 && !c.ready.check() {
+ // If Cacher is not yet initialized and we don't require any specific
+ // minimal resource version, simply forward the request to storage.
+ return c.storage.List(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
+ defer trace.LogIfLong(500 * time.Millisecond)
+
+ c.ready.wait()
+ trace.Step("Ready")
+
+ // List elements with at least 'listRV' from cache.
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ listVal, err := conversion.EnforcePtr(listPtr)
+ if err != nil || listVal.Kind() != reflect.Slice {
+ return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
+ }
+ filter := filterWithAttrsFunction(key, pred)
+
+ objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
+ if err != nil {
+ return err
+ }
+ trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
+ if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
+ // Resize the slice appropriately, since we already know that none
+ // of the elements will be filtered out.
+ listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
+ trace.Step("Resized result")
+ }
+ for _, obj := range objs {
+ elem, ok := obj.(*storeElement)
+ if !ok {
+ return fmt.Errorf("non *storeElement returned from storage: %v", obj)
+ }
+ if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
+ listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
+ }
+ }
+ trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
+ if c.versioner != nil {
+ if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) GuaranteedUpdate(
+ ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
+ preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
+ // Ignore the suggestion and try to pass down the current version of the object
+ // read from cache.
+ if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
+ glog.Errorf("GetByKey returned error: %v", err)
+ } else if exists {
+ currObj := elem.(*storeElement).Object.DeepCopyObject()
+ return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
+ }
+ // If we couldn't get the object, fallback to no-suggestion.
+ return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
+}
+
+func (c *Cacher) Count(pathPrefix string) (int64, error) {
+ return c.storage.Count(pathPrefix)
+}
+
+func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
+ // TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
+ // is aware of exactly the same trigger (at most one). Thus calling:
+ // c.triggerFunc(<some object>)
+ // can return only 0 or 1 values.
+ // That means, that triggerValues itself may return up to 2 different values.
+ if c.triggerFunc == nil {
+ return nil, false
+ }
+ result := make([]string, 0, 2)
+ matchValues := c.triggerFunc(event.Object)
+ if len(matchValues) > 0 {
+ result = append(result, matchValues[0].Value)
+ }
+ if event.PrevObject == nil {
+ return result, len(result) > 0
+ }
+ prevMatchValues := c.triggerFunc(event.PrevObject)
+ if len(prevMatchValues) > 0 {
+ if len(result) == 0 || result[0] != prevMatchValues[0].Value {
+ result = append(result, prevMatchValues[0].Value)
+ }
+ }
+ return result, len(result) > 0
+}
+
+func (c *Cacher) processEvent(event *watchCacheEvent) {
+ if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
+ // Monitor if this gets backed up, and how much.
+ glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
+ }
+ c.incoming <- *event
+}
+
+func (c *Cacher) dispatchEvents() {
+ for {
+ select {
+ case event, ok := <-c.incoming:
+ if !ok {
+ return
+ }
+ c.dispatchEvent(&event)
+ case <-c.stopCh:
+ return
+ }
+ }
+}
+
+func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
+ triggerValues, supported := c.triggerValues(event)
+
+ c.Lock()
+ defer c.Unlock()
+ // Iterate over "allWatchers" no matter what the trigger function is.
+ for _, watcher := range c.watchers.allWatchers {
+ watcher.add(event, c.dispatchTimeoutBudget)
+ }
+ if supported {
+ // Iterate over watchers interested in the given values of the trigger.
+ for _, triggerValue := range triggerValues {
+ for _, watcher := range c.watchers.valueWatchers[triggerValue] {
+ watcher.add(event, c.dispatchTimeoutBudget)
+ }
+ }
+ } else {
+ // supported equal to false generally means that trigger function
+ // is not defined (or not aware of any indexes). In this case,
+ // watchers filters should generally also don't generate any
+ // trigger values, but can cause problems in case of some
+ // misconfiguration. Thus we paranoidly leave this branch.
+
+ // Iterate over watchers interested in exact values for all values.
+ for _, watchers := range c.watchers.valueWatchers {
+ for _, watcher := range watchers {
+ watcher.add(event, c.dispatchTimeoutBudget)
+ }
+ }
+ }
+}
+
+func (c *Cacher) terminateAllWatchers() {
+ c.Lock()
+ defer c.Unlock()
+ c.watchers.terminateAll(c.objectType)
+}
+
+func (c *Cacher) isStopped() bool {
+ c.stopLock.RLock()
+ defer c.stopLock.RUnlock()
+ return c.stopped
+}
+
+func (c *Cacher) Stop() {
+ // avoid stopping twice (note: cachers are shared with subresources)
+ if c.isStopped() {
+ return
+ }
+ c.stopLock.Lock()
+ if c.stopped {
+ c.stopLock.Unlock()
+ return
+ }
+ c.stopped = true
+ c.stopLock.Unlock()
+ close(c.stopCh)
+ c.stopWg.Wait()
+}
+
+func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
+ return func(lock bool) {
+ if lock {
+ c.Lock()
+ defer c.Unlock()
+ } else {
+ // false is currently passed only if we are forcing watcher to close due
+ // to its unresponsiveness and blocking other watchers.
+ // TODO: Get this information in cleaner way.
+ glog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
+ }
+ // It's possible that the watcher is already not in the structure (e.g. in case of
+ // simultaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
+ c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
+ }
+}
+
+func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc {
+ filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
+ if !hasPathPrefix(objKey, key) {
+ return false
+ }
+ return p.MatchesObjectAttributes(label, field, uninitialized)
+ }
+ return filterFunc
+}
+
+// Returns resource version to which the underlying cache is synced.
+func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
+ c.ready.wait()
+
+ resourceVersion := c.reflector.LastSyncResourceVersion()
+ return c.versioner.ParseListResourceVersion(resourceVersion)
+}
+
+// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
+type cacherListerWatcher struct {
+ storage Interface
+ resourcePrefix string
+ newListFunc func() runtime.Object
+}
+
+func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
+ return &cacherListerWatcher{
+ storage: storage,
+ resourcePrefix: resourcePrefix,
+ newListFunc: newListFunc,
+ }
+}
+
+// Implements cache.ListerWatcher interface.
+func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
+ list := lw.newListFunc()
+ if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
+ return nil, err
+ }
+ return list, nil
+}
+
+// Implements cache.ListerWatcher interface.
+func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
+ return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
+}
+
+// errWatcher implements watch.Interface to return a single error
+type errWatcher struct {
+ result chan watch.Event
+}
+
+func newErrWatcher(err error) *errWatcher {
+ // Create an error event
+ errEvent := watch.Event{Type: watch.Error}
+ switch err := err.(type) {
+ case runtime.Object:
+ errEvent.Object = err
+ case *errors.StatusError:
+ errEvent.Object = &err.ErrStatus
+ default:
+ errEvent.Object = &metav1.Status{
+ Status: metav1.StatusFailure,
+ Message: err.Error(),
+ Reason: metav1.StatusReasonInternalError,
+ Code: http.StatusInternalServerError,
+ }
+ }
+
+ // Create a watcher with room for a single event, populate it, and close the channel
+ watcher := &errWatcher{result: make(chan watch.Event, 1)}
+ watcher.result <- errEvent
+ close(watcher.result)
+
+ return watcher
+}
+
+// Implements watch.Interface.
+func (c *errWatcher) ResultChan() <-chan watch.Event {
+ return c.result
+}
+
+// Implements watch.Interface.
+func (c *errWatcher) Stop() {
+ // no-op
+}
+
+// cachWatcher implements watch.Interface
+type cacheWatcher struct {
+ sync.Mutex
+ input chan *watchCacheEvent
+ result chan watch.Event
+ done chan struct{}
+ filter filterWithAttrsFunc
+ stopped bool
+ forget func(bool)
+ versioner Versioner
+}
+
+func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {
+ watcher := &cacheWatcher{
+ input: make(chan *watchCacheEvent, chanSize),
+ result: make(chan watch.Event, chanSize),
+ done: make(chan struct{}),
+ filter: filter,
+ stopped: false,
+ forget: forget,
+ versioner: versioner,
+ }
+ go watcher.process(initEvents, resourceVersion)
+ return watcher
+}
+
+// Implements watch.Interface.
+func (c *cacheWatcher) ResultChan() <-chan watch.Event {
+ return c.result
+}
+
+// Implements watch.Interface.
+func (c *cacheWatcher) Stop() {
+ c.forget(true)
+ c.stop()
+}
+
+func (c *cacheWatcher) stop() {
+ c.Lock()
+ defer c.Unlock()
+ if !c.stopped {
+ c.stopped = true
+ close(c.done)
+ close(c.input)
+ }
+}
+
+var timerPool sync.Pool
+
+func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
+ // Try to send the event immediately, without blocking.
+ select {
+ case c.input <- event:
+ return
+ default:
+ }
+
+ // OK, block sending, but only for up to <timeout>.
+ // cacheWatcher.add is called very often, so arrange
+ // to reuse timers instead of constantly allocating.
+ startTime := time.Now()
+ timeout := budget.takeAvailable()
+
+ t, ok := timerPool.Get().(*time.Timer)
+ if ok {
+ t.Reset(timeout)
+ } else {
+ t = time.NewTimer(timeout)
+ }
+ defer timerPool.Put(t)
+
+ select {
+ case c.input <- event:
+ stopped := t.Stop()
+ if !stopped {
+ // Consume triggered (but not yet received) timer event
+ // so that future reuse does not get a spurious timeout.
+ <-t.C
+ }
+ case <-t.C:
+ // This means that we couldn't send event to that watcher.
+ // Since we don't want to block on it infinitely,
+ // we simply terminate it.
+ c.forget(false)
+ c.stop()
+ }
+
+ budget.returnUnused(timeout - time.Since(startTime))
+}
+
+// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
+func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
+ curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
+ oldObjPasses := false
+ if event.PrevObject != nil {
+ oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
+ }
+ if !curObjPasses && !oldObjPasses {
+ // Watcher is not interested in that object.
+ return
+ }
+
+ var watchEvent watch.Event
+ switch {
+ case curObjPasses && !oldObjPasses:
+ watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
+ case curObjPasses && oldObjPasses:
+ watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
+ case !curObjPasses && oldObjPasses:
+ // return a delete event with the previous object content, but with the event's resource version
+ oldObj := event.PrevObject.DeepCopyObject()
+ if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
+ }
+ watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
+ }
+
+ // We need to ensure that if we put event X to the c.result, all
+ // previous events were already put into it before, no matter whether
+ // c.done is close or not.
+ // Thus we cannot simply select from c.done and c.result and this
+ // would give us non-determinism.
+ // At the same time, we don't want to block infinitely on putting
+ // to c.result, when c.done is already closed.
+
+ // This ensures that with c.done already close, we at most once go
+ // into the next select after this. With that, no matter which
+ // statement we choose there, we will deliver only consecutive
+ // events.
+ select {
+ case <-c.done:
+ return
+ default:
+ }
+
+ select {
+ case c.result <- watchEvent:
+ case <-c.done:
+ }
+}
+
+func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
+ defer utilruntime.HandleCrash()
+
+ // Check how long we are processing initEvents.
+ // As long as these are not processed, we are not processing
+ // any incoming events, so if it takes long, we may actually
+ // block all watchers for some time.
+ // TODO: From the logs it seems that there happens processing
+ // times even up to 1s which is very long. However, this doesn't
+ // depend that much on the number of initEvents. E.g. from the
+ // 2000-node Kubemark run we have logs like this, e.g.:
+ // ... processing 13862 initEvents took 66.808689ms
+ // ... processing 14040 initEvents took 993.532539ms
+ // We should understand what is blocking us in those cases (e.g.
+ // is it lack of CPU, network, or sth else) and potentially
+ // consider increase size of result buffer in those cases.
+ const initProcessThreshold = 500 * time.Millisecond
+ startTime := time.Now()
+ for _, event := range initEvents {
+ c.sendWatchCacheEvent(event)
+ }
+ processingTime := time.Since(startTime)
+ if processingTime > initProcessThreshold {
+ objType := "<null>"
+ if len(initEvents) > 0 {
+ objType = reflect.TypeOf(initEvents[0].Object).String()
+ }
+ glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
+ }
+
+ defer close(c.result)
+ defer c.Stop()
+ for {
+ event, ok := <-c.input
+ if !ok {
+ return
+ }
+ // only send events newer than resourceVersion
+ if event.ResourceVersion > resourceVersion {
+ c.sendWatchCacheEvent(event)
+ }
+ }
+}
+
+type ready struct {
+ ok bool
+ c *sync.Cond
+}
+
+func newReady() *ready {
+ return &ready{c: sync.NewCond(&sync.Mutex{})}
+}
+
+func (r *ready) wait() {
+ r.c.L.Lock()
+ for !r.ok {
+ r.c.Wait()
+ }
+ r.c.L.Unlock()
+}
+
+// TODO: Make check() function more sophisticated, in particular
+// allow it to behave as "waitWithTimeout".
+func (r *ready) check() bool {
+ r.c.L.Lock()
+ defer r.c.L.Unlock()
+ return r.ok
+}
+
+func (r *ready) set(ok bool) {
+ r.c.L.Lock()
+ defer r.c.L.Unlock()
+ r.ok = ok
+ r.c.Broadcast()
+}