blob: 69672e03bba083b37dc7b83d8bb09ece0a666945 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2018 The Kubernetes Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package summary_test
16
17import (
18 "context"
19 "fmt"
20 "math"
21 "testing"
22 "time"
23
24 . "github.com/onsi/ginkgo"
25 . "github.com/onsi/gomega"
26 corev1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/resource"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 corelisters "k8s.io/client-go/listers/core/v1"
31 stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
32
33 "github.com/kubernetes-incubator/metrics-server/pkg/sources"
34 . "github.com/kubernetes-incubator/metrics-server/pkg/sources/summary"
35)
36
37func TestSummarySource(t *testing.T) {
38 RegisterFailHandler(Fail)
39 RunSpecs(t, "Summary Source Test Suite")
40}
41
42type fakeKubeletClient struct {
43 delay time.Duration
44 metrics *stats.Summary
45
46 lastHost string
47}
48
49func (c *fakeKubeletClient) GetSummary(ctx context.Context, host string) (*stats.Summary, error) {
50 select {
51 case <-ctx.Done():
52 return nil, fmt.Errorf("timed out")
53 case <-time.After(c.delay):
54 }
55
56 c.lastHost = host
57
58 return c.metrics, nil
59}
60
61func cpuStats(usageNanocores uint64, ts time.Time) *stats.CPUStats {
62 return &stats.CPUStats{
63 Time: metav1.Time{ts},
64 UsageNanoCores: &usageNanocores,
65 }
66}
67
68func memStats(workingSetBytes uint64, ts time.Time) *stats.MemoryStats {
69 return &stats.MemoryStats{
70 Time: metav1.Time{ts},
71 WorkingSetBytes: &workingSetBytes,
72 }
73}
74
75func podStats(namespace, name string, containers ...stats.ContainerStats) stats.PodStats {
76 return stats.PodStats{
77 PodRef: stats.PodReference{
78 Name: name,
79 Namespace: namespace,
80 },
81 Containers: containers,
82 }
83}
84
85func containerStats(name string, cpu, mem uint64, baseTime time.Time) stats.ContainerStats {
86 return stats.ContainerStats{
87 Name: name,
88 CPU: cpuStats(cpu, baseTime.Add(2*time.Millisecond)),
89 Memory: memStats(mem, baseTime.Add(4*time.Millisecond)),
90 }
91}
92
93func verifyNode(nodeName string, summary *stats.Summary, batch *sources.MetricsBatch) {
94 var cpuUsage, memoryUsage resource.Quantity
95 var timestamp time.Time
96 if summary.Node.CPU != nil {
97 if summary.Node.CPU.UsageNanoCores != nil {
98 cpuUsage = *resource.NewScaledQuantity(int64(*summary.Node.CPU.UsageNanoCores), -9)
99 }
100 timestamp = summary.Node.CPU.Time.Time
101 }
102 if summary.Node.Memory != nil {
103 if summary.Node.Memory.WorkingSetBytes != nil {
104 memoryUsage = *resource.NewQuantity(int64(*summary.Node.Memory.WorkingSetBytes), resource.BinarySI)
105 }
106 if timestamp.IsZero() {
107 timestamp = summary.Node.Memory.Time.Time
108 }
109 }
110
111 Expect(batch.Nodes).To(ConsistOf(
112 sources.NodeMetricsPoint{
113 Name: nodeName,
114 MetricsPoint: sources.MetricsPoint{
115 Timestamp: timestamp,
116 CpuUsage: cpuUsage,
117 MemoryUsage: memoryUsage,
118 },
119 },
120 ))
121}
122
123func verifyPods(summary *stats.Summary, batch *sources.MetricsBatch) {
124 var expectedPods []interface{}
125 for _, pod := range summary.Pods {
126 containers := make([]sources.ContainerMetricsPoint, len(pod.Containers))
127 missingData := false
128 for i, container := range pod.Containers {
129 var cpuUsage, memoryUsage resource.Quantity
130 var timestamp time.Time
131 if container.CPU == nil || container.CPU.UsageNanoCores == nil {
132 missingData = true
133 break
134 }
135 cpuUsage = *resource.NewScaledQuantity(int64(*container.CPU.UsageNanoCores), -9)
136 timestamp = container.CPU.Time.Time
137 if container.Memory == nil || container.Memory.WorkingSetBytes == nil {
138 missingData = true
139 break
140 }
141 memoryUsage = *resource.NewQuantity(int64(*container.Memory.WorkingSetBytes), resource.BinarySI)
142 if timestamp.IsZero() {
143 timestamp = container.Memory.Time.Time
144 }
145
146 containers[i] = sources.ContainerMetricsPoint{
147 Name: container.Name,
148 MetricsPoint: sources.MetricsPoint{
149 Timestamp: timestamp,
150 CpuUsage: cpuUsage,
151 MemoryUsage: memoryUsage,
152 },
153 }
154 }
155 if missingData {
156 continue
157 }
158 expectedPods = append(expectedPods, sources.PodMetricsPoint{
159 Name: pod.PodRef.Name,
160 Namespace: pod.PodRef.Namespace,
161 Containers: containers,
162 })
163 }
164 Expect(batch.Pods).To(ConsistOf(expectedPods...))
165}
166
167var _ = Describe("Summary Source", func() {
168 var (
169 src sources.MetricSource
170 client *fakeKubeletClient
171 scrapeTime time.Time = time.Now()
172 nodeInfo NodeInfo = NodeInfo{
173 ConnectAddress: "10.0.1.2",
174 Name: "node1",
175 }
176 )
177 BeforeEach(func() {
178 client = &fakeKubeletClient{
179 metrics: &stats.Summary{
180 Node: stats.NodeStats{
181 CPU: cpuStats(100, scrapeTime.Add(100*time.Millisecond)),
182 Memory: memStats(200, scrapeTime.Add(200*time.Millisecond)),
183 },
184 Pods: []stats.PodStats{
185 podStats("ns1", "pod1",
186 containerStats("container1", 300, 400, scrapeTime.Add(10*time.Millisecond)),
187 containerStats("container2", 500, 600, scrapeTime.Add(20*time.Millisecond))),
188 podStats("ns1", "pod2",
189 containerStats("container1", 700, 800, scrapeTime.Add(30*time.Millisecond))),
190 podStats("ns2", "pod1",
191 containerStats("container1", 900, 1000, scrapeTime.Add(40*time.Millisecond))),
192 podStats("ns3", "pod1",
193 containerStats("container1", 1100, 1200, scrapeTime.Add(50*time.Millisecond))),
194 },
195 },
196 }
197 src = NewSummaryMetricsSource(nodeInfo, client)
198 })
199
200 It("should pass the provided context to the kubelet client to time out requests", func() {
201 By("setting up a context with a 1 second timeout")
202 ctx, workDone := context.WithTimeout(context.Background(), 1*time.Second)
203
204 By("collecting the batch with a 4 second delay")
205 start := time.Now()
206 client.delay = 4 * time.Second
207 _, err := src.Collect(ctx)
208 workDone()
209
210 By("ensuring it timed out with an error after 1 second")
211 Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond))
212 Expect(err).To(HaveOccurred())
213 })
214
215 It("should fetch by connection address", func() {
216 By("collecting the batch")
217 _, err := src.Collect(context.Background())
218 Expect(err).NotTo(HaveOccurred())
219
220 By("verifying that it submitted the right host to the client")
221 Expect(client.lastHost).To(Equal(nodeInfo.ConnectAddress))
222 })
223
224 It("should return the working set and cpu usage for the node, and all pods on the node", func() {
225 By("collecting the batch")
226 batch, err := src.Collect(context.Background())
227 Expect(err).NotTo(HaveOccurred())
228
229 By("verifying that the batch contains the right node data")
230 verifyNode(nodeInfo.Name, client.metrics, batch)
231
232 By("verifying that the batch contains the right pod data")
233 verifyPods(client.metrics, batch)
234 })
235
236 It("should use the scrape time from the CPU, falling back to memory if missing", func() {
237 By("removing some times from the data")
238 client.metrics.Pods[0].Containers[0].CPU.Time = metav1.Time{}
239 client.metrics.Node.CPU.Time = metav1.Time{}
240
241 By("collecting the batch")
242 batch, err := src.Collect(context.Background())
243 Expect(err).NotTo(HaveOccurred())
244
245 By("verifying that the scrape time is as expected")
246 Expect(batch.Nodes[0].Timestamp).To(Equal(client.metrics.Node.Memory.Time.Time))
247 Expect(batch.Pods[0].Containers[0].Timestamp).To(Equal(client.metrics.Pods[0].Containers[0].Memory.Time.Time))
248 Expect(batch.Pods[1].Containers[0].Timestamp).To(Equal(client.metrics.Pods[1].Containers[0].CPU.Time.Time))
249 })
250
251 It("should continue on missing CPU or memory metrics", func() {
252 By("removing some data from the raw summary")
253 client.metrics.Node.Memory = nil
254 client.metrics.Pods[0].Containers[1].CPU = nil
255 client.metrics.Pods[1].Containers[0].CPU.UsageNanoCores = nil
256 client.metrics.Pods[2].Containers[0].Memory = nil
257 client.metrics.Pods[3].Containers[0].Memory.WorkingSetBytes = nil
258
259 By("collecting the batch")
260 batch, err := src.Collect(context.Background())
261 Expect(err).To(HaveOccurred())
262
263 By("verifying that the batch has all the data, save for what was missing")
264 verifyNode(nodeInfo.Name, client.metrics, batch)
265 verifyPods(client.metrics, batch)
266 })
267
268 It("should handle larger-than-int64 CPU or memory values gracefully", func() {
269 By("setting some data in the summary to be above math.MaxInt64")
270 plusTen := uint64(math.MaxInt64 + 10)
271 plusTwenty := uint64(math.MaxInt64 + 20)
272 minusTen := uint64(math.MaxUint64 - 10)
273 minusOneHundred := uint64(math.MaxUint64 - 100)
274
275 client.metrics.Node.Memory.WorkingSetBytes = &plusTen // RAM is cheap, right?
276 client.metrics.Node.CPU.UsageNanoCores = &plusTwenty // a mainframe, probably
277 client.metrics.Pods[0].Containers[1].CPU.UsageNanoCores = &minusTen
278 client.metrics.Pods[1].Containers[0].Memory.WorkingSetBytes = &minusOneHundred
279
280 By("collecting the batch")
281 batch, err := src.Collect(context.Background())
282 Expect(err).NotTo(HaveOccurred())
283
284 By("verifying that the data is still present, at lower precision")
285 nodeMem := *resource.NewScaledQuantity(int64(plusTen/10), 1)
286 nodeMem.Format = resource.BinarySI
287 podMem := *resource.NewScaledQuantity(int64(minusOneHundred/10), 1)
288 podMem.Format = resource.BinarySI
289 Expect(batch.Nodes[0].MemoryUsage).To(Equal(nodeMem))
290 Expect(batch.Nodes[0].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(plusTwenty/10), -8)))
291 Expect(batch.Pods[0].Containers[1].CpuUsage).To(Equal(*resource.NewScaledQuantity(int64(minusTen/10), -8)))
292 Expect(batch.Pods[1].Containers[0].MemoryUsage).To(Equal(podMem))
293 })
294})
295
296type fakeNodeLister struct {
297 nodes []*corev1.Node
298 listErr error
299}
300
301func (l *fakeNodeLister) List(_ labels.Selector) (ret []*corev1.Node, err error) {
302 if l.listErr != nil {
303 return nil, l.listErr
304 }
305 // NB: this is ignores selector for the moment
306 return l.nodes, nil
307}
308
309func (l *fakeNodeLister) ListWithPredicate(_ corelisters.NodeConditionPredicate) ([]*corev1.Node, error) {
310 // NB: this is ignores predicate for the moment
311 return l.List(labels.Everything())
312}
313
314func (l *fakeNodeLister) Get(name string) (*corev1.Node, error) {
315 for _, node := range l.nodes {
316 if node.Name == name {
317 return node, nil
318 }
319 }
320 return nil, fmt.Errorf("no such node %q", name)
321}
322
323func nodeNames(nodes []*corev1.Node, addrs []string) []string {
324 var res []string
325 for i, node := range nodes {
326 res = append(res, NewSummaryMetricsSource(NodeInfo{ConnectAddress: addrs[i], Name: node.Name}, nil).Name())
327 }
328 return res
329}
330
331func makeNode(name, hostName, addr string, ready bool) *corev1.Node {
332 res := &corev1.Node{
333 ObjectMeta: metav1.ObjectMeta{Name: "node1"},
334 Status: corev1.NodeStatus{
335 Addresses: []corev1.NodeAddress{
336 {Type: corev1.NodeHostName, Address: hostName},
337 {Type: corev1.NodeInternalIP, Address: addr},
338 },
339 Conditions: []corev1.NodeCondition{
340 {Type: corev1.NodeReady},
341 },
342 },
343 }
344 if ready {
345 res.Status.Conditions[0].Status = corev1.ConditionTrue
346 } else {
347 res.Status.Conditions[0].Status = corev1.ConditionFalse
348 }
349 return res
350}
351
352var _ = Describe("Summary Source Provider", func() {
353 var (
354 nodeLister *fakeNodeLister
355 nodeAddrs []string
356 provider sources.MetricSourceProvider
357 fakeClient *fakeKubeletClient
358 )
359 BeforeEach(func() {
360 nodeLister = &fakeNodeLister{
361 nodes: []*corev1.Node{
362 makeNode("node1", "node1.somedomain", "10.0.1.2", true),
363 makeNode("node-no-host", "", "10.0.1.3", true),
364 makeNode("node3", "node3.somedomain", "10.0.1.4", false),
365 makeNode("node4", "node4.somedomain", "10.0.1.5", true),
366 },
367 }
368 nodeAddrs = []string{
369 "10.0.1.2",
370 "10.0.1.3",
371 "10.0.1.4",
372 "10.0.1.5",
373 }
374 fakeClient = &fakeKubeletClient{}
375 addrResolver := NewPriorityNodeAddressResolver(DefaultAddressTypePriority)
376 provider = NewSummaryProvider(nodeLister, fakeClient, addrResolver)
377 })
378
379 It("should return a metrics source for all nodes", func() {
380 By("listing the sources")
381 sources, err := provider.GetMetricSources()
382 Expect(err).To(Succeed())
383
384 By("verifying that a source is present for each node")
385 nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
386 sourceNames := make([]string, len(nodeNames))
387 for i, src := range sources {
388 sourceNames[i] = src.Name()
389 }
390 Expect(sourceNames).To(Equal(nodeNames))
391 })
392
393 It("should continue on error fetching node information for a particular node", func() {
394 By("deleting the IP of a node")
395 nodeLister.nodes[0].Status.Addresses = nil
396
397 By("listing the sources")
398 sources, err := provider.GetMetricSources()
399 Expect(err).To(HaveOccurred())
400
401 By("verifying that a source is present for each node")
402 nodeNames := nodeNames(nodeLister.nodes, nodeAddrs)
403 sourceNames := make([]string, len(nodeNames[1:]))
404 for i, src := range sources {
405 sourceNames[i] = src.Name()
406 }
407 // skip the bad node (the first one)
408 Expect(sourceNames).To(Equal(nodeNames[1:]))
409 })
410
411 It("should gracefully handle list errors", func() {
412 By("setting a fake error from the lister")
413 nodeLister.listErr = fmt.Errorf("something went wrong, expectedly")
414
415 By("listing the sources")
416 _, err := provider.GetMetricSources()
417 Expect(err).To(HaveOccurred())
418 })
419
420 Describe("when choosing node addresses", func() {
421 JustBeforeEach(func() {
422 // set up the metrics so we can call collect safely
423 fakeClient.metrics = &stats.Summary{
424 Node: stats.NodeStats{
425 CPU: cpuStats(100, time.Now()),
426 Memory: memStats(200, time.Now()),
427 },
428 }
429 })
430
431 It("should prefer addresses according to the order of the types first", func() {
432 By("setting the first node to have multiple addresses and setting all nodes to ready")
433 nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
434 {Type: DefaultAddressTypePriority[3], Address: "skip-val1"},
435 {Type: DefaultAddressTypePriority[2], Address: "skip-val2"},
436 {Type: DefaultAddressTypePriority[1], Address: "correct-val"},
437 }
438 for _, node := range nodeLister.nodes {
439 node.Status.Conditions = []corev1.NodeCondition{
440 {Type: corev1.NodeReady, Status: corev1.ConditionTrue},
441 }
442 }
443
444 By("listing all sources")
445 srcs, err := provider.GetMetricSources()
446 Expect(err).NotTo(HaveOccurred())
447
448 By("making sure that the first source scrapes from the correct location")
449 _, err = srcs[0].Collect(context.Background())
450 Expect(err).NotTo(HaveOccurred())
451 Expect(fakeClient.lastHost).To(Equal("correct-val"))
452 })
453
454 It("should prefer the first address that matches within a given type", func() {
455 By("setting the first node to have multiple addresses and setting all nodes to ready")
456 nodeLister.nodes[0].Status.Addresses = []corev1.NodeAddress{
457 {Type: DefaultAddressTypePriority[1], Address: "skip-val1"},
458 {Type: DefaultAddressTypePriority[0], Address: "correct-val"},
459 {Type: DefaultAddressTypePriority[1], Address: "skip-val2"},
460 {Type: DefaultAddressTypePriority[0], Address: "second-val"},
461 }
462 for _, node := range nodeLister.nodes {
463 node.Status.Conditions = []corev1.NodeCondition{
464 {Type: corev1.NodeReady, Status: corev1.ConditionTrue},
465 }
466 }
467
468 By("listing all sources")
469 srcs, err := provider.GetMetricSources()
470 Expect(err).NotTo(HaveOccurred())
471
472 By("making sure that the first source scrapes from the correct location")
473 _, err = srcs[0].Collect(context.Background())
474 Expect(err).NotTo(HaveOccurred())
475 Expect(fakeClient.lastHost).To(Equal("correct-val"))
476 })
477
478 It("should return an error if no preferred addresses are found", func() {
479 By("wiping out the addresses of one of the nodes and setting all nodes to ready")
480 nodeLister.nodes[0].Status.Addresses = nil
481 for _, node := range nodeLister.nodes {
482 node.Status.Conditions = []corev1.NodeCondition{
483 {Type: corev1.NodeReady, Status: corev1.ConditionTrue},
484 }
485 }
486
487 By("asking for source providers for all nodes")
488 _, err := provider.GetMetricSources()
489 Expect(err).To(HaveOccurred())
490 })
491 })
492})