git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server

subrepo:
  subdir:   "metrics-server"
  merged:   "92d8412"
upstream:
  origin:   "https://github.com/kubernetes-incubator/metrics-server.git"
  branch:   "master"
  commit:   "92d8412"
git-subrepo:
  version:  "0.4.0"
  origin:   "???"
  commit:   "???"
diff --git a/metrics-server/pkg/sources/manager.go b/metrics-server/pkg/sources/manager.go
new file mode 100644
index 0000000..ff5b4d0
--- /dev/null
+++ b/metrics-server/pkg/sources/manager.go
@@ -0,0 +1,165 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sources
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"time"
+
+	"github.com/golang/glog"
+	"github.com/prometheus/client_golang/prometheus"
+	utilerrors "k8s.io/apimachinery/pkg/util/errors"
+
+	utilmetrics "github.com/kubernetes-incubator/metrics-server/pkg/metrics"
+)
+
+const (
+	maxDelayMs       = 4 * 1000
+	delayPerSourceMs = 8
+)
+
+var (
+	lastScrapeTimestamp = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Namespace: "metrics_server",
+			Subsystem: "scraper",
+			Name:      "last_time_seconds",
+			Help:      "Last time metrics-server performed a scrape since unix epoch in seconds.",
+		},
+		[]string{"source"},
+	)
+
+	// initialized below to an actual value by a call to RegisterScraperDuration
+	// (acts as a no-op by default), but we can't just register it in the constructor,
+	// since it could be called multiple times during setup.
+	scraperDuration *prometheus.HistogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"source"})
+)
+
+func init() {
+	prometheus.MustRegister(lastScrapeTimestamp)
+}
+
+// RegisterScraperDuration creates and registers a histogram metric for
+// scrape duration, suitable for use in the source manager.
+func RegisterDurationMetrics(scrapeTimeout time.Duration) {
+	scraperDuration = prometheus.NewHistogramVec(
+		prometheus.HistogramOpts{
+			Namespace: "metrics_server",
+			Subsystem: "scraper",
+			Name:      "duration_seconds",
+			Help:      "Time spent scraping sources in seconds.",
+			Buckets:   utilmetrics.BucketsForScrapeDuration(scrapeTimeout),
+		},
+		[]string{"source"},
+	)
+	prometheus.MustRegister(scraperDuration)
+}
+
+func NewSourceManager(srcProv MetricSourceProvider, scrapeTimeout time.Duration) MetricSource {
+	return &sourceManager{
+		srcProv:       srcProv,
+		scrapeTimeout: scrapeTimeout,
+	}
+}
+
+type sourceManager struct {
+	srcProv       MetricSourceProvider
+	scrapeTimeout time.Duration
+}
+
+func (m *sourceManager) Name() string {
+	return "source_manager"
+}
+
+func (m *sourceManager) Collect(baseCtx context.Context) (*MetricsBatch, error) {
+	sources, err := m.srcProv.GetMetricSources()
+	var errs []error
+	if err != nil {
+		// save the error, and continue on in case of partial results
+		errs = append(errs, err)
+	}
+	glog.V(1).Infof("Scraping metrics from %v sources", len(sources))
+
+	responseChannel := make(chan *MetricsBatch, len(sources))
+	errChannel := make(chan error, len(sources))
+	defer close(responseChannel)
+	defer close(errChannel)
+
+	startTime := time.Now()
+
+	// TODO(directxman12): re-evaluate this code -- do we really need to stagger fetches like this?
+	delayMs := delayPerSourceMs * len(sources)
+	if delayMs > maxDelayMs {
+		delayMs = maxDelayMs
+	}
+
+	for _, source := range sources {
+		go func(source MetricSource) {
+			// Prevents network congestion.
+			sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond
+			time.Sleep(sleepDuration)
+			// make the timeout a bit shorter to account for staggering, so we still preserve
+			// the overall timeout
+			ctx, cancelTimeout := context.WithTimeout(baseCtx, m.scrapeTimeout-sleepDuration)
+			defer cancelTimeout()
+
+			glog.V(2).Infof("Querying source: %s", source)
+			metrics, err := scrapeWithMetrics(ctx, source)
+			if err != nil {
+				errChannel <- fmt.Errorf("unable to fully scrape metrics from source %s: %v", source.Name(), err)
+				responseChannel <- metrics
+				return
+			}
+			responseChannel <- metrics
+			errChannel <- nil
+		}(source)
+	}
+
+	res := &MetricsBatch{}
+
+	for range sources {
+		err := <-errChannel
+		srcBatch := <-responseChannel
+		if err != nil {
+			errs = append(errs, err)
+			// NB: partial node results are still worth saving, so
+			// don't skip storing results if we got an error
+		}
+		if srcBatch == nil {
+			continue
+		}
+
+		res.Nodes = append(res.Nodes, srcBatch.Nodes...)
+		res.Pods = append(res.Pods, srcBatch.Pods...)
+	}
+
+	glog.V(1).Infof("ScrapeMetrics: time: %s, nodes: %v, pods: %v", time.Since(startTime), len(res.Nodes), len(res.Pods))
+	return res, utilerrors.NewAggregate(errs)
+}
+
+func scrapeWithMetrics(ctx context.Context, s MetricSource) (*MetricsBatch, error) {
+	sourceName := s.Name()
+	startTime := time.Now()
+	defer lastScrapeTimestamp.
+		WithLabelValues(sourceName).
+		Set(float64(time.Now().Unix()))
+	defer scraperDuration.
+		WithLabelValues(sourceName).
+		Observe(float64(time.Since(startTime)) / float64(time.Second))
+
+	return s.Collect(ctx)
+}