blob: 78bf4363114c66d173b377d5b0aa659cec0a740d [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2018 The Kubernetes Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package sources_test
16
17import (
18 "context"
19 "fmt"
20 "testing"
21 "time"
22
23 . "github.com/onsi/ginkgo"
24 . "github.com/onsi/gomega"
25 "k8s.io/apimachinery/pkg/api/resource"
26
27 . "github.com/kubernetes-incubator/metrics-server/pkg/sources"
28 fakesrc "github.com/kubernetes-incubator/metrics-server/pkg/sources/fake"
29)
30
31func TestSourceManager(t *testing.T) {
32 RegisterFailHandler(Fail)
33 RunSpecs(t, "Source Manager Suite")
34}
35
36// sleepySource returns a MetricSource that takes some amount of time (respecting
37// context timeouts) to collect a MetricsBatch with a single node's data point.
38func sleepySource(delay time.Duration, nodeName string, point MetricsPoint) MetricSource {
39 return &fakesrc.FunctionSource{
40 SourceName: "sleepy_source:" + nodeName,
41 GenerateBatch: func(ctx context.Context) (*MetricsBatch, error) {
42 select {
43 case <-time.After(delay):
44 case <-ctx.Done():
45 return nil, fmt.Errorf("timed out")
46 }
47 return &MetricsBatch{
48 Nodes: []NodeMetricsPoint{
49 {
50 Name: nodeName,
51 MetricsPoint: point,
52 },
53 },
54 }, nil
55 },
56 }
57}
58
59func fullSource(ts time.Time, nodeInd, podStartInd, numPods int) MetricSource {
60 return &fakesrc.FunctionSource{
61 SourceName: fmt.Sprintf("static_source:node%d", nodeInd),
62 GenerateBatch: func(_ context.Context) (*MetricsBatch, error) {
63 podPoints := make([]PodMetricsPoint, numPods)
64 for i := range podPoints {
65 podInd := int64(podStartInd + i)
66 podPoints[i].Name = fmt.Sprintf("pod%d", podInd)
67 podPoints[i].Namespace = fmt.Sprintf("ns%d", nodeInd)
68 podPoints[i].Containers = []ContainerMetricsPoint{
69 {
70 Name: "container1",
71 MetricsPoint: MetricsPoint{
72 Timestamp: ts,
73 CpuUsage: *resource.NewQuantity(300+10*podInd, resource.DecimalSI),
74 MemoryUsage: *resource.NewQuantity(400+10*podInd, resource.DecimalSI),
75 },
76 },
77 {
78 Name: "container2",
79 MetricsPoint: MetricsPoint{
80 Timestamp: ts,
81 CpuUsage: *resource.NewQuantity(500+10*podInd, resource.DecimalSI),
82 MemoryUsage: *resource.NewQuantity(600+10*podInd, resource.DecimalSI),
83 },
84 },
85 }
86 }
87 return &MetricsBatch{
88 Nodes: []NodeMetricsPoint{
89 {
90 Name: fmt.Sprintf("node%d", nodeInd),
91 MetricsPoint: MetricsPoint{
92 Timestamp: ts,
93 CpuUsage: *resource.NewQuantity(100+10*int64(nodeInd), resource.DecimalSI),
94 MemoryUsage: *resource.NewQuantity(200+10*int64(nodeInd), resource.DecimalSI),
95 },
96 },
97 },
98 Pods: podPoints,
99 }, nil
100 },
101 }
102}
103
104var _ = Describe("Source Manager", func() {
105 var (
106 scrapeTime = time.Now()
107 nodeDataPoint = MetricsPoint{
108 Timestamp: scrapeTime,
109 CpuUsage: *resource.NewQuantity(100, resource.DecimalSI),
110 MemoryUsage: *resource.NewQuantity(200, resource.DecimalSI),
111 }
112 )
113
114 Context("when all sources return in time", func() {
115 It("should return the results of all sources, both pods and nodes", func() {
116 By("setting up sources that take 1 second to complete")
117 metricsSourceProvider := fakesrc.StaticSourceProvider{
118 sleepySource(1*time.Second, "node1", nodeDataPoint),
119 sleepySource(1*time.Second, "node2", nodeDataPoint),
120 }
121
122 By("running the source manager with a scrape and context timeout of 3*seconds")
123 start := time.Now()
124 manager := NewSourceManager(metricsSourceProvider, 3*time.Second)
125 timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 3*time.Second)
126 dataBatch, errs := manager.Collect(timeoutCtx)
127 doneWithWork()
128 Expect(errs).NotTo(HaveOccurred())
129
130 By("ensuring that the full time took at most 3 seconds")
131 Expect(time.Now().Sub(start)).To(BeNumerically("<=", 3*time.Second))
132
133 By("ensuring that all the nodes are listed")
134 Expect(dataBatch.Nodes).To(ConsistOf(
135 NodeMetricsPoint{Name: "node1", MetricsPoint: nodeDataPoint},
136 NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint},
137 ))
138 })
139
140 It("should return the results of all sources' nodes and pods", func() {
141 By("setting up multiple sources")
142 metricsSourceProvider := fakesrc.StaticSourceProvider{
143 fullSource(scrapeTime, 1, 0, 4),
144 fullSource(scrapeTime, 2, 4, 2),
145 fullSource(scrapeTime, 3, 6, 1),
146 }
147
148 By("running the source manager")
149 manager := NewSourceManager(metricsSourceProvider, 1*time.Second)
150 dataBatch, errs := manager.Collect(context.Background())
151 Expect(errs).NotTo(HaveOccurred())
152
153 By("figuring out the expected node and pod points")
154 var expectedNodePoints []interface{}
155 var expectedPodPoints []interface{}
156 for _, src := range metricsSourceProvider {
157 res, err := src.Collect(context.Background())
158 Expect(err).NotTo(HaveOccurred())
159 for _, pt := range res.Nodes {
160 expectedNodePoints = append(expectedNodePoints, pt)
161 }
162 for _, pt := range res.Pods {
163 expectedPodPoints = append(expectedPodPoints, pt)
164 }
165 }
166
167 By("ensuring that all nodes are present")
168 Expect(dataBatch.Nodes).To(ConsistOf(expectedNodePoints...))
169
170 By("ensuring that all pods are present")
171 Expect(dataBatch.Pods).To(ConsistOf(expectedPodPoints...))
172 })
173 })
174
175 Context("when some sources take too long", func() {
176 It("should pass the scrape timeout to the source context, so that sources can time out", func() {
177 By("setting up one source to take 4 seconds, and another to take 2")
178 metricsSourceProvider := fakesrc.StaticSourceProvider{
179 sleepySource(4*time.Second, "node1", nodeDataPoint),
180 sleepySource(2*time.Second, "node2", nodeDataPoint),
181 }
182
183 By("running the source manager with a scrape timeout of 3 seconds")
184 start := time.Now()
185 manager := NewSourceManager(metricsSourceProvider, 3*time.Second)
186 dataBatch, errs := manager.Collect(context.Background())
187
188 By("ensuring that scraping took around 3 seconds")
189 Expect(time.Now().Sub(start)).To(BeNumerically("~", 3*time.Second, 1*time.Millisecond))
190
191 By("ensuring that an error and partial results (data from source 2) were returned")
192 Expect(errs).To(HaveOccurred())
193 Expect(dataBatch.Nodes).To(ConsistOf(
194 NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint},
195 ))
196 })
197
198 It("should respect the parent context's general timeout, even with a longer scrape timeout", func() {
199 By("setting up some sources with 4 second delays")
200 metricsSourceProvider := fakesrc.StaticSourceProvider{
201 sleepySource(4*time.Second, "node1", nodeDataPoint),
202 sleepySource(4*time.Second, "node2", nodeDataPoint),
203 }
204
205 By("running the source manager with a scrape timeout of 5 seconds, but a context timeout of 1 second")
206 start := time.Now()
207 manager := NewSourceManager(metricsSourceProvider, 5*time.Second)
208 timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 1*time.Second)
209 dataBatch, errs := manager.Collect(timeoutCtx)
210 doneWithWork()
211
212 By("ensuring that it times out after 1 second with errors and no data")
213 Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond))
214 Expect(errs).To(HaveOccurred())
215 Expect(dataBatch.Nodes).To(BeEmpty())
216 })
217 })
218})