// Copyright 2018 The Kubernetes Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package summary_test
import (
. ""
. ""
corev1 ""
metav1 ""
corelisters ""
stats ""
. ""
func TestSummarySource(t *testing.T) {
RunSpecs(t, "Summary Source Test Suite")
type fakeKubeletClient struct {
delay time.Duration
metrics *stats.Summary
lastHost string
func (c *fakeKubeletClient) GetSummary(ctx context.Context, host string) (*stats.Summary, error) {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timed out")
case <-time.After(c.delay):
c.lastHost = host
return c.metrics, nil
func cpuStats(usageNanocores uint64, ts time.Time) *stats.CPUStats {
return &stats.CPUStats{
Time: metav1.Time{ts},
UsageNanoCores: &usageNanocores,
func memStats(workingSetBytes uint64, ts time.Time) *stats.MemoryStats {
return &stats.MemoryStats{
Time: metav1.Time{ts},
WorkingSetBytes: &workingSetBytes,
func podStats(namespace, name string, containers ...stats.ContainerStats) stats.PodStats {
return stats.PodStats{
PodRef: stats.PodReference{
Name: name,
Namespace: namespace,
Containers: containers,
func containerStats(name string, cpu, mem uint64, baseTime time.Time) stats.ContainerStats {
return stats.ContainerStats{
Name: name,
CPU: cpuStats(cpu, baseTime.Add(2*time.Millisecond)),
Memory: memStats(mem, baseTime.Add(4*time.Millisecond)),
func verifyNode(nodeName string, summary *stats.Summary, batch *sources.MetricsBatch) {
var cpuUsage, memoryUsage resource.Quantity
var timestamp time.Time
if summary.Node.CPU != nil {
if summary.Node.CPU.UsageNanoCores != nil {
cpuUsage = *resource.NewScaledQuantity(int64(*summary.Node.CPU.UsageNanoCores), -9)
timestamp = summary.Node.CPU.Time.Time
if summary.Node.Memory != nil {
if summary.Node.Memory.WorkingSetBytes != nil {
memoryUsage = *resource.NewQuantity(int64(*summary.Node.Memory.WorkingSetBytes), resource.BinarySI)
if timestamp.IsZero() {
timestamp = summary.Node.Memory.Time.Time
Name: nodeName,
MetricsPoint: sources.MetricsPoint{
Timestamp: timestamp,
CpuUsage: cpuUsage,
MemoryUsage: memoryUsage,
func verifyPods(summary *stats.Summary, batch *sources.MetricsBatch) {
var expectedPods []interface{}
for _, pod := range summary.Pods {
containers := make([]sources.ContainerMetricsPoint, len(pod.Containers))
missingData := false
for i, container := range pod.Containers {
var cpuUsage, memoryUsage resource.Quantity
var timestamp time.Time
if container.CPU == nil || container.CPU.UsageNanoCores == nil {
missingData = true
cpuUsage = *resource.NewScaledQuantity(int64(*container.CPU.UsageNanoCores), -9)
timestamp = container.CPU.Time.Time
if container.Memory == nil || container.Memory.WorkingSetBytes == nil {
missingData = true
memoryUsage = *resource.NewQuantity(int64(*container.Memory.WorkingSetBytes), resource.BinarySI)
if timestamp.IsZero() {
timestamp = container.Memory.Time.Time
containers[i] = sources.ContainerMetricsPoint{
Name: container.Name,
MetricsPoint: sources.MetricsPoint{
Timestamp: timestamp,
CpuUsage: cpuUsage,
MemoryUsage: memoryUsage,
if missingData {
expectedPods = append(expectedPods, sources.PodMetricsPoint{
Name: pod.PodRef.Name,
Namespace: pod.PodRef.Namespace,
Containers: containers,
var _ = Describe("Summary Source", func() {
var (
src sources.MetricSource
client *fakeKubeletClient
scrapeTime time.Time = time.Now()
nodeInfo NodeInfo = NodeInfo{
ConnectAddress: "",
Name: "node1",
BeforeEach(func() {
client = &fakeKubeletClient{
metrics: &stats.Summary{
Node: stats.NodeStats{
CPU: cpuStats(100, scrapeTime.Add(100*time.Millisecond)),
Memory: memStats(200, scrapeTime.Add(200*time.Millisecond)),
Pods: []stats.PodStats{
podStats("ns1", "pod1",
containerStats("container1", 300, 400, scrapeTime.Add(10*time.Millisecond)),
containerStats("container2", 500, 600, scrapeTime.Add(20*time.Millisecond))),
podStats("ns1", "pod2",
containerStats("container1", 700, 800, scrapeTime.Add(30*time.Millisecond))),
podStats("ns2", "pod1",
containerStats("container1", 900, 1000, scrapeTime.Add(40*time.Millisecond))),
podStats("ns3", "pod1",
containerStats("container1", 1100, 1200, scrapeTime.Add(50*time.Millisecond))),
src = NewSummaryMetricsSource(nodeInfo, client)
It("should pass the provided context to the kubelet client to time out requests", func() {
By("setting up a context with a 1 second timeout")
ctx, workDone := context.WithTimeout(context.Background(), 1*time.Second)
By("collecting the batch with a 4 second delay")
start := time.Now()
client.delay = 4 * time.Second
_, err := src.Collect(ctx)
By("ensuring it timed out with an error after 1 second")
Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond))
It("should fetch by connection address", func() {
By("collecting the batch")
_, err := src.Collect(context.Background())
By("verifying that it submitted the right host to the client")
It("should return the working set and cpu usage for the node, and all pods on the node", func() {
By("collecting the batch")
batch, err := src.Collect(context.Background())
By("verifying that the batch contains the right node data")
verifyNode(nodeInfo.Name, client.metrics, batch)
By("verifying that the batch contains the right pod data")
verifyPods(client.metrics, batch)
It("should use the scrape time from the CPU, falling back to memory if missing", func() {
By("removing some times from the data")
client.metrics.Pods[0].Containers[0].CPU.Time = metav1.Time{}
client.metrics.Node.CPU.Time = metav1.Time{}
By("collecting the batch")
batch, err := src.Collect(context.Background())
By("verifying that the scrape time is as expected")
It("should continue on missing CPU or memory metrics", func() {
By("removing some data from the raw summary")
client.metrics.Node.Memory = nil
client.metrics.Pods[0].Containers[1].CPU = nil
client.metrics.Pods[1].Containers[0].CPU.UsageNanoCores = nil
client.metrics.Pods[2].Containers[0].Memory = nil
client.metrics.Pods[3].Containers[0].Memory.WorkingSetBytes = nil
By("collecting the batch")
batch, err := src.Collect(context.Background())
By("verifying that the batch has all the data, save for what was missing")
verifyNode(nodeInfo.Name, client.metrics, batch)
verifyPods(client.metrics, batch)
It("should handle larger-than-int64 CPU or memory values gracefully", func() {
By("setting some data in the summary to be above math.MaxInt64")
plusTen := uint64(math.MaxInt64 + 10)
plusTwenty := uint64(math.MaxInt64 + 20)
minusTen := uint64(math.MaxUint64 - 10)
minusOneHundred := uint64(math.MaxUint64 - 100)
client.metrics.Node.Memory.WorkingSetBytes = &plusTen // RAM is cheap, right?
client.metrics.Node.CPU.UsageNanoCores = &plusTwenty // a mainframe, probably
client.metrics.Pods[0].Containers[1].CPU.UsageNanoCores = &minusTen
client.metrics.Pods[1].Containers[0].Memory.WorkingSetBytes = &minusOneHundred
By("collecting the batch")
batch, err := src.Collect(context.Background())
By("verifying that the data is still present, at lower precision")
nodeMem := *resource.NewScaledQuantity(int64(plusTen/10), 1)
nodeMem.Format = resource.BinarySI
podMem := *resource.NewScaledQuantity(int64(minusOneHundred/10), 1)
podMem.Format = resource.BinarySI
Expect(batch.Nodes[0].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(plusTwenty/10), -8)))
Expect(batch.Pods[0].Containers[1].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(minusTen/10), -8)))
type fakeNodeLister struct {
nodes []*corev1.Node
listErr error
func (l *fakeNodeLister) List(_ labels.Selector) (ret []*corev1.Node, err error) {
if l.listErr != nil {
return nil, l.listErr
// NB: this is ignores selector for the moment
return l.nodes, nil
func (l *fakeNodeLister) ListWithPredicate(_ corelisters.NodeConditionPredicate) ([]*corev1.Node, error) {
// NB: this is ignores predicate for the moment
return l.List(labels.Everything())
func (l *fakeNodeLister) Get(name string) (*corev1.Node, error) {
for _, node := range l.nodes {
if node.Name == name {
return node, nil
return nil, fmt.Errorf("no such node %q", name)
func nodeNames(nodes []*corev1.Node, addrs []string) []string {
var res []string
for i, node := range nodes {
res = append(res, NewSummaryMetricsSource(NodeInfo{ConnectAddress: addrs[i], Name: node.Name}, nil).Name())
return res
func makeNode(name, hostName, addr string, ready bool) *corev1.Node {
res := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{Type: corev1.NodeHostName, Address: hostName},
{Type: corev1.NodeInternalIP, Address: addr},
Conditions: []corev1.NodeCondition{
{Type: corev1.NodeReady},
if ready {
res.Status.Conditions[0].Status = corev1.ConditionTrue
} else {
res.Status.Conditions[0].Status = corev1.ConditionFalse
return res
var _ = Describe("Summary Source Provider", func() {
var (
nodeLister *fakeNodeLister
nodeAddrs []string
provider sources.MetricSourceProvider
fakeClient *fakeKubeletClient
BeforeEach(func() {
nodeLister = &fakeNodeLister{
nodes: []*corev1.Node{
makeNode("node1", "node1.somedomain", "", true),
makeNode("node-no-host", "", "", true),
makeNode("node3", "node3.somedomain", "", false),
makeNode("node4", "node4.somedomain", "", true),
nodeAddrs = []string{
fakeClient = &fakeKubeletClient{}
addrResolver := NewPriorityNodeAddressResolver(DefaultAddressTypePriority)
provider = NewSummaryProvider(nodeLister, fakeClient, addrResolver)
It("should return a metrics source for all nodes", func() {
By("listing the sources")
sources, err := provider.GetMetricSources()
By("verifying that a source is present for each node")
nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
sourceNames := make([]string, len(nodeNames))
for i, src := range sources {
sourceNames[i] = src.Name()
It("should continue on error fetching node information for a particular node", func() {
By("deleting the IP of a node")
nodeLister.nodes[0].Status.Addresses = nil
By("listing the sources")
sources, err := provider.GetMetricSources()
By("verifying that a source is present for each node")
nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
sourceNames := make([]string, len(nodeNames[1:]))
for i, src := range sources {
sourceNames[i] = src.Name()
// skip the bad node (the first one)
It("should gracefully handle list errors", func() {
By("setting a fake error from the lister")
nodeLister.listErr = fmt.Errorf("something went wrong, expectedly")
By("listing the sources")
_, err := provider.GetMetricSources()
Describe("when choosing node addresses", func() {
JustBeforeEach(func() {
// set up the metrics so we can call collect safely
fakeClient.metrics = &stats.Summary{
Node: stats.NodeStats{
CPU: cpuStats(100, time.Now()),
Memory: memStats(200, time.Now()),
It("should prefer addresses according to the order of the types first", func() {
By("setting the first node to have multiple addresses and setting all nodes to ready")
nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
{Type: DefaultAddressTypePriority[3], Address: "skip-val1"},
{Type: DefaultAddressTypePriority[2], Address: "skip-val2"},
{Type: DefaultAddressTypePriority[1], Address: "correct-val"},
for _, node := range nodeLister.nodes {
node.Status.Conditions = []corev1.NodeCondition{
{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
By("listing all sources")
srcs, err := provider.GetMetricSources()
By("making sure that the first source scrapes from the correct location")
_, err = srcs[0].Collect(context.Background())
It("should prefer the first address that matches within a given type", func() {
By("setting the first node to have multiple addresses and setting all nodes to ready")
nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
{Type: DefaultAddressTypePriority[1], Address: "skip-val1"},
{Type: DefaultAddressTypePriority[0], Address: "correct-val"},
{Type: DefaultAddressTypePriority[1], Address: "skip-val2"},
{Type: DefaultAddressTypePriority[0], Address: "second-val"},
for _, node := range nodeLister.nodes {
node.Status.Conditions = []corev1.NodeCondition{
{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
By("listing all sources")
srcs, err := provider.GetMetricSources()
By("making sure that the first source scrapes from the correct location")
_, err = srcs[0].Collect(context.Background())
It("should return an error if no preferred addresses are found", func() {
By("wiping out the addresses of one of the nodes and setting all nodes to ready")
nodeLister.nodes[0].Status.Addresses = nil
for _, node := range nodeLister.nodes {
node.Status.Conditions = []corev1.NodeCondition{
{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
By("asking for source providers for all nodes")
_, err := provider.GetMetricSources()