git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server
subrepo:
subdir: "metrics-server"
merged: "92d8412"
upstream:
origin: "https://github.com/kubernetes-incubator/metrics-server.git"
branch: "master"
commit: "92d8412"
git-subrepo:
version: "0.4.0"
origin: "???"
commit: "???"
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/OWNERS b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/OWNERS
new file mode 100644
index 0000000..b7bff4f
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/OWNERS
@@ -0,0 +1,30 @@
+approvers:
+- lavalamp
+- liggitt
+- timothysc
+- wojtek-t
+- xiang90
+reviewers:
+- lavalamp
+- smarterclayton
+- wojtek-t
+- deads2k
+- caesarxuchao
+- mikedanese
+- liggitt
+- ncdc
+- tallclair
+- timothysc
+- hongchaodeng
+- krousey
+- fgrzadkowski
+- xiang90
+- mml
+- ingvagabund
+- resouer
+- mbohlool
+- lixiaobing10051267
+- mqliang
+- feihujiang
+- rrati
+- enj
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/cacher.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/cacher.go
new file mode 100644
index 0000000..ab4ab04
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/cacher.go
@@ -0,0 +1,989 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "reflect"
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/conversion"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/apiserver/pkg/features"
+ utilfeature "k8s.io/apiserver/pkg/util/feature"
+ utiltrace "k8s.io/apiserver/pkg/util/trace"
+ "k8s.io/client-go/tools/cache"
+)
+
+// CacherConfig contains the configuration for a given Cache.
+type CacherConfig struct {
+ // Maximum size of the history cached in memory.
+ CacheCapacity int
+
+ // An underlying storage.Interface.
+ Storage Interface
+
+ // An underlying storage.Versioner.
+ Versioner Versioner
+
+ // The Cache will be caching objects of a given Type and assumes that they
+ // are all stored under ResourcePrefix directory in the underlying database.
+ Type interface{}
+ ResourcePrefix string
+
+ // KeyFunc is used to get a key in the underlying storage for a given object.
+ KeyFunc func(runtime.Object) (string, error)
+
+ // GetAttrsFunc is used to get object labels, fields, and the uninitialized bool
+ GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error)
+
+ // TriggerPublisherFunc is used for optimizing amount of watchers that
+ // needs to process an incoming event.
+ TriggerPublisherFunc TriggerPublisherFunc
+
+ // NewList is a function that creates new empty object storing a list of
+ // objects of type Type.
+ NewListFunc func() runtime.Object
+
+ Codec runtime.Codec
+}
+
+type watchersMap map[int]*cacheWatcher
+
+func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
+ wm[number] = w
+}
+
+func (wm watchersMap) deleteWatcher(number int) {
+ delete(wm, number)
+}
+
+func (wm watchersMap) terminateAll() {
+ for key, watcher := range wm {
+ delete(wm, key)
+ watcher.stop()
+ }
+}
+
+type indexedWatchers struct {
+ allWatchers watchersMap
+ valueWatchers map[string]watchersMap
+}
+
+func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
+ if supported {
+ if _, ok := i.valueWatchers[value]; !ok {
+ i.valueWatchers[value] = watchersMap{}
+ }
+ i.valueWatchers[value].addWatcher(w, number)
+ } else {
+ i.allWatchers.addWatcher(w, number)
+ }
+}
+
+func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
+ if supported {
+ i.valueWatchers[value].deleteWatcher(number)
+ if len(i.valueWatchers[value]) == 0 {
+ delete(i.valueWatchers, value)
+ }
+ } else {
+ i.allWatchers.deleteWatcher(number)
+ }
+}
+
+func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
+ if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
+ glog.Warningf("Terminating all watchers from cacher %v", objectType)
+ }
+ i.allWatchers.terminateAll()
+ for index, watchers := range i.valueWatchers {
+ watchers.terminateAll()
+ delete(i.valueWatchers, index)
+ }
+}
+
+type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool
+
+// Cacher is responsible for serving WATCH and LIST requests for a given
+// resource from its internal cache and updating its cache in the background
+// based on the underlying storage contents.
+// Cacher implements storage.Interface (although most of the calls are just
+// delegated to the underlying storage).
+type Cacher struct {
+ // HighWaterMarks for performance debugging.
+ // Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
+ // See: https://golang.org/pkg/sync/atomic/ for more information
+ incomingHWM HighWaterMark
+ // Incoming events that should be dispatched to watchers.
+ incoming chan watchCacheEvent
+
+ sync.RWMutex
+
+ // Before accessing the cacher's cache, wait for the ready to be ok.
+ // This is necessary to prevent users from accessing structures that are
+ // uninitialized or are being repopulated right now.
+ // ready needs to be set to false when the cacher is paused or stopped.
+ // ready needs to be set to true when the cacher is ready to use after
+ // initialization.
+ ready *ready
+
+ // Underlying storage.Interface.
+ storage Interface
+
+ // Expected type of objects in the underlying cache.
+ objectType reflect.Type
+
+ // "sliding window" of recent changes of objects and the current state.
+ watchCache *watchCache
+ reflector *cache.Reflector
+
+ // Versioner is used to handle resource versions.
+ versioner Versioner
+
+ // triggerFunc is used for optimizing amount of watchers that needs to process
+ // an incoming event.
+ triggerFunc TriggerPublisherFunc
+ // watchers is mapping from the value of trigger function that a
+ // watcher is interested into the watchers
+ watcherIdx int
+ watchers indexedWatchers
+
+ // Defines a time budget that can be spend on waiting for not-ready watchers
+ // while dispatching event before shutting them down.
+ dispatchTimeoutBudget *timeBudget
+
+ // Handling graceful termination.
+ stopLock sync.RWMutex
+ stopped bool
+ stopCh chan struct{}
+ stopWg sync.WaitGroup
+}
+
+// Create a new Cacher responsible for servicing WATCH and LIST requests from
+// its internal cache and updating its cache in the background based on the
+// given configuration.
+func NewCacherFromConfig(config CacherConfig) *Cacher {
+ watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
+ listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
+ reflectorName := "storage/cacher.go:" + config.ResourcePrefix
+
+ // Give this error when it is constructed rather than when you get the
+ // first watch item, because it's much easier to track down that way.
+ if obj, ok := config.Type.(runtime.Object); ok {
+ if err := runtime.CheckCodec(config.Codec, obj); err != nil {
+ panic("storage codec doesn't seem to match given type: " + err.Error())
+ }
+ }
+
+ stopCh := make(chan struct{})
+ cacher := &Cacher{
+ ready: newReady(),
+ storage: config.Storage,
+ objectType: reflect.TypeOf(config.Type),
+ watchCache: watchCache,
+ reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
+ versioner: config.Versioner,
+ triggerFunc: config.TriggerPublisherFunc,
+ watcherIdx: 0,
+ watchers: indexedWatchers{
+ allWatchers: make(map[int]*cacheWatcher),
+ valueWatchers: make(map[string]watchersMap),
+ },
+ // TODO: Figure out the correct value for the buffer size.
+ incoming: make(chan watchCacheEvent, 100),
+ dispatchTimeoutBudget: newTimeBudget(stopCh),
+ // We need to (potentially) stop both:
+ // - wait.Until go-routine
+ // - reflector.ListAndWatch
+ // and there are no guarantees on the order that they will stop.
+ // So we will be simply closing the channel, and synchronizing on the WaitGroup.
+ stopCh: stopCh,
+ }
+ watchCache.SetOnEvent(cacher.processEvent)
+ go cacher.dispatchEvents()
+
+ cacher.stopWg.Add(1)
+ go func() {
+ defer cacher.stopWg.Done()
+ wait.Until(
+ func() {
+ if !cacher.isStopped() {
+ cacher.startCaching(stopCh)
+ }
+ }, time.Second, stopCh,
+ )
+ }()
+ return cacher
+}
+
+func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
+ // The 'usable' lock is always 'RLock'able when it is safe to use the cache.
+ // It is safe to use the cache after a successful list until a disconnection.
+ // We start with usable (write) locked. The below OnReplace function will
+ // unlock it after a successful list. The below defer will then re-lock
+ // it when this function exits (always due to disconnection), only if
+ // we actually got a successful list. This cycle will repeat as needed.
+ successfulList := false
+ c.watchCache.SetOnReplace(func() {
+ successfulList = true
+ c.ready.set(true)
+ })
+ defer func() {
+ if successfulList {
+ c.ready.set(false)
+ }
+ }()
+
+ c.terminateAllWatchers()
+ // Note that since onReplace may be not called due to errors, we explicitly
+ // need to retry it on errors under lock.
+ // Also note that startCaching is called in a loop, so there's no need
+ // to have another loop here.
+ if err := c.reflector.ListAndWatch(stopChannel); err != nil {
+ glog.Errorf("unexpected ListAndWatch error: %v", err)
+ }
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Versioner() Versioner {
+ return c.storage.Versioner()
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
+ return c.storage.Create(ctx, key, obj, out, ttl)
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
+ return c.storage.Delete(ctx, key, out, preconditions)
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
+ watchRV, err := c.versioner.ParseWatchResourceVersion(resourceVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ c.ready.wait()
+
+ // We explicitly use thread unsafe version and do locking ourself to ensure that
+ // no new events will be processed in the meantime. The watchCache will be unlocked
+ // on return from this function.
+ // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
+ // underlying watchCache is calling processEvent under its lock.
+ c.watchCache.RLock()
+ defer c.watchCache.RUnlock()
+ initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
+ if err != nil {
+ // To match the uncached watch implementation, once we have passed authn/authz/admission,
+ // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
+ // rather than a directly returned error.
+ return newErrWatcher(err), nil
+ }
+
+ triggerValue, triggerSupported := "", false
+ // TODO: Currently we assume that in a given Cacher object, any <predicate> that is
+ // passed here is aware of exactly the same trigger (at most one).
+ // Thus, either 0 or 1 values will be returned.
+ if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
+ triggerValue, triggerSupported = matchValues[0].Value, true
+ }
+
+ // If there is triggerFunc defined, but triggerSupported is false,
+ // we can't narrow the amount of events significantly at this point.
+ //
+ // That said, currently triggerFunc is defined only for Pods and Nodes,
+ // and there is only constant number of watchers for which triggerSupported
+ // is false (excluding those issues explicitly by users).
+ // Thus, to reduce the risk of those watchers blocking all watchers of a
+ // given resource in the system, we increase the sizes of buffers for them.
+ chanSize := 10
+ if c.triggerFunc != nil && !triggerSupported {
+ // TODO: We should tune this value and ideally make it dependent on the
+ // number of objects of a given type and/or their churn.
+ chanSize = 1000
+ }
+
+ c.Lock()
+ defer c.Unlock()
+ forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
+ watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner)
+
+ c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
+ c.watcherIdx++
+ return watcher, nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
+ return c.Watch(ctx, key, resourceVersion, pred)
+}
+
+// Implements storage.Interface.
+func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
+ if resourceVersion == "" {
+ // If resourceVersion is not specified, serve it from underlying
+ // storage (for backward compatibility).
+ return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
+ }
+
+ // If resourceVersion is specified, serve it from cache.
+ // It's guaranteed that the returned value is at least that
+ // fresh as the given resourceVersion.
+ getRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return err
+ }
+
+ if getRV == 0 && !c.ready.check() {
+ // If Cacher is not yet initialized and we don't require any specific
+ // minimal resource version, simply forward the request to storage.
+ return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
+ }
+
+ // Do not create a trace - it's not for free and there are tons
+ // of Get requests. We can add it if it will be really needed.
+ c.ready.wait()
+
+ objVal, err := conversion.EnforcePtr(objPtr)
+ if err != nil {
+ return err
+ }
+
+ obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
+ if err != nil {
+ return err
+ }
+
+ if exists {
+ elem, ok := obj.(*storeElement)
+ if !ok {
+ return fmt.Errorf("non *storeElement returned from storage: %v", obj)
+ }
+ objVal.Set(reflect.ValueOf(elem.Object).Elem())
+ } else {
+ objVal.Set(reflect.Zero(objVal.Type()))
+ if !ignoreNotFound {
+ return NewKeyNotFoundError(key, int64(readResourceVersion))
+ }
+ }
+ return nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
+ pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
+ if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
+ // If resourceVersion is not specified, serve it from underlying
+ // storage (for backward compatibility). If a continuation or limit is
+ // requested, serve it from the underlying storage as well.
+ return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ // If resourceVersion is specified, serve it from cache.
+ // It's guaranteed that the returned value is at least that
+ // fresh as the given resourceVersion.
+ listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return err
+ }
+
+ if listRV == 0 && !c.ready.check() {
+ // If Cacher is not yet initialized and we don't require any specific
+ // minimal resource version, simply forward the request to storage.
+ return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
+ defer trace.LogIfLong(500 * time.Millisecond)
+
+ c.ready.wait()
+ trace.Step("Ready")
+
+ // List elements with at least 'listRV' from cache.
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ listVal, err := conversion.EnforcePtr(listPtr)
+ if err != nil || listVal.Kind() != reflect.Slice {
+ return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
+ }
+ filter := filterWithAttrsFunction(key, pred)
+
+ obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
+ if err != nil {
+ return err
+ }
+ trace.Step("Got from cache")
+
+ if exists {
+ elem, ok := obj.(*storeElement)
+ if !ok {
+ return fmt.Errorf("non *storeElement returned from storage: %v", obj)
+ }
+ if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
+ listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
+ }
+ }
+ if c.versioner != nil {
+ if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
+ pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
+ hasContinuation := pagingEnabled && len(pred.Continue) > 0
+ hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
+ if resourceVersion == "" || hasContinuation || hasLimit {
+ // If resourceVersion is not specified, serve it from underlying
+ // storage (for backward compatibility). If a continuation is
+ // requested, serve it from the underlying storage as well.
+ // Limits are only sent to storage when resourceVersion is non-zero
+ // since the watch cache isn't able to perform continuations, and
+ // limits are ignored when resource version is zero.
+ return c.storage.List(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ // If resourceVersion is specified, serve it from cache.
+ // It's guaranteed that the returned value is at least that
+ // fresh as the given resourceVersion.
+ listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return err
+ }
+
+ if listRV == 0 && !c.ready.check() {
+ // If Cacher is not yet initialized and we don't require any specific
+ // minimal resource version, simply forward the request to storage.
+ return c.storage.List(ctx, key, resourceVersion, pred, listObj)
+ }
+
+ trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
+ defer trace.LogIfLong(500 * time.Millisecond)
+
+ c.ready.wait()
+ trace.Step("Ready")
+
+ // List elements with at least 'listRV' from cache.
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ listVal, err := conversion.EnforcePtr(listPtr)
+ if err != nil || listVal.Kind() != reflect.Slice {
+ return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
+ }
+ filter := filterWithAttrsFunction(key, pred)
+
+ objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
+ if err != nil {
+ return err
+ }
+ trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
+ if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
+ // Resize the slice appropriately, since we already know that none
+ // of the elements will be filtered out.
+ listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
+ trace.Step("Resized result")
+ }
+ for _, obj := range objs {
+ elem, ok := obj.(*storeElement)
+ if !ok {
+ return fmt.Errorf("non *storeElement returned from storage: %v", obj)
+ }
+ if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
+ listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
+ }
+ }
+ trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
+ if c.versioner != nil {
+ if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Implements storage.Interface.
+func (c *Cacher) GuaranteedUpdate(
+ ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
+ preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
+ // Ignore the suggestion and try to pass down the current version of the object
+ // read from cache.
+ if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
+ glog.Errorf("GetByKey returned error: %v", err)
+ } else if exists {
+ currObj := elem.(*storeElement).Object.DeepCopyObject()
+ return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
+ }
+ // If we couldn't get the object, fallback to no-suggestion.
+ return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
+}
+
+func (c *Cacher) Count(pathPrefix string) (int64, error) {
+ return c.storage.Count(pathPrefix)
+}
+
+func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
+ // TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
+ // is aware of exactly the same trigger (at most one). Thus calling:
+ // c.triggerFunc(<some object>)
+ // can return only 0 or 1 values.
+ // That means, that triggerValues itself may return up to 2 different values.
+ if c.triggerFunc == nil {
+ return nil, false
+ }
+ result := make([]string, 0, 2)
+ matchValues := c.triggerFunc(event.Object)
+ if len(matchValues) > 0 {
+ result = append(result, matchValues[0].Value)
+ }
+ if event.PrevObject == nil {
+ return result, len(result) > 0
+ }
+ prevMatchValues := c.triggerFunc(event.PrevObject)
+ if len(prevMatchValues) > 0 {
+ if len(result) == 0 || result[0] != prevMatchValues[0].Value {
+ result = append(result, prevMatchValues[0].Value)
+ }
+ }
+ return result, len(result) > 0
+}
+
+func (c *Cacher) processEvent(event *watchCacheEvent) {
+ if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
+ // Monitor if this gets backed up, and how much.
+ glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
+ }
+ c.incoming <- *event
+}
+
+func (c *Cacher) dispatchEvents() {
+ for {
+ select {
+ case event, ok := <-c.incoming:
+ if !ok {
+ return
+ }
+ c.dispatchEvent(&event)
+ case <-c.stopCh:
+ return
+ }
+ }
+}
+
+func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
+ triggerValues, supported := c.triggerValues(event)
+
+ c.Lock()
+ defer c.Unlock()
+ // Iterate over "allWatchers" no matter what the trigger function is.
+ for _, watcher := range c.watchers.allWatchers {
+ watcher.add(event, c.dispatchTimeoutBudget)
+ }
+ if supported {
+ // Iterate over watchers interested in the given values of the trigger.
+ for _, triggerValue := range triggerValues {
+ for _, watcher := range c.watchers.valueWatchers[triggerValue] {
+ watcher.add(event, c.dispatchTimeoutBudget)
+ }
+ }
+ } else {
+ // supported equal to false generally means that trigger function
+ // is not defined (or not aware of any indexes). In this case,
+ // watchers filters should generally also don't generate any
+ // trigger values, but can cause problems in case of some
+ // misconfiguration. Thus we paranoidly leave this branch.
+
+ // Iterate over watchers interested in exact values for all values.
+ for _, watchers := range c.watchers.valueWatchers {
+ for _, watcher := range watchers {
+ watcher.add(event, c.dispatchTimeoutBudget)
+ }
+ }
+ }
+}
+
+func (c *Cacher) terminateAllWatchers() {
+ c.Lock()
+ defer c.Unlock()
+ c.watchers.terminateAll(c.objectType)
+}
+
+func (c *Cacher) isStopped() bool {
+ c.stopLock.RLock()
+ defer c.stopLock.RUnlock()
+ return c.stopped
+}
+
+func (c *Cacher) Stop() {
+ // avoid stopping twice (note: cachers are shared with subresources)
+ if c.isStopped() {
+ return
+ }
+ c.stopLock.Lock()
+ if c.stopped {
+ c.stopLock.Unlock()
+ return
+ }
+ c.stopped = true
+ c.stopLock.Unlock()
+ close(c.stopCh)
+ c.stopWg.Wait()
+}
+
+func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
+ return func(lock bool) {
+ if lock {
+ c.Lock()
+ defer c.Unlock()
+ } else {
+ // false is currently passed only if we are forcing watcher to close due
+ // to its unresponsiveness and blocking other watchers.
+ // TODO: Get this information in cleaner way.
+ glog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
+ }
+ // It's possible that the watcher is already not in the structure (e.g. in case of
+ // simultaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
+ c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
+ }
+}
+
+func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc {
+ filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
+ if !hasPathPrefix(objKey, key) {
+ return false
+ }
+ return p.MatchesObjectAttributes(label, field, uninitialized)
+ }
+ return filterFunc
+}
+
+// Returns resource version to which the underlying cache is synced.
+func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
+ c.ready.wait()
+
+ resourceVersion := c.reflector.LastSyncResourceVersion()
+ return c.versioner.ParseListResourceVersion(resourceVersion)
+}
+
+// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
+type cacherListerWatcher struct {
+ storage Interface
+ resourcePrefix string
+ newListFunc func() runtime.Object
+}
+
+func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
+ return &cacherListerWatcher{
+ storage: storage,
+ resourcePrefix: resourcePrefix,
+ newListFunc: newListFunc,
+ }
+}
+
+// Implements cache.ListerWatcher interface.
+func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
+ list := lw.newListFunc()
+ if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
+ return nil, err
+ }
+ return list, nil
+}
+
+// Implements cache.ListerWatcher interface.
+func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
+ return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
+}
+
+// errWatcher implements watch.Interface to return a single error
+type errWatcher struct {
+ result chan watch.Event
+}
+
+func newErrWatcher(err error) *errWatcher {
+ // Create an error event
+ errEvent := watch.Event{Type: watch.Error}
+ switch err := err.(type) {
+ case runtime.Object:
+ errEvent.Object = err
+ case *errors.StatusError:
+ errEvent.Object = &err.ErrStatus
+ default:
+ errEvent.Object = &metav1.Status{
+ Status: metav1.StatusFailure,
+ Message: err.Error(),
+ Reason: metav1.StatusReasonInternalError,
+ Code: http.StatusInternalServerError,
+ }
+ }
+
+ // Create a watcher with room for a single event, populate it, and close the channel
+ watcher := &errWatcher{result: make(chan watch.Event, 1)}
+ watcher.result <- errEvent
+ close(watcher.result)
+
+ return watcher
+}
+
+// Implements watch.Interface.
+func (c *errWatcher) ResultChan() <-chan watch.Event {
+ return c.result
+}
+
+// Implements watch.Interface.
+func (c *errWatcher) Stop() {
+ // no-op
+}
+
+// cachWatcher implements watch.Interface
+type cacheWatcher struct {
+ sync.Mutex
+ input chan *watchCacheEvent
+ result chan watch.Event
+ done chan struct{}
+ filter filterWithAttrsFunc
+ stopped bool
+ forget func(bool)
+ versioner Versioner
+}
+
+func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {
+ watcher := &cacheWatcher{
+ input: make(chan *watchCacheEvent, chanSize),
+ result: make(chan watch.Event, chanSize),
+ done: make(chan struct{}),
+ filter: filter,
+ stopped: false,
+ forget: forget,
+ versioner: versioner,
+ }
+ go watcher.process(initEvents, resourceVersion)
+ return watcher
+}
+
+// Implements watch.Interface.
+func (c *cacheWatcher) ResultChan() <-chan watch.Event {
+ return c.result
+}
+
+// Implements watch.Interface.
+func (c *cacheWatcher) Stop() {
+ c.forget(true)
+ c.stop()
+}
+
+func (c *cacheWatcher) stop() {
+ c.Lock()
+ defer c.Unlock()
+ if !c.stopped {
+ c.stopped = true
+ close(c.done)
+ close(c.input)
+ }
+}
+
+var timerPool sync.Pool
+
+func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
+ // Try to send the event immediately, without blocking.
+ select {
+ case c.input <- event:
+ return
+ default:
+ }
+
+ // OK, block sending, but only for up to <timeout>.
+ // cacheWatcher.add is called very often, so arrange
+ // to reuse timers instead of constantly allocating.
+ startTime := time.Now()
+ timeout := budget.takeAvailable()
+
+ t, ok := timerPool.Get().(*time.Timer)
+ if ok {
+ t.Reset(timeout)
+ } else {
+ t = time.NewTimer(timeout)
+ }
+ defer timerPool.Put(t)
+
+ select {
+ case c.input <- event:
+ stopped := t.Stop()
+ if !stopped {
+ // Consume triggered (but not yet received) timer event
+ // so that future reuse does not get a spurious timeout.
+ <-t.C
+ }
+ case <-t.C:
+ // This means that we couldn't send event to that watcher.
+ // Since we don't want to block on it infinitely,
+ // we simply terminate it.
+ c.forget(false)
+ c.stop()
+ }
+
+ budget.returnUnused(timeout - time.Since(startTime))
+}
+
+// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
+func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
+ curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
+ oldObjPasses := false
+ if event.PrevObject != nil {
+ oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
+ }
+ if !curObjPasses && !oldObjPasses {
+ // Watcher is not interested in that object.
+ return
+ }
+
+ var watchEvent watch.Event
+ switch {
+ case curObjPasses && !oldObjPasses:
+ watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
+ case curObjPasses && oldObjPasses:
+ watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
+ case !curObjPasses && oldObjPasses:
+ // return a delete event with the previous object content, but with the event's resource version
+ oldObj := event.PrevObject.DeepCopyObject()
+ if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
+ }
+ watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
+ }
+
+ // We need to ensure that if we put event X to the c.result, all
+ // previous events were already put into it before, no matter whether
+ // c.done is close or not.
+ // Thus we cannot simply select from c.done and c.result and this
+ // would give us non-determinism.
+ // At the same time, we don't want to block infinitely on putting
+ // to c.result, when c.done is already closed.
+
+ // This ensures that with c.done already close, we at most once go
+ // into the next select after this. With that, no matter which
+ // statement we choose there, we will deliver only consecutive
+ // events.
+ select {
+ case <-c.done:
+ return
+ default:
+ }
+
+ select {
+ case c.result <- watchEvent:
+ case <-c.done:
+ }
+}
+
+func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
+ defer utilruntime.HandleCrash()
+
+ // Check how long we are processing initEvents.
+ // As long as these are not processed, we are not processing
+ // any incoming events, so if it takes long, we may actually
+ // block all watchers for some time.
+ // TODO: From the logs it seems that there happens processing
+ // times even up to 1s which is very long. However, this doesn't
+ // depend that much on the number of initEvents. E.g. from the
+ // 2000-node Kubemark run we have logs like this, e.g.:
+ // ... processing 13862 initEvents took 66.808689ms
+ // ... processing 14040 initEvents took 993.532539ms
+ // We should understand what is blocking us in those cases (e.g.
+ // is it lack of CPU, network, or sth else) and potentially
+ // consider increase size of result buffer in those cases.
+ const initProcessThreshold = 500 * time.Millisecond
+ startTime := time.Now()
+ for _, event := range initEvents {
+ c.sendWatchCacheEvent(event)
+ }
+ processingTime := time.Since(startTime)
+ if processingTime > initProcessThreshold {
+ objType := "<null>"
+ if len(initEvents) > 0 {
+ objType = reflect.TypeOf(initEvents[0].Object).String()
+ }
+ glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
+ }
+
+ defer close(c.result)
+ defer c.Stop()
+ for {
+ event, ok := <-c.input
+ if !ok {
+ return
+ }
+ // only send events newer than resourceVersion
+ if event.ResourceVersion > resourceVersion {
+ c.sendWatchCacheEvent(event)
+ }
+ }
+}
+
+type ready struct {
+ ok bool
+ c *sync.Cond
+}
+
+func newReady() *ready {
+ return &ready{c: sync.NewCond(&sync.Mutex{})}
+}
+
+func (r *ready) wait() {
+ r.c.L.Lock()
+ for !r.ok {
+ r.c.Wait()
+ }
+ r.c.L.Unlock()
+}
+
+// TODO: Make check() function more sophisticated, in particular
+// allow it to behave as "waitWithTimeout".
+func (r *ready) check() bool {
+ r.c.L.Lock()
+ defer r.c.L.Unlock()
+ return r.ok
+}
+
+func (r *ready) set(ok bool) {
+ r.c.L.Lock()
+ defer r.c.L.Unlock()
+ r.ok = ok
+ r.c.Broadcast()
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/doc.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/doc.go
new file mode 100644
index 0000000..fbdd944
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Interfaces for database-related operations.
+package storage // import "k8s.io/apiserver/pkg/storage"
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors.go
new file mode 100644
index 0000000..a4d134a
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors.go
@@ -0,0 +1,170 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "fmt"
+
+ "k8s.io/apimachinery/pkg/util/validation/field"
+)
+
+const (
+ ErrCodeKeyNotFound int = iota + 1
+ ErrCodeKeyExists
+ ErrCodeResourceVersionConflicts
+ ErrCodeInvalidObj
+ ErrCodeUnreachable
+)
+
+var errCodeToMessage = map[int]string{
+ ErrCodeKeyNotFound: "key not found",
+ ErrCodeKeyExists: "key exists",
+ ErrCodeResourceVersionConflicts: "resource version conflicts",
+ ErrCodeInvalidObj: "invalid object",
+ ErrCodeUnreachable: "server unreachable",
+}
+
+func NewKeyNotFoundError(key string, rv int64) *StorageError {
+ return &StorageError{
+ Code: ErrCodeKeyNotFound,
+ Key: key,
+ ResourceVersion: rv,
+ }
+}
+
+func NewKeyExistsError(key string, rv int64) *StorageError {
+ return &StorageError{
+ Code: ErrCodeKeyExists,
+ Key: key,
+ ResourceVersion: rv,
+ }
+}
+
+func NewResourceVersionConflictsError(key string, rv int64) *StorageError {
+ return &StorageError{
+ Code: ErrCodeResourceVersionConflicts,
+ Key: key,
+ ResourceVersion: rv,
+ }
+}
+
+func NewUnreachableError(key string, rv int64) *StorageError {
+ return &StorageError{
+ Code: ErrCodeUnreachable,
+ Key: key,
+ ResourceVersion: rv,
+ }
+}
+
+func NewInvalidObjError(key, msg string) *StorageError {
+ return &StorageError{
+ Code: ErrCodeInvalidObj,
+ Key: key,
+ AdditionalErrorMsg: msg,
+ }
+}
+
+type StorageError struct {
+ Code int
+ Key string
+ ResourceVersion int64
+ AdditionalErrorMsg string
+}
+
+func (e *StorageError) Error() string {
+ return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s",
+ errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg)
+}
+
+// IsNotFound returns true if and only if err is "key" not found error.
+func IsNotFound(err error) bool {
+ return isErrCode(err, ErrCodeKeyNotFound)
+}
+
+// IsNodeExist returns true if and only if err is an node already exist error.
+func IsNodeExist(err error) bool {
+ return isErrCode(err, ErrCodeKeyExists)
+}
+
+// IsUnreachable returns true if and only if err indicates the server could not be reached.
+func IsUnreachable(err error) bool {
+ return isErrCode(err, ErrCodeUnreachable)
+}
+
+// IsConflict returns true if and only if err is a write conflict.
+func IsConflict(err error) bool {
+ return isErrCode(err, ErrCodeResourceVersionConflicts)
+}
+
+// IsInvalidObj returns true if and only if err is invalid error
+func IsInvalidObj(err error) bool {
+ return isErrCode(err, ErrCodeInvalidObj)
+}
+
+func isErrCode(err error, code int) bool {
+ if err == nil {
+ return false
+ }
+ if e, ok := err.(*StorageError); ok {
+ return e.Code == code
+ }
+ return false
+}
+
+// InvalidError is generated when an error caused by invalid API object occurs
+// in the storage package.
+type InvalidError struct {
+ Errs field.ErrorList
+}
+
+func (e InvalidError) Error() string {
+ return e.Errs.ToAggregate().Error()
+}
+
+// IsInvalidError returns true if and only if err is an InvalidError.
+func IsInvalidError(err error) bool {
+ _, ok := err.(InvalidError)
+ return ok
+}
+
+func NewInvalidError(errors field.ErrorList) InvalidError {
+ return InvalidError{errors}
+}
+
+// InternalError is generated when an error occurs in the storage package, i.e.,
+// not from the underlying storage backend (e.g., etcd).
+type InternalError struct {
+ Reason string
+}
+
+func (e InternalError) Error() string {
+ return e.Reason
+}
+
+// IsInternalError returns true if and only if err is an InternalError.
+func IsInternalError(err error) bool {
+ _, ok := err.(InternalError)
+ return ok
+}
+
+func NewInternalError(reason string) InternalError {
+ return InternalError{reason}
+}
+
+func NewInternalErrorf(format string, a ...interface{}) InternalError {
+ return InternalError{fmt.Sprintf(format, a)}
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors/doc.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors/doc.go
new file mode 100644
index 0000000..3d3150c
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package etcd provides conversion of etcd errors to API errors.
+package storage // import "k8s.io/apiserver/pkg/storage/errors"
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors/storage.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors/storage.go
new file mode 100644
index 0000000..fd3b35e
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/errors/storage.go
@@ -0,0 +1,116 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apiserver/pkg/storage"
+)
+
+// InterpretListError converts a generic error on a retrieval
+// operation into the appropriate API error.
+func InterpretListError(err error, qualifiedResource schema.GroupResource) error {
+ switch {
+ case storage.IsNotFound(err):
+ return errors.NewNotFound(qualifiedResource, "")
+ case storage.IsUnreachable(err):
+ return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
+ case storage.IsInternalError(err):
+ return errors.NewInternalError(err)
+ default:
+ return err
+ }
+}
+
+// InterpretGetError converts a generic error on a retrieval
+// operation into the appropriate API error.
+func InterpretGetError(err error, qualifiedResource schema.GroupResource, name string) error {
+ switch {
+ case storage.IsNotFound(err):
+ return errors.NewNotFound(qualifiedResource, name)
+ case storage.IsUnreachable(err):
+ return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level
+ case storage.IsInternalError(err):
+ return errors.NewInternalError(err)
+ default:
+ return err
+ }
+}
+
+// InterpretCreateError converts a generic error on a create
+// operation into the appropriate API error.
+func InterpretCreateError(err error, qualifiedResource schema.GroupResource, name string) error {
+ switch {
+ case storage.IsNodeExist(err):
+ return errors.NewAlreadyExists(qualifiedResource, name)
+ case storage.IsUnreachable(err):
+ return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level
+ case storage.IsInternalError(err):
+ return errors.NewInternalError(err)
+ default:
+ return err
+ }
+}
+
+// InterpretUpdateError converts a generic error on an update
+// operation into the appropriate API error.
+func InterpretUpdateError(err error, qualifiedResource schema.GroupResource, name string) error {
+ switch {
+ case storage.IsConflict(err), storage.IsNodeExist(err), storage.IsInvalidObj(err):
+ return errors.NewConflict(qualifiedResource, name, err)
+ case storage.IsUnreachable(err):
+ return errors.NewServerTimeout(qualifiedResource, "update", 2) // TODO: make configurable or handled at a higher level
+ case storage.IsNotFound(err):
+ return errors.NewNotFound(qualifiedResource, name)
+ case storage.IsInternalError(err):
+ return errors.NewInternalError(err)
+ default:
+ return err
+ }
+}
+
+// InterpretDeleteError converts a generic error on a delete
+// operation into the appropriate API error.
+func InterpretDeleteError(err error, qualifiedResource schema.GroupResource, name string) error {
+ switch {
+ case storage.IsNotFound(err):
+ return errors.NewNotFound(qualifiedResource, name)
+ case storage.IsUnreachable(err):
+ return errors.NewServerTimeout(qualifiedResource, "delete", 2) // TODO: make configurable or handled at a higher level
+ case storage.IsConflict(err), storage.IsNodeExist(err), storage.IsInvalidObj(err):
+ return errors.NewConflict(qualifiedResource, name, err)
+ case storage.IsInternalError(err):
+ return errors.NewInternalError(err)
+ default:
+ return err
+ }
+}
+
+// InterpretWatchError converts a generic error on a watch
+// operation into the appropriate API error.
+func InterpretWatchError(err error, resource schema.GroupResource, name string) error {
+ switch {
+ case storage.IsInvalidError(err):
+ invalidError, _ := err.(storage.InvalidError)
+ return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs)
+ case storage.IsInternalError(err):
+ return errors.NewInternalError(err)
+ default:
+ return err
+ }
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/OWNERS b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/OWNERS
new file mode 100755
index 0000000..a065498
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/OWNERS
@@ -0,0 +1,25 @@
+reviewers:
+- lavalamp
+- smarterclayton
+- wojtek-t
+- deads2k
+- derekwaynecarr
+- caesarxuchao
+- mikedanese
+- liggitt
+- davidopp
+- pmorie
+- luxas
+- janetkuo
+- roberthbailey
+- tallclair
+- timothysc
+- dims
+- hongchaodeng
+- krousey
+- fgrzadkowski
+- resouer
+- pweil-
+- mqliang
+- feihujiang
+- enj
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go
new file mode 100644
index 0000000..5534f9f
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go
@@ -0,0 +1,148 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd
+
+import (
+ "strconv"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/validation/field"
+ "k8s.io/apiserver/pkg/storage"
+)
+
+// APIObjectVersioner implements versioning and extracting etcd node information
+// for objects that have an embedded ObjectMeta or ListMeta field.
+type APIObjectVersioner struct{}
+
+// UpdateObject implements Versioner
+func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
+ accessor, err := meta.Accessor(obj)
+ if err != nil {
+ return err
+ }
+ versionString := ""
+ if resourceVersion != 0 {
+ versionString = strconv.FormatUint(resourceVersion, 10)
+ }
+ accessor.SetResourceVersion(versionString)
+ return nil
+}
+
+// UpdateList implements Versioner
+func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string) error {
+ listAccessor, err := meta.ListAccessor(obj)
+ if err != nil || listAccessor == nil {
+ return err
+ }
+ versionString := ""
+ if resourceVersion != 0 {
+ versionString = strconv.FormatUint(resourceVersion, 10)
+ }
+ listAccessor.SetResourceVersion(versionString)
+ listAccessor.SetContinue(nextKey)
+ return nil
+}
+
+// PrepareObjectForStorage clears resource version and self link prior to writing to etcd.
+func (a APIObjectVersioner) PrepareObjectForStorage(obj runtime.Object) error {
+ accessor, err := meta.Accessor(obj)
+ if err != nil {
+ return err
+ }
+ accessor.SetResourceVersion("")
+ accessor.SetSelfLink("")
+ return nil
+}
+
+// ObjectResourceVersion implements Versioner
+func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
+ accessor, err := meta.Accessor(obj)
+ if err != nil {
+ return 0, err
+ }
+ version := accessor.GetResourceVersion()
+ if len(version) == 0 {
+ return 0, nil
+ }
+ return strconv.ParseUint(version, 10, 64)
+}
+
+// ParseWatchResourceVersion takes a resource version argument and converts it to
+// the etcd version we should pass to helper.Watch(). Because resourceVersion is
+// an opaque value, the default watch behavior for non-zero watch is to watch
+// the next value (if you pass "1", you will see updates from "2" onwards).
+func (a APIObjectVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
+ if resourceVersion == "" || resourceVersion == "0" {
+ return 0, nil
+ }
+ version, err := strconv.ParseUint(resourceVersion, 10, 64)
+ if err != nil {
+ return 0, storage.NewInvalidError(field.ErrorList{
+ // Validation errors are supposed to return version-specific field
+ // paths, but this is probably close enough.
+ field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
+ })
+ }
+ return version, nil
+}
+
+// ParseListResourceVersion takes a resource version argument and converts it to
+// the etcd version.
+// TODO: reevaluate whether it is really clearer to have both this and the
+// Watch version of this function, since they perform the same logic.
+func (a APIObjectVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
+ if resourceVersion == "" {
+ return 0, nil
+ }
+ version, err := strconv.ParseUint(resourceVersion, 10, 64)
+ if err != nil {
+ return 0, storage.NewInvalidError(field.ErrorList{
+ // Validation errors are supposed to return version-specific field
+ // paths, but this is probably close enough.
+ field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
+ })
+ }
+ return version, nil
+}
+
+// APIObjectVersioner implements Versioner
+var Versioner storage.Versioner = APIObjectVersioner{}
+
+// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
+// but etcd resource versions are special, they're actually ints, so we can easily compare them.
+func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
+ lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
+ if err != nil {
+ // coder error
+ panic(err)
+ }
+ rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
+ if err != nil {
+ // coder error
+ panic(err)
+ }
+
+ if lhsVersion == rhsVersion {
+ return 0
+ }
+ if lhsVersion < rhsVersion {
+ return -1
+ }
+
+ return 1
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/doc.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/doc.go
new file mode 100644
index 0000000..566f466
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/doc.go
@@ -0,0 +1,17 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd // import "k8s.io/apiserver/pkg/storage/etcd"
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go
new file mode 100644
index 0000000..2fe2bba
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go
@@ -0,0 +1,652 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "path"
+ "reflect"
+ "time"
+
+ etcd "github.com/coreos/etcd/client"
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/conversion"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilcache "k8s.io/apimachinery/pkg/util/cache"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/apiserver/pkg/storage"
+ "k8s.io/apiserver/pkg/storage/etcd/metrics"
+ etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
+ utiltrace "k8s.io/apiserver/pkg/util/trace"
+)
+
+// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
+// must be able to undo the transformation caused by the other.
+type ValueTransformer interface {
+ // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
+ // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
+ // have not changed.
+ TransformStringFromStorage(string) (value string, stale bool, err error)
+ // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
+ TransformStringToStorage(string) (value string, err error)
+}
+
+type identityTransformer struct{}
+
+func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) {
+ return s, false, nil
+}
+func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
+
+// IdentityTransformer performs no transformation on the provided values.
+var IdentityTransformer ValueTransformer = identityTransformer{}
+
+// Creates a new storage interface from the client
+// TODO: deprecate in favor of storage.Config abstraction over time
+func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, transformer ValueTransformer) storage.Interface {
+ return &etcdHelper{
+ etcdMembersAPI: etcd.NewMembersAPI(client),
+ etcdKeysAPI: etcd.NewKeysAPI(client),
+ codec: codec,
+ versioner: APIObjectVersioner{},
+ transformer: transformer,
+ pathPrefix: path.Join("/", prefix),
+ quorum: quorum,
+ cache: utilcache.NewCache(cacheSize),
+ }
+}
+
+// etcdHelper is the reference implementation of storage.Interface.
+type etcdHelper struct {
+ etcdMembersAPI etcd.MembersAPI
+ etcdKeysAPI etcd.KeysAPI
+ codec runtime.Codec
+ transformer ValueTransformer
+ // Note that versioner is required for etcdHelper to work correctly.
+ // The public constructors (NewStorage & NewEtcdStorage) are setting it
+ // correctly, so be careful when manipulating with it manually.
+ // optional, has to be set to perform any atomic operations
+ versioner storage.Versioner
+ // prefix for all etcd keys
+ pathPrefix string
+ // if true, perform quorum read
+ quorum bool
+
+ // We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
+ // to resourceVersion.
+ // This depends on etcd's indexes being globally unique across all objects/types. This will
+ // have to revisited if we decide to do things like multiple etcd clusters, or etcd will
+ // support multi-object transaction that will result in many objects with the same index.
+ // Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
+ // TODO: Measure how much this cache helps after the conversion code is optimized.
+ cache utilcache.Cache
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) Versioner() storage.Versioner {
+ return h.versioner
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
+ trace := utiltrace.New("etcdHelper::Create " + getTypeName(obj))
+ defer trace.LogIfLong(250 * time.Millisecond)
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ key = path.Join(h.pathPrefix, key)
+ data, err := runtime.Encode(h.codec, obj)
+ trace.Step("Object encoded")
+ if err != nil {
+ return err
+ }
+ if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
+ return errors.New("resourceVersion may not be set on objects to be created")
+ }
+ if err := h.versioner.PrepareObjectForStorage(obj); err != nil {
+ return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err)
+ }
+ trace.Step("Version checked")
+
+ startTime := time.Now()
+ opts := etcd.SetOptions{
+ TTL: time.Duration(ttl) * time.Second,
+ PrevExist: etcd.PrevNoExist,
+ }
+
+ newBody, err := h.transformer.TransformStringToStorage(string(data))
+ if err != nil {
+ return storage.NewInternalError(err.Error())
+ }
+
+ response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts)
+ trace.Step("Object created")
+ metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
+ if err != nil {
+ return toStorageErr(err, key, 0)
+ }
+ if out != nil {
+ if _, err := conversion.EnforcePtr(out); err != nil {
+ panic("unable to convert output object to pointer")
+ }
+ _, _, _, err = h.extractObj(response, err, out, false, false)
+ }
+ return err
+}
+
+func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
+ if preconditions == nil {
+ return nil
+ }
+ objMeta, err := meta.Accessor(out)
+ if err != nil {
+ return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
+ }
+ if preconditions.UID != nil && *preconditions.UID != objMeta.GetUID() {
+ errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.GetUID())
+ return storage.NewInvalidObjError(key, errMsg)
+ }
+ return nil
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ key = path.Join(h.pathPrefix, key)
+ v, err := conversion.EnforcePtr(out)
+ if err != nil {
+ panic("unable to convert output object to pointer")
+ }
+
+ if preconditions == nil {
+ startTime := time.Now()
+ response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
+ metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
+ if !etcdutil.IsEtcdNotFound(err) {
+ // if the object that existed prior to the delete is returned by etcd, update the out object.
+ if err != nil || response.PrevNode != nil {
+ _, _, _, err = h.extractObj(response, err, out, false, true)
+ }
+ }
+ return toStorageErr(err, key, 0)
+ }
+
+ // Check the preconditions match.
+ obj := reflect.New(v.Type()).Interface().(runtime.Object)
+ for {
+ _, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false)
+ if err != nil {
+ return toStorageErr(err, key, 0)
+ }
+ if err := checkPreconditions(key, preconditions, obj); err != nil {
+ return toStorageErr(err, key, 0)
+ }
+ index := uint64(0)
+ if node != nil {
+ index = node.ModifiedIndex
+ } else if res != nil {
+ index = res.Index
+ }
+ opt := etcd.DeleteOptions{PrevIndex: index}
+ startTime := time.Now()
+ response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
+ metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
+ if !etcdutil.IsEtcdTestFailed(err) {
+ if !etcdutil.IsEtcdNotFound(err) {
+ // if the object that existed prior to the delete is returned by etcd, update the out object.
+ if err != nil || response.PrevNode != nil {
+ _, _, _, err = h.extractObj(response, err, out, false, true)
+ }
+ }
+ return toStorageErr(err, key, 0)
+ }
+
+ glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
+ }
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
+ if err != nil {
+ return nil, err
+ }
+ key = path.Join(h.pathPrefix, key)
+ w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h)
+ go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
+ return w, nil
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
+ if err != nil {
+ return nil, err
+ }
+ key = path.Join(h.pathPrefix, key)
+ w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h)
+ go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
+ return w, nil
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ key = path.Join(h.pathPrefix, key)
+ _, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
+ return err
+}
+
+// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
+// about the response, like the current etcd index and the ttl.
+func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ startTime := time.Now()
+
+ opts := &etcd.GetOptions{
+ Quorum: h.quorum,
+ }
+
+ response, err := h.etcdKeysAPI.Get(ctx, key, opts)
+ metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
+ if err != nil && !etcdutil.IsEtcdNotFound(err) {
+ return "", nil, nil, false, toStorageErr(err, key, 0)
+ }
+ body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
+ return body, node, response, stale, toStorageErr(err, key, 0)
+}
+
+func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) {
+ if response != nil {
+ if prevNode {
+ node = response.PrevNode
+ } else {
+ node = response.Node
+ }
+ }
+ if inErr != nil || node == nil || len(node.Value) == 0 {
+ if ignoreNotFound {
+ v, err := conversion.EnforcePtr(objPtr)
+ if err != nil {
+ return "", nil, false, err
+ }
+ v.Set(reflect.Zero(v.Type()))
+ return "", nil, false, nil
+ } else if inErr != nil {
+ return "", nil, false, inErr
+ }
+ return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response)
+ }
+
+ body, stale, err = h.transformer.TransformStringFromStorage(node.Value)
+ if err != nil {
+ return body, nil, stale, storage.NewInternalError(err.Error())
+ }
+ out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
+ if err != nil {
+ return body, nil, stale, err
+ }
+ if out != objPtr {
+ return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
+ }
+ // being unable to set the version does not prevent the object from being extracted
+ _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
+ return body, node, stale, err
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ trace := utiltrace.New("GetToList " + getTypeName(listObj))
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ key = path.Join(h.pathPrefix, key)
+ startTime := time.Now()
+ trace.Step("About to read etcd node")
+
+ opts := &etcd.GetOptions{
+ Quorum: h.quorum,
+ }
+ response, err := h.etcdKeysAPI.Get(ctx, key, opts)
+ trace.Step("Etcd node read")
+ metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
+ if err != nil {
+ if etcdutil.IsEtcdNotFound(err) {
+ if etcdErr, ok := err.(etcd.Error); ok {
+ return h.versioner.UpdateList(listObj, etcdErr.Index, "")
+ }
+ return fmt.Errorf("unexpected error from storage: %#v", err)
+ }
+ return toStorageErr(err, key, 0)
+ }
+
+ nodes := make([]*etcd.Node, 0)
+ nodes = append(nodes, response.Node)
+
+ if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
+ return err
+ }
+ trace.Step("Object decoded")
+ if err := h.versioner.UpdateList(listObj, response.Index, ""); err != nil {
+ return err
+ }
+ return nil
+}
+
+// decodeNodeList walks the tree of each node in the list and decodes into the specified object
+func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error {
+ trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr))
+ defer trace.LogIfLong(400 * time.Millisecond)
+ v, err := conversion.EnforcePtr(slicePtr)
+ if err != nil || v.Kind() != reflect.Slice {
+ // This should not happen at runtime.
+ panic("need ptr to slice")
+ }
+ for _, node := range nodes {
+ if node.Dir {
+ // IMPORTANT: do not log each key as a discrete step in the trace log
+ // as it produces an immense amount of log spam when there is a large
+ // amount of content in the list.
+ if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil {
+ return err
+ }
+ continue
+ }
+ if obj, found := h.getFromCache(node.ModifiedIndex, pred); found {
+ // obj != nil iff it matches the pred function.
+ if obj != nil {
+ v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
+ }
+ } else {
+ body, _, err := h.transformer.TransformStringFromStorage(node.Value)
+ if err != nil {
+ // omit items from lists and watches that cannot be transformed, but log the error
+ utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
+ continue
+ }
+
+ obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
+ if err != nil {
+ return err
+ }
+ // being unable to set the version does not prevent the object from being extracted
+ _ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
+ if matched, err := pred.Matches(obj); err == nil && matched {
+ v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
+ }
+ if node.ModifiedIndex != 0 {
+ h.addToCache(node.ModifiedIndex, obj)
+ }
+ }
+ }
+ trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
+ return nil
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ trace := utiltrace.New("List " + getTypeName(listObj))
+ defer trace.LogIfLong(400 * time.Millisecond)
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ key = path.Join(h.pathPrefix, key)
+ startTime := time.Now()
+ trace.Step("About to list etcd node")
+ nodes, index, err := h.listEtcdNode(ctx, key)
+ trace.Step("Etcd node listed")
+ metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
+ if err != nil {
+ return err
+ }
+ if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
+ return err
+ }
+ trace.Step("Node list decoded")
+ if err := h.versioner.UpdateList(listObj, index, ""); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ opts := etcd.GetOptions{
+ Recursive: true,
+ Sort: true,
+ Quorum: h.quorum,
+ }
+ result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
+ if err != nil {
+ var index uint64
+ if etcdError, ok := err.(etcd.Error); ok {
+ index = etcdError.Index
+ }
+ nodes := make([]*etcd.Node, 0)
+ if etcdutil.IsEtcdNotFound(err) {
+ return nodes, index, nil
+ } else {
+ return nodes, index, toStorageErr(err, key, 0)
+ }
+ }
+ return result.Node.Nodes, result.Index, nil
+}
+
+// Implements storage.Interface.
+func (h *etcdHelper) GuaranteedUpdate(
+ ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
+ preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
+ // Ignore the suggestion about current object.
+ if ctx == nil {
+ glog.Errorf("Context is nil")
+ }
+ v, err := conversion.EnforcePtr(ptrToType)
+ if err != nil {
+ // Panic is appropriate, because this is a programming error.
+ panic("need ptr to type")
+ }
+ key = path.Join(h.pathPrefix, key)
+ for {
+ obj := reflect.New(v.Type()).Interface().(runtime.Object)
+ origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
+ if err != nil {
+ return toStorageErr(err, key, 0)
+ }
+ if err := checkPreconditions(key, preconditions, obj); err != nil {
+ return toStorageErr(err, key, 0)
+ }
+ meta := storage.ResponseMeta{}
+ if node != nil {
+ meta.TTL = node.TTL
+ meta.ResourceVersion = node.ModifiedIndex
+ }
+ // Get the object to be written by calling tryUpdate.
+ ret, newTTL, err := tryUpdate(obj, meta)
+ if err != nil {
+ return toStorageErr(err, key, 0)
+ }
+
+ index := uint64(0)
+ ttl := uint64(0)
+ if node != nil {
+ index = node.ModifiedIndex
+ if node.TTL != 0 {
+ ttl = uint64(node.TTL)
+ }
+ if node.Expiration != nil && ttl == 0 {
+ ttl = 1
+ }
+ } else if res != nil {
+ index = res.Index
+ }
+
+ if newTTL != nil {
+ if ttl != 0 && *newTTL == 0 {
+ // TODO: remove this after we have verified this is no longer an issue
+ glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
+ }
+ ttl = *newTTL
+ }
+
+ // Since update object may have a resourceVersion set, we need to clear it here.
+ if err := h.versioner.PrepareObjectForStorage(ret); err != nil {
+ return errors.New("resourceVersion cannot be set on objects store in etcd")
+ }
+
+ newBodyData, err := runtime.Encode(h.codec, ret)
+ if err != nil {
+ return err
+ }
+ newBody := string(newBodyData)
+ data, err := h.transformer.TransformStringToStorage(newBody)
+ if err != nil {
+ return storage.NewInternalError(err.Error())
+ }
+
+ // First time this key has been used, try creating new value.
+ if index == 0 {
+ startTime := time.Now()
+ opts := etcd.SetOptions{
+ TTL: time.Duration(ttl) * time.Second,
+ PrevExist: etcd.PrevNoExist,
+ }
+ response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
+ metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
+ if etcdutil.IsEtcdNodeExist(err) {
+ continue
+ }
+ _, _, _, err = h.extractObj(response, err, ptrToType, false, false)
+ return toStorageErr(err, key, 0)
+ }
+
+ // If we don't send an update, we simply return the currently existing
+ // version of the object. However, the value transformer may indicate that
+ // the on disk representation has changed and that we must commit an update.
+ if newBody == origBody && !stale {
+ _, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
+ return err
+ }
+
+ startTime := time.Now()
+ // Swap origBody with data, if origBody is the latest etcd data.
+ opts := etcd.SetOptions{
+ PrevIndex: index,
+ TTL: time.Duration(ttl) * time.Second,
+ }
+ response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
+ metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
+ if etcdutil.IsEtcdTestFailed(err) {
+ // Try again.
+ continue
+ }
+ _, _, _, err = h.extractObj(response, err, ptrToType, false, false)
+ return toStorageErr(err, key, int64(index))
+ }
+}
+
+func (*etcdHelper) Count(pathPerfix string) (int64, error) {
+ return 0, fmt.Errorf("Count is unimplemented for etcd2!")
+}
+
+// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
+// their Node.ModifiedIndex, which is unique across all types.
+// All implementations must be thread-safe.
+type etcdCache interface {
+ getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool)
+ addToCache(index uint64, obj runtime.Object)
+}
+
+func getTypeName(obj interface{}) string {
+ return reflect.TypeOf(obj).String()
+}
+
+func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
+ startTime := time.Now()
+ defer func() {
+ metrics.ObserveGetCache(startTime)
+ }()
+ obj, found := h.cache.Get(index)
+ if found {
+ if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched {
+ return nil, true
+ }
+ // We should not return the object itself to avoid polluting the cache if someone
+ // modifies returned values.
+ objCopy := obj.(runtime.Object).DeepCopyObject()
+ metrics.ObserveCacheHit()
+ return objCopy.(runtime.Object), true
+ }
+ metrics.ObserveCacheMiss()
+ return nil, false
+}
+
+func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
+ startTime := time.Now()
+ defer func() {
+ metrics.ObserveAddCache(startTime)
+ }()
+ objCopy := obj.DeepCopyObject()
+ isOverwrite := h.cache.Add(index, objCopy)
+ if !isOverwrite {
+ metrics.ObserveNewEntry()
+ }
+}
+
+func toStorageErr(err error, key string, rv int64) error {
+ if err == nil {
+ return nil
+ }
+ switch {
+ case etcdutil.IsEtcdNotFound(err):
+ return storage.NewKeyNotFoundError(key, rv)
+ case etcdutil.IsEtcdNodeExist(err):
+ return storage.NewKeyExistsError(key, rv)
+ case etcdutil.IsEtcdTestFailed(err):
+ return storage.NewResourceVersionConflictsError(key, rv)
+ case etcdutil.IsEtcdUnreachable(err):
+ return storage.NewUnreachableError(key, rv)
+ default:
+ return err
+ }
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go
new file mode 100644
index 0000000..21ffc42
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go
@@ -0,0 +1,500 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "reflect"
+ "sync"
+ "time"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/apiserver/pkg/storage"
+ etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
+
+ etcd "github.com/coreos/etcd/client"
+ "github.com/golang/glog"
+)
+
+// Etcd watch event actions
+const (
+ EtcdCreate = "create"
+ EtcdGet = "get"
+ EtcdSet = "set"
+ EtcdCAS = "compareAndSwap"
+ EtcdDelete = "delete"
+ EtcdCAD = "compareAndDelete"
+ EtcdExpire = "expire"
+)
+
+// TransformFunc attempts to convert an object to another object for use with a watcher.
+type TransformFunc func(runtime.Object) (runtime.Object, error)
+
+// includeFunc returns true if the given key should be considered part of a watch
+type includeFunc func(key string) bool
+
+// exceptKey is an includeFunc that returns false when the provided key matches the watched key
+func exceptKey(except string) includeFunc {
+ return func(key string) bool {
+ return key != except
+ }
+}
+
+// etcdWatcher converts a native etcd watch to a watch.Interface.
+type etcdWatcher struct {
+ // HighWaterMarks for performance debugging.
+ // Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
+ // See: https://golang.org/pkg/sync/atomic/ for more information
+ incomingHWM storage.HighWaterMark
+ outgoingHWM storage.HighWaterMark
+
+ encoding runtime.Codec
+ // Note that versioner is required for etcdWatcher to work correctly.
+ // There is no public constructor of it, so be careful when manipulating
+ // with it manually.
+ versioner storage.Versioner
+ transform TransformFunc
+ valueTransformer ValueTransformer
+
+ list bool // If we're doing a recursive watch, should be true.
+ quorum bool // If we enable quorum, should be true
+ include includeFunc
+ pred storage.SelectionPredicate
+
+ etcdIncoming chan *etcd.Response
+ etcdError chan error
+ ctx context.Context
+ cancel context.CancelFunc
+ etcdCallEnded chan struct{}
+
+ outgoing chan watch.Event
+ userStop chan struct{}
+ stopped bool
+ stopLock sync.Mutex
+ // wg is used to avoid calls to etcd after Stop(), and to make sure
+ // that the translate goroutine is not leaked.
+ wg sync.WaitGroup
+
+ // Injectable for testing. Send the event down the outgoing channel.
+ emit func(watch.Event)
+
+ cache etcdCache
+}
+
+// watchWaitDuration is the amount of time to wait for an error from watch.
+const watchWaitDuration = 100 * time.Millisecond
+
+// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
+// The versioner must be able to handle the objects that transform creates.
+func newEtcdWatcher(list bool, quorum bool, include includeFunc, pred storage.SelectionPredicate,
+ encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
+ valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher {
+ w := &etcdWatcher{
+ encoding: encoding,
+ versioner: versioner,
+ transform: transform,
+ valueTransformer: valueTransformer,
+
+ list: list,
+ quorum: quorum,
+ include: include,
+ pred: pred,
+ // Buffer this channel, so that the etcd client is not forced
+ // to context switch with every object it gets, and so that a
+ // long time spent decoding an object won't block the *next*
+ // object. Basically, we see a lot of "401 window exceeded"
+ // errors from etcd, and that's due to the client not streaming
+ // results but rather getting them one at a time. So we really
+ // want to never block the etcd client, if possible. The 100 is
+ // mostly arbitrary--we know it goes as high as 50, though.
+ // There's a V(2) log message that prints the length so we can
+ // monitor how much of this buffer is actually used.
+ etcdIncoming: make(chan *etcd.Response, 100),
+ etcdError: make(chan error, 1),
+ // Similarly to etcdIncomming, we don't want to force context
+ // switch on every new incoming object.
+ outgoing: make(chan watch.Event, 100),
+ userStop: make(chan struct{}),
+ stopped: false,
+ wg: sync.WaitGroup{},
+ cache: cache,
+ ctx: nil,
+ cancel: nil,
+ }
+ w.emit = func(e watch.Event) {
+ if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) {
+ // Monitor if this gets backed up, and how much.
+ glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen)
+ }
+ // Give up on user stop, without this we leak a lot of goroutines in tests.
+ select {
+ case w.outgoing <- e:
+ case <-w.userStop:
+ }
+ }
+ // translate will call done. We need to Add() here because otherwise,
+ // if Stop() gets called before translate gets started, there'd be a
+ // problem.
+ w.wg.Add(1)
+ go w.translate()
+ return w
+}
+
+// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
+// as a goroutine.
+func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
+ defer utilruntime.HandleCrash()
+ defer close(w.etcdError)
+ defer close(w.etcdIncoming)
+
+ // All calls to etcd are coming from this function - once it is finished
+ // no other call to etcd should be generated by this watcher.
+ done := func() {}
+
+ // We need to be prepared, that Stop() can be called at any time.
+ // It can potentially also be called, even before this function is called.
+ // If that is the case, we simply skip all the code here.
+ // See #18928 for more details.
+ var watcher etcd.Watcher
+ returned := func() bool {
+ w.stopLock.Lock()
+ defer w.stopLock.Unlock()
+ if w.stopped {
+ // Watcher has already been stopped - don't event initiate it here.
+ return true
+ }
+ w.wg.Add(1)
+ done = w.wg.Done
+ // Perform initialization of watcher under lock - we want to avoid situation when
+ // Stop() is called in the meantime (which in tests can cause etcd termination and
+ // strange behavior here).
+ if resourceVersion == 0 {
+ latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
+ if err != nil {
+ w.etcdError <- err
+ return true
+ }
+ resourceVersion = latest
+ }
+
+ opts := etcd.WatcherOptions{
+ Recursive: w.list,
+ AfterIndex: resourceVersion,
+ }
+ watcher = client.Watcher(key, &opts)
+ w.ctx, w.cancel = context.WithCancel(ctx)
+ return false
+ }()
+ defer done()
+ if returned {
+ return
+ }
+
+ for {
+ resp, err := watcher.Next(w.ctx)
+ if err != nil {
+ w.etcdError <- err
+ return
+ }
+ w.etcdIncoming <- resp
+ }
+}
+
+// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
+func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
+ opts := etcd.GetOptions{
+ Recursive: recursive,
+ Sort: false,
+ Quorum: quorum,
+ }
+ resp, err := client.Get(ctx, key, &opts)
+ if err != nil {
+ if !etcdutil.IsEtcdNotFound(err) {
+ utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err))
+ return resourceVersion, toStorageErr(err, key, 0)
+ }
+ if etcdError, ok := err.(etcd.Error); ok {
+ resourceVersion = etcdError.Index
+ }
+ return resourceVersion, nil
+ }
+ resourceVersion = resp.Index
+ convertRecursiveResponse(resp.Node, resp, incoming)
+ return
+}
+
+// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
+// by copying the original response. This emulates the behavior of a recursive watch.
+func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
+ if node.Dir {
+ for i := range node.Nodes {
+ convertRecursiveResponse(node.Nodes[i], response, incoming)
+ }
+ return
+ }
+ copied := *response
+ copied.Action = "get"
+ copied.Node = node
+ incoming <- &copied
+}
+
+// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
+// called as a goroutine.
+func (w *etcdWatcher) translate() {
+ defer w.wg.Done()
+ defer close(w.outgoing)
+ defer utilruntime.HandleCrash()
+
+ for {
+ select {
+ case err := <-w.etcdError:
+ if err != nil {
+ var status *metav1.Status
+ switch {
+ case etcdutil.IsEtcdWatchExpired(err):
+ status = &metav1.Status{
+ Status: metav1.StatusFailure,
+ Message: err.Error(),
+ Code: http.StatusGone, // Gone
+ Reason: metav1.StatusReasonExpired,
+ }
+ // TODO: need to generate errors using api/errors which has a circular dependency on this package
+ // no other way to inject errors
+ // case etcdutil.IsEtcdUnreachable(err):
+ // status = errors.NewServerTimeout(...)
+ default:
+ status = &metav1.Status{
+ Status: metav1.StatusFailure,
+ Message: err.Error(),
+ Code: http.StatusInternalServerError,
+ Reason: metav1.StatusReasonInternalError,
+ }
+ }
+ w.emit(watch.Event{
+ Type: watch.Error,
+ Object: status,
+ })
+ }
+ return
+ case <-w.userStop:
+ return
+ case res, ok := <-w.etcdIncoming:
+ if ok {
+ if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) {
+ // Monitor if this gets backed up, and how much.
+ glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen)
+ }
+ w.sendResult(res)
+ }
+ // If !ok, don't return here-- must wait for etcdError channel
+ // to give an error or be closed.
+ }
+ }
+}
+
+// decodeObject extracts an object from the provided etcd node or returns an error.
+func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
+ if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
+ return obj, nil
+ }
+
+ body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value)
+ if err != nil {
+ return nil, err
+ }
+
+ obj, err := runtime.Decode(w.encoding, []byte(body))
+ if err != nil {
+ return nil, err
+ }
+
+ // ensure resource version is set on the object we load from etcd
+ if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
+ }
+
+ // perform any necessary transformation
+ if w.transform != nil {
+ obj, err = w.transform(obj)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
+ return nil, err
+ }
+ }
+
+ if node.ModifiedIndex != 0 {
+ w.cache.addToCache(node.ModifiedIndex, obj)
+ }
+ return obj, nil
+}
+
+func (w *etcdWatcher) sendAdd(res *etcd.Response) {
+ if res.Node == nil {
+ utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res))
+ return
+ }
+ if w.include != nil && !w.include(res.Node.Key) {
+ return
+ }
+ obj, err := w.decodeObject(res.Node)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
+ // TODO: expose an error through watch.Interface?
+ // Ignore this value. If we stop the watch on a bad value, a client that uses
+ // the resourceVersion to resume will never be able to get past a bad value.
+ return
+ }
+ if matched, err := w.pred.Matches(obj); err != nil || !matched {
+ return
+ }
+ action := watch.Added
+ w.emit(watch.Event{
+ Type: action,
+ Object: obj,
+ })
+}
+
+func (w *etcdWatcher) sendModify(res *etcd.Response) {
+ if res.Node == nil {
+ glog.Errorf("unexpected nil node: %#v", res)
+ return
+ }
+ if w.include != nil && !w.include(res.Node.Key) {
+ return
+ }
+ curObj, err := w.decodeObject(res.Node)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
+ // TODO: expose an error through watch.Interface?
+ // Ignore this value. If we stop the watch on a bad value, a client that uses
+ // the resourceVersion to resume will never be able to get past a bad value.
+ return
+ }
+ curObjPasses := false
+ if matched, err := w.pred.Matches(curObj); err == nil && matched {
+ curObjPasses = true
+ }
+ oldObjPasses := false
+ var oldObj runtime.Object
+ if res.PrevNode != nil && res.PrevNode.Value != "" {
+ // Ignore problems reading the old object.
+ if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
+ if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
+ }
+ if matched, err := w.pred.Matches(oldObj); err == nil && matched {
+ oldObjPasses = true
+ }
+ }
+ }
+ // Some changes to an object may cause it to start or stop matching a pred.
+ // We need to report those as adds/deletes. So we have to check both the previous
+ // and current value of the object.
+ switch {
+ case curObjPasses && oldObjPasses:
+ w.emit(watch.Event{
+ Type: watch.Modified,
+ Object: curObj,
+ })
+ case curObjPasses && !oldObjPasses:
+ w.emit(watch.Event{
+ Type: watch.Added,
+ Object: curObj,
+ })
+ case !curObjPasses && oldObjPasses:
+ w.emit(watch.Event{
+ Type: watch.Deleted,
+ Object: oldObj,
+ })
+ }
+ // Do nothing if neither new nor old object passed the pred.
+}
+
+func (w *etcdWatcher) sendDelete(res *etcd.Response) {
+ if res.PrevNode == nil {
+ utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res))
+ return
+ }
+ if w.include != nil && !w.include(res.PrevNode.Key) {
+ return
+ }
+ node := *res.PrevNode
+ if res.Node != nil {
+ // Note that this sends the *old* object with the etcd index for the time at
+ // which it gets deleted. This will allow users to restart the watch at the right
+ // index.
+ node.ModifiedIndex = res.Node.ModifiedIndex
+ }
+ obj, err := w.decodeObject(&node)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node))
+ // TODO: expose an error through watch.Interface?
+ // Ignore this value. If we stop the watch on a bad value, a client that uses
+ // the resourceVersion to resume will never be able to get past a bad value.
+ return
+ }
+ if matched, err := w.pred.Matches(obj); err != nil || !matched {
+ return
+ }
+ w.emit(watch.Event{
+ Type: watch.Deleted,
+ Object: obj,
+ })
+}
+
+func (w *etcdWatcher) sendResult(res *etcd.Response) {
+ switch res.Action {
+ case EtcdCreate, EtcdGet:
+ // "Get" will only happen in watch 0 case, where we explicitly want ADDED event
+ // for initial state.
+ w.sendAdd(res)
+ case EtcdSet, EtcdCAS:
+ w.sendModify(res)
+ case EtcdDelete, EtcdExpire, EtcdCAD:
+ w.sendDelete(res)
+ default:
+ utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))
+ }
+}
+
+// ResultChan implements watch.Interface.
+func (w *etcdWatcher) ResultChan() <-chan watch.Event {
+ return w.outgoing
+}
+
+// Stop implements watch.Interface.
+func (w *etcdWatcher) Stop() {
+ w.stopLock.Lock()
+ if w.cancel != nil {
+ w.cancel()
+ w.cancel = nil
+ }
+ if !w.stopped {
+ w.stopped = true
+ close(w.userStop)
+ }
+ w.stopLock.Unlock()
+
+ // Wait until all calls to etcd are finished and no other
+ // will be issued.
+ w.wg.Wait()
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/metrics/metrics.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/metrics/metrics.go
new file mode 100644
index 0000000..96385f6
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/metrics/metrics.go
@@ -0,0 +1,122 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package metrics
+
+import (
+ "sync"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+ cacheHitCounterOpts = prometheus.CounterOpts{
+ Name: "etcd_helper_cache_hit_count",
+ Help: "Counter of etcd helper cache hits.",
+ }
+ cacheHitCounter = prometheus.NewCounter(cacheHitCounterOpts)
+ cacheMissCounterOpts = prometheus.CounterOpts{
+ Name: "etcd_helper_cache_miss_count",
+ Help: "Counter of etcd helper cache miss.",
+ }
+ cacheMissCounter = prometheus.NewCounter(cacheMissCounterOpts)
+ cacheEntryCounterOpts = prometheus.CounterOpts{
+ Name: "etcd_helper_cache_entry_count",
+ Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
+ "because two concurrent threads can miss the cache and generate the same entry twice.",
+ }
+ cacheEntryCounter = prometheus.NewCounter(cacheEntryCounterOpts)
+ cacheGetLatency = prometheus.NewSummary(
+ prometheus.SummaryOpts{
+ Name: "etcd_request_cache_get_latencies_summary",
+ Help: "Latency in microseconds of getting an object from etcd cache",
+ },
+ )
+ cacheAddLatency = prometheus.NewSummary(
+ prometheus.SummaryOpts{
+ Name: "etcd_request_cache_add_latencies_summary",
+ Help: "Latency in microseconds of adding an object to etcd cache",
+ },
+ )
+ etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Name: "etcd_request_latencies_summary",
+ Help: "Etcd request latency summary in microseconds for each operation and object type.",
+ },
+ []string{"operation", "type"},
+ )
+ objectCounts = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "etcd_object_counts",
+ Help: "Number of stored objects at the time of last check split by kind.",
+ },
+ []string{"resource"},
+ )
+)
+
+var registerMetrics sync.Once
+
+// Register all metrics.
+func Register() {
+ // Register the metrics.
+ registerMetrics.Do(func() {
+ prometheus.MustRegister(cacheHitCounter)
+ prometheus.MustRegister(cacheMissCounter)
+ prometheus.MustRegister(cacheEntryCounter)
+ prometheus.MustRegister(cacheAddLatency)
+ prometheus.MustRegister(cacheGetLatency)
+ prometheus.MustRegister(etcdRequestLatenciesSummary)
+ prometheus.MustRegister(objectCounts)
+ })
+}
+
+func UpdateObjectCount(resourcePrefix string, count int64) {
+ objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
+}
+
+func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
+ etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
+}
+
+func ObserveGetCache(startTime time.Time) {
+ cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
+}
+
+func ObserveAddCache(startTime time.Time) {
+ cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
+}
+
+func ObserveCacheHit() {
+ cacheHitCounter.Inc()
+}
+
+func ObserveCacheMiss() {
+ cacheMissCounter.Inc()
+}
+
+func ObserveNewEntry() {
+ cacheEntryCounter.Inc()
+}
+
+func Reset() {
+ cacheHitCounter = prometheus.NewCounter(cacheHitCounterOpts)
+ cacheMissCounter = prometheus.NewCounter(cacheMissCounterOpts)
+ cacheEntryCounter = prometheus.NewCounter(cacheEntryCounterOpts)
+ // TODO: Reset cacheAddLatency.
+ // TODO: Reset cacheGetLatency.
+ etcdRequestLatenciesSummary.Reset()
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/util/doc.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/util/doc.go
new file mode 100644
index 0000000..97241a4
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/util/doc.go
@@ -0,0 +1,19 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package util holds generic etcd-related utility functions that any user of ectd might want to
+// use, without pulling in kubernetes-specific code.
+package util // import "k8s.io/apiserver/pkg/storage/etcd/util"
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/util/etcd_util.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/util/etcd_util.go
new file mode 100644
index 0000000..7c71fe2
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd/util/etcd_util.go
@@ -0,0 +1,99 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package util
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ etcd "github.com/coreos/etcd/client"
+)
+
+// IsEtcdNotFound returns true if and only if err is an etcd not found error.
+func IsEtcdNotFound(err error) bool {
+ return isEtcdErrorNum(err, etcd.ErrorCodeKeyNotFound)
+}
+
+// IsEtcdNodeExist returns true if and only if err is an etcd node already exist error.
+func IsEtcdNodeExist(err error) bool {
+ return isEtcdErrorNum(err, etcd.ErrorCodeNodeExist)
+}
+
+// IsEtcdTestFailed returns true if and only if err is an etcd write conflict.
+func IsEtcdTestFailed(err error) bool {
+ return isEtcdErrorNum(err, etcd.ErrorCodeTestFailed)
+}
+
+// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired.
+func IsEtcdWatchExpired(err error) bool {
+ // NOTE: This seems weird why it wouldn't be etcd.ErrorCodeWatcherCleared
+ // I'm using the previous matching value
+ return isEtcdErrorNum(err, etcd.ErrorCodeEventIndexCleared)
+}
+
+// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached.
+func IsEtcdUnreachable(err error) bool {
+ // NOTE: The logic has changed previous error code no longer applies
+ return err == etcd.ErrClusterUnavailable
+}
+
+// isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode
+func isEtcdErrorNum(err error, errorCode int) bool {
+ if err != nil {
+ if etcdError, ok := err.(etcd.Error); ok {
+ return etcdError.Code == errorCode
+ }
+ // NOTE: There are other error types returned
+ }
+ return false
+}
+
+// GetEtcdVersion performs a version check against the provided Etcd server,
+// returning the string response, and error (if any).
+func GetEtcdVersion(host string) (string, error) {
+ response, err := http.Get(host + "/version")
+ if err != nil {
+ return "", err
+ }
+ defer response.Body.Close()
+ if response.StatusCode != http.StatusOK {
+ return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
+ }
+ versionBytes, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ return "", err
+ }
+ return string(versionBytes), nil
+}
+
+type etcdHealth struct {
+ // Note this has to be public so the json library can modify it.
+ Health string `json:"health"`
+}
+
+func EtcdHealthCheck(data []byte) error {
+ obj := etcdHealth{}
+ if err := json.Unmarshal(data, &obj); err != nil {
+ return err
+ }
+ if obj.Health != "true" {
+ return fmt.Errorf("Unhealthy status: %s", obj.Health)
+ }
+ return nil
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS
new file mode 100755
index 0000000..84c24e7
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS
@@ -0,0 +1,5 @@
+reviewers:
+- wojtek-t
+- timothysc
+- madhusudancs
+- hongchaodeng
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go
new file mode 100644
index 0000000..bdcd5bc
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go
@@ -0,0 +1,162 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd3
+
+import (
+ "context"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/golang/glog"
+)
+
+const (
+ compactRevKey = "compact_rev_key"
+)
+
+var (
+ endpointsMapMu sync.Mutex
+ endpointsMap map[string]struct{}
+)
+
+func init() {
+ endpointsMap = make(map[string]struct{})
+}
+
+// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
+// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
+// It should be enough for slow watchers and to tolerate burst.
+// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
+func StartCompactor(ctx context.Context, client *clientv3.Client, compactInterval time.Duration) {
+ endpointsMapMu.Lock()
+ defer endpointsMapMu.Unlock()
+
+ // In one process, we can have only one compactor for one cluster.
+ // Currently we rely on endpoints to differentiate clusters.
+ for _, ep := range client.Endpoints() {
+ if _, ok := endpointsMap[ep]; ok {
+ glog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
+ return
+ }
+ }
+ for _, ep := range client.Endpoints() {
+ endpointsMap[ep] = struct{}{}
+ }
+
+ if compactInterval != 0 {
+ go compactor(ctx, client, compactInterval)
+ }
+}
+
+// compactor periodically compacts historical versions of keys in etcd.
+// It will compact keys with versions older than given interval.
+// In other words, after compaction, it will only contain keys set during last interval.
+// Any API call for the older versions of keys will return error.
+// Interval is the time interval between each compaction. The first compaction happens after "interval".
+func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
+ // Technical definitions:
+ // We have a special key in etcd defined as *compactRevKey*.
+ // compactRevKey's value will be set to the string of last compacted revision.
+ // compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
+ // Initially, because the key doesn't exist, the compact time (version) is 0.
+ //
+ // Algorithm:
+ // - Compare to see if (local compact_time) = (remote compact_time).
+ // - If yes, increment both local and remote compact_time, and do a compaction.
+ // - If not, set local to remote compact_time.
+ //
+ // Technical details/insights:
+ //
+ // The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
+ // CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
+ //
+ // For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
+ // at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
+ // If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
+ //
+ // oldRev(t2) curRev(t2)
+ // +
+ // oldRev curRev |
+ // + + |
+ // | | |
+ // | | t1' | t2'
+ // +---v-------------v----^---------v------^---->
+ // t0 t1 t2
+ //
+ // We have the guarantees:
+ // - in normal cases, the interval is 10 minutes.
+ // - in failover, the interval is >10m and <20m
+ //
+ // FAQ:
+ // - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
+ // etcd API.
+ // - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
+ // every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.
+
+ var compactTime int64
+ var rev int64
+ var err error
+ for {
+ select {
+ case <-time.After(interval):
+ case <-ctx.Done():
+ return
+ }
+
+ compactTime, rev, err = compact(ctx, client, compactTime, rev)
+ if err != nil {
+ glog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
+ continue
+ }
+ }
+}
+
+// compact compacts etcd store and returns current rev.
+// It will return the current compact time and global revision if no error occurred.
+// Note that CAS fail will not incur any error.
+func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
+ resp, err := client.KV.Txn(ctx).If(
+ clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
+ ).Then(
+ clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
+ ).Else(
+ clientv3.OpGet(compactRevKey),
+ ).Commit()
+ if err != nil {
+ return t, rev, err
+ }
+
+ curRev := resp.Header.Revision
+
+ if !resp.Succeeded {
+ curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
+ return curTime, curRev, nil
+ }
+ curTime := t + 1
+
+ if rev == 0 {
+ // We don't compact on bootstrap.
+ return curTime, curRev, nil
+ }
+ if _, err = client.Compact(ctx, rev); err != nil {
+ return curTime, curRev, err
+ }
+ glog.V(4).Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
+ return curTime, curRev, nil
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/errors.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/errors.go
new file mode 100644
index 0000000..5aac30a
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/errors.go
@@ -0,0 +1,42 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd3
+
+import (
+ "k8s.io/apimachinery/pkg/api/errors"
+
+ etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+)
+
+func interpretWatchError(err error) error {
+ switch {
+ case err == etcdrpc.ErrCompacted:
+ return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
+ }
+ return err
+}
+
+func interpretListError(err error, paging bool) error {
+ switch {
+ case err == etcdrpc.ErrCompacted:
+ if paging {
+ return errors.NewResourceExpired("The provided from parameter is too old to display a consistent list result. You must start a new list without the from.")
+ }
+ return errors.NewResourceExpired("The resourceVersion for the provided list is too old.")
+ }
+ return err
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
new file mode 100644
index 0000000..7dc9175
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
@@ -0,0 +1,57 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd3
+
+import (
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+type event struct {
+ key string
+ value []byte
+ prevValue []byte
+ rev int64
+ isDeleted bool
+ isCreated bool
+}
+
+// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
+func parseKV(kv *mvccpb.KeyValue) *event {
+ return &event{
+ key: string(kv.Key),
+ value: kv.Value,
+ prevValue: nil,
+ rev: kv.ModRevision,
+ isDeleted: false,
+ isCreated: true,
+ }
+}
+
+func parseEvent(e *clientv3.Event) *event {
+ ret := &event{
+ key: string(e.Kv.Key),
+ value: e.Kv.Value,
+ rev: e.Kv.ModRevision,
+ isDeleted: e.Type == clientv3.EventTypeDelete,
+ isCreated: e.IsCreate(),
+ }
+ if e.PrevKv != nil {
+ ret.prevValue = e.PrevKv.Value
+ }
+ return ret
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
new file mode 100644
index 0000000..dc06ac5
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
@@ -0,0 +1,102 @@
+/*
+Copyright 2018 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd3
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+)
+
+// leaseManager is used to manage leases requested from etcd. If a new write
+// needs a lease that has similar expiration time to the previous one, the old
+// lease will be reused to reduce the overhead of etcd, since lease operations
+// are expensive. In the implementation, we only store one previous lease,
+// since all the events have the same ttl.
+type leaseManager struct {
+ client *clientv3.Client // etcd client used to grant leases
+ leaseMu sync.Mutex
+ prevLeaseID clientv3.LeaseID
+ prevLeaseExpirationTime time.Time
+ // The period of time in seconds and percent of TTL that each lease is
+ // reused. The minimum of them is used to avoid unreasonably large
+ // numbers. We use var instead of const for testing purposes.
+ leaseReuseDurationSeconds int64
+ leaseReuseDurationPercent float64
+}
+
+// newDefaultLeaseManager creates a new lease manager using default setting.
+func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
+ return newLeaseManager(client, 60, 0.05)
+}
+
+// newLeaseManager creates a new lease manager with the number of buffered
+// leases, lease reuse duration in seconds and percentage. The percentage
+// value x means x*100%.
+func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager {
+ return &leaseManager{
+ client: client,
+ leaseReuseDurationSeconds: leaseReuseDurationSeconds,
+ leaseReuseDurationPercent: leaseReuseDurationPercent,
+ }
+}
+
+// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
+// reduce the extra lease duration to avoid unnecessary timeout in testing.
+func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
+ l.leaseMu.Lock()
+ defer l.leaseMu.Unlock()
+ l.leaseReuseDurationSeconds = duration
+}
+
+// GetLease returns a lease based on requested ttl: if the cached previous
+// lease can be reused, reuse it; otherwise request a new one from etcd.
+func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
+ now := time.Now()
+ l.leaseMu.Lock()
+ defer l.leaseMu.Unlock()
+ // check if previous lease can be reused
+ reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
+ valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
+ sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
+ if valid && sufficient {
+ return l.prevLeaseID, nil
+ }
+ // request a lease with a little extra ttl from etcd
+ ttl += reuseDurationSeconds
+ lcr, err := l.client.Lease.Grant(ctx, ttl)
+ if err != nil {
+ return clientv3.LeaseID(0), err
+ }
+ // cache the new lease id
+ l.prevLeaseID = lcr.ID
+ l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
+ return lcr.ID, nil
+}
+
+// getReuseDurationSecondsLocked returns the reusable duration in seconds
+// based on the configuration. Lock has to be acquired before calling this
+// function.
+func (l *leaseManager) getReuseDurationSecondsLocked(ttl int64) int64 {
+ reuseDurationSeconds := int64(l.leaseReuseDurationPercent * float64(ttl))
+ if reuseDurationSeconds > l.leaseReuseDurationSeconds {
+ reuseDurationSeconds = l.leaseReuseDurationSeconds
+ }
+ return reuseDurationSeconds
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight/checks.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight/checks.go
new file mode 100644
index 0000000..9c12c20
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight/checks.go
@@ -0,0 +1,70 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package preflight
+
+import (
+ "fmt"
+ "math/rand"
+ "net"
+ "net/url"
+ "time"
+)
+
+const connectionTimeout = 1 * time.Second
+
+// EtcdConnection holds the Etcd server list
+type EtcdConnection struct {
+ ServerList []string
+}
+
+func (EtcdConnection) serverReachable(connURL *url.URL) bool {
+ scheme := connURL.Scheme
+ if scheme == "http" || scheme == "https" || scheme == "tcp" {
+ scheme = "tcp"
+ }
+ if conn, err := net.DialTimeout(scheme, connURL.Host, connectionTimeout); err == nil {
+ defer conn.Close()
+ return true
+ }
+ return false
+}
+
+func parseServerURI(serverURI string) (*url.URL, error) {
+ connURL, err := url.Parse(serverURI)
+ if err != nil {
+ return &url.URL{}, fmt.Errorf("unable to parse etcd url: %v", err)
+ }
+ return connURL, nil
+}
+
+// CheckEtcdServers will attempt to reach all etcd servers once. If any
+// can be reached, return true.
+func (con EtcdConnection) CheckEtcdServers() (done bool, err error) {
+ // Attempt to reach every Etcd server randomly.
+ serverNumber := len(con.ServerList)
+ serverPerms := rand.Perm(serverNumber)
+ for _, index := range serverPerms {
+ host, err := parseServerURI(con.ServerList[index])
+ if err != nil {
+ return false, err
+ }
+ if con.serverReachable(host) {
+ return true, nil
+ }
+ }
+ return false, nil
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
new file mode 100644
index 0000000..1513af8
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
@@ -0,0 +1,816 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd3
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "path"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/golang/glog"
+
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/conversion"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/apiserver/pkg/storage"
+ "k8s.io/apiserver/pkg/storage/etcd"
+ "k8s.io/apiserver/pkg/storage/value"
+ utiltrace "k8s.io/apiserver/pkg/util/trace"
+)
+
+// authenticatedDataString satisfies the value.Context interface. It uses the key to
+// authenticate the stored data. This does not defend against reuse of previously
+// encrypted values under the same key, but will prevent an attacker from using an
+// encrypted value from a different key. A stronger authenticated data segment would
+// include the etcd3 Version field (which is incremented on each write to a key and
+// reset when the key is deleted), but an attacker with write access to etcd can
+// force deletion and recreation of keys to weaken that angle.
+type authenticatedDataString string
+
+// AuthenticatedData implements the value.Context interface.
+func (d authenticatedDataString) AuthenticatedData() []byte {
+ return []byte(string(d))
+}
+
+var _ value.Context = authenticatedDataString("")
+
+type store struct {
+ client *clientv3.Client
+ // getOpts contains additional options that should be passed
+ // to all Get() calls.
+ getOps []clientv3.OpOption
+ codec runtime.Codec
+ versioner storage.Versioner
+ transformer value.Transformer
+ pathPrefix string
+ watcher *watcher
+ pagingEnabled bool
+ leaseManager *leaseManager
+}
+
+type elemForDecode struct {
+ data []byte
+ rev uint64
+}
+
+type objState struct {
+ obj runtime.Object
+ meta *storage.ResponseMeta
+ rev int64
+ data []byte
+ stale bool
+}
+
+// New returns an etcd3 implementation of storage.Interface.
+func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
+ return newStore(c, true, pagingEnabled, codec, prefix, transformer)
+}
+
+// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
+// where Get operations don't require quorum read.
+func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
+ return newStore(c, false, pagingEnabled, codec, prefix, transformer)
+}
+
+func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
+ versioner := etcd.APIObjectVersioner{}
+ result := &store{
+ client: c,
+ codec: codec,
+ versioner: versioner,
+ transformer: transformer,
+ pagingEnabled: pagingEnabled,
+ // for compatibility with etcd2 impl.
+ // no-op for default prefix of '/registry'.
+ // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
+ pathPrefix: path.Join("/", prefix),
+ watcher: newWatcher(c, codec, versioner, transformer),
+ leaseManager: newDefaultLeaseManager(c),
+ }
+ if !quorumRead {
+ // In case of non-quorum reads, we can set WithSerializable()
+ // options for all Get operations.
+ result.getOps = append(result.getOps, clientv3.WithSerializable())
+ }
+ return result
+}
+
+// Versioner implements storage.Interface.Versioner.
+func (s *store) Versioner() storage.Versioner {
+ return s.versioner
+}
+
+// Get implements storage.Interface.Get.
+func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
+ key = path.Join(s.pathPrefix, key)
+ getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
+ if err != nil {
+ return err
+ }
+
+ if len(getResp.Kvs) == 0 {
+ if ignoreNotFound {
+ return runtime.SetZeroValue(out)
+ }
+ return storage.NewKeyNotFoundError(key, 0)
+ }
+ kv := getResp.Kvs[0]
+
+ data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
+ if err != nil {
+ return storage.NewInternalError(err.Error())
+ }
+
+ return decode(s.codec, s.versioner, data, out, kv.ModRevision)
+}
+
+// Create implements storage.Interface.Create.
+func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
+ if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
+ return errors.New("resourceVersion should not be set on objects to be created")
+ }
+ if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
+ return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
+ }
+ data, err := runtime.Encode(s.codec, obj)
+ if err != nil {
+ return err
+ }
+ key = path.Join(s.pathPrefix, key)
+
+ opts, err := s.ttlOpts(ctx, int64(ttl))
+ if err != nil {
+ return err
+ }
+
+ newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
+ if err != nil {
+ return storage.NewInternalError(err.Error())
+ }
+
+ txnResp, err := s.client.KV.Txn(ctx).If(
+ notFound(key),
+ ).Then(
+ clientv3.OpPut(key, string(newData), opts...),
+ ).Commit()
+ if err != nil {
+ return err
+ }
+ if !txnResp.Succeeded {
+ return storage.NewKeyExistsError(key, 0)
+ }
+
+ if out != nil {
+ putResp := txnResp.Responses[0].GetResponsePut()
+ return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
+ }
+ return nil
+}
+
+// Delete implements storage.Interface.Delete.
+func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
+ v, err := conversion.EnforcePtr(out)
+ if err != nil {
+ panic("unable to convert output object to pointer")
+ }
+ key = path.Join(s.pathPrefix, key)
+ if preconditions == nil {
+ return s.unconditionalDelete(ctx, key, out)
+ }
+ return s.conditionalDelete(ctx, key, out, v, preconditions)
+}
+
+func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
+ // We need to do get and delete in single transaction in order to
+ // know the value and revision before deleting it.
+ txnResp, err := s.client.KV.Txn(ctx).If().Then(
+ clientv3.OpGet(key),
+ clientv3.OpDelete(key),
+ ).Commit()
+ if err != nil {
+ return err
+ }
+ getResp := txnResp.Responses[0].GetResponseRange()
+ if len(getResp.Kvs) == 0 {
+ return storage.NewKeyNotFoundError(key, 0)
+ }
+
+ kv := getResp.Kvs[0]
+ data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
+ if err != nil {
+ return storage.NewInternalError(err.Error())
+ }
+ return decode(s.codec, s.versioner, data, out, kv.ModRevision)
+}
+
+func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
+ getResp, err := s.client.KV.Get(ctx, key)
+ if err != nil {
+ return err
+ }
+ for {
+ origState, err := s.getState(getResp, key, v, false)
+ if err != nil {
+ return err
+ }
+ if err := checkPreconditions(key, preconditions, origState.obj); err != nil {
+ return err
+ }
+ txnResp, err := s.client.KV.Txn(ctx).If(
+ clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
+ ).Then(
+ clientv3.OpDelete(key),
+ ).Else(
+ clientv3.OpGet(key),
+ ).Commit()
+ if err != nil {
+ return err
+ }
+ if !txnResp.Succeeded {
+ getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
+ glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
+ continue
+ }
+ return decode(s.codec, s.versioner, origState.data, out, origState.rev)
+ }
+}
+
+// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
+func (s *store) GuaranteedUpdate(
+ ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
+ preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
+ trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String()))
+ defer trace.LogIfLong(500 * time.Millisecond)
+
+ v, err := conversion.EnforcePtr(out)
+ if err != nil {
+ panic("unable to convert output object to pointer")
+ }
+ key = path.Join(s.pathPrefix, key)
+
+ getCurrentState := func() (*objState, error) {
+ getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
+ if err != nil {
+ return nil, err
+ }
+ return s.getState(getResp, key, v, ignoreNotFound)
+ }
+
+ var origState *objState
+ var mustCheckData bool
+ if len(suggestion) == 1 && suggestion[0] != nil {
+ origState, err = s.getStateFromObject(suggestion[0])
+ if err != nil {
+ return err
+ }
+ mustCheckData = true
+ } else {
+ origState, err = getCurrentState()
+ if err != nil {
+ return err
+ }
+ }
+ trace.Step("initial value restored")
+
+ transformContext := authenticatedDataString(key)
+ for {
+ if err := checkPreconditions(key, preconditions, origState.obj); err != nil {
+ return err
+ }
+
+ ret, ttl, err := s.updateState(origState, tryUpdate)
+ if err != nil {
+ // It's possible we were working with stale data
+ if mustCheckData && apierrors.IsConflict(err) {
+ // Actually fetch
+ origState, err = getCurrentState()
+ if err != nil {
+ return err
+ }
+ mustCheckData = false
+ // Retry
+ continue
+ }
+
+ return err
+ }
+
+ data, err := runtime.Encode(s.codec, ret)
+ if err != nil {
+ return err
+ }
+ if !origState.stale && bytes.Equal(data, origState.data) {
+ // if we skipped the original Get in this loop, we must refresh from
+ // etcd in order to be sure the data in the store is equivalent to
+ // our desired serialization
+ if mustCheckData {
+ origState, err = getCurrentState()
+ if err != nil {
+ return err
+ }
+ mustCheckData = false
+ if !bytes.Equal(data, origState.data) {
+ // original data changed, restart loop
+ continue
+ }
+ }
+ // recheck that the data from etcd is not stale before short-circuiting a write
+ if !origState.stale {
+ return decode(s.codec, s.versioner, origState.data, out, origState.rev)
+ }
+ }
+
+ newData, err := s.transformer.TransformToStorage(data, transformContext)
+ if err != nil {
+ return storage.NewInternalError(err.Error())
+ }
+
+ opts, err := s.ttlOpts(ctx, int64(ttl))
+ if err != nil {
+ return err
+ }
+ trace.Step("Transaction prepared")
+
+ txnResp, err := s.client.KV.Txn(ctx).If(
+ clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
+ ).Then(
+ clientv3.OpPut(key, string(newData), opts...),
+ ).Else(
+ clientv3.OpGet(key),
+ ).Commit()
+ if err != nil {
+ return err
+ }
+ trace.Step("Transaction committed")
+ if !txnResp.Succeeded {
+ getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
+ glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
+ origState, err = s.getState(getResp, key, v, ignoreNotFound)
+ if err != nil {
+ return err
+ }
+ trace.Step("Retry value restored")
+ mustCheckData = false
+ continue
+ }
+ putResp := txnResp.Responses[0].GetResponsePut()
+
+ return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
+ }
+}
+
+// GetToList implements storage.Interface.GetToList.
+func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ v, err := conversion.EnforcePtr(listPtr)
+ if err != nil || v.Kind() != reflect.Slice {
+ panic("need ptr to slice")
+ }
+
+ key = path.Join(s.pathPrefix, key)
+ getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
+ if err != nil {
+ return err
+ }
+
+ if len(getResp.Kvs) > 0 {
+ data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
+ if err != nil {
+ return storage.NewInternalError(err.Error())
+ }
+ if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
+ return err
+ }
+ }
+ // update version with cluster level revision
+ return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "")
+}
+
+func (s *store) Count(key string) (int64, error) {
+ key = path.Join(s.pathPrefix, key)
+ getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
+ if err != nil {
+ return 0, err
+ }
+ return getResp.Count, nil
+}
+
+// continueToken is a simple structured object for encoding the state of a continue token.
+// TODO: if we change the version of the encoded from, we can't start encoding the new version
+// until all other servers are upgraded (i.e. we need to support rolling schema)
+// This is a public API struct and cannot change.
+type continueToken struct {
+ APIVersion string `json:"v"`
+ ResourceVersion int64 `json:"rv"`
+ StartKey string `json:"start"`
+}
+
+// parseFrom transforms an encoded predicate from into a versioned struct.
+// TODO: return a typed error that instructs clients that they must relist
+func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
+ data, err := base64.RawURLEncoding.DecodeString(continueValue)
+ if err != nil {
+ return "", 0, fmt.Errorf("continue key is not valid: %v", err)
+ }
+ var c continueToken
+ if err := json.Unmarshal(data, &c); err != nil {
+ return "", 0, fmt.Errorf("continue key is not valid: %v", err)
+ }
+ switch c.APIVersion {
+ case "meta.k8s.io/v1":
+ if c.ResourceVersion == 0 {
+ return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
+ }
+ if len(c.StartKey) == 0 {
+ return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
+ }
+ // defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
+ // be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
+ // continue start key that is fully qualified and cannot range over anything less specific than
+ // keyPrefix.
+ key := c.StartKey
+ if !strings.HasPrefix(key, "/") {
+ key = "/" + key
+ }
+ cleaned := path.Clean(key)
+ if cleaned != key {
+ return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
+ }
+ return keyPrefix + cleaned[1:], c.ResourceVersion, nil
+ default:
+ return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
+ }
+}
+
+// encodeContinue returns a string representing the encoded continuation of the current query.
+func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
+ nextKey := strings.TrimPrefix(key, keyPrefix)
+ if nextKey == key {
+ return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
+ }
+ out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
+ if err != nil {
+ return "", err
+ }
+ return base64.RawURLEncoding.EncodeToString(out), nil
+}
+
+// List implements storage.Interface.List.
+func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
+ listPtr, err := meta.GetItemsPtr(listObj)
+ if err != nil {
+ return err
+ }
+ v, err := conversion.EnforcePtr(listPtr)
+ if err != nil || v.Kind() != reflect.Slice {
+ panic("need ptr to slice")
+ }
+
+ if s.pathPrefix != "" {
+ key = path.Join(s.pathPrefix, key)
+ }
+ // We need to make sure the key ended with "/" so that we only get children "directories".
+ // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
+ // while with prefix "/a/" will return only "/a/b" which is the correct answer.
+ if !strings.HasSuffix(key, "/") {
+ key += "/"
+ }
+ keyPrefix := key
+
+ // set the appropriate clientv3 options to filter the returned data set
+ var paging bool
+ options := make([]clientv3.OpOption, 0, 4)
+ if s.pagingEnabled && pred.Limit > 0 {
+ paging = true
+ options = append(options, clientv3.WithLimit(pred.Limit))
+ }
+
+ var returnedRV int64
+ switch {
+ case s.pagingEnabled && len(pred.Continue) > 0:
+ continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix)
+ if err != nil {
+ return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
+ }
+
+ if len(resourceVersion) > 0 && resourceVersion != "0" {
+ return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
+ }
+
+ rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
+ options = append(options, clientv3.WithRange(rangeEnd))
+ key = continueKey
+
+ options = append(options, clientv3.WithRev(continueRV))
+ returnedRV = continueRV
+
+ case s.pagingEnabled && pred.Limit > 0:
+ if len(resourceVersion) > 0 {
+ fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
+ }
+ if fromRV > 0 {
+ options = append(options, clientv3.WithRev(int64(fromRV)))
+ }
+ returnedRV = int64(fromRV)
+ }
+
+ rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
+ options = append(options, clientv3.WithRange(rangeEnd))
+
+ default:
+ if len(resourceVersion) > 0 {
+ fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
+ if err != nil {
+ return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
+ }
+ if fromRV > 0 {
+ options = append(options, clientv3.WithRev(int64(fromRV)))
+ }
+ returnedRV = int64(fromRV)
+ }
+
+ options = append(options, clientv3.WithPrefix())
+ }
+
+ // loop until we have filled the requested limit from etcd or there are no more results
+ var lastKey []byte
+ var hasMore bool
+ for {
+ getResp, err := s.client.KV.Get(ctx, key, options...)
+ if err != nil {
+ return interpretListError(err, len(pred.Continue) > 0)
+ }
+ hasMore = getResp.More
+
+ if len(getResp.Kvs) == 0 && getResp.More {
+ return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
+ }
+
+ // avoid small allocations for the result slice, since this can be called in many
+ // different contexts and we don't know how significantly the result will be filtered
+ if pred.Empty() {
+ growSlice(v, len(getResp.Kvs))
+ } else {
+ growSlice(v, 2048, len(getResp.Kvs))
+ }
+
+ // take items from the response until the bucket is full, filtering as we go
+ for _, kv := range getResp.Kvs {
+ if paging && int64(v.Len()) >= pred.Limit {
+ hasMore = true
+ break
+ }
+ lastKey = kv.Key
+
+ data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err))
+ continue
+ }
+
+ if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil {
+ return err
+ }
+ }
+
+ // indicate to the client which resource version was returned
+ if returnedRV == 0 {
+ returnedRV = getResp.Header.Revision
+ }
+
+ // no more results remain or we didn't request paging
+ if !hasMore || !paging {
+ break
+ }
+ // we're paging but we have filled our bucket
+ if int64(v.Len()) >= pred.Limit {
+ break
+ }
+ key = string(lastKey) + "\x00"
+ }
+
+ // instruct the client to begin querying from immediately after the last key we returned
+ // we never return a key that the client wouldn't be allowed to see
+ if hasMore {
+ // we want to start immediately after the last key
+ next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
+ if err != nil {
+ return err
+ }
+ return s.versioner.UpdateList(listObj, uint64(returnedRV), next)
+ }
+
+ // no continuation
+ return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
+}
+
+// growSlice takes a slice value and grows its capacity up
+// to the maximum of the passed sizes or maxCapacity, whichever
+// is smaller. Above maxCapacity decisions about allocation are left
+// to the Go runtime on append. This allows a caller to make an
+// educated guess about the potential size of the total list while
+// still avoiding overly aggressive initial allocation. If sizes
+// is empty maxCapacity will be used as the size to grow.
+func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
+ cap := v.Cap()
+ max := cap
+ for _, size := range sizes {
+ if size > max {
+ max = size
+ }
+ }
+ if len(sizes) == 0 || max > maxCapacity {
+ max = maxCapacity
+ }
+ if max <= cap {
+ return
+ }
+ if v.Len() > 0 {
+ extra := reflect.MakeSlice(v.Type(), 0, max)
+ reflect.Copy(extra, v)
+ v.Set(extra)
+ } else {
+ extra := reflect.MakeSlice(v.Type(), 0, max)
+ v.Set(extra)
+ }
+}
+
+// Watch implements storage.Interface.Watch.
+func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
+ return s.watch(ctx, key, resourceVersion, pred, false)
+}
+
+// WatchList implements storage.Interface.WatchList.
+func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
+ return s.watch(ctx, key, resourceVersion, pred, true)
+}
+
+func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
+ rev, err := s.versioner.ParseWatchResourceVersion(rv)
+ if err != nil {
+ return nil, err
+ }
+ key = path.Join(s.pathPrefix, key)
+ return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
+}
+
+func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
+ state := &objState{
+ obj: reflect.New(v.Type()).Interface().(runtime.Object),
+ meta: &storage.ResponseMeta{},
+ }
+ if len(getResp.Kvs) == 0 {
+ if !ignoreNotFound {
+ return nil, storage.NewKeyNotFoundError(key, 0)
+ }
+ if err := runtime.SetZeroValue(state.obj); err != nil {
+ return nil, err
+ }
+ } else {
+ data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
+ if err != nil {
+ return nil, storage.NewInternalError(err.Error())
+ }
+ state.rev = getResp.Kvs[0].ModRevision
+ state.meta.ResourceVersion = uint64(state.rev)
+ state.data = data
+ state.stale = stale
+ if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
+ return nil, err
+ }
+ }
+ return state, nil
+}
+
+func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
+ state := &objState{
+ obj: obj,
+ meta: &storage.ResponseMeta{},
+ }
+
+ rv, err := s.versioner.ObjectResourceVersion(obj)
+ if err != nil {
+ return nil, fmt.Errorf("couldn't get resource version: %v", err)
+ }
+ state.rev = int64(rv)
+ state.meta.ResourceVersion = uint64(state.rev)
+
+ // Compute the serialized form - for that we need to temporarily clean
+ // its resource version field (those are not stored in etcd).
+ if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
+ return nil, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
+ }
+ state.data, err = runtime.Encode(s.codec, obj)
+ if err != nil {
+ return nil, err
+ }
+ s.versioner.UpdateObject(state.obj, uint64(rv))
+ return state, nil
+}
+
+func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
+ ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ if err := s.versioner.PrepareObjectForStorage(ret); err != nil {
+ return nil, 0, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
+ }
+ var ttl uint64
+ if ttlPtr != nil {
+ ttl = *ttlPtr
+ }
+ return ret, ttl, nil
+}
+
+// ttlOpts returns client options based on given ttl.
+// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
+func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
+ if ttl == 0 {
+ return nil, nil
+ }
+ id, err := s.leaseManager.GetLease(ctx, ttl)
+ if err != nil {
+ return nil, err
+ }
+ return []clientv3.OpOption{clientv3.WithLease(id)}, nil
+}
+
+// decode decodes value of bytes into object. It will also set the object resource version to rev.
+// On success, objPtr would be set to the object.
+func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
+ if _, err := conversion.EnforcePtr(objPtr); err != nil {
+ panic("unable to convert output object to pointer")
+ }
+ _, _, err := codec.Decode(value, nil, objPtr)
+ if err != nil {
+ return err
+ }
+ // being unable to set the version does not prevent the object from being extracted
+ versioner.UpdateObject(objPtr, uint64(rev))
+ return nil
+}
+
+// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
+func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error {
+ obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
+ if err != nil {
+ return err
+ }
+ // being unable to set the version does not prevent the object from being extracted
+ versioner.UpdateObject(obj, rev)
+ if matched, err := pred.Matches(obj); err == nil && matched {
+ v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
+ }
+ return nil
+}
+
+func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
+ if preconditions == nil {
+ return nil
+ }
+ objMeta, err := meta.Accessor(out)
+ if err != nil {
+ return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
+ }
+ if preconditions.UID != nil && *preconditions.UID != objMeta.GetUID() {
+ errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", *preconditions.UID, objMeta.GetUID())
+ return storage.NewInvalidObjError(key, errMsg)
+ }
+ return nil
+}
+
+func notFound(key string) clientv3.Cmp {
+ return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
new file mode 100644
index 0000000..f09a000
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
@@ -0,0 +1,402 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd3
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+
+ apierrs "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/apiserver/pkg/storage"
+ "k8s.io/apiserver/pkg/storage/value"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/golang/glog"
+)
+
+const (
+ // We have set a buffer in order to reduce times of context switches.
+ incomingBufSize = 100
+ outgoingBufSize = 100
+)
+
+// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
+var fatalOnDecodeError = false
+
+// errTestingDecode is the only error that testingDeferOnDecodeError catches during a panic
+var errTestingDecode = errors.New("sentinel error only used during testing to indicate watch decoding error")
+
+// testingDeferOnDecodeError is used during testing to recover from a panic caused by errTestingDecode, all other values continue to panic
+func testingDeferOnDecodeError() {
+ if r := recover(); r != nil && r != errTestingDecode {
+ panic(r)
+ }
+}
+
+func init() {
+ // check to see if we are running in a test environment
+ fatalOnDecodeError, _ = strconv.ParseBool(os.Getenv("KUBE_PANIC_WATCH_DECODE_ERROR"))
+}
+
+type watcher struct {
+ client *clientv3.Client
+ codec runtime.Codec
+ versioner storage.Versioner
+ transformer value.Transformer
+}
+
+// watchChan implements watch.Interface.
+type watchChan struct {
+ watcher *watcher
+ key string
+ initialRev int64
+ recursive bool
+ internalPred storage.SelectionPredicate
+ ctx context.Context
+ cancel context.CancelFunc
+ incomingEventChan chan *event
+ resultChan chan watch.Event
+ errChan chan error
+}
+
+func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
+ return &watcher{
+ client: client,
+ codec: codec,
+ versioner: versioner,
+ transformer: transformer,
+ }
+}
+
+// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
+// If rev is zero, it will return the existing object(s) and then start watching from
+// the maximum revision+1 from returned objects.
+// If rev is non-zero, it will watch events happened after given revision.
+// If recursive is false, it watches on given key.
+// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
+// pred must be non-nil. Only if pred matches the change, it will be returned.
+func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
+ if recursive && !strings.HasSuffix(key, "/") {
+ key += "/"
+ }
+ wc := w.createWatchChan(ctx, key, rev, recursive, pred)
+ go wc.run()
+ return wc, nil
+}
+
+func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
+ wc := &watchChan{
+ watcher: w,
+ key: key,
+ initialRev: rev,
+ recursive: recursive,
+ internalPred: pred,
+ incomingEventChan: make(chan *event, incomingBufSize),
+ resultChan: make(chan watch.Event, outgoingBufSize),
+ errChan: make(chan error, 1),
+ }
+ if pred.Empty() {
+ // The filter doesn't filter out any object.
+ wc.internalPred = storage.Everything
+ }
+ wc.ctx, wc.cancel = context.WithCancel(ctx)
+ return wc
+}
+
+func (wc *watchChan) run() {
+ watchClosedCh := make(chan struct{})
+ go wc.startWatching(watchClosedCh)
+
+ var resultChanWG sync.WaitGroup
+ resultChanWG.Add(1)
+ go wc.processEvent(&resultChanWG)
+
+ select {
+ case err := <-wc.errChan:
+ if err == context.Canceled {
+ break
+ }
+ errResult := transformErrorToEvent(err)
+ if errResult != nil {
+ // error result is guaranteed to be received by user before closing ResultChan.
+ select {
+ case wc.resultChan <- *errResult:
+ case <-wc.ctx.Done(): // user has given up all results
+ }
+ }
+ case <-watchClosedCh:
+ case <-wc.ctx.Done(): // user cancel
+ }
+
+ // We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
+ // It's fine to double cancel.
+ wc.cancel()
+
+ // we need to wait until resultChan wouldn't be used anymore
+ resultChanWG.Wait()
+ close(wc.resultChan)
+}
+
+func (wc *watchChan) Stop() {
+ wc.cancel()
+}
+
+func (wc *watchChan) ResultChan() <-chan watch.Event {
+ return wc.resultChan
+}
+
+// sync tries to retrieve existing data and send them to process.
+// The revision to watch will be set to the revision in response.
+// All events sent will have isCreated=true
+func (wc *watchChan) sync() error {
+ opts := []clientv3.OpOption{}
+ if wc.recursive {
+ opts = append(opts, clientv3.WithPrefix())
+ }
+ getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
+ if err != nil {
+ return err
+ }
+ wc.initialRev = getResp.Header.Revision
+ for _, kv := range getResp.Kvs {
+ wc.sendEvent(parseKV(kv))
+ }
+ return nil
+}
+
+// startWatching does:
+// - get current objects if initialRev=0; set initialRev to current rev
+// - watch on given key and send events to process.
+func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
+ if wc.initialRev == 0 {
+ if err := wc.sync(); err != nil {
+ glog.Errorf("failed to sync with latest state: %v", err)
+ wc.sendError(err)
+ return
+ }
+ }
+ opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
+ if wc.recursive {
+ opts = append(opts, clientv3.WithPrefix())
+ }
+ wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
+ for wres := range wch {
+ if wres.Err() != nil {
+ err := wres.Err()
+ // If there is an error on server (e.g. compaction), the channel will return it before closed.
+ glog.Errorf("watch chan error: %v", err)
+ wc.sendError(err)
+ return
+ }
+ for _, e := range wres.Events {
+ wc.sendEvent(parseEvent(e))
+ }
+ }
+ // When we come to this point, it's only possible that client side ends the watch.
+ // e.g. cancel the context, close the client.
+ // If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
+ // We should notify the main thread that this goroutine has exited.
+ close(watchClosedCh)
+}
+
+// processEvent processes events from etcd watcher and sends results to resultChan.
+func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ for {
+ select {
+ case e := <-wc.incomingEventChan:
+ res := wc.transform(e)
+ if res == nil {
+ continue
+ }
+ if len(wc.resultChan) == outgoingBufSize {
+ glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
+ "Probably caused by slow dispatching events to watchers", outgoingBufSize)
+ }
+ // If user couldn't receive results fast enough, we also block incoming events from watcher.
+ // Because storing events in local will cause more memory usage.
+ // The worst case would be closing the fast watcher.
+ select {
+ case wc.resultChan <- *res:
+ case <-wc.ctx.Done():
+ return
+ }
+ case <-wc.ctx.Done():
+ return
+ }
+ }
+}
+
+func (wc *watchChan) filter(obj runtime.Object) bool {
+ if wc.internalPred.Empty() {
+ return true
+ }
+ matched, err := wc.internalPred.Matches(obj)
+ return err == nil && matched
+}
+
+func (wc *watchChan) acceptAll() bool {
+ return wc.internalPred.Empty()
+}
+
+// transform transforms an event into a result for user if not filtered.
+func (wc *watchChan) transform(e *event) (res *watch.Event) {
+ curObj, oldObj, err := wc.prepareObjs(e)
+ if err != nil {
+ glog.Errorf("failed to prepare current and previous objects: %v", err)
+ wc.sendError(err)
+ return nil
+ }
+
+ switch {
+ case e.isDeleted:
+ if !wc.filter(oldObj) {
+ return nil
+ }
+ res = &watch.Event{
+ Type: watch.Deleted,
+ Object: oldObj,
+ }
+ case e.isCreated:
+ if !wc.filter(curObj) {
+ return nil
+ }
+ res = &watch.Event{
+ Type: watch.Added,
+ Object: curObj,
+ }
+ default:
+ if wc.acceptAll() {
+ res = &watch.Event{
+ Type: watch.Modified,
+ Object: curObj,
+ }
+ return res
+ }
+ curObjPasses := wc.filter(curObj)
+ oldObjPasses := wc.filter(oldObj)
+ switch {
+ case curObjPasses && oldObjPasses:
+ res = &watch.Event{
+ Type: watch.Modified,
+ Object: curObj,
+ }
+ case curObjPasses && !oldObjPasses:
+ res = &watch.Event{
+ Type: watch.Added,
+ Object: curObj,
+ }
+ case !curObjPasses && oldObjPasses:
+ res = &watch.Event{
+ Type: watch.Deleted,
+ Object: oldObj,
+ }
+ }
+ }
+ return res
+}
+
+func transformErrorToEvent(err error) *watch.Event {
+ err = interpretWatchError(err)
+ if _, ok := err.(apierrs.APIStatus); !ok {
+ err = apierrs.NewInternalError(err)
+ }
+ status := err.(apierrs.APIStatus).Status()
+ return &watch.Event{
+ Type: watch.Error,
+ Object: &status,
+ }
+}
+
+func (wc *watchChan) sendError(err error) {
+ select {
+ case wc.errChan <- err:
+ case <-wc.ctx.Done():
+ }
+}
+
+func (wc *watchChan) sendEvent(e *event) {
+ if len(wc.incomingEventChan) == incomingBufSize {
+ glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
+ "Probably caused by slow decoding, user not receiving fast, or other processing logic",
+ incomingBufSize)
+ }
+ select {
+ case wc.incomingEventChan <- e:
+ case <-wc.ctx.Done():
+ }
+}
+
+func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
+ if !e.isDeleted {
+ data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
+ if err != nil {
+ return nil, nil, err
+ }
+ curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+ // We need to decode prevValue, only if this is deletion event or
+ // the underlying filter doesn't accept all objects (otherwise we
+ // know that the filter for previous object will return true and
+ // we need the object only to compute whether it was filtered out
+ // before).
+ if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
+ data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue, authenticatedDataString(e.key))
+ if err != nil {
+ return nil, nil, err
+ }
+ // Note that this sends the *old* object with the etcd revision for the time at
+ // which it gets deleted.
+ oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+ return curObj, oldObj, nil
+}
+
+func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
+ obj, err := runtime.Decode(codec, []byte(data))
+ if err != nil {
+ if fatalOnDecodeError {
+ // catch watch decode error iff we caused it on
+ // purpose during a unit test
+ defer testingDeferOnDecodeError()
+ // we are running in a test environment and thus an
+ // error here is due to a coder mistake if the defer
+ // does not catch it
+ panic(err)
+ }
+ return nil, err
+ }
+ // ensure resource version is set on the object we load from etcd
+ if err := versioner.UpdateObject(obj, uint64(rev)); err != nil {
+ return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err)
+ }
+ return obj, nil
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/interfaces.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/interfaces.go
new file mode 100644
index 0000000..227ab2b
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/interfaces.go
@@ -0,0 +1,202 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "context"
+
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/watch"
+)
+
+// Versioner abstracts setting and retrieving metadata fields from database response
+// onto the object ot list. It is required to maintain storage invariants - updating an
+// object twice with the same data except for the ResourceVersion and SelfLink must be
+// a no-op. A resourceVersion of type uint64 is a 'raw' resourceVersion,
+// intended to be sent directly to or from the backend. A resourceVersion of
+// type string is a 'safe' resourceVersion, intended for consumption by users.
+type Versioner interface {
+ // UpdateObject sets storage metadata into an API object. Returns an error if the object
+ // cannot be updated correctly. May return nil if the requested object does not need metadata
+ // from database.
+ UpdateObject(obj runtime.Object, resourceVersion uint64) error
+ // UpdateList sets the resource version into an API list object. Returns an error if the object
+ // cannot be updated correctly. May return nil if the requested object does not need metadata
+ // from database. continueValue is optional and indicates that more results are available if
+ // the client passes that value to the server in a subsequent call.
+ UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error
+ // PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should
+ // return an error if the specified object cannot be updated.
+ PrepareObjectForStorage(obj runtime.Object) error
+ // ObjectResourceVersion returns the resource version (for persistence) of the specified object.
+ // Should return an error if the specified object does not have a persistable version.
+ ObjectResourceVersion(obj runtime.Object) (uint64, error)
+
+ // ParseWatchResourceVersion takes a resource version argument and
+ // converts it to the storage backend we should pass to helper.Watch().
+ // Because resourceVersion is an opaque value, the default watch
+ // behavior for non-zero watch is to watch the next value (if you pass
+ // "1", you will see updates from "2" onwards).
+ ParseWatchResourceVersion(resourceVersion string) (uint64, error)
+ // ParseListResourceVersion takes a resource version argument and
+ // converts it to the storage backend version. Appropriate for
+ // everything that's not intended as an argument for watch.
+ ParseListResourceVersion(resourceVersion string) (uint64, error)
+}
+
+// ResponseMeta contains information about the database metadata that is associated with
+// an object. It abstracts the actual underlying objects to prevent coupling with concrete
+// database and to improve testability.
+type ResponseMeta struct {
+ // TTL is the time to live of the node that contained the returned object. It may be
+ // zero or negative in some cases (objects may be expired after the requested
+ // expiration time due to server lag).
+ TTL int64
+ // The resource version of the node that contained the returned object.
+ ResourceVersion uint64
+}
+
+// MatchValue defines a pair (<index name>, <value for that index>).
+type MatchValue struct {
+ IndexName string
+ Value string
+}
+
+// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs
+// (<index name>, <index value for the given object>) for all indexes known
+// to that function.
+type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
+
+// Everything accepts all objects.
+var Everything = SelectionPredicate{
+ Label: labels.Everything(),
+ Field: fields.Everything(),
+ // TODO: split this into a new top level constant?
+ IncludeUninitialized: true,
+}
+
+// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
+// that is guaranteed to succeed.
+// See the comment for GuaranteedUpdate for more details.
+type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
+
+// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
+type Preconditions struct {
+ // Specifies the target UID.
+ // +optional
+ UID *types.UID `json:"uid,omitempty"`
+}
+
+// NewUIDPreconditions returns a Preconditions with UID set.
+func NewUIDPreconditions(uid string) *Preconditions {
+ u := types.UID(uid)
+ return &Preconditions{UID: &u}
+}
+
+// Interface offers a common interface for object marshaling/unmarshaling operations and
+// hides all the storage-related operations behind it.
+type Interface interface {
+ // Returns Versioner associated with this interface.
+ Versioner() Versioner
+
+ // Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
+ // in seconds (0 means forever). If no error is returned and out is not nil, out will be
+ // set to the read value from database.
+ Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
+
+ // Delete removes the specified key and returns the value that existed at that spot.
+ // If key didn't exist, it will return NotFound storage error.
+ Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error
+
+ // Watch begins watching the specified key. Events are decoded into API objects,
+ // and any items selected by 'p' are sent down to returned watch.Interface.
+ // resourceVersion may be used to specify what version to begin watching,
+ // which should be the current resourceVersion, and no longer rv+1
+ // (e.g. reconnecting without missing any updates).
+ // If resource version is "0", this interface will get current object at given key
+ // and send it in an "ADDED" event, before watch starts.
+ Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
+
+ // WatchList begins watching the specified key's items. Items are decoded into API
+ // objects and any item selected by 'p' are sent down to returned watch.Interface.
+ // resourceVersion may be used to specify what version to begin watching,
+ // which should be the current resourceVersion, and no longer rv+1
+ // (e.g. reconnecting without missing any updates).
+ // If resource version is "0", this interface will list current objects directory defined by key
+ // and send them in "ADDED" events, before watch starts.
+ WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
+
+ // Get unmarshals json found at key into objPtr. On a not found error, will either
+ // return a zero object of the requested type, or an error, depending on ignoreNotFound.
+ // Treats empty responses and nil response nodes exactly like a not found error.
+ // The returned contents may be delayed, but it is guaranteed that they will
+ // be have at least 'resourceVersion'.
+ Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
+
+ // GetToList unmarshals json found at key and opaque it into *List api object
+ // (an object that satisfies the runtime.IsList definition).
+ // The returned contents may be delayed, but it is guaranteed that they will
+ // be have at least 'resourceVersion'.
+ GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
+
+ // List unmarshalls jsons found at directory defined by key and opaque them
+ // into *List api object (an object that satisfies runtime.IsList definition).
+ // The returned contents may be delayed, but it is guaranteed that they will
+ // be have at least 'resourceVersion'.
+ List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
+
+ // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
+ // retrying the update until success if there is index conflict.
+ // Note that object passed to tryUpdate may change across invocations of tryUpdate() if
+ // other writers are simultaneously updating it, so tryUpdate() needs to take into account
+ // the current contents of the object when deciding how the update object should look.
+ // If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
+ // or zero value in 'ptrToType' parameter otherwise.
+ // If the object to update has the same value as previous, it won't do any update
+ // but will return the object in 'ptrToType' parameter.
+ // If 'suggestion' can contain zero or one element - in such case this can be used as
+ // a suggestion about the current version of the object to avoid read operation from
+ // storage to get it.
+ //
+ // Example:
+ //
+ // s := /* implementation of Interface */
+ // err := s.GuaranteedUpdate(
+ // "myKey", &MyType{}, true,
+ // func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
+ // // Before each incovation of the user defined function, "input" is reset to
+ // // current contents for "myKey" in database.
+ // curr := input.(*MyType) // Guaranteed to succeed.
+ //
+ // // Make the modification
+ // curr.Counter++
+ //
+ // // Return the modified object - return an error to stop iterating. Return
+ // // a uint64 to alter the TTL on the object, or nil to keep it the same value.
+ // return cur, nil, nil
+ // }
+ // })
+ GuaranteedUpdate(
+ ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
+ precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
+
+ // Count returns number of different entries under the key (generally being path prefix).
+ Count(key string) (int64, error)
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/names/generate.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/names/generate.go
new file mode 100644
index 0000000..aad9a07
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/names/generate.go
@@ -0,0 +1,54 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package names
+
+import (
+ "fmt"
+
+ utilrand "k8s.io/apimachinery/pkg/util/rand"
+)
+
+// NameGenerator generates names for objects. Some backends may have more information
+// available to guide selection of new names and this interface hides those details.
+type NameGenerator interface {
+ // GenerateName generates a valid name from the base name, adding a random suffix to the
+ // the base. If base is valid, the returned name must also be valid. The generator is
+ // responsible for knowing the maximum valid name length.
+ GenerateName(base string) string
+}
+
+// simpleNameGenerator generates random names.
+type simpleNameGenerator struct{}
+
+// SimpleNameGenerator is a generator that returns the name plus a random suffix of five alphanumerics
+// when a name is requested. The string is guaranteed to not exceed the length of a standard Kubernetes
+// name (63 characters)
+var SimpleNameGenerator NameGenerator = simpleNameGenerator{}
+
+const (
+ // TODO: make this flexible for non-core resources with alternate naming rules.
+ maxNameLength = 63
+ randomLength = 5
+ maxGeneratedNameLength = maxNameLength - randomLength
+)
+
+func (simpleNameGenerator) GenerateName(base string) string {
+ if len(base) > maxGeneratedNameLength {
+ base = base[:maxGeneratedNameLength]
+ }
+ return fmt.Sprintf("%s%s", base, utilrand.String(randomLength))
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/selection_predicate.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/selection_predicate.go
new file mode 100644
index 0000000..9c8c3d5
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/selection_predicate.go
@@ -0,0 +1,150 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// AttrFunc returns label and field sets and the uninitialized flag for List or Watch to match.
+// In any failure to parse given object, it returns error.
+type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, bool, error)
+
+// FieldMutationFunc allows the mutation of the field selection fields. It is mutating to
+// avoid the extra allocation on this common path
+type FieldMutationFunc func(obj runtime.Object, fieldSet fields.Set) error
+
+func DefaultClusterScopedAttr(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
+ metadata, err := meta.Accessor(obj)
+ if err != nil {
+ return nil, nil, false, err
+ }
+ fieldSet := fields.Set{
+ "metadata.name": metadata.GetName(),
+ }
+
+ return labels.Set(metadata.GetLabels()), fieldSet, metadata.GetInitializers() != nil, nil
+}
+
+func DefaultNamespaceScopedAttr(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
+ metadata, err := meta.Accessor(obj)
+ if err != nil {
+ return nil, nil, false, err
+ }
+ fieldSet := fields.Set{
+ "metadata.name": metadata.GetName(),
+ "metadata.namespace": metadata.GetNamespace(),
+ }
+
+ return labels.Set(metadata.GetLabels()), fieldSet, metadata.GetInitializers() != nil, nil
+}
+
+func (f AttrFunc) WithFieldMutation(fieldMutator FieldMutationFunc) AttrFunc {
+ return func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
+ labelSet, fieldSet, initialized, err := f(obj)
+ if err != nil {
+ return nil, nil, false, err
+ }
+ if err := fieldMutator(obj, fieldSet); err != nil {
+ return nil, nil, false, err
+ }
+ return labelSet, fieldSet, initialized, nil
+ }
+}
+
+// SelectionPredicate is used to represent the way to select objects from api storage.
+type SelectionPredicate struct {
+ Label labels.Selector
+ Field fields.Selector
+ IncludeUninitialized bool
+ GetAttrs AttrFunc
+ IndexFields []string
+ Limit int64
+ Continue string
+}
+
+// Matches returns true if the given object's labels and fields (as
+// returned by s.GetAttrs) match s.Label and s.Field. An error is
+// returned if s.GetAttrs fails.
+func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
+ if s.Empty() {
+ return true, nil
+ }
+ labels, fields, uninitialized, err := s.GetAttrs(obj)
+ if err != nil {
+ return false, err
+ }
+ if !s.IncludeUninitialized && uninitialized {
+ return false, nil
+ }
+ matched := s.Label.Matches(labels)
+ if matched && s.Field != nil {
+ matched = matched && s.Field.Matches(fields)
+ }
+ return matched, nil
+}
+
+// MatchesObjectAttributes returns true if the given labels and fields
+// match s.Label and s.Field.
+func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set, uninitialized bool) bool {
+ if !s.IncludeUninitialized && uninitialized {
+ return false
+ }
+ if s.Label.Empty() && s.Field.Empty() {
+ return true
+ }
+ matched := s.Label.Matches(l)
+ if matched && s.Field != nil {
+ matched = (matched && s.Field.Matches(f))
+ }
+ return matched
+}
+
+// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
+// name.
+func (s *SelectionPredicate) MatchesSingle() (string, bool) {
+ if len(s.Continue) > 0 {
+ return "", false
+ }
+ // TODO: should be namespace.name
+ if name, ok := s.Field.RequiresExactMatch("metadata.name"); ok {
+ return name, true
+ }
+ return "", false
+}
+
+// For any index defined by IndexFields, if a matcher can match only (a subset)
+// of objects that return <value> for a given index, a pair (<index name>, <value>)
+// wil be returned.
+// TODO: Consider supporting also labels.
+func (s *SelectionPredicate) MatcherIndex() []MatchValue {
+ var result []MatchValue
+ for _, field := range s.IndexFields {
+ if value, ok := s.Field.RequiresExactMatch(field); ok {
+ result = append(result, MatchValue{IndexName: field, Value: value})
+ }
+ }
+ return result
+}
+
+// Empty returns true if the predicate performs no filtering.
+func (s *SelectionPredicate) Empty() bool {
+ return s.Label.Empty() && s.Field.Empty() && s.IncludeUninitialized
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/OWNERS b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/OWNERS
new file mode 100755
index 0000000..d14ed51
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/OWNERS
@@ -0,0 +1,6 @@
+reviewers:
+- lavalamp
+- smarterclayton
+- wojtek-t
+- timothysc
+- hongchaodeng
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/config.go
new file mode 100644
index 0000000..8d7ecf3
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/config.go
@@ -0,0 +1,80 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storagebackend
+
+import (
+ "time"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apiserver/pkg/storage/value"
+)
+
+const (
+ StorageTypeUnset = ""
+ StorageTypeETCD2 = "etcd2"
+ StorageTypeETCD3 = "etcd3"
+
+ DefaultCompactInterval = 5 * time.Minute
+)
+
+// Config is configuration for creating a storage backend.
+type Config struct {
+ // Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd3".
+ Type string
+ // Prefix is the prefix to all keys passed to storage.Interface methods.
+ Prefix string
+ // ServerList is the list of storage servers to connect with.
+ ServerList []string
+ // TLS credentials
+ KeyFile string
+ CertFile string
+ CAFile string
+ // Quorum indicates that whether read operations should be quorum-level consistent.
+ Quorum bool
+ // Paging indicates whether the server implementation should allow paging (if it is
+ // supported). This is generally configured by feature gating, or by a specific
+ // resource type not wishing to allow paging, and is not intended for end users to
+ // set.
+ Paging bool
+ // DeserializationCacheSize is the size of cache of deserialized objects.
+ // Currently this is only supported in etcd2.
+ // We will drop the cache once using protobuf.
+ DeserializationCacheSize int
+
+ Codec runtime.Codec
+ // Transformer allows the value to be transformed prior to persisting into etcd.
+ Transformer value.Transformer
+
+ // CompactionInterval is an interval of requesting compaction from apiserver.
+ // If the value is 0, no compaction will be issued.
+ CompactionInterval time.Duration
+
+ // CountMetricPollPeriod specifies how often should count metric be updated
+ CountMetricPollPeriod time.Duration
+}
+
+func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
+ return &Config{
+ Prefix: prefix,
+ // Default cache size to 0 - if unset, its size will be set based on target
+ // memory usage.
+ DeserializationCacheSize: 0,
+ Codec: codec,
+ CompactionInterval: DefaultCompactInterval,
+ Quorum: true,
+ }
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go
new file mode 100644
index 0000000..41542cc
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go
@@ -0,0 +1,81 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package factory
+
+import (
+ "net"
+ "net/http"
+ "time"
+
+ etcd2client "github.com/coreos/etcd/client"
+ "github.com/coreos/etcd/pkg/transport"
+
+ utilnet "k8s.io/apimachinery/pkg/util/net"
+ "k8s.io/apiserver/pkg/storage"
+ "k8s.io/apiserver/pkg/storage/etcd"
+ "k8s.io/apiserver/pkg/storage/storagebackend"
+)
+
+func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
+ tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
+ if err != nil {
+ return nil, nil, err
+ }
+ client, err := newETCD2Client(tr, c.ServerList)
+ if err != nil {
+ return nil, nil, err
+ }
+ s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, etcd.IdentityTransformer)
+ return s, tr.CloseIdleConnections, nil
+}
+
+func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {
+ cli, err := etcd2client.New(etcd2client.Config{
+ Endpoints: serverList,
+ Transport: tr,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return cli, nil
+}
+
+func newTransportForETCD2(certFile, keyFile, caFile string) (*http.Transport, error) {
+ info := transport.TLSInfo{
+ CertFile: certFile,
+ KeyFile: keyFile,
+ CAFile: caFile,
+ }
+ cfg, err := info.ClientConfig()
+ if err != nil {
+ return nil, err
+ }
+ // Copied from etcd.DefaultTransport declaration.
+ // TODO: Determine if transport needs optimization
+ tr := utilnet.SetTransportDefaults(&http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ DialContext: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 30 * time.Second,
+ }).DialContext,
+ TLSHandshakeTimeout: 10 * time.Second,
+ MaxIdleConnsPerHost: 500,
+ TLSClientConfig: cfg,
+ })
+ return tr, nil
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
new file mode 100644
index 0000000..06b935a
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
@@ -0,0 +1,82 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package factory
+
+import (
+ "context"
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/pkg/transport"
+
+ "k8s.io/apiserver/pkg/storage"
+ "k8s.io/apiserver/pkg/storage/etcd3"
+ "k8s.io/apiserver/pkg/storage/storagebackend"
+ "k8s.io/apiserver/pkg/storage/value"
+)
+
+// The short keepalive timeout and interval have been chosen to aggressively
+// detect a failed etcd server without introducing much overhead.
+const keepaliveTime = 30 * time.Second
+const keepaliveTimeout = 10 * time.Second
+
+// dialTimeout is the timeout for failing to establish a connection.
+// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
+// on heavily loaded arm64 CPUs (issue #64649)
+const dialTimeout = 20 * time.Second
+
+func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
+ tlsInfo := transport.TLSInfo{
+ CertFile: c.CertFile,
+ KeyFile: c.KeyFile,
+ CAFile: c.CAFile,
+ }
+ tlsConfig, err := tlsInfo.ClientConfig()
+ if err != nil {
+ return nil, nil, err
+ }
+ // NOTE: Client relies on nil tlsConfig
+ // for non-secure connections, update the implicit variable
+ if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 {
+ tlsConfig = nil
+ }
+ cfg := clientv3.Config{
+ DialTimeout: dialTimeout,
+ DialKeepAliveTime: keepaliveTime,
+ DialKeepAliveTimeout: keepaliveTimeout,
+ Endpoints: c.ServerList,
+ TLS: tlsConfig,
+ }
+ client, err := clientv3.New(cfg)
+ if err != nil {
+ return nil, nil, err
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ etcd3.StartCompactor(ctx, client, c.CompactionInterval)
+ destroyFunc := func() {
+ cancel()
+ client.Close()
+ }
+ transformer := c.Transformer
+ if transformer == nil {
+ transformer = value.IdentityTransformer
+ }
+ if c.Quorum {
+ return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
+ }
+ return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
new file mode 100644
index 0000000..101207b
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
@@ -0,0 +1,43 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package factory
+
+import (
+ "fmt"
+
+ "k8s.io/apiserver/pkg/storage"
+ "k8s.io/apiserver/pkg/storage/storagebackend"
+)
+
+// DestroyFunc is to destroy any resources used by the storage returned in Create() together.
+type DestroyFunc func()
+
+// Create creates a storage backend based on given config.
+func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
+ switch c.Type {
+ case storagebackend.StorageTypeETCD2:
+ return newETCD2Storage(c)
+ case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
+ // TODO: We have the following features to implement:
+ // - Support secure connection by using key, cert, and CA files.
+ // - Honor "https" scheme to support secure connection in gRPC.
+ // - Support non-quorum read.
+ return newETCD3Storage(c)
+ default:
+ return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
+ }
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/time_budget.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/time_budget.go
new file mode 100644
index 0000000..e619ec6
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/time_budget.go
@@ -0,0 +1,95 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "sync"
+ "time"
+)
+
+const (
+ refreshPerSecond = 50 * time.Millisecond
+ maxBudget = 100 * time.Millisecond
+)
+
+// timeBudget implements a budget of time that you can use and is
+// periodically being refreshed. The pattern to use it is:
+// budget := newTimeBudget(...)
+// ...
+// timeout := budget.takeAvailable()
+// // Now you can spend at most timeout on doing stuff
+// ...
+// // If you didn't use all timeout, return what you didn't use
+// budget.returnUnused(<unused part of timeout>)
+//
+// NOTE: It's not recommended to be used concurrently from multiple threads -
+// if first user takes the whole timeout, the second one will get 0 timeout
+// even though the first one may return something later.
+type timeBudget struct {
+ sync.Mutex
+ budget time.Duration
+
+ refresh time.Duration
+ maxBudget time.Duration
+}
+
+func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
+ result := &timeBudget{
+ budget: time.Duration(0),
+ refresh: refreshPerSecond,
+ maxBudget: maxBudget,
+ }
+ go result.periodicallyRefresh(stopCh)
+ return result
+}
+
+func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ t.Lock()
+ if t.budget = t.budget + t.refresh; t.budget > t.maxBudget {
+ t.budget = t.maxBudget
+ }
+ t.Unlock()
+ case <-stopCh:
+ return
+ }
+ }
+}
+
+func (t *timeBudget) takeAvailable() time.Duration {
+ t.Lock()
+ defer t.Unlock()
+ result := t.budget
+ t.budget = time.Duration(0)
+ return result
+}
+
+func (t *timeBudget) returnUnused(unused time.Duration) {
+ t.Lock()
+ defer t.Unlock()
+ if unused < 0 {
+ // We used more than allowed.
+ return
+ }
+ if t.budget = t.budget + unused; t.budget > t.maxBudget {
+ t.budget = t.maxBudget
+ }
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/util.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/util.go
new file mode 100644
index 0000000..9d437d0
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/util.go
@@ -0,0 +1,115 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "fmt"
+ "strings"
+ "sync/atomic"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/api/validation/path"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
+
+// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc
+func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
+ return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
+ out, err := fn(input)
+ return out, nil, err
+ }
+}
+
+func EverythingFunc(runtime.Object) bool {
+ return true
+}
+
+func NoTriggerFunc() []MatchValue {
+ return nil
+}
+
+func NoTriggerPublisher(runtime.Object) []MatchValue {
+ return nil
+}
+
+func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
+ meta, err := meta.Accessor(obj)
+ if err != nil {
+ return "", err
+ }
+ name := meta.GetName()
+ if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
+ return "", fmt.Errorf("invalid name: %v", msgs)
+ }
+ return prefix + "/" + meta.GetNamespace() + "/" + name, nil
+}
+
+func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
+ meta, err := meta.Accessor(obj)
+ if err != nil {
+ return "", err
+ }
+ name := meta.GetName()
+ if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
+ return "", fmt.Errorf("invalid name: %v", msgs)
+ }
+ return prefix + "/" + name, nil
+}
+
+// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
+func hasPathPrefix(s, pathPrefix string) bool {
+ // Short circuit if s doesn't contain the prefix at all
+ if !strings.HasPrefix(s, pathPrefix) {
+ return false
+ }
+
+ pathPrefixLength := len(pathPrefix)
+
+ if len(s) == pathPrefixLength {
+ // Exact match
+ return true
+ }
+ if strings.HasSuffix(pathPrefix, "/") {
+ // pathPrefix already ensured a path segment boundary
+ return true
+ }
+ if s[pathPrefixLength:pathPrefixLength+1] == "/" {
+ // The next character in s is a path segment boundary
+ // Check this instead of normalizing pathPrefix to avoid allocating on every call
+ return true
+ }
+ return false
+}
+
+// HighWaterMark is a thread-safe object for tracking the maximum value seen
+// for some quantity.
+type HighWaterMark int64
+
+// Update returns true if and only if 'current' is the highest value ever seen.
+func (hwm *HighWaterMark) Update(current int64) bool {
+ for {
+ old := atomic.LoadInt64((*int64)(hwm))
+ if current <= old {
+ return false
+ }
+ if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
+ return true
+ }
+ }
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/value/metrics.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/value/metrics.go
new file mode 100644
index 0000000..1fe3167
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/value/metrics.go
@@ -0,0 +1,124 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package value
+
+import (
+ "sync"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+ namespace = "apiserver"
+ subsystem = "storage"
+)
+
+var (
+ transformerLatencies = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "transformation_latencies_microseconds",
+ Help: "Latencies in microseconds of value transformation operations.",
+ // In-process transformations (ex. AES CBC) complete on the order of 20 microseconds. However, when
+ // external KMS is involved latencies may climb into milliseconds.
+ Buckets: prometheus.ExponentialBuckets(5, 2, 14),
+ },
+ []string{"transformation_type"},
+ )
+ transformerFailuresTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "transformation_failures_total",
+ Help: "Total number of failed transformation operations.",
+ },
+ []string{"transformation_type"},
+ )
+
+ envelopeTransformationCacheMissTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "envelope_transformation_cache_misses_total",
+ Help: "Total number of cache misses while accessing key decryption key(KEK).",
+ },
+ )
+
+ dataKeyGenerationLatencies = prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "data_key_generation_latencies_microseconds",
+ Help: "Latencies in microseconds of data encryption key(DEK) generation operations.",
+ Buckets: prometheus.ExponentialBuckets(5, 2, 14),
+ },
+ )
+ dataKeyGenerationFailuresTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "data_key_generation_failures_total",
+ Help: "Total number of failed data encryption key(DEK) generation operations.",
+ },
+ )
+)
+
+var registerMetrics sync.Once
+
+func RegisterMetrics() {
+ registerMetrics.Do(func() {
+ prometheus.MustRegister(transformerLatencies)
+ prometheus.MustRegister(transformerFailuresTotal)
+ prometheus.MustRegister(envelopeTransformationCacheMissTotal)
+ prometheus.MustRegister(dataKeyGenerationLatencies)
+ prometheus.MustRegister(dataKeyGenerationFailuresTotal)
+ })
+}
+
+// RecordTransformation records latencies and count of TransformFromStorage and TransformToStorage operations.
+func RecordTransformation(transformationType string, start time.Time, err error) {
+ if err != nil {
+ transformerFailuresTotal.WithLabelValues(transformationType).Inc()
+ return
+ }
+
+ since := sinceInMicroseconds(start)
+ transformerLatencies.WithLabelValues(transformationType).Observe(float64(since))
+}
+
+// RecordCacheMiss records a miss on Key Encryption Key(KEK) - call to KMS was required to decrypt KEK.
+func RecordCacheMiss() {
+ envelopeTransformationCacheMissTotal.Inc()
+}
+
+// RecordDataKeyGeneration records latencies and count of Data Encryption Key generation operations.
+func RecordDataKeyGeneration(start time.Time, err error) {
+ if err != nil {
+ dataKeyGenerationFailuresTotal.Inc()
+ return
+ }
+
+ since := sinceInMicroseconds(start)
+ dataKeyGenerationLatencies.Observe(float64(since))
+}
+
+func sinceInMicroseconds(start time.Time) int64 {
+ elapsedNanoseconds := time.Since(start).Nanoseconds()
+ return elapsedNanoseconds / int64(time.Microsecond)
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/value/transformer.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/value/transformer.go
new file mode 100644
index 0000000..bad6ed5
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/value/transformer.go
@@ -0,0 +1,164 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package value contains methods for assisting with transformation of values in storage.
+package value
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+ "time"
+)
+
+func init() {
+ RegisterMetrics()
+}
+
+// Context is additional information that a storage transformation may need to verify the data at rest.
+type Context interface {
+ // AuthenticatedData should return an array of bytes that describes the current value. If the value changes,
+ // the transformer may report the value as unreadable or tampered. This may be nil if no such description exists
+ // or is needed. For additional verification, set this to data that strongly identifies the value, such as
+ // the key and creation version of the stored data.
+ AuthenticatedData() []byte
+}
+
+// Transformer allows a value to be transformed before being read from or written to the underlying store. The methods
+// must be able to undo the transformation caused by the other.
+type Transformer interface {
+ // TransformFromStorage may transform the provided data from its underlying storage representation or return an error.
+ // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
+ // have not changed.
+ TransformFromStorage(data []byte, context Context) (out []byte, stale bool, err error)
+ // TransformToStorage may transform the provided data into the appropriate form in storage or return an error.
+ TransformToStorage(data []byte, context Context) (out []byte, err error)
+}
+
+type identityTransformer struct{}
+
+// IdentityTransformer performs no transformation of the provided data.
+var IdentityTransformer Transformer = identityTransformer{}
+
+func (identityTransformer) TransformFromStorage(b []byte, ctx Context) ([]byte, bool, error) {
+ return b, false, nil
+}
+func (identityTransformer) TransformToStorage(b []byte, ctx Context) ([]byte, error) {
+ return b, nil
+}
+
+// DefaultContext is a simple implementation of Context for a slice of bytes.
+type DefaultContext []byte
+
+// AuthenticatedData returns itself.
+func (c DefaultContext) AuthenticatedData() []byte { return []byte(c) }
+
+// MutableTransformer allows a transformer to be changed safely at runtime.
+type MutableTransformer struct {
+ lock sync.RWMutex
+ transformer Transformer
+}
+
+// NewMutableTransformer creates a transformer that can be updated at any time by calling Set()
+func NewMutableTransformer(transformer Transformer) *MutableTransformer {
+ return &MutableTransformer{transformer: transformer}
+}
+
+// Set updates the nested transformer.
+func (t *MutableTransformer) Set(transformer Transformer) {
+ t.lock.Lock()
+ t.transformer = transformer
+ t.lock.Unlock()
+}
+
+func (t *MutableTransformer) TransformFromStorage(data []byte, context Context) (out []byte, stale bool, err error) {
+ defer func(start time.Time) {
+ RecordTransformation("from_storage", start, err)
+ }(time.Now())
+ t.lock.RLock()
+ transformer := t.transformer
+ t.lock.RUnlock()
+ return transformer.TransformFromStorage(data, context)
+}
+func (t *MutableTransformer) TransformToStorage(data []byte, context Context) (out []byte, err error) {
+ defer func(start time.Time) {
+ RecordTransformation("to_storage", start, err)
+ }(time.Now())
+ t.lock.RLock()
+ transformer := t.transformer
+ t.lock.RUnlock()
+ return transformer.TransformToStorage(data, context)
+}
+
+// PrefixTransformer holds a transformer interface and the prefix that the transformation is located under.
+type PrefixTransformer struct {
+ Prefix []byte
+ Transformer Transformer
+}
+
+type prefixTransformers struct {
+ transformers []PrefixTransformer
+ err error
+}
+
+var _ Transformer = &prefixTransformers{}
+
+// NewPrefixTransformers supports the Transformer interface by checking the incoming data against the provided
+// prefixes in order. The first matching prefix will be used to transform the value (the prefix is stripped
+// before the Transformer interface is invoked). The first provided transformer will be used when writing to
+// the store.
+func NewPrefixTransformers(err error, transformers ...PrefixTransformer) Transformer {
+ if err == nil {
+ err = fmt.Errorf("the provided value does not match any of the supported transformers")
+ }
+ return &prefixTransformers{
+ transformers: transformers,
+ err: err,
+ }
+}
+
+// TransformFromStorage finds the first transformer with a prefix matching the provided data and returns
+// the result of transforming the value. It will always mark any transformation as stale that is not using
+// the first transformer.
+func (t *prefixTransformers) TransformFromStorage(data []byte, context Context) ([]byte, bool, error) {
+ for i, transformer := range t.transformers {
+ if bytes.HasPrefix(data, transformer.Prefix) {
+ result, stale, err := transformer.Transformer.TransformFromStorage(data[len(transformer.Prefix):], context)
+ // To migrate away from encryption, user can specify an identity transformer higher up
+ // (in the config file) than the encryption transformer. In that scenario, the identity transformer needs to
+ // identify (during reads from disk) whether the data being read is encrypted or not. If the data is encrypted,
+ // it shall throw an error, but that error should not prevent subsequent transformers from being tried.
+ if len(transformer.Prefix) == 0 && err != nil {
+ continue
+ }
+ return result, stale || i != 0, err
+ }
+ }
+ return nil, false, t.err
+}
+
+// TransformToStorage uses the first transformer and adds its prefix to the data.
+func (t *prefixTransformers) TransformToStorage(data []byte, context Context) ([]byte, error) {
+ transformer := t.transformers[0]
+ prefixedData := make([]byte, len(transformer.Prefix), len(data)+len(transformer.Prefix))
+ copy(prefixedData, transformer.Prefix)
+ result, err := transformer.Transformer.TransformToStorage(data, context)
+ if err != nil {
+ return nil, err
+ }
+ prefixedData = append(prefixedData, result...)
+ return prefixedData, nil
+}
diff --git a/metrics-server/vendor/k8s.io/apiserver/pkg/storage/watch_cache.go b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/watch_cache.go
new file mode 100644
index 0000000..373e74e
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apiserver/pkg/storage/watch_cache.go
@@ -0,0 +1,484 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package storage
+
+import (
+ "fmt"
+ "sort"
+ "strconv"
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
+ "k8s.io/apimachinery/pkg/watch"
+ utiltrace "k8s.io/apiserver/pkg/util/trace"
+ "k8s.io/client-go/tools/cache"
+)
+
+const (
+ // blockTimeout determines how long we're willing to block the request
+ // to wait for a given resource version to be propagated to cache,
+ // before terminating request and returning Timeout error with retry
+ // after suggestion.
+ blockTimeout = 3 * time.Second
+)
+
+// watchCacheEvent is a single "watch event" that is send to users of
+// watchCache. Additionally to a typical "watch.Event" it contains
+// the previous value of the object to enable proper filtering in the
+// upper layers.
+type watchCacheEvent struct {
+ Type watch.EventType
+ Object runtime.Object
+ ObjLabels labels.Set
+ ObjFields fields.Set
+ ObjUninitialized bool
+ PrevObject runtime.Object
+ PrevObjLabels labels.Set
+ PrevObjFields fields.Set
+ PrevObjUninitialized bool
+ Key string
+ ResourceVersion uint64
+}
+
+// Computing a key of an object is generally non-trivial (it performs
+// e.g. validation underneath). Similarly computing object fields and
+// labels. To avoid computing them multiple times (to serve the event
+// in different List/Watch requests), in the underlying store we are
+// keeping structs (key, object, labels, fields, uninitialized).
+type storeElement struct {
+ Key string
+ Object runtime.Object
+ Labels labels.Set
+ Fields fields.Set
+ Uninitialized bool
+}
+
+func storeElementKey(obj interface{}) (string, error) {
+ elem, ok := obj.(*storeElement)
+ if !ok {
+ return "", fmt.Errorf("not a storeElement: %v", obj)
+ }
+ return elem.Key, nil
+}
+
+// watchCacheElement is a single "watch event" stored in a cache.
+// It contains the resource version of the object and the object
+// itself.
+type watchCacheElement struct {
+ resourceVersion uint64
+ watchCacheEvent *watchCacheEvent
+}
+
+// watchCache implements a Store interface.
+// However, it depends on the elements implementing runtime.Object interface.
+//
+// watchCache is a "sliding window" (with a limited capacity) of objects
+// observed from a watch.
+type watchCache struct {
+ sync.RWMutex
+
+ // Condition on which lists are waiting for the fresh enough
+ // resource version.
+ cond *sync.Cond
+
+ // Maximum size of history window.
+ capacity int
+
+ // keyFunc is used to get a key in the underlying storage for a given object.
+ keyFunc func(runtime.Object) (string, error)
+
+ // getAttrsFunc is used to get labels and fields of an object.
+ getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)
+
+ // cache is used a cyclic buffer - its first element (with the smallest
+ // resourceVersion) is defined by startIndex, its last element is defined
+ // by endIndex (if cache is full it will be startIndex + capacity).
+ // Both startIndex and endIndex can be greater than buffer capacity -
+ // you should always apply modulo capacity to get an index in cache array.
+ cache []watchCacheElement
+ startIndex int
+ endIndex int
+
+ // store will effectively support LIST operation from the "end of cache
+ // history" i.e. from the moment just after the newest cached watched event.
+ // It is necessary to effectively allow clients to start watching at now.
+ // NOTE: We assume that <store> is thread-safe.
+ store cache.Store
+
+ // ResourceVersion up to which the watchCache is propagated.
+ resourceVersion uint64
+
+ // This handler is run at the end of every successful Replace() method.
+ onReplace func()
+
+ // This handler is run at the end of every Add/Update/Delete method
+ // and additionally gets the previous value of the object.
+ onEvent func(*watchCacheEvent)
+
+ // for testing timeouts.
+ clock clock.Clock
+}
+
+func newWatchCache(
+ capacity int,
+ keyFunc func(runtime.Object) (string, error),
+ getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache {
+ wc := &watchCache{
+ capacity: capacity,
+ keyFunc: keyFunc,
+ getAttrsFunc: getAttrsFunc,
+ cache: make([]watchCacheElement, capacity),
+ startIndex: 0,
+ endIndex: 0,
+ store: cache.NewStore(storeElementKey),
+ resourceVersion: 0,
+ clock: clock.RealClock{},
+ }
+ wc.cond = sync.NewCond(wc.RLocker())
+ return wc
+}
+
+// Add takes runtime.Object as an argument.
+func (w *watchCache) Add(obj interface{}) error {
+ object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
+ if err != nil {
+ return err
+ }
+ event := watch.Event{Type: watch.Added, Object: object}
+
+ f := func(elem *storeElement) error { return w.store.Add(elem) }
+ return w.processEvent(event, resourceVersion, f)
+}
+
+// Update takes runtime.Object as an argument.
+func (w *watchCache) Update(obj interface{}) error {
+ object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
+ if err != nil {
+ return err
+ }
+ event := watch.Event{Type: watch.Modified, Object: object}
+
+ f := func(elem *storeElement) error { return w.store.Update(elem) }
+ return w.processEvent(event, resourceVersion, f)
+}
+
+// Delete takes runtime.Object as an argument.
+func (w *watchCache) Delete(obj interface{}) error {
+ object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
+ if err != nil {
+ return err
+ }
+ event := watch.Event{Type: watch.Deleted, Object: object}
+
+ f := func(elem *storeElement) error { return w.store.Delete(elem) }
+ return w.processEvent(event, resourceVersion, f)
+}
+
+func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
+ object, ok := obj.(runtime.Object)
+ if !ok {
+ return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
+ }
+ meta, err := meta.Accessor(object)
+ if err != nil {
+ return nil, 0, err
+ }
+ resourceVersion, err := parseResourceVersion(meta.GetResourceVersion())
+ if err != nil {
+ return nil, 0, err
+ }
+ return object, resourceVersion, nil
+}
+
+func parseResourceVersion(resourceVersion string) (uint64, error) {
+ if resourceVersion == "" {
+ return 0, nil
+ }
+ // Use bitsize being the size of int on the machine.
+ return strconv.ParseUint(resourceVersion, 10, 0)
+}
+
+func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
+ key, err := w.keyFunc(event.Object)
+ if err != nil {
+ return fmt.Errorf("couldn't compute key: %v", err)
+ }
+ elem := &storeElement{Key: key, Object: event.Object}
+ elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
+ if err != nil {
+ return err
+ }
+
+ watchCacheEvent := &watchCacheEvent{
+ Type: event.Type,
+ Object: elem.Object,
+ ObjLabels: elem.Labels,
+ ObjFields: elem.Fields,
+ ObjUninitialized: elem.Uninitialized,
+ Key: key,
+ ResourceVersion: resourceVersion,
+ }
+
+ // TODO: We should consider moving this lock below after the watchCacheEvent
+ // is created. In such situation, the only problematic scenario is Replace(
+ // happening after getting object from store and before acquiring a lock.
+ // Maybe introduce another lock for this purpose.
+ w.Lock()
+ defer w.Unlock()
+ previous, exists, err := w.store.Get(elem)
+ if err != nil {
+ return err
+ }
+ if exists {
+ previousElem := previous.(*storeElement)
+ watchCacheEvent.PrevObject = previousElem.Object
+ watchCacheEvent.PrevObjLabels = previousElem.Labels
+ watchCacheEvent.PrevObjFields = previousElem.Fields
+ watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
+ }
+
+ if w.onEvent != nil {
+ w.onEvent(watchCacheEvent)
+ }
+ w.updateCache(resourceVersion, watchCacheEvent)
+ w.resourceVersion = resourceVersion
+ w.cond.Broadcast()
+ return updateFunc(elem)
+}
+
+// Assumes that lock is already held for write.
+func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
+ if w.endIndex == w.startIndex+w.capacity {
+ // Cache is full - remove the oldest element.
+ w.startIndex++
+ }
+ w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
+ w.endIndex++
+}
+
+// List returns list of pointers to <storeElement> objects.
+func (w *watchCache) List() []interface{} {
+ return w.store.List()
+}
+
+// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
+// NOTE: This function acquired lock and doesn't release it.
+// You HAVE TO explicitly call w.RUnlock() after this function.
+func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error {
+ startTime := w.clock.Now()
+ go func() {
+ // Wake us up when the time limit has expired. The docs
+ // promise that time.After (well, NewTimer, which it calls)
+ // will wait *at least* the duration given. Since this go
+ // routine starts sometime after we record the start time, and
+ // it will wake up the loop below sometime after the broadcast,
+ // we don't need to worry about waking it up before the time
+ // has expired accidentally.
+ <-w.clock.After(blockTimeout)
+ w.cond.Broadcast()
+ }()
+
+ w.RLock()
+ if trace != nil {
+ trace.Step("watchCache locked acquired")
+ }
+ for w.resourceVersion < resourceVersion {
+ if w.clock.Since(startTime) >= blockTimeout {
+ // Timeout with retry after 1 second.
+ return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
+ }
+ w.cond.Wait()
+ }
+ if trace != nil {
+ trace.Step("watchCache fresh enough")
+ }
+ return nil
+}
+
+// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
+func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
+ err := w.waitUntilFreshAndBlock(resourceVersion, trace)
+ defer w.RUnlock()
+ if err != nil {
+ return nil, 0, err
+ }
+ return w.store.List(), w.resourceVersion, nil
+}
+
+// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
+func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
+ err := w.waitUntilFreshAndBlock(resourceVersion, trace)
+ defer w.RUnlock()
+ if err != nil {
+ return nil, false, 0, err
+ }
+ value, exists, err := w.store.GetByKey(key)
+ return value, exists, w.resourceVersion, err
+}
+
+func (w *watchCache) ListKeys() []string {
+ return w.store.ListKeys()
+}
+
+// Get takes runtime.Object as a parameter. However, it returns
+// pointer to <storeElement>.
+func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
+ object, ok := obj.(runtime.Object)
+ if !ok {
+ return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
+ }
+ key, err := w.keyFunc(object)
+ if err != nil {
+ return nil, false, fmt.Errorf("couldn't compute key: %v", err)
+ }
+
+ return w.store.Get(&storeElement{Key: key, Object: object})
+}
+
+// GetByKey returns pointer to <storeElement>.
+func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
+ return w.store.GetByKey(key)
+}
+
+// Replace takes slice of runtime.Object as a parameter.
+func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
+ version, err := parseResourceVersion(resourceVersion)
+ if err != nil {
+ return err
+ }
+
+ toReplace := make([]interface{}, 0, len(objs))
+ for _, obj := range objs {
+ object, ok := obj.(runtime.Object)
+ if !ok {
+ return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
+ }
+ key, err := w.keyFunc(object)
+ if err != nil {
+ return fmt.Errorf("couldn't compute key: %v", err)
+ }
+ objLabels, objFields, objUninitialized, err := w.getAttrsFunc(object)
+ if err != nil {
+ return err
+ }
+ toReplace = append(toReplace, &storeElement{
+ Key: key,
+ Object: object,
+ Labels: objLabels,
+ Fields: objFields,
+ Uninitialized: objUninitialized,
+ })
+ }
+
+ w.Lock()
+ defer w.Unlock()
+
+ w.startIndex = 0
+ w.endIndex = 0
+ if err := w.store.Replace(toReplace, resourceVersion); err != nil {
+ return err
+ }
+ w.resourceVersion = version
+ if w.onReplace != nil {
+ w.onReplace()
+ }
+ w.cond.Broadcast()
+ return nil
+}
+
+func (w *watchCache) SetOnReplace(onReplace func()) {
+ w.Lock()
+ defer w.Unlock()
+ w.onReplace = onReplace
+}
+
+func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
+ w.Lock()
+ defer w.Unlock()
+ w.onEvent = onEvent
+}
+
+func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
+ size := w.endIndex - w.startIndex
+ // if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
+ // is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
+ oldest := w.resourceVersion + 1
+ if size > 0 {
+ oldest = w.cache[w.startIndex%w.capacity].resourceVersion
+ }
+ if resourceVersion == 0 {
+ // resourceVersion = 0 means that we don't require any specific starting point
+ // and we would like to start watching from ~now.
+ // However, to keep backward compatibility, we additionally need to return the
+ // current state and only then start watching from that point.
+ //
+ // TODO: In v2 api, we should stop returning the current state - #13969.
+ allItems := w.store.List()
+ result := make([]*watchCacheEvent, len(allItems))
+ for i, item := range allItems {
+ elem, ok := item.(*storeElement)
+ if !ok {
+ return nil, fmt.Errorf("not a storeElement: %v", elem)
+ }
+ objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
+ if err != nil {
+ return nil, err
+ }
+ result[i] = &watchCacheEvent{
+ Type: watch.Added,
+ Object: elem.Object,
+ ObjLabels: objLabels,
+ ObjFields: objFields,
+ ObjUninitialized: objUninitialized,
+ Key: elem.Key,
+ ResourceVersion: w.resourceVersion,
+ }
+ }
+ return result, nil
+ }
+ if resourceVersion < oldest-1 {
+ return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
+ }
+
+ // Binary search the smallest index at which resourceVersion is greater than the given one.
+ f := func(i int) bool {
+ return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
+ }
+ first := sort.Search(size, f)
+ result := make([]*watchCacheEvent, size-first)
+ for i := 0; i < size-first; i++ {
+ result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
+ }
+ return result, nil
+}
+
+func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
+ w.RLock()
+ defer w.RUnlock()
+ return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
+}
+
+func (w *watchCache) Resync() error {
+ // Nothing to do
+ return nil
+}