git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server

subrepo:
  subdir:   "metrics-server"
  merged:   "92d8412"
upstream:
  origin:   "https://github.com/kubernetes-incubator/metrics-server.git"
  branch:   "master"
  commit:   "92d8412"
git-subrepo:
  version:  "0.4.0"
  origin:   "???"
  commit:   "???"
diff --git a/metrics-server/vendor/k8s.io/apimachinery/pkg/watch/mux.go b/metrics-server/vendor/k8s.io/apimachinery/pkg/watch/mux.go
new file mode 100644
index 0000000..0ac8dc4
--- /dev/null
+++ b/metrics-server/vendor/k8s.io/apimachinery/pkg/watch/mux.go
@@ -0,0 +1,260 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+	"sync"
+
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/runtime/schema"
+)
+
+// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
+// channel is full.
+type FullChannelBehavior int
+
+const (
+	WaitIfChannelFull FullChannelBehavior = iota
+	DropIfChannelFull
+)
+
+// Buffer the incoming queue a little bit even though it should rarely ever accumulate
+// anything, just in case a few events are received in such a short window that
+// Broadcaster can't move them onto the watchers' queues fast enough.
+const incomingQueueLength = 25
+
+// Broadcaster distributes event notifications among any number of watchers. Every event
+// is delivered to every watcher.
+type Broadcaster struct {
+	// TODO: see if this lock is needed now that new watchers go through
+	// the incoming channel.
+	lock sync.Mutex
+
+	watchers     map[int64]*broadcasterWatcher
+	nextWatcher  int64
+	distributing sync.WaitGroup
+
+	incoming chan Event
+
+	// How large to make watcher's channel.
+	watchQueueLength int
+	// If one of the watch channels is full, don't wait for it to become empty.
+	// Instead just deliver it to the watchers that do have space in their
+	// channels and move on to the next event.
+	// It's more fair to do this on a per-watcher basis than to do it on the
+	// "incoming" channel, which would allow one slow watcher to prevent all
+	// other watchers from getting new events.
+	fullChannelBehavior FullChannelBehavior
+}
+
+// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
+// It is guaranteed that events will be distributed in the order in which they occur,
+// but the order in which a single event is distributed among all of the watchers is unspecified.
+func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
+	m := &Broadcaster{
+		watchers:            map[int64]*broadcasterWatcher{},
+		incoming:            make(chan Event, incomingQueueLength),
+		watchQueueLength:    queueLength,
+		fullChannelBehavior: fullChannelBehavior,
+	}
+	m.distributing.Add(1)
+	go m.loop()
+	return m
+}
+
+const internalRunFunctionMarker = "internal-do-function"
+
+// a function type we can shoehorn into the queue.
+type functionFakeRuntimeObject func()
+
+func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind {
+	return schema.EmptyObjectKind
+}
+func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
+	if obj == nil {
+		return nil
+	}
+	// funcs are immutable. Hence, just return the original func.
+	return obj
+}
+
+// Execute f, blocking the incoming queue (and waiting for it to drain first).
+// The purpose of this terrible hack is so that watchers added after an event
+// won't ever see that event, and will always see any event after they are
+// added.
+func (b *Broadcaster) blockQueue(f func()) {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	b.incoming <- Event{
+		Type: internalRunFunctionMarker,
+		Object: functionFakeRuntimeObject(func() {
+			defer wg.Done()
+			f()
+		}),
+	}
+	wg.Wait()
+}
+
+// Watch adds a new watcher to the list and returns an Interface for it.
+// Note: new watchers will only receive new events. They won't get an entire history
+// of previous events.
+func (m *Broadcaster) Watch() Interface {
+	var w *broadcasterWatcher
+	m.blockQueue(func() {
+		m.lock.Lock()
+		defer m.lock.Unlock()
+		id := m.nextWatcher
+		m.nextWatcher++
+		w = &broadcasterWatcher{
+			result:  make(chan Event, m.watchQueueLength),
+			stopped: make(chan struct{}),
+			id:      id,
+			m:       m,
+		}
+		m.watchers[id] = w
+	})
+	return w
+}
+
+// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
+// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
+// The returned watch will have a queue length that is at least large enough to accommodate
+// all of the items in queuedEvents.
+func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
+	var w *broadcasterWatcher
+	m.blockQueue(func() {
+		m.lock.Lock()
+		defer m.lock.Unlock()
+		id := m.nextWatcher
+		m.nextWatcher++
+		length := m.watchQueueLength
+		if n := len(queuedEvents) + 1; n > length {
+			length = n
+		}
+		w = &broadcasterWatcher{
+			result:  make(chan Event, length),
+			stopped: make(chan struct{}),
+			id:      id,
+			m:       m,
+		}
+		m.watchers[id] = w
+		for _, e := range queuedEvents {
+			w.result <- e
+		}
+	})
+	return w
+}
+
+// stopWatching stops the given watcher and removes it from the list.
+func (m *Broadcaster) stopWatching(id int64) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	w, ok := m.watchers[id]
+	if !ok {
+		// No need to do anything, it's already been removed from the list.
+		return
+	}
+	delete(m.watchers, id)
+	close(w.result)
+}
+
+// closeAll disconnects all watchers (presumably in response to a Shutdown call).
+func (m *Broadcaster) closeAll() {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	for _, w := range m.watchers {
+		close(w.result)
+	}
+	// Delete everything from the map, since presence/absence in the map is used
+	// by stopWatching to avoid double-closing the channel.
+	m.watchers = map[int64]*broadcasterWatcher{}
+}
+
+// Action distributes the given event among all watchers.
+func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
+	m.incoming <- Event{action, obj}
+}
+
+// Shutdown disconnects all watchers (but any queued events will still be distributed).
+// You must not call Action or Watch* after calling Shutdown. This call blocks
+// until all events have been distributed through the outbound channels. Note
+// that since they can be buffered, this means that the watchers might not
+// have received the data yet as it can remain sitting in the buffered
+// channel.
+func (m *Broadcaster) Shutdown() {
+	close(m.incoming)
+	m.distributing.Wait()
+}
+
+// loop receives from m.incoming and distributes to all watchers.
+func (m *Broadcaster) loop() {
+	// Deliberately not catching crashes here. Yes, bring down the process if there's a
+	// bug in watch.Broadcaster.
+	for event := range m.incoming {
+		if event.Type == internalRunFunctionMarker {
+			event.Object.(functionFakeRuntimeObject)()
+			continue
+		}
+		m.distribute(event)
+	}
+	m.closeAll()
+	m.distributing.Done()
+}
+
+// distribute sends event to all watchers. Blocking.
+func (m *Broadcaster) distribute(event Event) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if m.fullChannelBehavior == DropIfChannelFull {
+		for _, w := range m.watchers {
+			select {
+			case w.result <- event:
+			case <-w.stopped:
+			default: // Don't block if the event can't be queued.
+			}
+		}
+	} else {
+		for _, w := range m.watchers {
+			select {
+			case w.result <- event:
+			case <-w.stopped:
+			}
+		}
+	}
+}
+
+// broadcasterWatcher handles a single watcher of a broadcaster
+type broadcasterWatcher struct {
+	result  chan Event
+	stopped chan struct{}
+	stop    sync.Once
+	id      int64
+	m       *Broadcaster
+}
+
+// ResultChan returns a channel to use for waiting on events.
+func (mw *broadcasterWatcher) ResultChan() <-chan Event {
+	return mw.result
+}
+
+// Stop stops watching and removes mw from its list.
+func (mw *broadcasterWatcher) Stop() {
+	mw.stop.Do(func() {
+		close(mw.stopped)
+		mw.m.stopWatching(mw.id)
+	})
+}