git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server

subrepo:
  subdir:   "metrics-server"
  merged:   "92d8412"
upstream:
  origin:   "https://github.com/kubernetes-incubator/metrics-server.git"
  branch:   "master"
  commit:   "92d8412"
git-subrepo:
  version:  "0.4.0"
  origin:   "???"
  commit:   "???"
diff --git a/metrics-server/pkg/sources/summary/summary.go b/metrics-server/pkg/sources/summary/summary.go
new file mode 100644
index 0000000..667d74b
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/summary.go
@@ -0,0 +1,295 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"time"
+
+	"github.com/golang/glog"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+	"github.com/prometheus/client_golang/prometheus"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+	"k8s.io/apimachinery/pkg/labels"
+	utilerrors "k8s.io/apimachinery/pkg/util/errors"
+	v1listers "k8s.io/client-go/listers/core/v1"
+	stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
+)
+
+var (
+	summaryRequestLatency = prometheus.NewHistogramVec(
+		prometheus.HistogramOpts{
+			Namespace: "metrics_server",
+			Subsystem: "kubelet_summary",
+			Name:      "request_duration_seconds",
+			Help:      "The Kubelet summary request latencies in seconds.",
+			// TODO(directxman12): it would be nice to calculate these buckets off of scrape duration,
+			// like we do elsewhere, but we're not passed the scrape duration at this level.
+			Buckets: prometheus.DefBuckets,
+		},
+		[]string{"node"},
+	)
+	scrapeTotal = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "metrics_server",
+			Subsystem: "kubelet_summary",
+			Name:      "scrapes_total",
+			Help:      "Total number of attempted Summary API scrapes done by Metrics Server",
+		},
+		[]string{"success"},
+	)
+)
+
+func init() {
+	prometheus.MustRegister(summaryRequestLatency)
+	prometheus.MustRegister(scrapeTotal)
+}
+
+// NodeInfo contains the information needed to identify and connect to a particular node
+// (node name and preferred address).
+type NodeInfo struct {
+	Name           string
+	ConnectAddress string
+}
+
+// Kubelet-provided metrics for pod and system container.
+type summaryMetricsSource struct {
+	node          NodeInfo
+	kubeletClient KubeletInterface
+}
+
+func NewSummaryMetricsSource(node NodeInfo, client KubeletInterface) sources.MetricSource {
+	return &summaryMetricsSource{
+		node:          node,
+		kubeletClient: client,
+	}
+}
+
+func (src *summaryMetricsSource) Name() string {
+	return src.String()
+}
+
+func (src *summaryMetricsSource) String() string {
+	return fmt.Sprintf("kubelet_summary:%s", src.node.Name)
+}
+
+func (src *summaryMetricsSource) Collect(ctx context.Context) (*sources.MetricsBatch, error) {
+	summary, err := func() (*stats.Summary, error) {
+		startTime := time.Now()
+		defer summaryRequestLatency.WithLabelValues(src.node.Name).Observe(float64(time.Since(startTime)) / float64(time.Second))
+		return src.kubeletClient.GetSummary(ctx, src.node.ConnectAddress)
+	}()
+
+	if err != nil {
+		scrapeTotal.WithLabelValues("false").Inc()
+		return nil, fmt.Errorf("unable to fetch metrics from Kubelet %s (%s): %v", src.node.Name, src.node.ConnectAddress, err)
+	}
+
+	scrapeTotal.WithLabelValues("true").Inc()
+
+	res := &sources.MetricsBatch{
+		Nodes: make([]sources.NodeMetricsPoint, 1),
+		Pods:  make([]sources.PodMetricsPoint, len(summary.Pods)),
+	}
+
+	var errs []error
+	errs = append(errs, src.decodeNodeStats(&summary.Node, &res.Nodes[0])...)
+	if len(errs) != 0 {
+		// if we had errors providing node metrics, discard the data point
+		// so that we don't incorrectly report metric values as zero.
+		res.Nodes = res.Nodes[:1]
+	}
+
+	num := 0
+	for _, pod := range summary.Pods {
+		podErrs := src.decodePodStats(&pod, &res.Pods[num])
+		errs = append(errs, podErrs...)
+		if len(podErrs) != 0 {
+			// NB: we explicitly want to discard pods with partial results, since
+			// the horizontal pod autoscaler takes special action when a pod is missing
+			// metrics (and zero CPU or memory does not count as "missing metrics")
+
+			// we don't care if we reuse slots in the result array,
+			// because they get completely overwritten in decodePodStats
+			continue
+		}
+		num++
+	}
+	res.Pods = res.Pods[:num]
+
+	return res, utilerrors.NewAggregate(errs)
+}
+
+func (src *summaryMetricsSource) decodeNodeStats(nodeStats *stats.NodeStats, target *sources.NodeMetricsPoint) []error {
+	timestamp, err := getScrapeTime(nodeStats.CPU, nodeStats.Memory)
+	if err != nil {
+		// if we can't get a timestamp, assume bad data in general
+		return []error{fmt.Errorf("unable to get valid timestamp for metric point for node %q, discarding data: %v", src.node.ConnectAddress, err)}
+	}
+	*target = sources.NodeMetricsPoint{
+		Name: src.node.Name,
+		MetricsPoint: sources.MetricsPoint{
+			Timestamp: timestamp,
+		},
+	}
+	var errs []error
+	if err := decodeCPU(&target.CpuUsage, nodeStats.CPU); err != nil {
+		errs = append(errs, fmt.Errorf("unable to get CPU for node %q, discarding data: %v", src.node.ConnectAddress, err))
+	}
+	if err := decodeMemory(&target.MemoryUsage, nodeStats.Memory); err != nil {
+		errs = append(errs, fmt.Errorf("unable to get memory for node %q, discarding data: %v", src.node.ConnectAddress, err))
+	}
+	return errs
+}
+
+func (src *summaryMetricsSource) decodePodStats(podStats *stats.PodStats, target *sources.PodMetricsPoint) []error {
+	// completely overwrite data in the target
+	*target = sources.PodMetricsPoint{
+		Name:       podStats.PodRef.Name,
+		Namespace:  podStats.PodRef.Namespace,
+		Containers: make([]sources.ContainerMetricsPoint, len(podStats.Containers)),
+	}
+
+	var errs []error
+	for i, container := range podStats.Containers {
+		timestamp, err := getScrapeTime(container.CPU, container.Memory)
+		if err != nil {
+			// if we can't get a timestamp, assume bad data in general
+			errs = append(errs, fmt.Errorf("unable to get a valid timestamp for metric point for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+			continue
+		}
+		point := sources.ContainerMetricsPoint{
+			Name: container.Name,
+			MetricsPoint: sources.MetricsPoint{
+				Timestamp: timestamp,
+			},
+		}
+		if err := decodeCPU(&point.CpuUsage, container.CPU); err != nil {
+			errs = append(errs, fmt.Errorf("unable to get CPU for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+		}
+		if err := decodeMemory(&point.MemoryUsage, container.Memory); err != nil {
+			errs = append(errs, fmt.Errorf("unable to get memory for container %q in pod %s/%s on node %q: %v, discarding data", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+		}
+
+		target.Containers[i] = point
+	}
+
+	return errs
+}
+
+func decodeCPU(target *resource.Quantity, cpuStats *stats.CPUStats) error {
+	if cpuStats == nil || cpuStats.UsageNanoCores == nil {
+		return fmt.Errorf("missing cpu usage metric")
+	}
+
+	*target = *uint64Quantity(*cpuStats.UsageNanoCores, -9)
+	return nil
+}
+
+func decodeMemory(target *resource.Quantity, memStats *stats.MemoryStats) error {
+	if memStats == nil || memStats.WorkingSetBytes == nil {
+		return fmt.Errorf("missing memory usage metric")
+	}
+
+	*target = *uint64Quantity(*memStats.WorkingSetBytes, 0)
+	target.Format = resource.BinarySI
+
+	return nil
+}
+
+func getScrapeTime(cpu *stats.CPUStats, memory *stats.MemoryStats) (time.Time, error) {
+	// Ensure we get the earlier timestamp so that we can tell if a given data
+	// point was tainted by pod initialization.
+
+	var earliest *time.Time
+	if cpu != nil && !cpu.Time.IsZero() && (earliest == nil || earliest.After(cpu.Time.Time)) {
+		earliest = &cpu.Time.Time
+	}
+
+	if memory != nil && !memory.Time.IsZero() && (earliest == nil || earliest.After(memory.Time.Time)) {
+		earliest = &memory.Time.Time
+	}
+
+	if earliest == nil {
+		return time.Time{}, fmt.Errorf("no non-zero timestamp on either CPU or memory")
+	}
+
+	return *earliest, nil
+}
+
+// uint64Quantity converts a uint64 into a Quantity, which only has constructors
+// that work with int64 (except for parse, which requires costly round-trips to string).
+// We lose precision until we fit in an int64 if greater than the max int64 value.
+func uint64Quantity(val uint64, scale resource.Scale) *resource.Quantity {
+	// easy path -- we can safely fit val into an int64
+	if val <= math.MaxInt64 {
+		return resource.NewScaledQuantity(int64(val), scale)
+	}
+
+	glog.V(1).Infof("unexpectedly large resource value %v, loosing precision to fit in scaled resource.Quantity", val)
+
+	// otherwise, lose an decimal order-of-magnitude precision,
+	// so we can fit into a scaled quantity
+	return resource.NewScaledQuantity(int64(val/10), resource.Scale(1)+scale)
+}
+
+type summaryProvider struct {
+	nodeLister    v1listers.NodeLister
+	kubeletClient KubeletInterface
+	addrResolver  NodeAddressResolver
+}
+
+func (p *summaryProvider) GetMetricSources() ([]sources.MetricSource, error) {
+	sources := []sources.MetricSource{}
+	nodes, err := p.nodeLister.List(labels.Everything())
+	if err != nil {
+		return nil, fmt.Errorf("unable to list nodes: %v", err)
+	}
+
+	var errs []error
+	for _, node := range nodes {
+		info, err := p.getNodeInfo(node)
+		if err != nil {
+			errs = append(errs, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err))
+			continue
+		}
+		sources = append(sources, NewSummaryMetricsSource(info, p.kubeletClient))
+	}
+	return sources, utilerrors.NewAggregate(errs)
+}
+
+func (p *summaryProvider) getNodeInfo(node *corev1.Node) (NodeInfo, error) {
+	addr, err := p.addrResolver.NodeAddress(node)
+	if err != nil {
+		return NodeInfo{}, err
+	}
+	info := NodeInfo{
+		Name:           node.Name,
+		ConnectAddress: addr,
+	}
+
+	return info, nil
+}
+
+func NewSummaryProvider(nodeLister v1listers.NodeLister, kubeletClient KubeletInterface, addrResolver NodeAddressResolver) sources.MetricSourceProvider {
+	return &summaryProvider{
+		nodeLister:    nodeLister,
+		kubeletClient: kubeletClient,
+		addrResolver:  addrResolver,
+	}
+}