git subrepo clone (merge) https://github.com/kubernetes-incubator/metrics-server.git metrics-server
subrepo:
subdir: "metrics-server"
merged: "92d8412"
upstream:
origin: "https://github.com/kubernetes-incubator/metrics-server.git"
branch: "master"
commit: "92d8412"
git-subrepo:
version: "0.4.0"
origin: "???"
commit: "???"
diff --git a/metrics-server/pkg/sources/fake/fakes.go b/metrics-server/pkg/sources/fake/fakes.go
new file mode 100644
index 0000000..4bbc9f9
--- /dev/null
+++ b/metrics-server/pkg/sources/fake/fakes.go
@@ -0,0 +1,46 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fake
+
+import (
+ "context"
+
+ "github.com/kubernetes-incubator/metrics-server/pkg/sources"
+)
+
+// StaticSourceProvider is a fake sources.MetricSourceProvider that returns
+// metrics from a static list.
+type StaticSourceProvider []sources.MetricSource
+
+func (p StaticSourceProvider) GetMetricSources() ([]sources.MetricSource, error) { return p, nil }
+
+// FunctionSource is a sources.MetricSource that calls a function to
+// return the given data points
+type FunctionSource struct {
+ SourceName string
+ GenerateBatch CollectFunc
+}
+
+func (f *FunctionSource) Name() string {
+ return f.SourceName
+}
+
+func (f *FunctionSource) Collect(ctx context.Context) (*sources.MetricsBatch, error) {
+ return f.GenerateBatch(ctx)
+}
+
+// CollectFunc is the function signature of FunctionSource#GenerateBatch,
+// and knows how to generate a MetricsBatch.
+type CollectFunc func(context.Context) (*sources.MetricsBatch, error)
diff --git a/metrics-server/pkg/sources/interfaces.go b/metrics-server/pkg/sources/interfaces.go
new file mode 100644
index 0000000..74a4d14
--- /dev/null
+++ b/metrics-server/pkg/sources/interfaces.go
@@ -0,0 +1,76 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sources
+
+import (
+ "context"
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/resource"
+)
+
+// MetricsBatch is a single batch of pod, container, and node metrics from some source.
+type MetricsBatch struct {
+ Nodes []NodeMetricsPoint
+ Pods []PodMetricsPoint
+}
+
+// NodeMetricsPoint contains the metrics for some node at some point in time.
+type NodeMetricsPoint struct {
+ Name string
+ MetricsPoint
+}
+
+// PodMetricsPoint contains the metrics for some pod's containers.
+type PodMetricsPoint struct {
+ Name string
+ Namespace string
+
+ Containers []ContainerMetricsPoint
+}
+
+// ContainerMetricsPoint contains the metrics for some container at some point in time.
+type ContainerMetricsPoint struct {
+ Name string
+ MetricsPoint
+}
+
+// MetricsPoint represents the a set of specific metrics at some point in time.
+type MetricsPoint struct {
+ Timestamp time.Time
+ // CpuUsage is the CPU usage rate, in cores
+ CpuUsage resource.Quantity
+ // MemoryUsage is the working set size, in bytes.
+ MemoryUsage resource.Quantity
+}
+
+// MetricSource knows how to collect pod, container, and node metrics from some location.
+// It is expected that the batch returned contains unique values (i.e. it does not return
+// the same node, pod, or container as any other source).
+type MetricSource interface {
+ // Collect fetches a batch of metrics. It may return both a partial result and an error,
+ // and non-nil results thus must be well-formed and meaningful even when accompanied by
+ // and error.
+ Collect(context.Context) (*MetricsBatch, error)
+ // Name names the metrics source for identification purposes
+ Name() string
+}
+
+// MetricSourceProvider provides metric sources to collect from.
+type MetricSourceProvider interface {
+ // GetMetricSources fetches all sources known to this metrics provider.
+ // It may return both partial results and an error.
+ GetMetricSources() ([]MetricSource, error)
+}
diff --git a/metrics-server/pkg/sources/manager.go b/metrics-server/pkg/sources/manager.go
new file mode 100644
index 0000000..ff5b4d0
--- /dev/null
+++ b/metrics-server/pkg/sources/manager.go
@@ -0,0 +1,165 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sources
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "time"
+
+ "github.com/golang/glog"
+ "github.com/prometheus/client_golang/prometheus"
+ utilerrors "k8s.io/apimachinery/pkg/util/errors"
+
+ utilmetrics "github.com/kubernetes-incubator/metrics-server/pkg/metrics"
+)
+
+const (
+ maxDelayMs = 4 * 1000
+ delayPerSourceMs = 8
+)
+
+var (
+ lastScrapeTimestamp = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "metrics_server",
+ Subsystem: "scraper",
+ Name: "last_time_seconds",
+ Help: "Last time metrics-server performed a scrape since unix epoch in seconds.",
+ },
+ []string{"source"},
+ )
+
+ // initialized below to an actual value by a call to RegisterScraperDuration
+ // (acts as a no-op by default), but we can't just register it in the constructor,
+ // since it could be called multiple times during setup.
+ scraperDuration *prometheus.HistogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"source"})
+)
+
+func init() {
+ prometheus.MustRegister(lastScrapeTimestamp)
+}
+
+// RegisterScraperDuration creates and registers a histogram metric for
+// scrape duration, suitable for use in the source manager.
+func RegisterDurationMetrics(scrapeTimeout time.Duration) {
+ scraperDuration = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "metrics_server",
+ Subsystem: "scraper",
+ Name: "duration_seconds",
+ Help: "Time spent scraping sources in seconds.",
+ Buckets: utilmetrics.BucketsForScrapeDuration(scrapeTimeout),
+ },
+ []string{"source"},
+ )
+ prometheus.MustRegister(scraperDuration)
+}
+
+func NewSourceManager(srcProv MetricSourceProvider, scrapeTimeout time.Duration) MetricSource {
+ return &sourceManager{
+ srcProv: srcProv,
+ scrapeTimeout: scrapeTimeout,
+ }
+}
+
+type sourceManager struct {
+ srcProv MetricSourceProvider
+ scrapeTimeout time.Duration
+}
+
+func (m *sourceManager) Name() string {
+ return "source_manager"
+}
+
+func (m *sourceManager) Collect(baseCtx context.Context) (*MetricsBatch, error) {
+ sources, err := m.srcProv.GetMetricSources()
+ var errs []error
+ if err != nil {
+ // save the error, and continue on in case of partial results
+ errs = append(errs, err)
+ }
+ glog.V(1).Infof("Scraping metrics from %v sources", len(sources))
+
+ responseChannel := make(chan *MetricsBatch, len(sources))
+ errChannel := make(chan error, len(sources))
+ defer close(responseChannel)
+ defer close(errChannel)
+
+ startTime := time.Now()
+
+ // TODO(directxman12): re-evaluate this code -- do we really need to stagger fetches like this?
+ delayMs := delayPerSourceMs * len(sources)
+ if delayMs > maxDelayMs {
+ delayMs = maxDelayMs
+ }
+
+ for _, source := range sources {
+ go func(source MetricSource) {
+ // Prevents network congestion.
+ sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond
+ time.Sleep(sleepDuration)
+ // make the timeout a bit shorter to account for staggering, so we still preserve
+ // the overall timeout
+ ctx, cancelTimeout := context.WithTimeout(baseCtx, m.scrapeTimeout-sleepDuration)
+ defer cancelTimeout()
+
+ glog.V(2).Infof("Querying source: %s", source)
+ metrics, err := scrapeWithMetrics(ctx, source)
+ if err != nil {
+ errChannel <- fmt.Errorf("unable to fully scrape metrics from source %s: %v", source.Name(), err)
+ responseChannel <- metrics
+ return
+ }
+ responseChannel <- metrics
+ errChannel <- nil
+ }(source)
+ }
+
+ res := &MetricsBatch{}
+
+ for range sources {
+ err := <-errChannel
+ srcBatch := <-responseChannel
+ if err != nil {
+ errs = append(errs, err)
+ // NB: partial node results are still worth saving, so
+ // don't skip storing results if we got an error
+ }
+ if srcBatch == nil {
+ continue
+ }
+
+ res.Nodes = append(res.Nodes, srcBatch.Nodes...)
+ res.Pods = append(res.Pods, srcBatch.Pods...)
+ }
+
+ glog.V(1).Infof("ScrapeMetrics: time: %s, nodes: %v, pods: %v", time.Since(startTime), len(res.Nodes), len(res.Pods))
+ return res, utilerrors.NewAggregate(errs)
+}
+
+func scrapeWithMetrics(ctx context.Context, s MetricSource) (*MetricsBatch, error) {
+ sourceName := s.Name()
+ startTime := time.Now()
+ defer lastScrapeTimestamp.
+ WithLabelValues(sourceName).
+ Set(float64(time.Now().Unix()))
+ defer scraperDuration.
+ WithLabelValues(sourceName).
+ Observe(float64(time.Since(startTime)) / float64(time.Second))
+
+ return s.Collect(ctx)
+}
diff --git a/metrics-server/pkg/sources/manager_test.go b/metrics-server/pkg/sources/manager_test.go
new file mode 100644
index 0000000..78bf436
--- /dev/null
+++ b/metrics-server/pkg/sources/manager_test.go
@@ -0,0 +1,218 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sources_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "k8s.io/apimachinery/pkg/api/resource"
+
+ . "github.com/kubernetes-incubator/metrics-server/pkg/sources"
+ fakesrc "github.com/kubernetes-incubator/metrics-server/pkg/sources/fake"
+)
+
+func TestSourceManager(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Source Manager Suite")
+}
+
+// sleepySource returns a MetricSource that takes some amount of time (respecting
+// context timeouts) to collect a MetricsBatch with a single node's data point.
+func sleepySource(delay time.Duration, nodeName string, point MetricsPoint) MetricSource {
+ return &fakesrc.FunctionSource{
+ SourceName: "sleepy_source:" + nodeName,
+ GenerateBatch: func(ctx context.Context) (*MetricsBatch, error) {
+ select {
+ case <-time.After(delay):
+ case <-ctx.Done():
+ return nil, fmt.Errorf("timed out")
+ }
+ return &MetricsBatch{
+ Nodes: []NodeMetricsPoint{
+ {
+ Name: nodeName,
+ MetricsPoint: point,
+ },
+ },
+ }, nil
+ },
+ }
+}
+
+func fullSource(ts time.Time, nodeInd, podStartInd, numPods int) MetricSource {
+ return &fakesrc.FunctionSource{
+ SourceName: fmt.Sprintf("static_source:node%d", nodeInd),
+ GenerateBatch: func(_ context.Context) (*MetricsBatch, error) {
+ podPoints := make([]PodMetricsPoint, numPods)
+ for i := range podPoints {
+ podInd := int64(podStartInd + i)
+ podPoints[i].Name = fmt.Sprintf("pod%d", podInd)
+ podPoints[i].Namespace = fmt.Sprintf("ns%d", nodeInd)
+ podPoints[i].Containers = []ContainerMetricsPoint{
+ {
+ Name: "container1",
+ MetricsPoint: MetricsPoint{
+ Timestamp: ts,
+ CpuUsage: *resource.NewQuantity(300+10*podInd, resource.DecimalSI),
+ MemoryUsage: *resource.NewQuantity(400+10*podInd, resource.DecimalSI),
+ },
+ },
+ {
+ Name: "container2",
+ MetricsPoint: MetricsPoint{
+ Timestamp: ts,
+ CpuUsage: *resource.NewQuantity(500+10*podInd, resource.DecimalSI),
+ MemoryUsage: *resource.NewQuantity(600+10*podInd, resource.DecimalSI),
+ },
+ },
+ }
+ }
+ return &MetricsBatch{
+ Nodes: []NodeMetricsPoint{
+ {
+ Name: fmt.Sprintf("node%d", nodeInd),
+ MetricsPoint: MetricsPoint{
+ Timestamp: ts,
+ CpuUsage: *resource.NewQuantity(100+10*int64(nodeInd), resource.DecimalSI),
+ MemoryUsage: *resource.NewQuantity(200+10*int64(nodeInd), resource.DecimalSI),
+ },
+ },
+ },
+ Pods: podPoints,
+ }, nil
+ },
+ }
+}
+
+var _ = Describe("Source Manager", func() {
+ var (
+ scrapeTime = time.Now()
+ nodeDataPoint = MetricsPoint{
+ Timestamp: scrapeTime,
+ CpuUsage: *resource.NewQuantity(100, resource.DecimalSI),
+ MemoryUsage: *resource.NewQuantity(200, resource.DecimalSI),
+ }
+ )
+
+ Context("when all sources return in time", func() {
+ It("should return the results of all sources, both pods and nodes", func() {
+ By("setting up sources that take 1 second to complete")
+ metricsSourceProvider := fakesrc.StaticSourceProvider{
+ sleepySource(1*time.Second, "node1", nodeDataPoint),
+ sleepySource(1*time.Second, "node2", nodeDataPoint),
+ }
+
+ By("running the source manager with a scrape and context timeout of 3*seconds")
+ start := time.Now()
+ manager := NewSourceManager(metricsSourceProvider, 3*time.Second)
+ timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 3*time.Second)
+ dataBatch, errs := manager.Collect(timeoutCtx)
+ doneWithWork()
+ Expect(errs).NotTo(HaveOccurred())
+
+ By("ensuring that the full time took at most 3 seconds")
+ Expect(time.Now().Sub(start)).To(BeNumerically("<=", 3*time.Second))
+
+ By("ensuring that all the nodes are listed")
+ Expect(dataBatch.Nodes).To(ConsistOf(
+ NodeMetricsPoint{Name: "node1", MetricsPoint: nodeDataPoint},
+ NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint},
+ ))
+ })
+
+ It("should return the results of all sources' nodes and pods", func() {
+ By("setting up multiple sources")
+ metricsSourceProvider := fakesrc.StaticSourceProvider{
+ fullSource(scrapeTime, 1, 0, 4),
+ fullSource(scrapeTime, 2, 4, 2),
+ fullSource(scrapeTime, 3, 6, 1),
+ }
+
+ By("running the source manager")
+ manager := NewSourceManager(metricsSourceProvider, 1*time.Second)
+ dataBatch, errs := manager.Collect(context.Background())
+ Expect(errs).NotTo(HaveOccurred())
+
+ By("figuring out the expected node and pod points")
+ var expectedNodePoints []interface{}
+ var expectedPodPoints []interface{}
+ for _, src := range metricsSourceProvider {
+ res, err := src.Collect(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+ for _, pt := range res.Nodes {
+ expectedNodePoints = append(expectedNodePoints, pt)
+ }
+ for _, pt := range res.Pods {
+ expectedPodPoints = append(expectedPodPoints, pt)
+ }
+ }
+
+ By("ensuring that all nodes are present")
+ Expect(dataBatch.Nodes).To(ConsistOf(expectedNodePoints...))
+
+ By("ensuring that all pods are present")
+ Expect(dataBatch.Pods).To(ConsistOf(expectedPodPoints...))
+ })
+ })
+
+ Context("when some sources take too long", func() {
+ It("should pass the scrape timeout to the source context, so that sources can time out", func() {
+ By("setting up one source to take 4 seconds, and another to take 2")
+ metricsSourceProvider := fakesrc.StaticSourceProvider{
+ sleepySource(4*time.Second, "node1", nodeDataPoint),
+ sleepySource(2*time.Second, "node2", nodeDataPoint),
+ }
+
+ By("running the source manager with a scrape timeout of 3 seconds")
+ start := time.Now()
+ manager := NewSourceManager(metricsSourceProvider, 3*time.Second)
+ dataBatch, errs := manager.Collect(context.Background())
+
+ By("ensuring that scraping took around 3 seconds")
+ Expect(time.Now().Sub(start)).To(BeNumerically("~", 3*time.Second, 1*time.Millisecond))
+
+ By("ensuring that an error and partial results (data from source 2) were returned")
+ Expect(errs).To(HaveOccurred())
+ Expect(dataBatch.Nodes).To(ConsistOf(
+ NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint},
+ ))
+ })
+
+ It("should respect the parent context's general timeout, even with a longer scrape timeout", func() {
+ By("setting up some sources with 4 second delays")
+ metricsSourceProvider := fakesrc.StaticSourceProvider{
+ sleepySource(4*time.Second, "node1", nodeDataPoint),
+ sleepySource(4*time.Second, "node2", nodeDataPoint),
+ }
+
+ By("running the source manager with a scrape timeout of 5 seconds, but a context timeout of 1 second")
+ start := time.Now()
+ manager := NewSourceManager(metricsSourceProvider, 5*time.Second)
+ timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 1*time.Second)
+ dataBatch, errs := manager.Collect(timeoutCtx)
+ doneWithWork()
+
+ By("ensuring that it times out after 1 second with errors and no data")
+ Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond))
+ Expect(errs).To(HaveOccurred())
+ Expect(dataBatch.Nodes).To(BeEmpty())
+ })
+ })
+})
diff --git a/metrics-server/pkg/sources/summary/addrs.go b/metrics-server/pkg/sources/summary/addrs.go
new file mode 100644
index 0000000..49e7fc1
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/addrs.go
@@ -0,0 +1,76 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+ "fmt"
+
+ corev1 "k8s.io/api/core/v1"
+)
+
+var (
+ // DefaultAddressTypePriority is the default node address type
+ // priority list, as taken from the Kubernetes API server options.
+ // In general, we prefer overrides to others, internal to external,
+ // and DNS to IPs.
+ DefaultAddressTypePriority = []corev1.NodeAddressType{
+ // --override-hostname
+ corev1.NodeHostName,
+
+ // internal, preferring DNS if reported
+ corev1.NodeInternalDNS,
+ corev1.NodeInternalIP,
+
+ // external, preferring DNS if reported
+ corev1.NodeExternalDNS,
+ corev1.NodeExternalIP,
+ }
+)
+
+// NodeAddressResolver knows how to find the preferred connection
+// address for a given node.
+type NodeAddressResolver interface {
+ // NodeAddress finds the preferred address to use to connect to
+ // the given node.
+ NodeAddress(node *corev1.Node) (address string, err error)
+}
+
+// prioNodeAddrResolver finds node addresses according to a list of
+// priorities of types of addresses.
+type prioNodeAddrResolver struct {
+ addrTypePriority []corev1.NodeAddressType
+}
+
+func (r *prioNodeAddrResolver) NodeAddress(node *corev1.Node) (string, error) {
+ // adapted from k8s.io/kubernetes/pkg/util/node
+ for _, addrType := range r.addrTypePriority {
+ for _, addr := range node.Status.Addresses {
+ if addr.Type == addrType {
+ return addr.Address, nil
+ }
+ }
+ }
+
+ return "", fmt.Errorf("node %s had no addresses that matched types %v", node.Name, r.addrTypePriority)
+}
+
+// NewPriorityNodeAddressResolver creates a new NodeAddressResolver that resolves
+// addresses first based on a list of prioritized address types, then based on
+// address order (first to last) within a particular address type.
+func NewPriorityNodeAddressResolver(typePriority []corev1.NodeAddressType) NodeAddressResolver {
+ return &prioNodeAddrResolver{
+ addrTypePriority: typePriority,
+ }
+}
diff --git a/metrics-server/pkg/sources/summary/client.go b/metrics-server/pkg/sources/summary/client.go
new file mode 100644
index 0000000..21e3aed
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/client.go
@@ -0,0 +1,119 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strconv"
+
+ "github.com/golang/glog"
+ stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
+)
+
+// KubeletInterface knows how to fetch metrics from the Kubelet
+type KubeletInterface interface {
+ // GetSummary fetches summary metrics from the given Kubelet
+ GetSummary(ctx context.Context, host string) (*stats.Summary, error)
+}
+
+type kubeletClient struct {
+ port int
+ deprecatedNoTLS bool
+ client *http.Client
+}
+
+type ErrNotFound struct {
+ endpoint string
+}
+
+func (err *ErrNotFound) Error() string {
+ return fmt.Sprintf("%q not found", err.endpoint)
+}
+
+func IsNotFoundError(err error) bool {
+ _, isNotFound := err.(*ErrNotFound)
+ return isNotFound
+}
+
+func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.Request, value interface{}) error {
+ // TODO(directxman12): support validating certs by hostname
+ response, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer response.Body.Close()
+ body, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response body - %v", err)
+ }
+ if response.StatusCode == http.StatusNotFound {
+ return &ErrNotFound{req.URL.String()}
+ } else if response.StatusCode != http.StatusOK {
+ return fmt.Errorf("request failed - %q, response: %q", response.Status, string(body))
+ }
+
+ kubeletAddr := "[unknown]"
+ if req.URL != nil {
+ kubeletAddr = req.URL.Host
+ }
+ glog.V(10).Infof("Raw response from Kubelet at %s: %s", kubeletAddr, string(body))
+
+ err = json.Unmarshal(body, value)
+ if err != nil {
+ return fmt.Errorf("failed to parse output. Response: %q. Error: %v", string(body), err)
+ }
+ return nil
+}
+
+func (kc *kubeletClient) GetSummary(ctx context.Context, host string) (*stats.Summary, error) {
+ scheme := "https"
+ if kc.deprecatedNoTLS {
+ scheme = "http"
+ }
+ url := url.URL{
+ Scheme: scheme,
+ Host: net.JoinHostPort(host, strconv.Itoa(kc.port)),
+ Path: "/stats/summary/",
+ }
+
+ req, err := http.NewRequest("GET", url.String(), nil)
+ if err != nil {
+ return nil, err
+ }
+ summary := &stats.Summary{}
+ client := kc.client
+ if client == nil {
+ client = http.DefaultClient
+ }
+ err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary)
+ return summary, err
+}
+
+func NewKubeletClient(transport http.RoundTripper, port int, deprecatedNoTLS bool) (KubeletInterface, error) {
+ c := &http.Client{
+ Transport: transport,
+ }
+ return &kubeletClient{
+ port: port,
+ client: c,
+ deprecatedNoTLS: deprecatedNoTLS,
+ }, nil
+}
diff --git a/metrics-server/pkg/sources/summary/configs.go b/metrics-server/pkg/sources/summary/configs.go
new file mode 100644
index 0000000..e5da658
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/configs.go
@@ -0,0 +1,58 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+ "fmt"
+
+ "k8s.io/client-go/rest"
+)
+
+// GetKubeletConfig fetches connection config for connecting to the Kubelet.
+func GetKubeletConfig(baseKubeConfig *rest.Config, port int, insecureTLS bool, completelyInsecure bool) *KubeletClientConfig {
+ cfg := rest.CopyConfig(baseKubeConfig)
+ if completelyInsecure {
+ cfg = rest.AnonymousClientConfig(cfg) // don't use auth to avoid leaking auth details to insecure endpoints
+ cfg.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS
+ } else if insecureTLS {
+ cfg.TLSClientConfig.Insecure = true
+ cfg.TLSClientConfig.CAData = nil
+ cfg.TLSClientConfig.CAFile = ""
+ }
+ kubeletConfig := &KubeletClientConfig{
+ Port: port,
+ RESTConfig: cfg,
+ DeprecatedCompletelyInsecure: completelyInsecure,
+ }
+
+ return kubeletConfig
+}
+
+// KubeletClientConfig represents configuration for connecting to Kubelets.
+type KubeletClientConfig struct {
+ Port int
+ RESTConfig *rest.Config
+ DeprecatedCompletelyInsecure bool
+}
+
+// KubeletClientFor constructs a new KubeletInterface for the given configuration.
+func KubeletClientFor(config *KubeletClientConfig) (KubeletInterface, error) {
+ transport, err := rest.TransportFor(config.RESTConfig)
+ if err != nil {
+ return nil, fmt.Errorf("unable to construct transport: %v", err)
+ }
+
+ return NewKubeletClient(transport, config.Port, config.DeprecatedCompletelyInsecure)
+}
diff --git a/metrics-server/pkg/sources/summary/summary.go b/metrics-server/pkg/sources/summary/summary.go
new file mode 100644
index 0000000..667d74b
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/summary.go
@@ -0,0 +1,295 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "time"
+
+ "github.com/golang/glog"
+ "github.com/kubernetes-incubator/metrics-server/pkg/sources"
+ "github.com/prometheus/client_golang/prometheus"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
+ "k8s.io/apimachinery/pkg/labels"
+ utilerrors "k8s.io/apimachinery/pkg/util/errors"
+ v1listers "k8s.io/client-go/listers/core/v1"
+ stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
+)
+
+var (
+ summaryRequestLatency = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "metrics_server",
+ Subsystem: "kubelet_summary",
+ Name: "request_duration_seconds",
+ Help: "The Kubelet summary request latencies in seconds.",
+ // TODO(directxman12): it would be nice to calculate these buckets off of scrape duration,
+ // like we do elsewhere, but we're not passed the scrape duration at this level.
+ Buckets: prometheus.DefBuckets,
+ },
+ []string{"node"},
+ )
+ scrapeTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "metrics_server",
+ Subsystem: "kubelet_summary",
+ Name: "scrapes_total",
+ Help: "Total number of attempted Summary API scrapes done by Metrics Server",
+ },
+ []string{"success"},
+ )
+)
+
+func init() {
+ prometheus.MustRegister(summaryRequestLatency)
+ prometheus.MustRegister(scrapeTotal)
+}
+
+// NodeInfo contains the information needed to identify and connect to a particular node
+// (node name and preferred address).
+type NodeInfo struct {
+ Name string
+ ConnectAddress string
+}
+
+// Kubelet-provided metrics for pod and system container.
+type summaryMetricsSource struct {
+ node NodeInfo
+ kubeletClient KubeletInterface
+}
+
+func NewSummaryMetricsSource(node NodeInfo, client KubeletInterface) sources.MetricSource {
+ return &summaryMetricsSource{
+ node: node,
+ kubeletClient: client,
+ }
+}
+
+func (src *summaryMetricsSource) Name() string {
+ return src.String()
+}
+
+func (src *summaryMetricsSource) String() string {
+ return fmt.Sprintf("kubelet_summary:%s", src.node.Name)
+}
+
+func (src *summaryMetricsSource) Collect(ctx context.Context) (*sources.MetricsBatch, error) {
+ summary, err := func() (*stats.Summary, error) {
+ startTime := time.Now()
+ defer summaryRequestLatency.WithLabelValues(src.node.Name).Observe(float64(time.Since(startTime)) / float64(time.Second))
+ return src.kubeletClient.GetSummary(ctx, src.node.ConnectAddress)
+ }()
+
+ if err != nil {
+ scrapeTotal.WithLabelValues("false").Inc()
+ return nil, fmt.Errorf("unable to fetch metrics from Kubelet %s (%s): %v", src.node.Name, src.node.ConnectAddress, err)
+ }
+
+ scrapeTotal.WithLabelValues("true").Inc()
+
+ res := &sources.MetricsBatch{
+ Nodes: make([]sources.NodeMetricsPoint, 1),
+ Pods: make([]sources.PodMetricsPoint, len(summary.Pods)),
+ }
+
+ var errs []error
+ errs = append(errs, src.decodeNodeStats(&summary.Node, &res.Nodes[0])...)
+ if len(errs) != 0 {
+ // if we had errors providing node metrics, discard the data point
+ // so that we don't incorrectly report metric values as zero.
+ res.Nodes = res.Nodes[:1]
+ }
+
+ num := 0
+ for _, pod := range summary.Pods {
+ podErrs := src.decodePodStats(&pod, &res.Pods[num])
+ errs = append(errs, podErrs...)
+ if len(podErrs) != 0 {
+ // NB: we explicitly want to discard pods with partial results, since
+ // the horizontal pod autoscaler takes special action when a pod is missing
+ // metrics (and zero CPU or memory does not count as "missing metrics")
+
+ // we don't care if we reuse slots in the result array,
+ // because they get completely overwritten in decodePodStats
+ continue
+ }
+ num++
+ }
+ res.Pods = res.Pods[:num]
+
+ return res, utilerrors.NewAggregate(errs)
+}
+
+func (src *summaryMetricsSource) decodeNodeStats(nodeStats *stats.NodeStats, target *sources.NodeMetricsPoint) []error {
+ timestamp, err := getScrapeTime(nodeStats.CPU, nodeStats.Memory)
+ if err != nil {
+ // if we can't get a timestamp, assume bad data in general
+ return []error{fmt.Errorf("unable to get valid timestamp for metric point for node %q, discarding data: %v", src.node.ConnectAddress, err)}
+ }
+ *target = sources.NodeMetricsPoint{
+ Name: src.node.Name,
+ MetricsPoint: sources.MetricsPoint{
+ Timestamp: timestamp,
+ },
+ }
+ var errs []error
+ if err := decodeCPU(&target.CpuUsage, nodeStats.CPU); err != nil {
+ errs = append(errs, fmt.Errorf("unable to get CPU for node %q, discarding data: %v", src.node.ConnectAddress, err))
+ }
+ if err := decodeMemory(&target.MemoryUsage, nodeStats.Memory); err != nil {
+ errs = append(errs, fmt.Errorf("unable to get memory for node %q, discarding data: %v", src.node.ConnectAddress, err))
+ }
+ return errs
+}
+
+func (src *summaryMetricsSource) decodePodStats(podStats *stats.PodStats, target *sources.PodMetricsPoint) []error {
+ // completely overwrite data in the target
+ *target = sources.PodMetricsPoint{
+ Name: podStats.PodRef.Name,
+ Namespace: podStats.PodRef.Namespace,
+ Containers: make([]sources.ContainerMetricsPoint, len(podStats.Containers)),
+ }
+
+ var errs []error
+ for i, container := range podStats.Containers {
+ timestamp, err := getScrapeTime(container.CPU, container.Memory)
+ if err != nil {
+ // if we can't get a timestamp, assume bad data in general
+ errs = append(errs, fmt.Errorf("unable to get a valid timestamp for metric point for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+ continue
+ }
+ point := sources.ContainerMetricsPoint{
+ Name: container.Name,
+ MetricsPoint: sources.MetricsPoint{
+ Timestamp: timestamp,
+ },
+ }
+ if err := decodeCPU(&point.CpuUsage, container.CPU); err != nil {
+ errs = append(errs, fmt.Errorf("unable to get CPU for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+ }
+ if err := decodeMemory(&point.MemoryUsage, container.Memory); err != nil {
+ errs = append(errs, fmt.Errorf("unable to get memory for container %q in pod %s/%s on node %q: %v, discarding data", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
+ }
+
+ target.Containers[i] = point
+ }
+
+ return errs
+}
+
+func decodeCPU(target *resource.Quantity, cpuStats *stats.CPUStats) error {
+ if cpuStats == nil || cpuStats.UsageNanoCores == nil {
+ return fmt.Errorf("missing cpu usage metric")
+ }
+
+ *target = *uint64Quantity(*cpuStats.UsageNanoCores, -9)
+ return nil
+}
+
+func decodeMemory(target *resource.Quantity, memStats *stats.MemoryStats) error {
+ if memStats == nil || memStats.WorkingSetBytes == nil {
+ return fmt.Errorf("missing memory usage metric")
+ }
+
+ *target = *uint64Quantity(*memStats.WorkingSetBytes, 0)
+ target.Format = resource.BinarySI
+
+ return nil
+}
+
+func getScrapeTime(cpu *stats.CPUStats, memory *stats.MemoryStats) (time.Time, error) {
+ // Ensure we get the earlier timestamp so that we can tell if a given data
+ // point was tainted by pod initialization.
+
+ var earliest *time.Time
+ if cpu != nil && !cpu.Time.IsZero() && (earliest == nil || earliest.After(cpu.Time.Time)) {
+ earliest = &cpu.Time.Time
+ }
+
+ if memory != nil && !memory.Time.IsZero() && (earliest == nil || earliest.After(memory.Time.Time)) {
+ earliest = &memory.Time.Time
+ }
+
+ if earliest == nil {
+ return time.Time{}, fmt.Errorf("no non-zero timestamp on either CPU or memory")
+ }
+
+ return *earliest, nil
+}
+
+// uint64Quantity converts a uint64 into a Quantity, which only has constructors
+// that work with int64 (except for parse, which requires costly round-trips to string).
+// We lose precision until we fit in an int64 if greater than the max int64 value.
+func uint64Quantity(val uint64, scale resource.Scale) *resource.Quantity {
+ // easy path -- we can safely fit val into an int64
+ if val <= math.MaxInt64 {
+ return resource.NewScaledQuantity(int64(val), scale)
+ }
+
+ glog.V(1).Infof("unexpectedly large resource value %v, loosing precision to fit in scaled resource.Quantity", val)
+
+ // otherwise, lose an decimal order-of-magnitude precision,
+ // so we can fit into a scaled quantity
+ return resource.NewScaledQuantity(int64(val/10), resource.Scale(1)+scale)
+}
+
+type summaryProvider struct {
+ nodeLister v1listers.NodeLister
+ kubeletClient KubeletInterface
+ addrResolver NodeAddressResolver
+}
+
+func (p *summaryProvider) GetMetricSources() ([]sources.MetricSource, error) {
+ sources := []sources.MetricSource{}
+ nodes, err := p.nodeLister.List(labels.Everything())
+ if err != nil {
+ return nil, fmt.Errorf("unable to list nodes: %v", err)
+ }
+
+ var errs []error
+ for _, node := range nodes {
+ info, err := p.getNodeInfo(node)
+ if err != nil {
+ errs = append(errs, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err))
+ continue
+ }
+ sources = append(sources, NewSummaryMetricsSource(info, p.kubeletClient))
+ }
+ return sources, utilerrors.NewAggregate(errs)
+}
+
+func (p *summaryProvider) getNodeInfo(node *corev1.Node) (NodeInfo, error) {
+ addr, err := p.addrResolver.NodeAddress(node)
+ if err != nil {
+ return NodeInfo{}, err
+ }
+ info := NodeInfo{
+ Name: node.Name,
+ ConnectAddress: addr,
+ }
+
+ return info, nil
+}
+
+func NewSummaryProvider(nodeLister v1listers.NodeLister, kubeletClient KubeletInterface, addrResolver NodeAddressResolver) sources.MetricSourceProvider {
+ return &summaryProvider{
+ nodeLister: nodeLister,
+ kubeletClient: kubeletClient,
+ addrResolver: addrResolver,
+ }
+}
diff --git a/metrics-server/pkg/sources/summary/summary_test.go b/metrics-server/pkg/sources/summary/summary_test.go
new file mode 100644
index 0000000..69672e0
--- /dev/null
+++ b/metrics-server/pkg/sources/summary/summary_test.go
@@ -0,0 +1,492 @@
+// Copyright 2018 The Kubernetes Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package summary_test
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ corelisters "k8s.io/client-go/listers/core/v1"
+ stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
+
+ "github.com/kubernetes-incubator/metrics-server/pkg/sources"
+ . "github.com/kubernetes-incubator/metrics-server/pkg/sources/summary"
+)
+
+func TestSummarySource(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Summary Source Test Suite")
+}
+
+type fakeKubeletClient struct {
+ delay time.Duration
+ metrics *stats.Summary
+
+ lastHost string
+}
+
+func (c *fakeKubeletClient) GetSummary(ctx context.Context, host string) (*stats.Summary, error) {
+ select {
+ case <-ctx.Done():
+ return nil, fmt.Errorf("timed out")
+ case <-time.After(c.delay):
+ }
+
+ c.lastHost = host
+
+ return c.metrics, nil
+}
+
+func cpuStats(usageNanocores uint64, ts time.Time) *stats.CPUStats {
+ return &stats.CPUStats{
+ Time: metav1.Time{ts},
+ UsageNanoCores: &usageNanocores,
+ }
+}
+
+func memStats(workingSetBytes uint64, ts time.Time) *stats.MemoryStats {
+ return &stats.MemoryStats{
+ Time: metav1.Time{ts},
+ WorkingSetBytes: &workingSetBytes,
+ }
+}
+
+func podStats(namespace, name string, containers ...stats.ContainerStats) stats.PodStats {
+ return stats.PodStats{
+ PodRef: stats.PodReference{
+ Name: name,
+ Namespace: namespace,
+ },
+ Containers: containers,
+ }
+}
+
+func containerStats(name string, cpu, mem uint64, baseTime time.Time) stats.ContainerStats {
+ return stats.ContainerStats{
+ Name: name,
+ CPU: cpuStats(cpu, baseTime.Add(2*time.Millisecond)),
+ Memory: memStats(mem, baseTime.Add(4*time.Millisecond)),
+ }
+}
+
+func verifyNode(nodeName string, summary *stats.Summary, batch *sources.MetricsBatch) {
+ var cpuUsage, memoryUsage resource.Quantity
+ var timestamp time.Time
+ if summary.Node.CPU != nil {
+ if summary.Node.CPU.UsageNanoCores != nil {
+ cpuUsage = *resource.NewScaledQuantity(int64(*summary.Node.CPU.UsageNanoCores), -9)
+ }
+ timestamp = summary.Node.CPU.Time.Time
+ }
+ if summary.Node.Memory != nil {
+ if summary.Node.Memory.WorkingSetBytes != nil {
+ memoryUsage = *resource.NewQuantity(int64(*summary.Node.Memory.WorkingSetBytes), resource.BinarySI)
+ }
+ if timestamp.IsZero() {
+ timestamp = summary.Node.Memory.Time.Time
+ }
+ }
+
+ Expect(batch.Nodes).To(ConsistOf(
+ sources.NodeMetricsPoint{
+ Name: nodeName,
+ MetricsPoint: sources.MetricsPoint{
+ Timestamp: timestamp,
+ CpuUsage: cpuUsage,
+ MemoryUsage: memoryUsage,
+ },
+ },
+ ))
+}
+
+func verifyPods(summary *stats.Summary, batch *sources.MetricsBatch) {
+ var expectedPods []interface{}
+ for _, pod := range summary.Pods {
+ containers := make([]sources.ContainerMetricsPoint, len(pod.Containers))
+ missingData := false
+ for i, container := range pod.Containers {
+ var cpuUsage, memoryUsage resource.Quantity
+ var timestamp time.Time
+ if container.CPU == nil || container.CPU.UsageNanoCores == nil {
+ missingData = true
+ break
+ }
+ cpuUsage = *resource.NewScaledQuantity(int64(*container.CPU.UsageNanoCores), -9)
+ timestamp = container.CPU.Time.Time
+ if container.Memory == nil || container.Memory.WorkingSetBytes == nil {
+ missingData = true
+ break
+ }
+ memoryUsage = *resource.NewQuantity(int64(*container.Memory.WorkingSetBytes), resource.BinarySI)
+ if timestamp.IsZero() {
+ timestamp = container.Memory.Time.Time
+ }
+
+ containers[i] = sources.ContainerMetricsPoint{
+ Name: container.Name,
+ MetricsPoint: sources.MetricsPoint{
+ Timestamp: timestamp,
+ CpuUsage: cpuUsage,
+ MemoryUsage: memoryUsage,
+ },
+ }
+ }
+ if missingData {
+ continue
+ }
+ expectedPods = append(expectedPods, sources.PodMetricsPoint{
+ Name: pod.PodRef.Name,
+ Namespace: pod.PodRef.Namespace,
+ Containers: containers,
+ })
+ }
+ Expect(batch.Pods).To(ConsistOf(expectedPods...))
+}
+
+var _ = Describe("Summary Source", func() {
+ var (
+ src sources.MetricSource
+ client *fakeKubeletClient
+ scrapeTime time.Time = time.Now()
+ nodeInfo NodeInfo = NodeInfo{
+ ConnectAddress: "10.0.1.2",
+ Name: "node1",
+ }
+ )
+ BeforeEach(func() {
+ client = &fakeKubeletClient{
+ metrics: &stats.Summary{
+ Node: stats.NodeStats{
+ CPU: cpuStats(100, scrapeTime.Add(100*time.Millisecond)),
+ Memory: memStats(200, scrapeTime.Add(200*time.Millisecond)),
+ },
+ Pods: []stats.PodStats{
+ podStats("ns1", "pod1",
+ containerStats("container1", 300, 400, scrapeTime.Add(10*time.Millisecond)),
+ containerStats("container2", 500, 600, scrapeTime.Add(20*time.Millisecond))),
+ podStats("ns1", "pod2",
+ containerStats("container1", 700, 800, scrapeTime.Add(30*time.Millisecond))),
+ podStats("ns2", "pod1",
+ containerStats("container1", 900, 1000, scrapeTime.Add(40*time.Millisecond))),
+ podStats("ns3", "pod1",
+ containerStats("container1", 1100, 1200, scrapeTime.Add(50*time.Millisecond))),
+ },
+ },
+ }
+ src = NewSummaryMetricsSource(nodeInfo, client)
+ })
+
+ It("should pass the provided context to the kubelet client to time out requests", func() {
+ By("setting up a context with a 1 second timeout")
+ ctx, workDone := context.WithTimeout(context.Background(), 1*time.Second)
+
+ By("collecting the batch with a 4 second delay")
+ start := time.Now()
+ client.delay = 4 * time.Second
+ _, err := src.Collect(ctx)
+ workDone()
+
+ By("ensuring it timed out with an error after 1 second")
+ Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond))
+ Expect(err).To(HaveOccurred())
+ })
+
+ It("should fetch by connection address", func() {
+ By("collecting the batch")
+ _, err := src.Collect(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+
+ By("verifying that it submitted the right host to the client")
+ Expect(client.lastHost).To(Equal(nodeInfo.ConnectAddress))
+ })
+
+ It("should return the working set and cpu usage for the node, and all pods on the node", func() {
+ By("collecting the batch")
+ batch, err := src.Collect(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+
+ By("verifying that the batch contains the right node data")
+ verifyNode(nodeInfo.Name, client.metrics, batch)
+
+ By("verifying that the batch contains the right pod data")
+ verifyPods(client.metrics, batch)
+ })
+
+ It("should use the scrape time from the CPU, falling back to memory if missing", func() {
+ By("removing some times from the data")
+ client.metrics.Pods[0].Containers[0].CPU.Time = metav1.Time{}
+ client.metrics.Node.CPU.Time = metav1.Time{}
+
+ By("collecting the batch")
+ batch, err := src.Collect(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+
+ By("verifying that the scrape time is as expected")
+ Expect(batch.Nodes[0].Timestamp).To(Equal(client.metrics.Node.Memory.Time.Time))
+ Expect(batch.Pods[0].Containers[0].Timestamp).To(Equal(client.metrics.Pods[0].Containers[0].Memory.Time.Time))
+ Expect(batch.Pods[1].Containers[0].Timestamp).To(Equal(client.metrics.Pods[1].Containers[0].CPU.Time.Time))
+ })
+
+ It("should continue on missing CPU or memory metrics", func() {
+ By("removing some data from the raw summary")
+ client.metrics.Node.Memory = nil
+ client.metrics.Pods[0].Containers[1].CPU = nil
+ client.metrics.Pods[1].Containers[0].CPU.UsageNanoCores = nil
+ client.metrics.Pods[2].Containers[0].Memory = nil
+ client.metrics.Pods[3].Containers[0].Memory.WorkingSetBytes = nil
+
+ By("collecting the batch")
+ batch, err := src.Collect(context.Background())
+ Expect(err).To(HaveOccurred())
+
+ By("verifying that the batch has all the data, save for what was missing")
+ verifyNode(nodeInfo.Name, client.metrics, batch)
+ verifyPods(client.metrics, batch)
+ })
+
+ It("should handle larger-than-int64 CPU or memory values gracefully", func() {
+ By("setting some data in the summary to be above math.MaxInt64")
+ plusTen := uint64(math.MaxInt64 + 10)
+ plusTwenty := uint64(math.MaxInt64 + 20)
+ minusTen := uint64(math.MaxUint64 - 10)
+ minusOneHundred := uint64(math.MaxUint64 - 100)
+
+ client.metrics.Node.Memory.WorkingSetBytes = &plusTen // RAM is cheap, right?
+ client.metrics.Node.CPU.UsageNanoCores = &plusTwenty // a mainframe, probably
+ client.metrics.Pods[0].Containers[1].CPU.UsageNanoCores = &minusTen
+ client.metrics.Pods[1].Containers[0].Memory.WorkingSetBytes = &minusOneHundred
+
+ By("collecting the batch")
+ batch, err := src.Collect(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+
+ By("verifying that the data is still present, at lower precision")
+ nodeMem := *resource.NewScaledQuantity(int64(plusTen/10), 1)
+ nodeMem.Format = resource.BinarySI
+ podMem := *resource.NewScaledQuantity(int64(minusOneHundred/10), 1)
+ podMem.Format = resource.BinarySI
+ Expect(batch.Nodes[0].MemoryUsage).To(Equal(nodeMem))
+ Expect(batch.Nodes[0].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(plusTwenty/10), -8)))
+ Expect(batch.Pods[0].Containers[1].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(minusTen/10), -8)))
+ Expect(batch.Pods[1].Containers[0].MemoryUsage).To(Equal(podMem))
+ })
+})
+
+type fakeNodeLister struct {
+ nodes []*corev1.Node
+ listErr error
+}
+
+func (l *fakeNodeLister) List(_ labels.Selector) (ret []*corev1.Node, err error) {
+ if l.listErr != nil {
+ return nil, l.listErr
+ }
+ // NB: this is ignores selector for the moment
+ return l.nodes, nil
+}
+
+func (l *fakeNodeLister) ListWithPredicate(_ corelisters.NodeConditionPredicate) ([]*corev1.Node, error) {
+ // NB: this is ignores predicate for the moment
+ return l.List(labels.Everything())
+}
+
+func (l *fakeNodeLister) Get(name string) (*corev1.Node, error) {
+ for _, node := range l.nodes {
+ if node.Name == name {
+ return node, nil
+ }
+ }
+ return nil, fmt.Errorf("no such node %q", name)
+}
+
+func nodeNames(nodes []*corev1.Node, addrs []string) []string {
+ var res []string
+ for i, node := range nodes {
+ res = append(res, NewSummaryMetricsSource(NodeInfo{ConnectAddress: addrs[i], Name: node.Name}, nil).Name())
+ }
+ return res
+}
+
+func makeNode(name, hostName, addr string, ready bool) *corev1.Node {
+ res := &corev1.Node{
+ ObjectMeta: metav1.ObjectMeta{Name: "node1"},
+ Status: corev1.NodeStatus{
+ Addresses: []corev1.NodeAddress{
+ {Type: corev1.NodeHostName, Address: hostName},
+ {Type: corev1.NodeInternalIP, Address: addr},
+ },
+ Conditions: []corev1.NodeCondition{
+ {Type: corev1.NodeReady},
+ },
+ },
+ }
+ if ready {
+ res.Status.Conditions[0].Status = corev1.ConditionTrue
+ } else {
+ res.Status.Conditions[0].Status = corev1.ConditionFalse
+ }
+ return res
+}
+
+var _ = Describe("Summary Source Provider", func() {
+ var (
+ nodeLister *fakeNodeLister
+ nodeAddrs []string
+ provider sources.MetricSourceProvider
+ fakeClient *fakeKubeletClient
+ )
+ BeforeEach(func() {
+ nodeLister = &fakeNodeLister{
+ nodes: []*corev1.Node{
+ makeNode("node1", "node1.somedomain", "10.0.1.2", true),
+ makeNode("node-no-host", "", "10.0.1.3", true),
+ makeNode("node3", "node3.somedomain", "10.0.1.4", false),
+ makeNode("node4", "node4.somedomain", "10.0.1.5", true),
+ },
+ }
+ nodeAddrs = []string{
+ "10.0.1.2",
+ "10.0.1.3",
+ "10.0.1.4",
+ "10.0.1.5",
+ }
+ fakeClient = &fakeKubeletClient{}
+ addrResolver := NewPriorityNodeAddressResolver(DefaultAddressTypePriority)
+ provider = NewSummaryProvider(nodeLister, fakeClient, addrResolver)
+ })
+
+ It("should return a metrics source for all nodes", func() {
+ By("listing the sources")
+ sources, err := provider.GetMetricSources()
+ Expect(err).To(Succeed())
+
+ By("verifying that a source is present for each node")
+ nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
+ sourceNames := make([]string, len(nodeNames))
+ for i, src := range sources {
+ sourceNames[i] = src.Name()
+ }
+ Expect(sourceNames).To(Equal(nodeNames))
+ })
+
+ It("should continue on error fetching node information for a particular node", func() {
+ By("deleting the IP of a node")
+ nodeLister.nodes[0].Status.Addresses = nil
+
+ By("listing the sources")
+ sources, err := provider.GetMetricSources()
+ Expect(err).To(HaveOccurred())
+
+ By("verifying that a source is present for each node")
+ nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
+ sourceNames := make([]string, len(nodeNames[1:]))
+ for i, src := range sources {
+ sourceNames[i] = src.Name()
+ }
+ // skip the bad node (the first one)
+ Expect(sourceNames).To(Equal(nodeNames[1:]))
+ })
+
+ It("should gracefully handle list errors", func() {
+ By("setting a fake error from the lister")
+ nodeLister.listErr = fmt.Errorf("something went wrong, expectedly")
+
+ By("listing the sources")
+ _, err := provider.GetMetricSources()
+ Expect(err).To(HaveOccurred())
+ })
+
+ Describe("when choosing node addresses", func() {
+ JustBeforeEach(func() {
+ // set up the metrics so we can call collect safely
+ fakeClient.metrics = &stats.Summary{
+ Node: stats.NodeStats{
+ CPU: cpuStats(100, time.Now()),
+ Memory: memStats(200, time.Now()),
+ },
+ }
+ })
+
+ It("should prefer addresses according to the order of the types first", func() {
+ By("setting the first node to have multiple addresses and setting all nodes to ready")
+ nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
+ {Type: DefaultAddressTypePriority[3], Address: "skip-val1"},
+ {Type: DefaultAddressTypePriority[2], Address: "skip-val2"},
+ {Type: DefaultAddressTypePriority[1], Address: "correct-val"},
+ }
+ for _, node := range nodeLister.nodes {
+ node.Status.Conditions = []corev1.NodeCondition{
+ {Type: corev1.NodeReady, Status: corev1.ConditionTrue},
+ }
+ }
+
+ By("listing all sources")
+ srcs, err := provider.GetMetricSources()
+ Expect(err).NotTo(HaveOccurred())
+
+ By("making sure that the first source scrapes from the correct location")
+ _, err = srcs[0].Collect(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+ Expect(fakeClient.lastHost).To(Equal("correct-val"))
+ })
+
+ It("should prefer the first address that matches within a given type", func() {
+ By("setting the first node to have multiple addresses and setting all nodes to ready")
+ nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
+ {Type: DefaultAddressTypePriority[1], Address: "skip-val1"},
+ {Type: DefaultAddressTypePriority[0], Address: "correct-val"},
+ {Type: DefaultAddressTypePriority[1], Address: "skip-val2"},
+ {Type: DefaultAddressTypePriority[0], Address: "second-val"},
+ }
+ for _, node := range nodeLister.nodes {
+ node.Status.Conditions = []corev1.NodeCondition{
+ {Type: corev1.NodeReady, Status: corev1.ConditionTrue},
+ }
+ }
+
+ By("listing all sources")
+ srcs, err := provider.GetMetricSources()
+ Expect(err).NotTo(HaveOccurred())
+
+ By("making sure that the first source scrapes from the correct location")
+ _, err = srcs[0].Collect(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+ Expect(fakeClient.lastHost).To(Equal("correct-val"))
+ })
+
+ It("should return an error if no preferred addresses are found", func() {
+ By("wiping out the addresses of one of the nodes and setting all nodes to ready")
+ nodeLister.nodes[0].Status.Addresses = nil
+ for _, node := range nodeLister.nodes {
+ node.Status.Conditions = []corev1.NodeCondition{
+ {Type: corev1.NodeReady, Status: corev1.ConditionTrue},
+ }
+ }
+
+ By("asking for source providers for all nodes")
+ _, err := provider.GetMetricSources()
+ Expect(err).To(HaveOccurred())
+ })
+ })
+})