git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server

subrepo:
  subdir:   "metrics-server"
  merged:   "92d8412"
upstream:
  origin:   "https://github.com/kubernetes-incubator/metrics-server.git"
  branch:   "master"
  commit:   "92d8412"
git-subrepo:
  version:  "0.4.0"
  origin:   "???"
  commit:   "???"
diff --git a/metrics-server/pkg/apiserver/config.go b/metrics-server/pkg/apiserver/config.go
new file mode 100644
index 0000000..fa22c10
--- /dev/null
+++ b/metrics-server/pkg/apiserver/config.go
@@ -0,0 +1,74 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apiserver
+
+import (
+	"strings"
+
+	openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
+	genericapiserver "k8s.io/apiserver/pkg/server"
+	"k8s.io/client-go/informers"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/apiserver/generic"
+	generatedopenapi "github.com/kubernetes-incubator/metrics-server/pkg/generated/openapi"
+	"github.com/kubernetes-incubator/metrics-server/pkg/version"
+)
+
+// Config contains configuration for launching an instance of metrics-server.
+type Config struct {
+	GenericConfig  *genericapiserver.Config
+	ProviderConfig generic.ProviderConfig
+}
+
+type completedConfig struct {
+	genericapiserver.CompletedConfig
+	ProviderConfig *generic.ProviderConfig
+}
+
+// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
+func (c *Config) Complete(informers informers.SharedInformerFactory) completedConfig {
+	c.GenericConfig.Version = version.VersionInfo()
+
+	// enable OpenAPI schemas
+	c.GenericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(generic.Scheme))
+	c.GenericConfig.OpenAPIConfig.Info.Title = "Kubernetes metrics-server"
+	c.GenericConfig.OpenAPIConfig.Info.Version = strings.Split(c.GenericConfig.Version.String(), "-")[0] // TODO(directxman12): remove this once autosetting this doesn't require security definitions
+	c.GenericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
+
+	return completedConfig{
+		CompletedConfig: c.GenericConfig.Complete(informers),
+		ProviderConfig:  &c.ProviderConfig,
+	}
+}
+
+type MetricsServer struct {
+	*genericapiserver.GenericAPIServer
+}
+
+// New returns a new instance of MetricsServer from the given config.
+func (c completedConfig) New() (*MetricsServer, error) {
+	genericServer, err := c.CompletedConfig.New("metrics-server", genericapiserver.NewEmptyDelegate()) // completion is done in Complete, no need for a second time
+	if err != nil {
+		return nil, err
+	}
+
+	if err := generic.InstallStorage(c.ProviderConfig, c.SharedInformerFactory.Core().V1(), genericServer); err != nil {
+		return nil, err
+	}
+
+	return &MetricsServer{
+		GenericAPIServer: genericServer,
+	}, nil
+}
diff --git a/metrics-server/pkg/apiserver/generic/storage.go b/metrics-server/pkg/apiserver/generic/storage.go
new file mode 100644
index 0000000..1cd5304
--- /dev/null
+++ b/metrics-server/pkg/apiserver/generic/storage.go
@@ -0,0 +1,72 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package generic
+
+import (
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/runtime/schema"
+	"k8s.io/apimachinery/pkg/runtime/serializer"
+	"k8s.io/apiserver/pkg/registry/rest"
+	genericapiserver "k8s.io/apiserver/pkg/server"
+	coreinf "k8s.io/client-go/informers/core/v1"
+	"k8s.io/metrics/pkg/apis/metrics"
+	"k8s.io/metrics/pkg/apis/metrics/install"
+	"k8s.io/metrics/pkg/apis/metrics/v1beta1"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/provider"
+	nodemetricsstorage "github.com/kubernetes-incubator/metrics-server/pkg/storage/nodemetrics"
+	podmetricsstorage "github.com/kubernetes-incubator/metrics-server/pkg/storage/podmetrics"
+)
+
+var (
+	// Scheme contains the types needed by the resource metrics API.
+	Scheme = runtime.NewScheme()
+	// Codecs is a codec factory for serving the resource metrics API.
+	Codecs = serializer.NewCodecFactory(Scheme)
+)
+
+func init() {
+	install.Install(Scheme)
+	metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
+}
+
+// ProviderConfig holds the providers for node and pod metrics
+// for serving the resource metrics API.
+type ProviderConfig struct {
+	Node provider.NodeMetricsProvider
+	Pod  provider.PodMetricsProvider
+}
+
+// BuildStorage constructs APIGroupInfo the metrics.k8s.io API group using the given providers.
+func BuildStorage(providers *ProviderConfig, informers coreinf.Interface) genericapiserver.APIGroupInfo {
+	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(metrics.GroupName, Scheme, metav1.ParameterCodec, Codecs)
+
+	nodemetricsStorage := nodemetricsstorage.NewStorage(metrics.Resource("nodemetrics"), providers.Node, informers.Nodes().Lister())
+	podmetricsStorage := podmetricsstorage.NewStorage(metrics.Resource("podmetrics"), providers.Pod, informers.Pods().Lister())
+	metricsServerResources := map[string]rest.Storage{
+		"nodes": nodemetricsStorage,
+		"pods":  podmetricsStorage,
+	}
+	apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = metricsServerResources
+
+	return apiGroupInfo
+}
+
+// InstallStorage builds the storage for the metrics.k8s.io API, and then installs it into the given API server.
+func InstallStorage(providers *ProviderConfig, informers coreinf.Interface, server *genericapiserver.GenericAPIServer) error {
+	info := BuildStorage(providers, informers)
+	return server.InstallAPIGroup(&info)
+}
diff --git a/metrics-server/pkg/generated/openapi/doc.go b/metrics-server/pkg/generated/openapi/doc.go
new file mode 100644
index 0000000..554c103
--- /dev/null
+++ b/metrics-server/pkg/generated/openapi/doc.go
@@ -0,0 +1,17 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// package openapi contains autogenerated openapi schema definitions
+// for the metrics.k8s.io API.
+package openapi
diff --git a/metrics-server/pkg/manager/manager.go b/metrics-server/pkg/manager/manager.go
new file mode 100644
index 0000000..28c676c
--- /dev/null
+++ b/metrics-server/pkg/manager/manager.go
@@ -0,0 +1,164 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package manager
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"sync"
+	"time"
+
+	utilmetrics "github.com/kubernetes-incubator/metrics-server/pkg/metrics"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sink"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+
+	"github.com/golang/glog"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+	// initialized below to an actual value by a call to RegisterTickDuration
+	// (acts as a no-op by default), but we can't just register it in the constructor,
+	// since it could be called multiple times during setup.
+	tickDuration prometheus.Histogram = prometheus.NewHistogram(prometheus.HistogramOpts{})
+)
+
+// RegisterTickDuration creates and registers a histogram metric for
+// scrape duration, suitable for use in the overall manager.
+func RegisterDurationMetrics(resolution time.Duration) {
+	tickDuration = prometheus.NewHistogram(
+		prometheus.HistogramOpts{
+			Namespace: "metrics_server",
+			Subsystem: "manager",
+			Name:      "tick_duration_seconds",
+			Help:      "The total time spent collecting and storing metrics in seconds.",
+			Buckets:   utilmetrics.BucketsForScrapeDuration(resolution),
+		},
+	)
+	prometheus.MustRegister(tickDuration)
+}
+
+// Runnable represents something that can be run until a signal is given to stop.
+type Runnable interface {
+	// Run runs this runnable until the given channel is closed.
+	// It should not block -- it will spawn its own goroutine.
+	RunUntil(stopCh <-chan struct{})
+}
+
+type Manager struct {
+	source     sources.MetricSource
+	sink       sink.MetricSink
+	resolution time.Duration
+
+	healthMu      sync.RWMutex
+	lastTickStart time.Time
+	lastOk        bool
+}
+
+func NewManager(metricSrc sources.MetricSource, metricSink sink.MetricSink, resolution time.Duration) *Manager {
+	manager := Manager{
+		source:     metricSrc,
+		sink:       metricSink,
+		resolution: resolution,
+	}
+
+	return &manager
+}
+
+func (rm *Manager) RunUntil(stopCh <-chan struct{}) {
+	go func() {
+		ticker := time.NewTicker(rm.resolution)
+		defer ticker.Stop()
+		rm.Collect(time.Now())
+
+		for {
+			select {
+				case startTime := <-ticker.C:
+					rm.Collect(startTime)
+				case <-stopCh:
+					return
+			}
+                }
+        }()
+}
+
+func (rm *Manager) Collect(startTime time.Time) {
+	rm.healthMu.Lock()
+	rm.lastTickStart = startTime
+	rm.healthMu.Unlock()
+
+	healthyTick := true
+
+	ctx, cancelTimeout := context.WithTimeout(context.Background(), rm.resolution)
+	defer cancelTimeout()
+
+	glog.V(6).Infof("Beginning cycle, collecting metrics...")
+	data, collectErr := rm.source.Collect(ctx)
+	if collectErr != nil {
+		glog.Errorf("unable to fully collect metrics: %v", collectErr)
+
+		// only consider this an indication of bad health if we
+		// couldn't collect from any nodes -- one node going down
+		// shouldn't indicate that metrics-server is unhealthy
+		if len(data.Nodes) == 0 {
+			healthyTick = false
+		}
+
+		// NB: continue on so that we don't lose all metrics
+		// if one node goes down
+	}
+
+	glog.V(6).Infof("...Storing metrics...")
+	recvErr := rm.sink.Receive(data)
+	if recvErr != nil {
+		glog.Errorf("unable to save metrics: %v", recvErr)
+
+		// any failure to save means we're unhealthy
+		healthyTick = false
+	}
+
+	collectTime := time.Now().Sub(startTime)
+	tickDuration.Observe(float64(collectTime) / float64(time.Second))
+	glog.V(6).Infof("...Cycle complete")
+
+	rm.healthMu.Lock()
+	rm.lastOk = healthyTick
+	rm.healthMu.Unlock()
+}
+
+// CheckHealth checks the health of the manager by looking at tick times,
+// and checking if we have at least one node in the collected data.
+// It implements the health checker func part of the healthz checker.
+func (rm *Manager) CheckHealth(_ *http.Request) error {
+	rm.healthMu.RLock()
+	lastTick := rm.lastTickStart
+	healthyTick := rm.lastOk
+	rm.healthMu.RUnlock()
+
+	// use 1.1 for a bit of wiggle room
+	maxTickWait := time.Duration(1.1 * float64(rm.resolution))
+	tickWait := time.Now().Sub(lastTick)
+
+	if tickWait > maxTickWait {
+		return fmt.Errorf("time since last tick (%s) was greater than expected metrics resolution (%s)", tickWait, maxTickWait)
+	}
+
+	if !healthyTick {
+		return fmt.Errorf("there was an error collecting or saving metrics in the last collection tick")
+	}
+
+	return nil
+}
diff --git a/metrics-server/pkg/metrics/util.go b/metrics-server/pkg/metrics/util.go
new file mode 100644
index 0000000..537e2e9
--- /dev/null
+++ b/metrics-server/pkg/metrics/util.go
@@ -0,0 +1,58 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+// BucketsForScrapeDuration calculates a variant of the prometheus default histogram
+// buckets that includes relevant buckets around out scrape timeout.
+func BucketsForScrapeDuration(scrapeTimeout time.Duration) []float64 {
+	// set up some buckets that include our scrape timeout,
+	// so that we can easily pinpoint scrape timeout issues.
+	// The default buckets provide a sane starting point for
+	// the smaller numbers.
+	buckets := append([]float64(nil), prometheus.DefBuckets...)
+	maxBucket := buckets[len(buckets)-1]
+	timeoutSeconds := float64(scrapeTimeout) / float64(time.Second)
+	if timeoutSeconds > maxBucket {
+		// [defaults, (scrapeTimeout + (scrapeTimeout - maxBucket)/ 2), scrapeTimeout, scrapeTimeout*1.5, scrapeTimeout*2]
+		halfwayToScrapeTimeout := maxBucket + (timeoutSeconds-maxBucket)/2
+		buckets = append(buckets, halfwayToScrapeTimeout, timeoutSeconds, timeoutSeconds*1.5, timeoutSeconds*2.0)
+	} else if timeoutSeconds < maxBucket {
+		var i int
+		var bucket float64
+		for i, bucket = range buckets {
+			if bucket > timeoutSeconds {
+				break
+			}
+		}
+
+		if bucket-timeoutSeconds < buckets[0] || (i > 0 && timeoutSeconds-buckets[i-1] < buckets[0]) {
+			// if we're sufficiently close to another bucket, just skip this
+			return buckets
+		}
+
+		// likely that our scrape timeout is close to another bucket, so don't bother injecting more than just our scrape timeout
+		oldRest := append([]float64(nil), buckets[i:]...) // make a copy so we don't overwrite it
+		buckets = append(buckets[:i], timeoutSeconds)
+		buckets = append(buckets, oldRest...)
+	}
+
+	return buckets
+}
diff --git a/metrics-server/pkg/metrics/util_test.go b/metrics-server/pkg/metrics/util_test.go
new file mode 100644
index 0000000..dcac09b
--- /dev/null
+++ b/metrics-server/pkg/metrics/util_test.go
@@ -0,0 +1,80 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics_test
+
+import (
+	"testing"
+	"time"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"github.com/prometheus/client_golang/prometheus"
+
+	. "github.com/kubernetes-incubator/metrics-server/pkg/metrics"
+)
+
+func TestMetricsUtil(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Prometheus Metrics Util Test")
+}
+
+var _ = Describe("Prometheus Bucket Estimator", func() {
+	Context("with a scrape timeout longer than the max default bucket", func() {
+		It("should generate buckets in strictly increasing order", func() {
+			buckets := BucketsForScrapeDuration(15 * time.Second)
+			lastBucket := 0.0
+			for _, bucket := range buckets {
+				Expect(bucket).To(BeNumerically(">", lastBucket))
+				lastBucket = bucket
+			}
+		})
+
+		It("should include some buckets around the scrape timeout", func() {
+			Expect(BucketsForScrapeDuration(15 * time.Second)).To(ContainElement(15.0))
+			Expect(BucketsForScrapeDuration(15 * time.Second)).To(ContainElement(30.0))
+		})
+	})
+	Context("with a scrape timeout shorter than the max default bucket", func() {
+		It("should generate buckets in strictly increasing order", func() {
+			buckets := BucketsForScrapeDuration(5 * time.Second)
+			lastBucket := 0.0
+			for _, bucket := range buckets {
+				Expect(bucket).To(BeNumerically(">", lastBucket))
+				lastBucket = bucket
+			}
+		})
+
+		It("should include a bucket for the scrape timeout", func() {
+			Expect(BucketsForScrapeDuration(5 * time.Second)).To(ContainElement(5.0))
+		})
+	})
+	Context("with a scrape timeout equalt to the max default bucket", func() {
+		maxBucket := prometheus.DefBuckets[len(prometheus.DefBuckets)-1]
+		maxBucketDuration := time.Duration(maxBucket) * time.Second
+
+		It("should generate buckets in strictly increasing order", func() {
+			buckets := BucketsForScrapeDuration(maxBucketDuration)
+			lastBucket := 0.0
+			for _, bucket := range buckets {
+				Expect(bucket).To(BeNumerically(">", lastBucket))
+				lastBucket = bucket
+			}
+		})
+
+		It("should include a bucket for the scrape timeout", func() {
+			Expect(BucketsForScrapeDuration(maxBucketDuration)).To(ContainElement(maxBucket))
+		})
+	})
+})
diff --git a/metrics-server/pkg/provider/interfaces.go b/metrics-server/pkg/provider/interfaces.go
new file mode 100644
index 0000000..31ce625
--- /dev/null
+++ b/metrics-server/pkg/provider/interfaces.go
@@ -0,0 +1,63 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package provider
+
+import (
+	"time"
+
+	corev1 "k8s.io/api/core/v1"
+	apitypes "k8s.io/apimachinery/pkg/types"
+	metrics "k8s.io/metrics/pkg/apis/metrics"
+)
+
+// MetricsProvider is both a PodMetricsProvider and a NodeMetricsProvider
+type MetricsProvider interface {
+	PodMetricsProvider
+	NodeMetricsProvider
+}
+
+// TimeSpan represents the timing information for a metric, which was
+// potentially calculated over some window of time (e.g. for CPU usage rate).
+type TimeInfo struct {
+	// NB: we consider the earliest timestamp amongst multiple containers
+	// for the purposes of determining if a metric is tained by a time
+	// period, like pod startup (used by things like the HPA).
+
+	// Timestamp is the time at which the metrics were initially collected.
+	// In the case of a rate metric, it should be the timestamp of the last
+	// data point used in the calculation.  If it represents multiple metric
+	// points, it should be the earliest such timestamp from all of the points.
+	Timestamp time.Time
+
+	// Window represents the window used to calculate rate metrics associated
+	// with this timestamp.
+	Window time.Duration
+}
+
+// PodMetricsProvider knows how to fetch metrics for the containers in a pod.
+type PodMetricsProvider interface {
+	// GetContainerMetrics gets the latest metrics for all containers in each listed pod,
+	// returning both the metrics and the associated collection timestamp.
+	// If a pod is missing, the container metrics should be nil for that pod.
+	GetContainerMetrics(pods ...apitypes.NamespacedName) ([]TimeInfo, [][]metrics.ContainerMetrics, error)
+}
+
+// NodeMetricsProvider knows how to fetch metrics for a node.
+type NodeMetricsProvider interface {
+	// GetNodeMetrics gets the latest metrics for the given nodes,
+	// returning both the metrics and the associated collection timestamp.
+	// If a node is missing, the resourcelist should be nil for that node.
+	GetNodeMetrics(nodes ...string) ([]TimeInfo, []corev1.ResourceList, error)
+}
diff --git a/metrics-server/pkg/provider/sink/sinkprov.go b/metrics-server/pkg/provider/sink/sinkprov.go
new file mode 100644
index 0000000..3427a15
--- /dev/null
+++ b/metrics-server/pkg/provider/sink/sinkprov.go
@@ -0,0 +1,146 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sink
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	corev1 "k8s.io/api/core/v1"
+	apitypes "k8s.io/apimachinery/pkg/types"
+	metrics "k8s.io/metrics/pkg/apis/metrics"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/provider"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sink"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+)
+
+// kubernetesCadvisorWindow is the max window used by cAdvisor for calculating
+// CPU usage rate.  While it can vary, it's no more than this number, but may be
+// as low as half this number (when working with no backoff).  It would be really
+// nice if the kubelet told us this in the summary API...
+var kubernetesCadvisorWindow = 30 * time.Second
+
+// sinkMetricsProvider is a provider.MetricsProvider that also acts as a sink.MetricSink
+type sinkMetricsProvider struct {
+	mu    sync.RWMutex
+	nodes map[string]sources.NodeMetricsPoint
+	pods  map[apitypes.NamespacedName]sources.PodMetricsPoint
+}
+
+// NewSinkProvider returns a MetricSink that feeds into a MetricsProvider.
+func NewSinkProvider() (sink.MetricSink, provider.MetricsProvider) {
+	prov := &sinkMetricsProvider{}
+	return prov, prov
+}
+
+// TODO(directxman12): figure out what the right value is for "window" --
+// we don't get the actual window from cAdvisor, so we could just
+// plumb down metric resolution, but that wouldn't be actually correct.
+
+func (p *sinkMetricsProvider) GetNodeMetrics(nodes ...string) ([]provider.TimeInfo, []corev1.ResourceList, error) {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	timestamps := make([]provider.TimeInfo, len(nodes))
+	resMetrics := make([]corev1.ResourceList, len(nodes))
+
+	for i, node := range nodes {
+		metricPoint, present := p.nodes[node]
+		if !present {
+			continue
+		}
+
+		timestamps[i] = provider.TimeInfo{
+			Timestamp: metricPoint.Timestamp,
+			Window:    kubernetesCadvisorWindow,
+		}
+		resMetrics[i] = corev1.ResourceList{
+			corev1.ResourceName(corev1.ResourceCPU):    metricPoint.CpuUsage,
+			corev1.ResourceName(corev1.ResourceMemory): metricPoint.MemoryUsage,
+		}
+	}
+
+	return timestamps, resMetrics, nil
+}
+
+func (p *sinkMetricsProvider) GetContainerMetrics(pods ...apitypes.NamespacedName) ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	timestamps := make([]provider.TimeInfo, len(pods))
+	resMetrics := make([][]metrics.ContainerMetrics, len(pods))
+
+	for i, pod := range pods {
+		metricPoint, present := p.pods[pod]
+		if !present {
+			continue
+		}
+
+		contMetrics := make([]metrics.ContainerMetrics, len(metricPoint.Containers))
+		var earliestTS *time.Time
+		for i, contPoint := range metricPoint.Containers {
+			contMetrics[i] = metrics.ContainerMetrics{
+				Name: contPoint.Name,
+				Usage: corev1.ResourceList{
+					corev1.ResourceName(corev1.ResourceCPU):    contPoint.CpuUsage,
+					corev1.ResourceName(corev1.ResourceMemory): contPoint.MemoryUsage,
+				},
+			}
+			if earliestTS == nil || earliestTS.After(contPoint.Timestamp) {
+				ts := contPoint.Timestamp // copy to avoid loop iteration variable issues
+				earliestTS = &ts
+			}
+		}
+		if earliestTS == nil {
+			// we had no containers
+			earliestTS = &time.Time{}
+		}
+		timestamps[i] = provider.TimeInfo{
+			Timestamp: *earliestTS,
+			Window:    kubernetesCadvisorWindow,
+		}
+		resMetrics[i] = contMetrics
+	}
+	return timestamps, resMetrics, nil
+}
+
+func (p *sinkMetricsProvider) Receive(batch *sources.MetricsBatch) error {
+	newNodes := make(map[string]sources.NodeMetricsPoint, len(batch.Nodes))
+	for _, nodePoint := range batch.Nodes {
+		if _, exists := newNodes[nodePoint.Name]; exists {
+			return fmt.Errorf("duplicate node %s received", nodePoint.Name)
+		}
+		newNodes[nodePoint.Name] = nodePoint
+	}
+
+	newPods := make(map[apitypes.NamespacedName]sources.PodMetricsPoint, len(batch.Pods))
+	for _, podPoint := range batch.Pods {
+		podIdent := apitypes.NamespacedName{Name: podPoint.Name, Namespace: podPoint.Namespace}
+		if _, exists := newPods[podIdent]; exists {
+			return fmt.Errorf("duplicate pod %s received", podIdent)
+		}
+		newPods[podIdent] = podPoint
+	}
+
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	p.nodes = newNodes
+	p.pods = newPods
+
+	return nil
+}
diff --git a/metrics-server/pkg/provider/sink/sinkprov_test.go b/metrics-server/pkg/provider/sink/sinkprov_test.go
new file mode 100644
index 0000000..8980eeb
--- /dev/null
+++ b/metrics-server/pkg/provider/sink/sinkprov_test.go
@@ -0,0 +1,281 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sink_test
+
+import (
+	"testing"
+	"time"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+	apitypes "k8s.io/apimachinery/pkg/types"
+	metrics "k8s.io/metrics/pkg/apis/metrics"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/provider"
+	. "github.com/kubernetes-incubator/metrics-server/pkg/provider/sink"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sink"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+)
+
+var defaultWindow = 30 * time.Second
+
+func TestSourceManager(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Provider/Sink Suite")
+}
+
+func newMilliPoint(ts time.Time, cpu, memory int64) sources.MetricsPoint {
+	return sources.MetricsPoint{
+		Timestamp:   ts,
+		CpuUsage:    *resource.NewMilliQuantity(cpu, resource.DecimalSI),
+		MemoryUsage: *resource.NewMilliQuantity(memory, resource.BinarySI),
+	}
+}
+
+var _ = Describe("In-memory Sink Provider", func() {
+	var (
+		batch    *sources.MetricsBatch
+		prov     provider.MetricsProvider
+		provSink sink.MetricSink
+		now      time.Time
+	)
+
+	BeforeEach(func() {
+		now = time.Now()
+		batch = &sources.MetricsBatch{
+			Nodes: []sources.NodeMetricsPoint{
+				{Name: "node1", MetricsPoint: newMilliPoint(now.Add(100*time.Millisecond), 110, 120)},
+				{Name: "node2", MetricsPoint: newMilliPoint(now.Add(200*time.Millisecond), 210, 220)},
+				{Name: "node3", MetricsPoint: newMilliPoint(now.Add(300*time.Millisecond), 310, 320)},
+			},
+			Pods: []sources.PodMetricsPoint{
+				{Name: "pod1", Namespace: "ns1", Containers: []sources.ContainerMetricsPoint{
+					{Name: "container1", MetricsPoint: newMilliPoint(now.Add(400*time.Millisecond), 410, 420)},
+					{Name: "container2", MetricsPoint: newMilliPoint(now.Add(500*time.Millisecond), 510, 520)},
+				}},
+				{Name: "pod2", Namespace: "ns1", Containers: []sources.ContainerMetricsPoint{
+					{Name: "container1", MetricsPoint: newMilliPoint(now.Add(600*time.Millisecond), 610, 620)},
+				}},
+				{Name: "pod1", Namespace: "ns2", Containers: []sources.ContainerMetricsPoint{
+					{Name: "container1", MetricsPoint: newMilliPoint(now.Add(700*time.Millisecond), 710, 720)},
+					{Name: "container2", MetricsPoint: newMilliPoint(now.Add(800*time.Millisecond), 810, 820)},
+				}},
+			},
+		}
+
+		provSink, prov = NewSinkProvider()
+	})
+
+	It("should receive batches of metrics", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("making sure that the provider contains all nodes received")
+		for _, node := range batch.Nodes {
+			_, _, err := prov.GetNodeMetrics(node.Name)
+			Expect(err).NotTo(HaveOccurred())
+		}
+
+		By("making sure that the provider contains all pods received")
+		for _, pod := range batch.Pods {
+			_, _, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+				Name:      pod.Name,
+				Namespace: pod.Namespace,
+			})
+			Expect(err).NotTo(HaveOccurred())
+		}
+	})
+
+	It("should error out if duplicate nodes were received, without a partial store", func() {
+		By("adding a duplicate node to the batch")
+		batch.Nodes = append(batch.Nodes, batch.Nodes[0])
+
+		By("sending the batch to the sink and checking for an error")
+		Expect(provSink.Receive(batch)).NotTo(Succeed())
+
+		By("making sure none of the data is in the sink")
+		for _, node := range batch.Nodes {
+			_, res, err := prov.GetNodeMetrics(node.Name)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(ConsistOf(corev1.ResourceList(nil)))
+		}
+		for _, pod := range batch.Pods {
+			_, res, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+				Name:      pod.Name,
+				Namespace: pod.Namespace,
+			})
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(Equal([][]metrics.ContainerMetrics{nil}))
+		}
+	})
+
+	It("should error out if duplicate pods were received, without a partial store", func() {
+		By("adding a duplicate pod to the batch")
+		batch.Pods = append(batch.Pods, batch.Pods[0])
+
+		By("sending the batch to the sink and checking for an error")
+		Expect(provSink.Receive(batch)).NotTo(Succeed())
+
+		By("making sure none of the data is in the sink")
+		for _, node := range batch.Nodes {
+			_, res, err := prov.GetNodeMetrics(node.Name)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(ConsistOf(corev1.ResourceList(nil)))
+		}
+		for _, pod := range batch.Pods {
+			_, res, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+				Name:      pod.Name,
+				Namespace: pod.Namespace,
+			})
+			Expect(err).NotTo(HaveOccurred())
+			Expect(res).To(Equal([][]metrics.ContainerMetrics{nil}))
+		}
+	})
+
+	It("should retrieve metrics for all containers in a pod, with overall latest scrape time", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the pod")
+		ts, containerMetrics, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+			Name:      "pod1",
+			Namespace: "ns1",
+		})
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(ConsistOf(provider.TimeInfo{Timestamp: now.Add(400 * time.Millisecond), Window: defaultWindow}))
+
+		By("verifying that all containers have data")
+		Expect(containerMetrics).To(Equal(
+			[][]metrics.ContainerMetrics{
+				{
+					{
+						Name: "container1",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(410, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(420, resource.BinarySI),
+						},
+					},
+					{
+						Name: "container2",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(510, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(520, resource.BinarySI),
+						},
+					},
+				},
+			},
+		))
+	})
+
+	It("should return nil metrics for missing pods", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the a present pod and a missing pod")
+		ts, containerMetrics, err := prov.GetContainerMetrics(apitypes.NamespacedName{
+			Name:      "pod1",
+			Namespace: "ns1",
+		}, apitypes.NamespacedName{
+			Name:      "pod2",
+			Namespace: "ns42",
+		})
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(400 * time.Millisecond), Window: defaultWindow}, {}}))
+
+		By("verifying that all present containers have data")
+		Expect(containerMetrics).To(Equal(
+			[][]metrics.ContainerMetrics{
+				{
+					{
+						Name: "container1",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(410, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(420, resource.BinarySI),
+						},
+					},
+					{
+						Name: "container2",
+						Usage: corev1.ResourceList{
+							corev1.ResourceCPU:    *resource.NewMilliQuantity(510, resource.DecimalSI),
+							corev1.ResourceMemory: *resource.NewMilliQuantity(520, resource.BinarySI),
+						},
+					},
+				},
+				nil,
+			},
+		))
+
+	})
+
+	It("should retrieve metrics for a node, with overall latest scrape time", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the nodes")
+		ts, nodeMetrics, err := prov.GetNodeMetrics("node1", "node2")
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(100 * time.Millisecond), Window: defaultWindow}, {Timestamp: now.Add(200 * time.Millisecond), Window: defaultWindow}}))
+
+		By("verifying that all nodes have data")
+		Expect(nodeMetrics).To(Equal(
+			[]corev1.ResourceList{
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(110, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(120, resource.BinarySI),
+				},
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(210, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(220, resource.BinarySI),
+				},
+			},
+		))
+	})
+
+	It("should return nil metrics for missing nodes", func() {
+		By("sending the batch to the sink")
+		Expect(provSink.Receive(batch)).To(Succeed())
+
+		By("fetching the nodes, plus a missing node")
+		ts, nodeMetrics, err := prov.GetNodeMetrics("node1", "node2", "node42")
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the timestamp is the smallest time amongst all containers")
+		Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(100 * time.Millisecond), Window: defaultWindow}, {Timestamp: now.Add(200 * time.Millisecond), Window: defaultWindow}, {}}))
+
+		By("verifying that all present nodes have data")
+		Expect(nodeMetrics).To(Equal(
+			[]corev1.ResourceList{
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(110, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(120, resource.BinarySI),
+				},
+				{
+					corev1.ResourceCPU:    *resource.NewMilliQuantity(210, resource.DecimalSI),
+					corev1.ResourceMemory: *resource.NewMilliQuantity(220, resource.BinarySI),
+				},
+				nil,
+			},
+		))
+
+	})
+})
diff --git a/metrics-server/pkg/sink/interfaces.go b/metrics-server/pkg/sink/interfaces.go
new file mode 100644
index 0000000..a8faf68
--- /dev/null
+++ b/metrics-server/pkg/sink/interfaces.go
@@ -0,0 +1,25 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sink
+
+import (
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+)
+
+// MetricSink knows how to receive metrics batches from a source.
+type MetricSink interface {
+	// Receive ingests a new batch of metrics.
+	Receive(*sources.MetricsBatch) error
+}
diff --git a/metrics-server/pkg/sources/fake/fakes.go b/metrics-server/pkg/sources/fake/fakes.go
new file mode 100644
index 0000000..4bbc9f9
--- /dev/null
+++ b/metrics-server/pkg/sources/fake/fakes.go
@@ -0,0 +1,46 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fake
+
+import (
+	"context"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+)
+
+// StaticSourceProvider is a fake sources.MetricSourceProvider that returns
+// metrics from a static list.
+type StaticSourceProvider []sources.MetricSource
+
+func (p StaticSourceProvider) GetMetricSources() ([]sources.MetricSource, error) { return p, nil }
+
+// FunctionSource is a sources.MetricSource that calls a function to
+// return the given data points
+type FunctionSource struct {
+	SourceName    string
+	GenerateBatch CollectFunc
+}
+
+func (f *FunctionSource) Name() string {
+	return f.SourceName
+}
+
+func (f *FunctionSource) Collect(ctx context.Context) (*sources.MetricsBatch, error) {
+	return f.GenerateBatch(ctx)
+}
+
+// CollectFunc is the function signature of FunctionSource#GenerateBatch,
+// and knows how to generate a MetricsBatch.
+type CollectFunc func(context.Context) (*sources.MetricsBatch, error)
diff --git a/metrics-server/pkg/sources/interfaces.go b/metrics-server/pkg/sources/interfaces.go
new file mode 100644
index 0000000..74a4d14
--- /dev/null
+++ b/metrics-server/pkg/sources/interfaces.go
@@ -0,0 +1,76 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sources
+
+import (
+	"context"
+	"time"
+
+	"k8s.io/apimachinery/pkg/api/resource"
+)
+
+// MetricsBatch is a single batch of pod, container, and node metrics from some source.
+type MetricsBatch struct {
+	Nodes []NodeMetricsPoint
+	Pods  []PodMetricsPoint
+}
+
+// NodeMetricsPoint contains the metrics for some node at some point in time.
+type NodeMetricsPoint struct {
+	Name string
+	MetricsPoint
+}
+
+// PodMetricsPoint contains the metrics for some pod's containers.
+type PodMetricsPoint struct {
+	Name      string
+	Namespace string
+
+	Containers []ContainerMetricsPoint
+}
+
+// ContainerMetricsPoint contains the metrics for some container at some point in time.
+type ContainerMetricsPoint struct {
+	Name string
+	MetricsPoint
+}
+
+// MetricsPoint represents the a set of specific metrics at some point in time.
+type MetricsPoint struct {
+	Timestamp time.Time
+	// CpuUsage is the CPU usage rate, in cores
+	CpuUsage resource.Quantity
+	// MemoryUsage is the working set size, in bytes.
+	MemoryUsage resource.Quantity
+}
+
+// MetricSource knows how to collect pod, container, and node metrics from some location.
+// It is expected that the batch returned contains unique values (i.e. it does not return
+// the same node, pod, or container as any other source).
+type MetricSource interface {
+	// Collect fetches a batch of metrics.  It may return both a partial result and an error,
+	// and non-nil results thus must be well-formed and meaningful even when accompanied by 
+	// and error.
+	Collect(context.Context) (*MetricsBatch, error)
+	// Name names the metrics source for identification purposes
+	Name() string
+}
+
+// MetricSourceProvider provides metric sources to collect from.
+type MetricSourceProvider interface {
+	// GetMetricSources fetches all sources known to this metrics provider.
+	// It may return both partial results and an error.
+	GetMetricSources() ([]MetricSource, error)
+}
diff --git a/metrics-server/pkg/sources/manager.go b/metrics-server/pkg/sources/manager.go
new file mode 100644
index 0000000..ff5b4d0
--- /dev/null
+++ b/metrics-server/pkg/sources/manager.go
@@ -0,0 +1,165 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sources
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"time"
+
+	"github.com/golang/glog"
+	"github.com/prometheus/client_golang/prometheus"
+	utilerrors "k8s.io/apimachinery/pkg/util/errors"
+
+	utilmetrics "github.com/kubernetes-incubator/metrics-server/pkg/metrics"
+)
+
+const (
+	maxDelayMs       = 4 * 1000
+	delayPerSourceMs = 8
+)
+
+var (
+	lastScrapeTimestamp = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Namespace: "metrics_server",
+			Subsystem: "scraper",
+			Name:      "last_time_seconds",
+			Help:      "Last time metrics-server performed a scrape since unix epoch in seconds.",
+		},
+		[]string{"source"},
+	)
+
+	// initialized below to an actual value by a call to RegisterScraperDuration
+	// (acts as a no-op by default), but we can't just register it in the constructor,
+	// since it could be called multiple times during setup.
+	scraperDuration *prometheus.HistogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"source"})
+)
+
+func init() {
+	prometheus.MustRegister(lastScrapeTimestamp)
+}
+
+// RegisterScraperDuration creates and registers a histogram metric for
+// scrape duration, suitable for use in the source manager.
+func RegisterDurationMetrics(scrapeTimeout time.Duration) {
+	scraperDuration = prometheus.NewHistogramVec(
+		prometheus.HistogramOpts{
+			Namespace: "metrics_server",
+			Subsystem: "scraper",
+			Name:      "duration_seconds",
+			Help:      "Time spent scraping sources in seconds.",
+			Buckets:   utilmetrics.BucketsForScrapeDuration(scrapeTimeout),
+		},
+		[]string{"source"},
+	)
+	prometheus.MustRegister(scraperDuration)
+}
+
+func NewSourceManager(srcProv MetricSourceProvider, scrapeTimeout time.Duration) MetricSource {
+	return &sourceManager{
+		srcProv:       srcProv,
+		scrapeTimeout: scrapeTimeout,
+	}
+}
+
+type sourceManager struct {
+	srcProv       MetricSourceProvider
+	scrapeTimeout time.Duration
+}
+
+func (m *sourceManager) Name() string {
+	return "source_manager"
+}
+
+func (m *sourceManager) Collect(baseCtx context.Context) (*MetricsBatch, error) {
+	sources, err := m.srcProv.GetMetricSources()
+	var errs []error
+	if err != nil {
+		// save the error, and continue on in case of partial results
+		errs = append(errs, err)
+	}
+	glog.V(1).Infof("Scraping metrics from %v sources", len(sources))
+
+	responseChannel := make(chan *MetricsBatch, len(sources))
+	errChannel := make(chan error, len(sources))
+	defer close(responseChannel)
+	defer close(errChannel)
+
+	startTime := time.Now()
+
+	// TODO(directxman12): re-evaluate this code -- do we really need to stagger fetches like this?
+	delayMs := delayPerSourceMs * len(sources)
+	if delayMs > maxDelayMs {
+		delayMs = maxDelayMs
+	}
+
+	for _, source := range sources {
+		go func(source MetricSource) {
+			// Prevents network congestion.
+			sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond
+			time.Sleep(sleepDuration)
+			// make the timeout a bit shorter to account for staggering, so we still preserve
+			// the overall timeout
+			ctx, cancelTimeout := context.WithTimeout(baseCtx, m.scrapeTimeout-sleepDuration)
+			defer cancelTimeout()
+
+			glog.V(2).Infof("Querying source: %s", source)
+			metrics, err := scrapeWithMetrics(ctx, source)
+			if err != nil {
+				errChannel <- fmt.Errorf("unable to fully scrape metrics from source %s: %v", source.Name(), err)
+				responseChannel <- metrics
+				return
+			}
+			responseChannel <- metrics
+			errChannel <- nil
+		}(source)
+	}
+
+	res := &MetricsBatch{}
+
+	for range sources {
+		err := <-errChannel
+		srcBatch := <-responseChannel
+		if err != nil {
+			errs = append(errs, err)
+			// NB: partial node results are still worth saving, so
+			// don't skip storing results if we got an error
+		}
+		if srcBatch == nil {
+			continue
+		}
+
+		res.Nodes = append(res.Nodes, srcBatch.Nodes...)
+		res.Pods = append(res.Pods, srcBatch.Pods...)
+	}
+
+	glog.V(1).Infof("ScrapeMetrics: time: %s, nodes: %v, pods: %v", time.Since(startTime), len(res.Nodes), len(res.Pods))
+	return res, utilerrors.NewAggregate(errs)
+}
+
+func scrapeWithMetrics(ctx context.Context, s MetricSource) (*MetricsBatch, error) {
+	sourceName := s.Name()
+	startTime := time.Now()
+	defer lastScrapeTimestamp.
+		WithLabelValues(sourceName).
+		Set(float64(time.Now().Unix()))
+	defer scraperDuration.
+		WithLabelValues(sourceName).
+		Observe(float64(time.Since(startTime)) / float64(time.Second))
+
+	return s.Collect(ctx)
+}
diff --git a/metrics-server/pkg/sources/manager_test.go b/metrics-server/pkg/sources/manager_test.go
new file mode 100644
index 0000000..78bf436
--- /dev/null
+++ b/metrics-server/pkg/sources/manager_test.go
@@ -0,0 +1,218 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sources_test
+
+import (
+	"context"
+	"fmt"
+	"testing"
+	"time"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	. "github.com/kubernetes-incubator/metrics-server/pkg/sources"
+	fakesrc "github.com/kubernetes-incubator/metrics-server/pkg/sources/fake"
+)
+
+func TestSourceManager(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Source Manager Suite")
+}
+
+// sleepySource returns a MetricSource that takes some amount of time (respecting
+// context timeouts) to collect a MetricsBatch with a single node's data point.
+func sleepySource(delay time.Duration, nodeName string, point MetricsPoint) MetricSource {
+	return &fakesrc.FunctionSource{
+		SourceName: "sleepy_source:" + nodeName,
+		GenerateBatch: func(ctx context.Context) (*MetricsBatch, error) {
+			select {
+			case <-time.After(delay):
+			case <-ctx.Done():
+				return nil, fmt.Errorf("timed out")
+			}
+			return &MetricsBatch{
+				Nodes: []NodeMetricsPoint{
+					{
+						Name:         nodeName,
+						MetricsPoint: point,
+					},
+				},
+			}, nil
+		},
+	}
+}
+
+func fullSource(ts time.Time, nodeInd, podStartInd, numPods int) MetricSource {
+	return &fakesrc.FunctionSource{
+		SourceName: fmt.Sprintf("static_source:node%d", nodeInd),
+		GenerateBatch: func(_ context.Context) (*MetricsBatch, error) {
+			podPoints := make([]PodMetricsPoint, numPods)
+			for i := range podPoints {
+				podInd := int64(podStartInd + i)
+				podPoints[i].Name = fmt.Sprintf("pod%d", podInd)
+				podPoints[i].Namespace = fmt.Sprintf("ns%d", nodeInd)
+				podPoints[i].Containers = []ContainerMetricsPoint{
+					{
+						Name: "container1",
+						MetricsPoint: MetricsPoint{
+							Timestamp:   ts,
+							CpuUsage:    *resource.NewQuantity(300+10*podInd, resource.DecimalSI),
+							MemoryUsage: *resource.NewQuantity(400+10*podInd, resource.DecimalSI),
+						},
+					},
+					{
+						Name: "container2",
+						MetricsPoint: MetricsPoint{
+							Timestamp:   ts,
+							CpuUsage:    *resource.NewQuantity(500+10*podInd, resource.DecimalSI),
+							MemoryUsage: *resource.NewQuantity(600+10*podInd, resource.DecimalSI),
+						},
+					},
+				}
+			}
+			return &MetricsBatch{
+				Nodes: []NodeMetricsPoint{
+					{
+						Name: fmt.Sprintf("node%d", nodeInd),
+						MetricsPoint: MetricsPoint{
+							Timestamp:   ts,
+							CpuUsage:    *resource.NewQuantity(100+10*int64(nodeInd), resource.DecimalSI),
+							MemoryUsage: *resource.NewQuantity(200+10*int64(nodeInd), resource.DecimalSI),
+						},
+					},
+				},
+				Pods: podPoints,
+			}, nil
+		},
+	}
+}
+
+var _ = Describe("Source Manager", func() {
+	var (
+		scrapeTime    = time.Now()
+		nodeDataPoint = MetricsPoint{
+			Timestamp:   scrapeTime,
+			CpuUsage:    *resource.NewQuantity(100, resource.DecimalSI),
+			MemoryUsage: *resource.NewQuantity(200, resource.DecimalSI),
+		}
+	)
+
+	Context("when all sources return in time", func() {
+		It("should return the results of all sources, both pods and nodes", func() {
+			By("setting up sources that take 1 second to complete")
+			metricsSourceProvider := fakesrc.StaticSourceProvider{
+				sleepySource(1*time.Second, "node1", nodeDataPoint),
+				sleepySource(1*time.Second, "node2", nodeDataPoint),
+			}
+
+			By("running the source manager with a scrape and context timeout of 3*seconds")
+			start := time.Now()
+			manager := NewSourceManager(metricsSourceProvider, 3*time.Second)
+			timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 3*time.Second)
+			dataBatch, errs := manager.Collect(timeoutCtx)
+			doneWithWork()
+			Expect(errs).NotTo(HaveOccurred())
+
+			By("ensuring that the full time took at most 3 seconds")
+			Expect(time.Now().Sub(start)).To(BeNumerically("<=", 3*time.Second))
+
+			By("ensuring that all the nodes are listed")
+			Expect(dataBatch.Nodes).To(ConsistOf(
+				NodeMetricsPoint{Name: "node1", MetricsPoint: nodeDataPoint},
+				NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint},
+			))
+		})
+
+		It("should return the results of all sources' nodes and pods", func() {
+			By("setting up multiple sources")
+			metricsSourceProvider := fakesrc.StaticSourceProvider{
+				fullSource(scrapeTime, 1, 0, 4),
+				fullSource(scrapeTime, 2, 4, 2),
+				fullSource(scrapeTime, 3, 6, 1),
+			}
+
+			By("running the source manager")
+			manager := NewSourceManager(metricsSourceProvider, 1*time.Second)
+			dataBatch, errs := manager.Collect(context.Background())
+			Expect(errs).NotTo(HaveOccurred())
+
+			By("figuring out the expected node and pod points")
+			var expectedNodePoints []interface{}
+			var expectedPodPoints []interface{}
+			for _, src := range metricsSourceProvider {
+				res, err := src.Collect(context.Background())
+				Expect(err).NotTo(HaveOccurred())
+				for _, pt := range res.Nodes {
+					expectedNodePoints = append(expectedNodePoints, pt)
+				}
+				for _, pt := range res.Pods {
+					expectedPodPoints = append(expectedPodPoints, pt)
+				}
+			}
+
+			By("ensuring that all nodes are present")
+			Expect(dataBatch.Nodes).To(ConsistOf(expectedNodePoints...))
+
+			By("ensuring that all pods are present")
+			Expect(dataBatch.Pods).To(ConsistOf(expectedPodPoints...))
+		})
+	})
+
+	Context("when some sources take too long", func() {
+		It("should pass the scrape timeout to the source context, so that sources can time out", func() {
+			By("setting up one source to take 4 seconds, and another to take 2")
+			metricsSourceProvider := fakesrc.StaticSourceProvider{
+				sleepySource(4*time.Second, "node1", nodeDataPoint),
+				sleepySource(2*time.Second, "node2", nodeDataPoint),
+			}
+
+			By("running the source manager with a scrape timeout of 3 seconds")
+			start := time.Now()
+			manager := NewSourceManager(metricsSourceProvider, 3*time.Second)
+			dataBatch, errs := manager.Collect(context.Background())
+
+			By("ensuring that scraping took around 3 seconds")
+			Expect(time.Now().Sub(start)).To(BeNumerically("~", 3*time.Second, 1*time.Millisecond))
+
+			By("ensuring that an error and partial results (data from source 2) were returned")
+			Expect(errs).To(HaveOccurred())
+			Expect(dataBatch.Nodes).To(ConsistOf(
+				NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint},
+			))
+		})
+
+		It("should respect the parent context's general timeout, even with a longer scrape timeout", func() {
+			By("setting up some sources with 4 second delays")
+			metricsSourceProvider := fakesrc.StaticSourceProvider{
+				sleepySource(4*time.Second, "node1", nodeDataPoint),
+				sleepySource(4*time.Second, "node2", nodeDataPoint),
+			}
+
+			By("running the source manager with a scrape timeout of 5 seconds, but a context timeout of 1 second")
+			start := time.Now()
+			manager := NewSourceManager(metricsSourceProvider, 5*time.Second)
+			timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 1*time.Second)
+			dataBatch, errs := manager.Collect(timeoutCtx)
+			doneWithWork()
+
+			By("ensuring that it times out after 1 second with errors and no data")
+			Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond))
+			Expect(errs).To(HaveOccurred())
+			Expect(dataBatch.Nodes).To(BeEmpty())
+		})
+	})
+})
diff --git a/metrics-server/pkg/sources/summary/addrs.go b/metrics-server/pkg/sources/summary/addrs.go
new file mode 100644
index 0000000..49e7fc1
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/addrs.go
@@ -0,0 +1,76 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+	"fmt"
+
+	corev1 "k8s.io/api/core/v1"
+)
+
+var (
+	// DefaultAddressTypePriority is the default node address type
+	// priority list, as taken from the Kubernetes API server options.
+	// In general, we prefer overrides to others, internal to external,
+	// and DNS to IPs.
+	DefaultAddressTypePriority = []corev1.NodeAddressType{
+		// --override-hostname
+		corev1.NodeHostName,
+
+		// internal, preferring DNS if reported
+		corev1.NodeInternalDNS,
+		corev1.NodeInternalIP,
+
+		// external, preferring DNS if reported
+		corev1.NodeExternalDNS,
+		corev1.NodeExternalIP,
+	}
+)
+
+// NodeAddressResolver knows how to find the preferred connection
+// address for a given node.
+type NodeAddressResolver interface {
+	// NodeAddress finds the preferred address to use to connect to
+	// the given node.
+	NodeAddress(node *corev1.Node) (address string, err error)
+}
+
+// prioNodeAddrResolver finds node addresses according to a list of
+// priorities of types of addresses.
+type prioNodeAddrResolver struct {
+	addrTypePriority []corev1.NodeAddressType
+}
+
+func (r *prioNodeAddrResolver) NodeAddress(node *corev1.Node) (string, error) {
+	// adapted from k8s.io/kubernetes/pkg/util/node
+	for _, addrType := range r.addrTypePriority {
+		for _, addr := range node.Status.Addresses {
+			if addr.Type == addrType {
+				return addr.Address, nil
+			}
+		}
+	}
+
+	return "", fmt.Errorf("node %s had no addresses that matched types %v", node.Name, r.addrTypePriority)
+}
+
+// NewPriorityNodeAddressResolver creates a new NodeAddressResolver that resolves
+// addresses first based on a list of prioritized address types, then based on
+// address order (first to last) within a particular address type.
+func NewPriorityNodeAddressResolver(typePriority []corev1.NodeAddressType) NodeAddressResolver {
+	return &prioNodeAddrResolver{
+		addrTypePriority: typePriority,
+	}
+}
diff --git a/metrics-server/pkg/sources/summary/client.go b/metrics-server/pkg/sources/summary/client.go
new file mode 100644
index 0000000..21e3aed
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/client.go
@@ -0,0 +1,119 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"net/url"
+	"strconv"
+
+	"github.com/golang/glog"
+	stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
+)
+
+// KubeletInterface knows how to fetch metrics from the Kubelet
+type KubeletInterface interface {
+	// GetSummary fetches summary metrics from the given Kubelet
+	GetSummary(ctx context.Context, host string) (*stats.Summary, error)
+}
+
+type kubeletClient struct {
+	port            int
+	deprecatedNoTLS bool
+	client          *http.Client
+}
+
+type ErrNotFound struct {
+	endpoint string
+}
+
+func (err *ErrNotFound) Error() string {
+	return fmt.Sprintf("%q not found", err.endpoint)
+}
+
+func IsNotFoundError(err error) bool {
+	_, isNotFound := err.(*ErrNotFound)
+	return isNotFound
+}
+
+func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.Request, value interface{}) error {
+	// TODO(directxman12): support validating certs by hostname
+	response, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer response.Body.Close()
+	body, err := ioutil.ReadAll(response.Body)
+	if err != nil {
+		return fmt.Errorf("failed to read response body - %v", err)
+	}
+	if response.StatusCode == http.StatusNotFound {
+		return &ErrNotFound{req.URL.String()}
+	} else if response.StatusCode != http.StatusOK {
+		return fmt.Errorf("request failed - %q, response: %q", response.Status, string(body))
+	}
+
+	kubeletAddr := "[unknown]"
+	if req.URL != nil {
+		kubeletAddr = req.URL.Host
+	}
+	glog.V(10).Infof("Raw response from Kubelet at %s: %s", kubeletAddr, string(body))
+
+	err = json.Unmarshal(body, value)
+	if err != nil {
+		return fmt.Errorf("failed to parse output. Response: %q. Error: %v", string(body), err)
+	}
+	return nil
+}
+
+func (kc *kubeletClient) GetSummary(ctx context.Context, host string) (*stats.Summary, error) {
+	scheme := "https"
+	if kc.deprecatedNoTLS {
+		scheme = "http"
+	}
+	url := url.URL{
+		Scheme: scheme,
+		Host:   net.JoinHostPort(host, strconv.Itoa(kc.port)),
+		Path:   "/stats/summary/",
+	}
+
+	req, err := http.NewRequest("GET", url.String(), nil)
+	if err != nil {
+		return nil, err
+	}
+	summary := &stats.Summary{}
+	client := kc.client
+	if client == nil {
+		client = http.DefaultClient
+	}
+	err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary)
+	return summary, err
+}
+
+func NewKubeletClient(transport http.RoundTripper, port int, deprecatedNoTLS bool) (KubeletInterface, error) {
+	c := &http.Client{
+		Transport: transport,
+	}
+	return &kubeletClient{
+		port:            port,
+		client:          c,
+		deprecatedNoTLS: deprecatedNoTLS,
+	}, nil
+}
diff --git a/metrics-server/pkg/sources/summary/configs.go b/metrics-server/pkg/sources/summary/configs.go
new file mode 100644
index 0000000..e5da658
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/configs.go
@@ -0,0 +1,58 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+	"fmt"
+
+	"k8s.io/client-go/rest"
+)
+
+// GetKubeletConfig fetches connection config for connecting to the Kubelet.
+func GetKubeletConfig(baseKubeConfig *rest.Config, port int, insecureTLS bool, completelyInsecure bool) *KubeletClientConfig {
+	cfg := rest.CopyConfig(baseKubeConfig)
+	if completelyInsecure {
+		cfg = rest.AnonymousClientConfig(cfg)        // don't use auth to avoid leaking auth details to insecure endpoints
+		cfg.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS
+	} else if insecureTLS {
+		cfg.TLSClientConfig.Insecure = true
+		cfg.TLSClientConfig.CAData = nil
+		cfg.TLSClientConfig.CAFile = ""
+	}
+	kubeletConfig := &KubeletClientConfig{
+		Port:                         port,
+		RESTConfig:                   cfg,
+		DeprecatedCompletelyInsecure: completelyInsecure,
+	}
+
+	return kubeletConfig
+}
+
+// KubeletClientConfig represents configuration for connecting to Kubelets.
+type KubeletClientConfig struct {
+	Port                         int
+	RESTConfig                   *rest.Config
+	DeprecatedCompletelyInsecure bool
+}
+
+// KubeletClientFor constructs a new KubeletInterface for the given configuration.
+func KubeletClientFor(config *KubeletClientConfig) (KubeletInterface, error) {
+	transport, err := rest.TransportFor(config.RESTConfig)
+	if err != nil {
+		return nil, fmt.Errorf("unable to construct transport: %v", err)
+	}
+
+	return NewKubeletClient(transport, config.Port, config.DeprecatedCompletelyInsecure)
+}
diff --git a/metrics-server/pkg/sources/summary/summary.go b/metrics-server/pkg/sources/summary/summary.go
new file mode 100644
index 0000000..667d74b
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/summary.go
@@ -0,0 +1,295 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"time"
+
+	"github.com/golang/glog"
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+	"github.com/prometheus/client_golang/prometheus"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+	"k8s.io/apimachinery/pkg/labels"
+	utilerrors "k8s.io/apimachinery/pkg/util/errors"
+	v1listers "k8s.io/client-go/listers/core/v1"
+	stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
+)
+
+var (
+	summaryRequestLatency = prometheus.NewHistogramVec(
+		prometheus.HistogramOpts{
+			Namespace: "metrics_server",
+			Subsystem: "kubelet_summary",
+			Name:      "request_duration_seconds",
+			Help:      "The Kubelet summary request latencies in seconds.",
+			// TODO(directxman12): it would be nice to calculate these buckets off of scrape duration,
+			// like we do elsewhere, but we're not passed the scrape duration at this level.
+			Buckets: prometheus.DefBuckets,
+		},
+		[]string{"node"},
+	)
+	scrapeTotal = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "metrics_server",
+			Subsystem: "kubelet_summary",
+			Name:      "scrapes_total",
+			Help:      "Total number of attempted Summary API scrapes done by Metrics Server",
+		},
+		[]string{"success"},
+	)
+)
+
+func init() {
+	prometheus.MustRegister(summaryRequestLatency)
+	prometheus.MustRegister(scrapeTotal)
+}
+
+// NodeInfo contains the information needed to identify and connect to a particular node
+// (node name and preferred address).
+type NodeInfo struct {
+	Name           string
+	ConnectAddress string
+}
+
+// Kubelet-provided metrics for pod and system container.
+type summaryMetricsSource struct {
+	node          NodeInfo
+	kubeletClient KubeletInterface
+}
+
+func NewSummaryMetricsSource(node NodeInfo, client KubeletInterface) sources.MetricSource {
+	return &summaryMetricsSource{
+		node:          node,
+		kubeletClient: client,
+	}
+}
+
+func (src *summaryMetricsSource) Name() string {
+	return src.String()
+}
+
+func (src *summaryMetricsSource) String() string {
+	return fmt.Sprintf("kubelet_summary:%s", src.node.Name)
+}
+
+func (src *summaryMetricsSource) Collect(ctx context.Context) (*sources.MetricsBatch, error) {
+	summary, err := func() (*stats.Summary, error) {
+		startTime := time.Now()
+		defer summaryRequestLatency.WithLabelValues(src.node.Name).Observe(float64(time.Since(startTime)) / float64(time.Second))
+		return src.kubeletClient.GetSummary(ctx, src.node.ConnectAddress)
+	}()
+
+	if err != nil {
+		scrapeTotal.WithLabelValues("false").Inc()
+		return nil, fmt.Errorf("unable to fetch metrics from Kubelet %s (%s): %v", src.node.Name, src.node.ConnectAddress, err)
+	}
+
+	scrapeTotal.WithLabelValues("true").Inc()
+
+	res := &sources.MetricsBatch{
+		Nodes: make([]sources.NodeMetricsPoint, 1),
+		Pods:  make([]sources.PodMetricsPoint, len(summary.Pods)),
+	}
+
+	var errs []error
+	errs = append(errs, src.decodeNodeStats(&summary.Node, &res.Nodes[0])...)
+	if len(errs) != 0 {
+		// if we had errors providing node metrics, discard the data point
+		// so that we don't incorrectly report metric values as zero.
+		res.Nodes = res.Nodes[:1]
+	}
+
+	num := 0
+	for _, pod := range summary.Pods {
+		podErrs := src.decodePodStats(&pod, &res.Pods[num])
+		errs = append(errs, podErrs...)
+		if len(podErrs) != 0 {
+			// NB: we explicitly want to discard pods with partial results, since
+			// the horizontal pod autoscaler takes special action when a pod is missing
+			// metrics (and zero CPU or memory does not count as "missing metrics")
+
+			// we don't care if we reuse slots in the result array,
+			// because they get completely overwritten in decodePodStats
+			continue
+		}
+		num++
+	}
+	res.Pods = res.Pods[:num]
+
+	return res, utilerrors.NewAggregate(errs)
+}
+
+func (src *summaryMetricsSource) decodeNodeStats(nodeStats *stats.NodeStats, target *sources.NodeMetricsPoint) []error {
+	timestamp, err := getScrapeTime(nodeStats.CPU, nodeStats.Memory)
+	if err != nil {
+		// if we can't get a timestamp, assume bad data in general
+		return []error{fmt.Errorf("unable to get valid timestamp for metric point for node %q, discarding data: %v", src.node.ConnectAddress, err)}
+	}
+	*target = sources.NodeMetricsPoint{
+		Name: src.node.Name,
+		MetricsPoint: sources.MetricsPoint{
+			Timestamp: timestamp,
+		},
+	}
+	var errs []error
+	if err := decodeCPU(&target.CpuUsage, nodeStats.CPU); err != nil {
+		errs = append(errs, fmt.Errorf("unable to get CPU for node %q, discarding data: %v", src.node.ConnectAddress, err))
+	}
+	if err := decodeMemory(&target.MemoryUsage, nodeStats.Memory); err != nil {
+		errs = append(errs, fmt.Errorf("unable to get memory for node %q, discarding data: %v", src.node.ConnectAddress, err))
+	}
+	return errs
+}
+
+func (src *summaryMetricsSource) decodePodStats(podStats *stats.PodStats, target *sources.PodMetricsPoint) []error {
+	// completely overwrite data in the target
+	*target = sources.PodMetricsPoint{
+		Name:       podStats.PodRef.Name,
+		Namespace:  podStats.PodRef.Namespace,
+		Containers: make([]sources.ContainerMetricsPoint, len(podStats.Containers)),
+	}
+
+	var errs []error
+	for i, container := range podStats.Containers {
+		timestamp, err := getScrapeTime(container.CPU, container.Memory)
+		if err != nil {
+			// if we can't get a timestamp, assume bad data in general
+			errs = append(errs, fmt.Errorf("unable to get a valid timestamp for metric point for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+			continue
+		}
+		point := sources.ContainerMetricsPoint{
+			Name: container.Name,
+			MetricsPoint: sources.MetricsPoint{
+				Timestamp: timestamp,
+			},
+		}
+		if err := decodeCPU(&point.CpuUsage, container.CPU); err != nil {
+			errs = append(errs, fmt.Errorf("unable to get CPU for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+		}
+		if err := decodeMemory(&point.MemoryUsage, container.Memory); err != nil {
+			errs = append(errs, fmt.Errorf("unable to get memory for container %q in pod %s/%s on node %q: %v, discarding data", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+		}
+
+		target.Containers[i] = point
+	}
+
+	return errs
+}
+
+func decodeCPU(target *resource.Quantity, cpuStats *stats.CPUStats) error {
+	if cpuStats == nil || cpuStats.UsageNanoCores == nil {
+		return fmt.Errorf("missing cpu usage metric")
+	}
+
+	*target = *uint64Quantity(*cpuStats.UsageNanoCores, -9)
+	return nil
+}
+
+func decodeMemory(target *resource.Quantity, memStats *stats.MemoryStats) error {
+	if memStats == nil || memStats.WorkingSetBytes == nil {
+		return fmt.Errorf("missing memory usage metric")
+	}
+
+	*target = *uint64Quantity(*memStats.WorkingSetBytes, 0)
+	target.Format = resource.BinarySI
+
+	return nil
+}
+
+func getScrapeTime(cpu *stats.CPUStats, memory *stats.MemoryStats) (time.Time, error) {
+	// Ensure we get the earlier timestamp so that we can tell if a given data
+	// point was tainted by pod initialization.
+
+	var earliest *time.Time
+	if cpu != nil && !cpu.Time.IsZero() && (earliest == nil || earliest.After(cpu.Time.Time)) {
+		earliest = &cpu.Time.Time
+	}
+
+	if memory != nil && !memory.Time.IsZero() && (earliest == nil || earliest.After(memory.Time.Time)) {
+		earliest = &memory.Time.Time
+	}
+
+	if earliest == nil {
+		return time.Time{}, fmt.Errorf("no non-zero timestamp on either CPU or memory")
+	}
+
+	return *earliest, nil
+}
+
+// uint64Quantity converts a uint64 into a Quantity, which only has constructors
+// that work with int64 (except for parse, which requires costly round-trips to string).
+// We lose precision until we fit in an int64 if greater than the max int64 value.
+func uint64Quantity(val uint64, scale resource.Scale) *resource.Quantity {
+	// easy path -- we can safely fit val into an int64
+	if val <= math.MaxInt64 {
+		return resource.NewScaledQuantity(int64(val), scale)
+	}
+
+	glog.V(1).Infof("unexpectedly large resource value %v, loosing precision to fit in scaled resource.Quantity", val)
+
+	// otherwise, lose an decimal order-of-magnitude precision,
+	// so we can fit into a scaled quantity
+	return resource.NewScaledQuantity(int64(val/10), resource.Scale(1)+scale)
+}
+
+type summaryProvider struct {
+	nodeLister    v1listers.NodeLister
+	kubeletClient KubeletInterface
+	addrResolver  NodeAddressResolver
+}
+
+func (p *summaryProvider) GetMetricSources() ([]sources.MetricSource, error) {
+	sources := []sources.MetricSource{}
+	nodes, err := p.nodeLister.List(labels.Everything())
+	if err != nil {
+		return nil, fmt.Errorf("unable to list nodes: %v", err)
+	}
+
+	var errs []error
+	for _, node := range nodes {
+		info, err := p.getNodeInfo(node)
+		if err != nil {
+			errs = append(errs, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err))
+			continue
+		}
+		sources = append(sources, NewSummaryMetricsSource(info, p.kubeletClient))
+	}
+	return sources, utilerrors.NewAggregate(errs)
+}
+
+func (p *summaryProvider) getNodeInfo(node *corev1.Node) (NodeInfo, error) {
+	addr, err := p.addrResolver.NodeAddress(node)
+	if err != nil {
+		return NodeInfo{}, err
+	}
+	info := NodeInfo{
+		Name:           node.Name,
+		ConnectAddress: addr,
+	}
+
+	return info, nil
+}
+
+func NewSummaryProvider(nodeLister v1listers.NodeLister, kubeletClient KubeletInterface, addrResolver NodeAddressResolver) sources.MetricSourceProvider {
+	return &summaryProvider{
+		nodeLister:    nodeLister,
+		kubeletClient: kubeletClient,
+		addrResolver:  addrResolver,
+	}
+}
diff --git a/metrics-server/pkg/sources/summary/summary_test.go b/metrics-server/pkg/sources/summary/summary_test.go
new file mode 100644
index 0000000..69672e0
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/summary_test.go
@@ -0,0 +1,492 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary_test
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"testing"
+	"time"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	corelisters "k8s.io/client-go/listers/core/v1"
+	stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/sources"
+	. "github.com/kubernetes-incubator/metrics-server/pkg/sources/summary"
+)
+
+func TestSummarySource(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Summary Source Test Suite")
+}
+
+type fakeKubeletClient struct {
+	delay   time.Duration
+	metrics *stats.Summary
+
+	lastHost string
+}
+
+func (c *fakeKubeletClient) GetSummary(ctx context.Context, host string) (*stats.Summary, error) {
+	select {
+	case <-ctx.Done():
+		return nil, fmt.Errorf("timed out")
+	case <-time.After(c.delay):
+	}
+
+	c.lastHost = host
+
+	return c.metrics, nil
+}
+
+func cpuStats(usageNanocores uint64, ts time.Time) *stats.CPUStats {
+	return &stats.CPUStats{
+		Time:           metav1.Time{ts},
+		UsageNanoCores: &usageNanocores,
+	}
+}
+
+func memStats(workingSetBytes uint64, ts time.Time) *stats.MemoryStats {
+	return &stats.MemoryStats{
+		Time:            metav1.Time{ts},
+		WorkingSetBytes: &workingSetBytes,
+	}
+}
+
+func podStats(namespace, name string, containers ...stats.ContainerStats) stats.PodStats {
+	return stats.PodStats{
+		PodRef: stats.PodReference{
+			Name:      name,
+			Namespace: namespace,
+		},
+		Containers: containers,
+	}
+}
+
+func containerStats(name string, cpu, mem uint64, baseTime time.Time) stats.ContainerStats {
+	return stats.ContainerStats{
+		Name:   name,
+		CPU:    cpuStats(cpu, baseTime.Add(2*time.Millisecond)),
+		Memory: memStats(mem, baseTime.Add(4*time.Millisecond)),
+	}
+}
+
+func verifyNode(nodeName string, summary *stats.Summary, batch *sources.MetricsBatch) {
+	var cpuUsage, memoryUsage resource.Quantity
+	var timestamp time.Time
+	if summary.Node.CPU != nil {
+		if summary.Node.CPU.UsageNanoCores != nil {
+			cpuUsage = *resource.NewScaledQuantity(int64(*summary.Node.CPU.UsageNanoCores), -9)
+		}
+		timestamp = summary.Node.CPU.Time.Time
+	}
+	if summary.Node.Memory != nil {
+		if summary.Node.Memory.WorkingSetBytes != nil {
+			memoryUsage = *resource.NewQuantity(int64(*summary.Node.Memory.WorkingSetBytes), resource.BinarySI)
+		}
+		if timestamp.IsZero() {
+			timestamp = summary.Node.Memory.Time.Time
+		}
+	}
+
+	Expect(batch.Nodes).To(ConsistOf(
+		sources.NodeMetricsPoint{
+			Name: nodeName,
+			MetricsPoint: sources.MetricsPoint{
+				Timestamp:   timestamp,
+				CpuUsage:    cpuUsage,
+				MemoryUsage: memoryUsage,
+			},
+		},
+	))
+}
+
+func verifyPods(summary *stats.Summary, batch *sources.MetricsBatch) {
+	var expectedPods []interface{}
+	for _, pod := range summary.Pods {
+		containers := make([]sources.ContainerMetricsPoint, len(pod.Containers))
+		missingData := false
+		for i, container := range pod.Containers {
+			var cpuUsage, memoryUsage resource.Quantity
+			var timestamp time.Time
+			if container.CPU == nil || container.CPU.UsageNanoCores == nil {
+				missingData = true
+				break
+			}
+			cpuUsage = *resource.NewScaledQuantity(int64(*container.CPU.UsageNanoCores), -9)
+			timestamp = container.CPU.Time.Time
+			if container.Memory == nil || container.Memory.WorkingSetBytes == nil {
+				missingData = true
+				break
+			}
+			memoryUsage = *resource.NewQuantity(int64(*container.Memory.WorkingSetBytes), resource.BinarySI)
+			if timestamp.IsZero() {
+				timestamp = container.Memory.Time.Time
+			}
+
+			containers[i] = sources.ContainerMetricsPoint{
+				Name: container.Name,
+				MetricsPoint: sources.MetricsPoint{
+					Timestamp:   timestamp,
+					CpuUsage:    cpuUsage,
+					MemoryUsage: memoryUsage,
+				},
+			}
+		}
+		if missingData {
+			continue
+		}
+		expectedPods = append(expectedPods, sources.PodMetricsPoint{
+			Name:       pod.PodRef.Name,
+			Namespace:  pod.PodRef.Namespace,
+			Containers: containers,
+		})
+	}
+	Expect(batch.Pods).To(ConsistOf(expectedPods...))
+}
+
+var _ = Describe("Summary Source", func() {
+	var (
+		src        sources.MetricSource
+		client     *fakeKubeletClient
+		scrapeTime time.Time = time.Now()
+		nodeInfo   NodeInfo  = NodeInfo{
+			ConnectAddress: "10.0.1.2",
+			Name:           "node1",
+		}
+	)
+	BeforeEach(func() {
+		client = &fakeKubeletClient{
+			metrics: &stats.Summary{
+				Node: stats.NodeStats{
+					CPU:    cpuStats(100, scrapeTime.Add(100*time.Millisecond)),
+					Memory: memStats(200, scrapeTime.Add(200*time.Millisecond)),
+				},
+				Pods: []stats.PodStats{
+					podStats("ns1", "pod1",
+						containerStats("container1", 300, 400, scrapeTime.Add(10*time.Millisecond)),
+						containerStats("container2", 500, 600, scrapeTime.Add(20*time.Millisecond))),
+					podStats("ns1", "pod2",
+						containerStats("container1", 700, 800, scrapeTime.Add(30*time.Millisecond))),
+					podStats("ns2", "pod1",
+						containerStats("container1", 900, 1000, scrapeTime.Add(40*time.Millisecond))),
+					podStats("ns3", "pod1",
+						containerStats("container1", 1100, 1200, scrapeTime.Add(50*time.Millisecond))),
+				},
+			},
+		}
+		src = NewSummaryMetricsSource(nodeInfo, client)
+	})
+
+	It("should pass the provided context to the kubelet client to time out requests", func() {
+		By("setting up a context with a 1 second timeout")
+		ctx, workDone := context.WithTimeout(context.Background(), 1*time.Second)
+
+		By("collecting the batch with a 4 second delay")
+		start := time.Now()
+		client.delay = 4 * time.Second
+		_, err := src.Collect(ctx)
+		workDone()
+
+		By("ensuring it timed out with an error after 1 second")
+		Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond))
+		Expect(err).To(HaveOccurred())
+	})
+
+	It("should fetch by connection address", func() {
+		By("collecting the batch")
+		_, err := src.Collect(context.Background())
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that it submitted the right host to the client")
+		Expect(client.lastHost).To(Equal(nodeInfo.ConnectAddress))
+	})
+
+	It("should return the working set and cpu usage for the node, and all pods on the node", func() {
+		By("collecting the batch")
+		batch, err := src.Collect(context.Background())
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the batch contains the right node data")
+		verifyNode(nodeInfo.Name, client.metrics, batch)
+
+		By("verifying that the batch contains the right pod data")
+		verifyPods(client.metrics, batch)
+	})
+
+	It("should use the scrape time from the CPU, falling back to memory if missing", func() {
+		By("removing some times from the data")
+		client.metrics.Pods[0].Containers[0].CPU.Time = metav1.Time{}
+		client.metrics.Node.CPU.Time = metav1.Time{}
+
+		By("collecting the batch")
+		batch, err := src.Collect(context.Background())
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the scrape time is as expected")
+		Expect(batch.Nodes[0].Timestamp).To(Equal(client.metrics.Node.Memory.Time.Time))
+		Expect(batch.Pods[0].Containers[0].Timestamp).To(Equal(client.metrics.Pods[0].Containers[0].Memory.Time.Time))
+		Expect(batch.Pods[1].Containers[0].Timestamp).To(Equal(client.metrics.Pods[1].Containers[0].CPU.Time.Time))
+	})
+
+	It("should continue on missing CPU or memory metrics", func() {
+		By("removing some data from the raw summary")
+		client.metrics.Node.Memory = nil
+		client.metrics.Pods[0].Containers[1].CPU = nil
+		client.metrics.Pods[1].Containers[0].CPU.UsageNanoCores = nil
+		client.metrics.Pods[2].Containers[0].Memory = nil
+		client.metrics.Pods[3].Containers[0].Memory.WorkingSetBytes = nil
+
+		By("collecting the batch")
+		batch, err := src.Collect(context.Background())
+		Expect(err).To(HaveOccurred())
+
+		By("verifying that the batch has all the data, save for what was missing")
+		verifyNode(nodeInfo.Name, client.metrics, batch)
+		verifyPods(client.metrics, batch)
+	})
+
+	It("should handle larger-than-int64 CPU or memory values gracefully", func() {
+		By("setting some data in the summary to be above math.MaxInt64")
+		plusTen := uint64(math.MaxInt64 + 10)
+		plusTwenty := uint64(math.MaxInt64 + 20)
+		minusTen := uint64(math.MaxUint64 - 10)
+		minusOneHundred := uint64(math.MaxUint64 - 100)
+
+		client.metrics.Node.Memory.WorkingSetBytes = &plusTen // RAM is cheap, right?
+		client.metrics.Node.CPU.UsageNanoCores = &plusTwenty  // a mainframe, probably
+		client.metrics.Pods[0].Containers[1].CPU.UsageNanoCores = &minusTen
+		client.metrics.Pods[1].Containers[0].Memory.WorkingSetBytes = &minusOneHundred
+
+		By("collecting the batch")
+		batch, err := src.Collect(context.Background())
+		Expect(err).NotTo(HaveOccurred())
+
+		By("verifying that the data is still present, at lower precision")
+		nodeMem := *resource.NewScaledQuantity(int64(plusTen/10), 1)
+		nodeMem.Format = resource.BinarySI
+		podMem := *resource.NewScaledQuantity(int64(minusOneHundred/10), 1)
+		podMem.Format = resource.BinarySI
+		Expect(batch.Nodes[0].MemoryUsage).To(Equal(nodeMem))
+		Expect(batch.Nodes[0].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(plusTwenty/10), -8)))
+		Expect(batch.Pods[0].Containers[1].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(minusTen/10), -8)))
+		Expect(batch.Pods[1].Containers[0].MemoryUsage).To(Equal(podMem))
+	})
+})
+
+type fakeNodeLister struct {
+	nodes   []*corev1.Node
+	listErr error
+}
+
+func (l *fakeNodeLister) List(_ labels.Selector) (ret []*corev1.Node, err error) {
+	if l.listErr != nil {
+		return nil, l.listErr
+	}
+	// NB: this is ignores selector for the moment
+	return l.nodes, nil
+}
+
+func (l *fakeNodeLister) ListWithPredicate(_ corelisters.NodeConditionPredicate) ([]*corev1.Node, error) {
+	// NB: this is ignores predicate for the moment
+	return l.List(labels.Everything())
+}
+
+func (l *fakeNodeLister) Get(name string) (*corev1.Node, error) {
+	for _, node := range l.nodes {
+		if node.Name == name {
+			return node, nil
+		}
+	}
+	return nil, fmt.Errorf("no such node %q", name)
+}
+
+func nodeNames(nodes []*corev1.Node, addrs []string) []string {
+	var res []string
+	for i, node := range nodes {
+		res = append(res, NewSummaryMetricsSource(NodeInfo{ConnectAddress: addrs[i], Name: node.Name}, nil).Name())
+	}
+	return res
+}
+
+func makeNode(name, hostName, addr string, ready bool) *corev1.Node {
+	res := &corev1.Node{
+		ObjectMeta: metav1.ObjectMeta{Name: "node1"},
+		Status: corev1.NodeStatus{
+			Addresses: []corev1.NodeAddress{
+				{Type: corev1.NodeHostName, Address: hostName},
+				{Type: corev1.NodeInternalIP, Address: addr},
+			},
+			Conditions: []corev1.NodeCondition{
+				{Type: corev1.NodeReady},
+			},
+		},
+	}
+	if ready {
+		res.Status.Conditions[0].Status = corev1.ConditionTrue
+	} else {
+		res.Status.Conditions[0].Status = corev1.ConditionFalse
+	}
+	return res
+}
+
+var _ = Describe("Summary Source Provider", func() {
+	var (
+		nodeLister *fakeNodeLister
+		nodeAddrs  []string
+		provider   sources.MetricSourceProvider
+		fakeClient *fakeKubeletClient
+	)
+	BeforeEach(func() {
+		nodeLister = &fakeNodeLister{
+			nodes: []*corev1.Node{
+				makeNode("node1", "node1.somedomain", "10.0.1.2", true),
+				makeNode("node-no-host", "", "10.0.1.3", true),
+				makeNode("node3", "node3.somedomain", "10.0.1.4", false),
+				makeNode("node4", "node4.somedomain", "10.0.1.5", true),
+			},
+		}
+		nodeAddrs = []string{
+			"10.0.1.2",
+			"10.0.1.3",
+			"10.0.1.4",
+			"10.0.1.5",
+		}
+		fakeClient = &fakeKubeletClient{}
+		addrResolver := NewPriorityNodeAddressResolver(DefaultAddressTypePriority)
+		provider = NewSummaryProvider(nodeLister, fakeClient, addrResolver)
+	})
+
+	It("should return a metrics source for all nodes", func() {
+		By("listing the sources")
+		sources, err := provider.GetMetricSources()
+		Expect(err).To(Succeed())
+
+		By("verifying that a source is present for each node")
+		nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
+		sourceNames := make([]string, len(nodeNames))
+		for i, src := range sources {
+			sourceNames[i] = src.Name()
+		}
+		Expect(sourceNames).To(Equal(nodeNames))
+	})
+
+	It("should continue on error fetching node information for a particular node", func() {
+		By("deleting the IP of a node")
+		nodeLister.nodes[0].Status.Addresses = nil
+
+		By("listing the sources")
+		sources, err := provider.GetMetricSources()
+		Expect(err).To(HaveOccurred())
+
+		By("verifying that a source is present for each node")
+		nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
+		sourceNames := make([]string, len(nodeNames[1:]))
+		for i, src := range sources {
+			sourceNames[i] = src.Name()
+		}
+		// skip the bad node (the first one)
+		Expect(sourceNames).To(Equal(nodeNames[1:]))
+	})
+
+	It("should gracefully handle list errors", func() {
+		By("setting a fake error from the lister")
+		nodeLister.listErr = fmt.Errorf("something went wrong, expectedly")
+
+		By("listing the sources")
+		_, err := provider.GetMetricSources()
+		Expect(err).To(HaveOccurred())
+	})
+
+	Describe("when choosing node addresses", func() {
+		JustBeforeEach(func() {
+			// set up the metrics so we can call collect safely
+			fakeClient.metrics = &stats.Summary{
+				Node: stats.NodeStats{
+					CPU:    cpuStats(100, time.Now()),
+					Memory: memStats(200, time.Now()),
+				},
+			}
+		})
+
+		It("should prefer addresses according to the order of the types first", func() {
+			By("setting the first node to have multiple addresses and setting all nodes to ready")
+			nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
+				{Type: DefaultAddressTypePriority[3], Address: "skip-val1"},
+				{Type: DefaultAddressTypePriority[2], Address: "skip-val2"},
+				{Type: DefaultAddressTypePriority[1], Address: "correct-val"},
+			}
+			for _, node := range nodeLister.nodes {
+				node.Status.Conditions = []corev1.NodeCondition{
+					{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
+				}
+			}
+
+			By("listing all sources")
+			srcs, err := provider.GetMetricSources()
+			Expect(err).NotTo(HaveOccurred())
+
+			By("making sure that the first source scrapes from the correct location")
+			_, err = srcs[0].Collect(context.Background())
+			Expect(err).NotTo(HaveOccurred())
+			Expect(fakeClient.lastHost).To(Equal("correct-val"))
+		})
+
+		It("should prefer the first address that matches within a given type", func() {
+			By("setting the first node to have multiple addresses and setting all nodes to ready")
+			nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
+				{Type: DefaultAddressTypePriority[1], Address: "skip-val1"},
+				{Type: DefaultAddressTypePriority[0], Address: "correct-val"},
+				{Type: DefaultAddressTypePriority[1], Address: "skip-val2"},
+				{Type: DefaultAddressTypePriority[0], Address: "second-val"},
+			}
+			for _, node := range nodeLister.nodes {
+				node.Status.Conditions = []corev1.NodeCondition{
+					{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
+				}
+			}
+
+			By("listing all sources")
+			srcs, err := provider.GetMetricSources()
+			Expect(err).NotTo(HaveOccurred())
+
+			By("making sure that the first source scrapes from the correct location")
+			_, err = srcs[0].Collect(context.Background())
+			Expect(err).NotTo(HaveOccurred())
+			Expect(fakeClient.lastHost).To(Equal("correct-val"))
+		})
+
+		It("should return an error if no preferred addresses are found", func() {
+			By("wiping out the addresses of one of the nodes and setting all nodes to ready")
+			nodeLister.nodes[0].Status.Addresses = nil
+			for _, node := range nodeLister.nodes {
+				node.Status.Conditions = []corev1.NodeCondition{
+					{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
+				}
+			}
+
+			By("asking for source providers for all nodes")
+			_, err := provider.GetMetricSources()
+			Expect(err).To(HaveOccurred())
+		})
+	})
+})
diff --git a/metrics-server/pkg/storage/nodemetrics/reststorage.go b/metrics-server/pkg/storage/nodemetrics/reststorage.go
new file mode 100644
index 0000000..909e8bf
--- /dev/null
+++ b/metrics-server/pkg/storage/nodemetrics/reststorage.go
@@ -0,0 +1,149 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package nodemetrics
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/golang/glog"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/provider"
+	"k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/errors"
+	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/runtime/schema"
+	"k8s.io/apiserver/pkg/registry/rest"
+	v1listers "k8s.io/client-go/listers/core/v1"
+	"k8s.io/metrics/pkg/apis/metrics"
+	_ "k8s.io/metrics/pkg/apis/metrics/install"
+)
+
+type MetricStorage struct {
+	groupResource schema.GroupResource
+	prov          provider.NodeMetricsProvider
+	nodeLister    v1listers.NodeLister
+}
+
+var _ rest.KindProvider = &MetricStorage{}
+var _ rest.Storage = &MetricStorage{}
+var _ rest.Getter = &MetricStorage{}
+var _ rest.Lister = &MetricStorage{}
+var _ rest.Scoper = &MetricStorage{}
+
+func NewStorage(groupResource schema.GroupResource, prov provider.NodeMetricsProvider, nodeLister v1listers.NodeLister) *MetricStorage {
+	return &MetricStorage{
+		groupResource: groupResource,
+		prov:          prov,
+		nodeLister:    nodeLister,
+	}
+}
+
+// Storage interface
+func (m *MetricStorage) New() runtime.Object {
+	return &metrics.NodeMetrics{}
+}
+
+// KindProvider interface
+func (m *MetricStorage) Kind() string {
+	return "NodeMetrics"
+}
+
+// Lister interface
+func (m *MetricStorage) NewList() runtime.Object {
+	return &metrics.NodeMetricsList{}
+}
+
+// Lister interface
+func (m *MetricStorage) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
+	labelSelector := labels.Everything()
+	if options != nil && options.LabelSelector != nil {
+		labelSelector = options.LabelSelector
+	}
+	nodes, err := m.nodeLister.ListWithPredicate(func(node *v1.Node) bool {
+		if labelSelector.Empty() {
+			return true
+		}
+		return labelSelector.Matches(labels.Set(node.Labels))
+	})
+	if err != nil {
+		errMsg := fmt.Errorf("Error while listing nodes for selector %v: %v", labelSelector, err)
+		glog.Error(errMsg)
+		return &metrics.NodeMetricsList{}, errMsg
+	}
+
+	names := make([]string, len(nodes))
+	for i, node := range nodes {
+		names[i] = node.Name
+	}
+
+	metricsItems, err := m.getNodeMetrics(names...)
+	if err != nil {
+		errMsg := fmt.Errorf("Error while fetching node metrics for selector %v: %v", labelSelector, err)
+		glog.Error(errMsg)
+		return &metrics.NodeMetricsList{}, errMsg
+	}
+
+	return &metrics.NodeMetricsList{Items: metricsItems}, nil
+}
+
+func (m *MetricStorage) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) {
+	nodeMetrics, err := m.getNodeMetrics(name)
+	if err == nil && len(nodeMetrics) == 0 {
+		err = fmt.Errorf("no metrics known for node %q", name)
+	}
+	if err != nil {
+		glog.Errorf("unable to fetch node metrics for node %q: %v", name, err)
+		return nil, errors.NewNotFound(m.groupResource, name)
+	}
+
+	return &nodeMetrics[0], nil
+}
+
+func (m *MetricStorage) getNodeMetrics(names ...string) ([]metrics.NodeMetrics, error) {
+	timestamps, usages, err := m.prov.GetNodeMetrics(names...)
+	if err != nil {
+		return nil, err
+	}
+
+	res := make([]metrics.NodeMetrics, 0, len(names))
+
+	for i, name := range names {
+		if usages[i] == nil {
+			glog.Errorf("unable to fetch node metrics for node %q: no metrics known for node", name)
+
+			continue
+		}
+		res = append(res, metrics.NodeMetrics{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:              name,
+				CreationTimestamp: metav1.NewTime(time.Now()),
+			},
+			Timestamp: metav1.NewTime(timestamps[i].Timestamp),
+			Window:    metav1.Duration{Duration: timestamps[i].Window},
+			Usage:     usages[i],
+		})
+	}
+
+	return res, nil
+}
+
+func (m *MetricStorage) NamespaceScoped() bool {
+	return false
+}
diff --git a/metrics-server/pkg/storage/podmetrics/reststorage.go b/metrics-server/pkg/storage/podmetrics/reststorage.go
new file mode 100644
index 0000000..59edb17
--- /dev/null
+++ b/metrics-server/pkg/storage/podmetrics/reststorage.go
@@ -0,0 +1,168 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package podmetrics
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/golang/glog"
+
+	"github.com/kubernetes-incubator/metrics-server/pkg/provider"
+	"k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/errors"
+	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/runtime/schema"
+	apitypes "k8s.io/apimachinery/pkg/types"
+	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
+	"k8s.io/apiserver/pkg/registry/rest"
+	v1listers "k8s.io/client-go/listers/core/v1"
+	"k8s.io/metrics/pkg/apis/metrics"
+	_ "k8s.io/metrics/pkg/apis/metrics/install"
+)
+
+type MetricStorage struct {
+	groupResource schema.GroupResource
+	prov          provider.PodMetricsProvider
+	podLister     v1listers.PodLister
+}
+
+var _ rest.KindProvider = &MetricStorage{}
+var _ rest.Storage = &MetricStorage{}
+var _ rest.Getter = &MetricStorage{}
+var _ rest.Lister = &MetricStorage{}
+
+func NewStorage(groupResource schema.GroupResource, prov provider.PodMetricsProvider, podLister v1listers.PodLister) *MetricStorage {
+	return &MetricStorage{
+		groupResource: groupResource,
+		prov:          prov,
+		podLister:     podLister,
+	}
+}
+
+// Storage interface
+func (m *MetricStorage) New() runtime.Object {
+	return &metrics.PodMetrics{}
+}
+
+// KindProvider interface
+func (m *MetricStorage) Kind() string {
+	return "PodMetrics"
+}
+
+// Lister interface
+func (m *MetricStorage) NewList() runtime.Object {
+	return &metrics.PodMetricsList{}
+}
+
+// Lister interface
+func (m *MetricStorage) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
+	labelSelector := labels.Everything()
+	if options != nil && options.LabelSelector != nil {
+		labelSelector = options.LabelSelector
+	}
+	namespace := genericapirequest.NamespaceValue(ctx)
+	pods, err := m.podLister.Pods(namespace).List(labelSelector)
+	if err != nil {
+		errMsg := fmt.Errorf("Error while listing pods for selector %v in namespace %q: %v", labelSelector, namespace, err)
+		glog.Error(errMsg)
+		return &metrics.PodMetricsList{}, errMsg
+	}
+
+	metricsItems, err := m.getPodMetrics(pods...)
+	if err != nil {
+		errMsg := fmt.Errorf("Error while fetching pod metrics for selector %v in namespace %q: %v", labelSelector, namespace, err)
+		glog.Error(errMsg)
+		return &metrics.PodMetricsList{}, errMsg
+	}
+
+	return &metrics.PodMetricsList{Items: metricsItems}, nil
+}
+
+// Getter interface
+func (m *MetricStorage) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) {
+	namespace := genericapirequest.NamespaceValue(ctx)
+
+	pod, err := m.podLister.Pods(namespace).Get(name)
+	if err != nil {
+		errMsg := fmt.Errorf("Error while getting pod %v: %v", name, err)
+		glog.Error(errMsg)
+		if errors.IsNotFound(err) {
+			// return not-found errors directly
+			return &metrics.PodMetrics{}, err
+		}
+		return &metrics.PodMetrics{}, errMsg
+	}
+	if pod == nil {
+		return &metrics.PodMetrics{}, errors.NewNotFound(v1.Resource("pods"), fmt.Sprintf("%v/%v", namespace, name))
+	}
+
+	podMetrics, err := m.getPodMetrics(pod)
+	if err == nil && len(podMetrics) == 0 {
+		err = fmt.Errorf("no metrics known for pod \"%s/%s\"", pod.Namespace, pod.Name)
+	}
+	if err != nil {
+		glog.Errorf("unable to fetch pod metrics for pod %s/%s: %v", pod.Namespace, pod.Name, err)
+		return nil, errors.NewNotFound(m.groupResource, fmt.Sprintf("%v/%v", namespace, name))
+	}
+	return &podMetrics[0], nil
+}
+
+func (m *MetricStorage) getPodMetrics(pods ...*v1.Pod) ([]metrics.PodMetrics, error) {
+	namespacedNames := make([]apitypes.NamespacedName, len(pods))
+	for i, pod := range pods {
+		namespacedNames[i] = apitypes.NamespacedName{
+			Name:      pod.Name,
+			Namespace: pod.Namespace,
+		}
+	}
+	timestamps, containerMetrics, err := m.prov.GetContainerMetrics(namespacedNames...)
+	if err != nil {
+		return nil, err
+	}
+
+	res := make([]metrics.PodMetrics, 0, len(pods))
+
+	for i, pod := range pods {
+		if pod.Status.Phase != v1.PodRunning {
+			// ignore pod not in Running phase
+			continue
+		}
+		if containerMetrics[i] == nil {
+			glog.Errorf("unable to fetch pod metrics for pod %s/%s: no metrics known for pod", pod.Namespace, pod.Name)
+			continue
+		}
+
+		res = append(res, metrics.PodMetrics{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:              pod.Name,
+				Namespace:         pod.Namespace,
+				CreationTimestamp: metav1.NewTime(time.Now()),
+			},
+			Timestamp:  metav1.NewTime(timestamps[i].Timestamp),
+			Window:     metav1.Duration{Duration: timestamps[i].Window},
+			Containers: containerMetrics[i],
+		})
+	}
+	return res, nil
+}
+
+func (m *MetricStorage) NamespaceScoped() bool {
+	return true
+}
diff --git a/metrics-server/pkg/version/version.go b/metrics-server/pkg/version/version.go
new file mode 100644
index 0000000..72dd1ca
--- /dev/null
+++ b/metrics-server/pkg/version/version.go
@@ -0,0 +1,43 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package version
+
+import (
+	"fmt"
+	"runtime"
+
+	genericversion "k8s.io/apimachinery/pkg/version"
+)
+
+// these come from ldflags
+var (
+	gitVersion   = "v0.0.0-master+$Format:%h$"
+	gitCommit    = "$Format:%H$"          // sha1 from git, output of $(git rev-parse HEAD)
+	gitTreeState = ""                     // state of git tree, either "clean" or "dirty"
+	buildDate    = "1970-01-01T00:00:00Z" // build date in ISO8601 format, output of $(date -u +'%Y-%m-%dT%H:%M:%SZ')
+)
+
+// VersionInfo returns the version information for metrics-server.
+func VersionInfo() *genericversion.Info {
+	return &genericversion.Info{
+		GitVersion:   gitVersion,
+		GitCommit:    gitCommit,
+		GitTreeState: gitTreeState,
+		BuildDate:    buildDate,
+		GoVersion:    runtime.Version(),
+		Compiler:     runtime.Compiler,
+		Platform:     fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH),
+	}
+}