git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server

subrepo:
  subdir:   "metrics-server"
  merged:   "92d8412"
upstream:
  origin:   "https://github.com/kubernetes-incubator/metrics-server.git"
  branch:   "master"
  commit:   "92d8412"
git-subrepo:
  version:  "0.4.0"
  origin:   "???"
  commit:   "???"
diff --git a/metrics-server/pkg/provider/sink/sinkprov.go b/metrics-server/pkg/provider/sink/sinkprov.go
new file mode 100644
index 0000000..3427a15
--- /dev/null
+++ b/metrics-server/pkg/provider/sink/sinkprov.go
@@ -0,0 +1,146 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sink
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	corev1 "k8s.io/api/core/v1"
+	apitypes "k8s.io/apimachinery/pkg/types"
+	metrics "k8s.io/metrics/pkg/apis/metrics"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/provider"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sink"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+)
+
+// kubernetesCadvisorWindow is the max window used by cAdvisor for calculating
+// CPU usage rate.  While it can vary, it's no more than this number, but may be
+// as low as half this number (when working with no backoff).  It would be really
+// nice if the kubelet told us this in the summary API...
+var kubernetesCadvisorWindow = 30 * time.Second
+
+// sinkMetricsProvider is a provider.MetricsProvider that also acts as a sink.MetricSink
+type sinkMetricsProvider struct {
+	mu    sync.RWMutex
+	nodes map[string]sources.NodeMetricsPoint
+	pods  map[apitypes.NamespacedName]sources.PodMetricsPoint
+}
+
+// NewSinkProvider returns a MetricSink that feeds into a MetricsProvider.
+func NewSinkProvider() (sink.MetricSink, provider.MetricsProvider) {
+	prov := &sinkMetricsProvider{}
+	return prov, prov
+}
+
+// TODO(directxman12): figure out what the right value is for "window" --
+// we don't get the actual window from cAdvisor, so we could just
+// plumb down metric resolution, but that wouldn't be actually correct.
+
+func (p *sinkMetricsProvider) GetNodeMetrics(nodes ...string) ([]provider.TimeInfo, []corev1.ResourceList, error) {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	timestamps := make([]provider.TimeInfo, len(nodes))
+	resMetrics := make([]corev1.ResourceList, len(nodes))
+
+	for i, node := range nodes {
+		metricPoint, present := p.nodes[node]
+		if !present {
+			continue
+		}
+
+		timestamps[i] = provider.TimeInfo{
+			Timestamp: metricPoint.Timestamp,
+			Window:    kubernetesCadvisorWindow,
+		}
+		resMetrics[i] = corev1.ResourceList{
+			corev1.ResourceName(corev1.ResourceCPU):    metricPoint.CpuUsage,
+			corev1.ResourceName(corev1.ResourceMemory): metricPoint.MemoryUsage,
+		}
+	}
+
+	return timestamps, resMetrics, nil
+}
+
+func (p *sinkMetricsProvider) GetContainerMetrics(pods ...apitypes.NamespacedName) ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	timestamps := make([]provider.TimeInfo, len(pods))
+	resMetrics := make([][]metrics.ContainerMetrics, len(pods))
+
+	for i, pod := range pods {
+		metricPoint, present := p.pods[pod]
+		if !present {
+			continue
+		}
+
+		contMetrics := make([]metrics.ContainerMetrics, len(metricPoint.Containers))
+		var earliestTS *time.Time
+		for i, contPoint := range metricPoint.Containers {
+			contMetrics[i] = metrics.ContainerMetrics{
+				Name: contPoint.Name,
+				Usage: corev1.ResourceList{
+					corev1.ResourceName(corev1.ResourceCPU):    contPoint.CpuUsage,
+					corev1.ResourceName(corev1.ResourceMemory): contPoint.MemoryUsage,
+				},
+			}
+			if earliestTS == nil || earliestTS.After(contPoint.Timestamp) {
+				ts := contPoint.Timestamp // copy to avoid loop iteration variable issues
+				earliestTS = &ts
+			}
+		}
+		if earliestTS == nil {
+			// we had no containers
+			earliestTS = &time.Time{}
+		}
+		timestamps[i] = provider.TimeInfo{
+			Timestamp: *earliestTS,
+			Window:    kubernetesCadvisorWindow,
+		}
+		resMetrics[i] = contMetrics
+	}
+	return timestamps, resMetrics, nil
+}
+
+func (p *sinkMetricsProvider) Receive(batch *sources.MetricsBatch) error {
+	newNodes := make(map[string]sources.NodeMetricsPoint, len(batch.Nodes))
+	for _, nodePoint := range batch.Nodes {
+		if _, exists := newNodes[nodePoint.Name]; exists {
+			return fmt.Errorf("duplicate node %s received", nodePoint.Name)
+		}
+		newNodes[nodePoint.Name] = nodePoint
+	}
+
+	newPods := make(map[apitypes.NamespacedName]sources.PodMetricsPoint, len(batch.Pods))
+	for _, podPoint := range batch.Pods {
+		podIdent := apitypes.NamespacedName{Name: podPoint.Name, Namespace: podPoint.Namespace}
+		if _, exists := newPods[podIdent]; exists {
+			return fmt.Errorf("duplicate pod %s received", podIdent)
+		}
+		newPods[podIdent] = podPoint
+	}
+
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	p.nodes = newNodes
+	p.pods = newPods
+
+	return nil
+}
diff --git a/metrics-server/pkg/provider/sink/sinkprov_test.go b/metrics-server/pkg/provider/sink/sinkprov_test.go
new file mode 100644
index 0000000..8980eeb
--- /dev/null
+++ b/metrics-server/pkg/provider/sink/sinkprov_test.go
@@ -0,0 +1,281 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sink_test
+
+import (
+	"testing"
+	"time"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+	apitypes "k8s.io/apimachinery/pkg/types"
+	metrics "k8s.io/metrics/pkg/apis/metrics"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/provider"
+	. "github.com/kubernetes-incubator/metrics-server/pkg/provider/sink"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sink"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+)
+
+var defaultWindow = 30 * time.Second
+
+func TestSourceManager(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Provider/Sink Suite")
+}
+
+func newMilliPoint(ts time.Time, cpu, memory int64) sources.MetricsPoint {
+	return sources.MetricsPoint{
+		Timestamp:   ts,
+		CpuUsage:    *resource.NewMilliQuantity(cpu, resource.DecimalSI),
+		MemoryUsage: *resource.NewMilliQuantity(memory, resource.BinarySI),
+	}
+}
+
+var _ = Describe("In-memory Sink Provider", func() {
+	var (
+		batch    *sources.MetricsBatch
+		prov     provider.MetricsProvider
+		provSink sink.MetricSink
+		now      time.Time
+	)
+
+	BeforeEach(func() {
+		now = time.Now()
+		batch = &sources.MetricsBatch{
+			Nodes: []sources.NodeMetricsPoint{
+				{Name: "node1", MetricsPoint: newMilliPoint(now.Add(100*time.Millisecond), 110, 120)},
+				{Name: "node2", MetricsPoint: newMilliPoint(now.Add(200*time.Millisecond), 210, 220)},
+				{Name: "node3", MetricsPoint: newMilliPoint(now.Add(300*time.Millisecond), 310, 320)},
+			},
+			Pods: []sources.PodMetricsPoint{
+				{Name: "pod1", Namespace: "ns1", Containers: []sources.ContainerMetricsPoint{
+					{Name: "container1", MetricsPoint: newMilliPoint(now.Add(400*time.Millisecond), 410, 420)},
+					{Name: "container2", MetricsPoint: newMilliPoint(now.Add(500*time.Millisecond), 510, 520)},
+				}},
+				{Name: "pod2", Namespace: "ns1", Containers: []sources.ContainerMetricsPoint{
+					{Name: "container1", MetricsPoint: newMilliPoint(now.Add(600*time.Millisecond), 610, 620)},
+				}},
+				{Name: "pod1", Namespace: "ns2", Containers: []sources.ContainerMetricsPoint{
+					{Name: "container1", MetricsPoint: newMilliPoint(now.Add(700*time.Millisecond), 710, 720)},
+					{Name: "container2", MetricsPoint: newMilliPoint(now.Add(800*time.Millisecond), 810, 820)},
+				}},
+			},
+		}
+
+		provSink, prov = NewSinkProvider()
+	})
+
+	It("should receive batches of metrics", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("making sure that the provider contains all nodes received")
+		for _, node := range batch.Nodes {
+			_, _, err := prov.GetNodeMetrics(node.Name)
+			Expect(err).NotTo(HaveOccurred())
+		}
+
+		By("making sure that the provider contains all pods received")
+		for _, pod := range batch.Pods {
+			_, _, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+				Name:      pod.Name,
+				Namespace: pod.Namespace,
+			})
+			Expect(err).NotTo(HaveOccurred())
+		}
+	})
+
+	It("should error out if duplicate nodes were received, without a partial store", func() {
+		By("adding a duplicate node to the batch")
+		batch.Nodes = append(batch.Nodes, batch.Nodes[0])
+
+		By("sending the batch to the sink and checking for an error")
+		Expect(provSink.Receive(batch)).NotTo(Succeed())
+
+		By("making sure none of the data is in the sink")
+		for _, node := range batch.Nodes {
+			_, res, err := prov.GetNodeMetrics(node.Name)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(ConsistOf(corev1.ResourceList(nil)))
+		}
+		for _, pod := range batch.Pods {
+			_, res, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+				Name:      pod.Name,
+				Namespace: pod.Namespace,
+			})
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(Equal([][]metrics.ContainerMetrics{nil}))
+		}
+	})
+
+	It("should error out if duplicate pods were received, without a partial store", func() {
+		By("adding a duplicate pod to the batch")
+		batch.Pods = append(batch.Pods, batch.Pods[0])
+
+		By("sending the batch to the sink and checking for an error")
+		Expect(provSink.Receive(batch)).NotTo(Succeed())
+
+		By("making sure none of the data is in the sink")
+		for _, node := range batch.Nodes {
+			_, res, err := prov.GetNodeMetrics(node.Name)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(ConsistOf(corev1.ResourceList(nil)))
+		}
+		for _, pod := range batch.Pods {
+			_, res, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+				Name:      pod.Name,
+				Namespace: pod.Namespace,
+			})
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(Equal([][]metrics.ContainerMetrics{nil}))
+		}
+	})
+
+	It("should retrieve metrics for all containers in a pod, with overall latest scrape time", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the pod")
+		ts, containerMetrics, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+			Name:      "pod1",
+			Namespace: "ns1",
+		})
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(ConsistOf(provider.TimeInfo{Timestamp: now.Add(400 * time.Millisecond), Window: defaultWindow}))
+
+		By("verifying that all containers have data")
+		Expect(containerMetrics).To(Equal(
+			[][]metrics.ContainerMetrics{
+				{
+					{
+						Name: "container1",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(410, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(420, resource.BinarySI),
+						},
+					},
+					{
+						Name: "container2",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(510, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(520, resource.BinarySI),
+						},
+					},
+				},
+			},
+		))
+	})
+
+	It("should return nil metrics for missing pods", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the a present pod and a missing pod")
+		ts, containerMetrics, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+			Name:      "pod1",
+			Namespace: "ns1",
+		}, apitypes.NamespacedName{
+			Name:      "pod2",
+			Namespace: "ns42",
+		})
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(400 * time.Millisecond), Window: defaultWindow}, {}}))
+
+		By("verifying that all present containers have data")
+		Expect(containerMetrics).To(Equal(
+			[][]metrics.ContainerMetrics{
+				{
+					{
+						Name: "container1",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(410, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(420, resource.BinarySI),
+						},
+					},
+					{
+						Name: "container2",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(510, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(520, resource.BinarySI),
+						},
+					},
+				},
+				nil,
+			},
+		))
+
+	})
+
+	It("should retrieve metrics for a node, with overall latest scrape time", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the nodes")
+		ts, nodeMetrics, err := prov.GetNodeMetrics("node1", "node2")
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(100 * time.Millisecond), Window: defaultWindow}, {Timestamp: now.Add(200 * time.Millisecond), Window: defaultWindow}}))
+
+		By("verifying that all nodes have data")
+		Expect(nodeMetrics).To(Equal(
+			[]corev1.ResourceList{
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(110, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(120, resource.BinarySI),
+				},
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(210, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(220, resource.BinarySI),
+				},
+			},
+		))
+	})
+
+	It("should return nil metrics for missing nodes", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the nodes, plus a missing node")
+		ts, nodeMetrics, err := prov.GetNodeMetrics("node1", "node2", "node42")
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(100 * time.Millisecond), Window: defaultWindow}, {Timestamp: now.Add(200 * time.Millisecond), Window: defaultWindow}, {}}))
+
+		By("verifying that all present nodes have data")
+		Expect(nodeMetrics).To(Equal(
+			[]corev1.ResourceList{
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(110, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(120, resource.BinarySI),
+				},
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(210, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(220, resource.BinarySI),
+				},
+				nil,
+			},
+		))
+
+	})
+})