blob: 8980eebfe7c555898406fd04151f1520c497e47d [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2018 The Kubernetes Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package sink_test
16
17import (
18 "testing"
19 "time"
20
21 . "github.com/onsi/ginkgo"
22 . "github.com/onsi/gomega"
23 corev1 "k8s.io/api/core/v1"
24 "k8s.io/apimachinery/pkg/api/resource"
25 apitypes "k8s.io/apimachinery/pkg/types"
26 metrics "k8s.io/metrics/pkg/apis/metrics"
27
28 "github.com/kubernetes-incubator/metrics-server/pkg/provider"
29 . "github.com/kubernetes-incubator/metrics-server/pkg/provider/sink"
30 "github.com/kubernetes-incubator/metrics-server/pkg/sink"
31 "github.com/kubernetes-incubator/metrics-server/pkg/sources"
32)
33
34var defaultWindow = 30 * time.Second
35
36func TestSourceManager(t *testing.T) {
37 RegisterFailHandler(Fail)
38 RunSpecs(t, "Provider/Sink Suite")
39}
40
41func newMilliPoint(ts time.Time, cpu, memory int64) sources.MetricsPoint {
42 return sources.MetricsPoint{
43 Timestamp: ts,
44 CpuUsage: *resource.NewMilliQuantity(cpu, resource.DecimalSI),
45 MemoryUsage: *resource.NewMilliQuantity(memory, resource.BinarySI),
46 }
47}
48
49var _ = Describe("In-memory Sink Provider", func() {
50 var (
51 batch *sources.MetricsBatch
52 prov provider.MetricsProvider
53 provSink sink.MetricSink
54 now time.Time
55 )
56
57 BeforeEach(func() {
58 now = time.Now()
59 batch = &sources.MetricsBatch{
60 Nodes: []sources.NodeMetricsPoint{
61 {Name: "node1", MetricsPoint: newMilliPoint(now.Add(100*time.Millisecond), 110, 120)},
62 {Name: "node2", MetricsPoint: newMilliPoint(now.Add(200*time.Millisecond), 210, 220)},
63 {Name: "node3", MetricsPoint: newMilliPoint(now.Add(300*time.Millisecond), 310, 320)},
64 },
65 Pods: []sources.PodMetricsPoint{
66 {Name: "pod1", Namespace: "ns1", Containers: []sources.ContainerMetricsPoint{
67 {Name: "container1", MetricsPoint: newMilliPoint(now.Add(400*time.Millisecond), 410, 420)},
68 {Name: "container2", MetricsPoint: newMilliPoint(now.Add(500*time.Millisecond), 510, 520)},
69 }},
70 {Name: "pod2", Namespace: "ns1", Containers: []sources.ContainerMetricsPoint{
71 {Name: "container1", MetricsPoint: newMilliPoint(now.Add(600*time.Millisecond), 610, 620)},
72 }},
73 {Name: "pod1", Namespace: "ns2", Containers: []sources.ContainerMetricsPoint{
74 {Name: "container1", MetricsPoint: newMilliPoint(now.Add(700*time.Millisecond), 710, 720)},
75 {Name: "container2", MetricsPoint: newMilliPoint(now.Add(800*time.Millisecond), 810, 820)},
76 }},
77 },
78 }
79
80 provSink, prov = NewSinkProvider()
81 })
82
83 It("should receive batches of metrics", func() {
84 By("sending the batch to the sink")
85 Expect(provSink.Receive(batch)).To(Succeed())
86
87 By("making sure that the provider contains all nodes received")
88 for _, node := range batch.Nodes {
89 _, _, err := prov.GetNodeMetrics(node.Name)
90 Expect(err).NotTo(HaveOccurred())
91 }
92
93 By("making sure that the provider contains all pods received")
94 for _, pod := range batch.Pods {
95 _, _, err := prov.GetContainerMetrics(apitypes.NamespacedName{
96 Name: pod.Name,
97 Namespace: pod.Namespace,
98 })
99 Expect(err).NotTo(HaveOccurred())
100 }
101 })
102
103 It("should error out if duplicate nodes were received, without a partial store", func() {
104 By("adding a duplicate node to the batch")
105 batch.Nodes = append(batch.Nodes, batch.Nodes[0])
106
107 By("sending the batch to the sink and checking for an error")
108 Expect(provSink.Receive(batch)).NotTo(Succeed())
109
110 By("making sure none of the data is in the sink")
111 for _, node := range batch.Nodes {
112 _, res, err := prov.GetNodeMetrics(node.Name)
113 Expect(err).NotTo(HaveOccurred())
114 Expect(res).To(ConsistOf(corev1.ResourceList(nil)))
115 }
116 for _, pod := range batch.Pods {
117 _, res, err := prov.GetContainerMetrics(apitypes.NamespacedName{
118 Name: pod.Name,
119 Namespace: pod.Namespace,
120 })
121 Expect(err).NotTo(HaveOccurred())
122 Expect(res).To(Equal([][]metrics.ContainerMetrics{nil}))
123 }
124 })
125
126 It("should error out if duplicate pods were received, without a partial store", func() {
127 By("adding a duplicate pod to the batch")
128 batch.Pods = append(batch.Pods, batch.Pods[0])
129
130 By("sending the batch to the sink and checking for an error")
131 Expect(provSink.Receive(batch)).NotTo(Succeed())
132
133 By("making sure none of the data is in the sink")
134 for _, node := range batch.Nodes {
135 _, res, err := prov.GetNodeMetrics(node.Name)
136 Expect(err).NotTo(HaveOccurred())
137 Expect(res).To(ConsistOf(corev1.ResourceList(nil)))
138 }
139 for _, pod := range batch.Pods {
140 _, res, err := prov.GetContainerMetrics(apitypes.NamespacedName{
141 Name: pod.Name,
142 Namespace: pod.Namespace,
143 })
144 Expect(err).NotTo(HaveOccurred())
145 Expect(res).To(Equal([][]metrics.ContainerMetrics{nil}))
146 }
147 })
148
149 It("should retrieve metrics for all containers in a pod, with overall latest scrape time", func() {
150 By("sending the batch to the sink")
151 Expect(provSink.Receive(batch)).To(Succeed())
152
153 By("fetching the pod")
154 ts, containerMetrics, err := prov.GetContainerMetrics(apitypes.NamespacedName{
155 Name: "pod1",
156 Namespace: "ns1",
157 })
158 Expect(err).NotTo(HaveOccurred())
159
160 By("verifying that the timestamp is the smallest time amongst all containers")
161 Expect(ts).To(ConsistOf(provider.TimeInfo{Timestamp: now.Add(400 * time.Millisecond), Window: defaultWindow}))
162
163 By("verifying that all containers have data")
164 Expect(containerMetrics).To(Equal(
165 [][]metrics.ContainerMetrics{
166 {
167 {
168 Name: "container1",
169 Usage: corev1.ResourceList{
170 corev1.ResourceCPU: *resource.NewMilliQuantity(410, resource.DecimalSI),
171 corev1.ResourceMemory: *resource.NewMilliQuantity(420, resource.BinarySI),
172 },
173 },
174 {
175 Name: "container2",
176 Usage: corev1.ResourceList{
177 corev1.ResourceCPU: *resource.NewMilliQuantity(510, resource.DecimalSI),
178 corev1.ResourceMemory: *resource.NewMilliQuantity(520, resource.BinarySI),
179 },
180 },
181 },
182 },
183 ))
184 })
185
186 It("should return nil metrics for missing pods", func() {
187 By("sending the batch to the sink")
188 Expect(provSink.Receive(batch)).To(Succeed())
189
190 By("fetching the a present pod and a missing pod")
191 ts, containerMetrics, err := prov.GetContainerMetrics(apitypes.NamespacedName{
192 Name: "pod1",
193 Namespace: "ns1",
194 }, apitypes.NamespacedName{
195 Name: "pod2",
196 Namespace: "ns42",
197 })
198 Expect(err).NotTo(HaveOccurred())
199
200 By("verifying that the timestamp is the smallest time amongst all containers")
201 Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(400 * time.Millisecond), Window: defaultWindow}, {}}))
202
203 By("verifying that all present containers have data")
204 Expect(containerMetrics).To(Equal(
205 [][]metrics.ContainerMetrics{
206 {
207 {
208 Name: "container1",
209 Usage: corev1.ResourceList{
210 corev1.ResourceCPU: *resource.NewMilliQuantity(410, resource.DecimalSI),
211 corev1.ResourceMemory: *resource.NewMilliQuantity(420, resource.BinarySI),
212 },
213 },
214 {
215 Name: "container2",
216 Usage: corev1.ResourceList{
217 corev1.ResourceCPU: *resource.NewMilliQuantity(510, resource.DecimalSI),
218 corev1.ResourceMemory: *resource.NewMilliQuantity(520, resource.BinarySI),
219 },
220 },
221 },
222 nil,
223 },
224 ))
225
226 })
227
228 It("should retrieve metrics for a node, with overall latest scrape time", func() {
229 By("sending the batch to the sink")
230 Expect(provSink.Receive(batch)).To(Succeed())
231
232 By("fetching the nodes")
233 ts, nodeMetrics, err := prov.GetNodeMetrics("node1", "node2")
234 Expect(err).NotTo(HaveOccurred())
235
236 By("verifying that the timestamp is the smallest time amongst all containers")
237 Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(100 * time.Millisecond), Window: defaultWindow}, {Timestamp: now.Add(200 * time.Millisecond), Window: defaultWindow}}))
238
239 By("verifying that all nodes have data")
240 Expect(nodeMetrics).To(Equal(
241 []corev1.ResourceList{
242 {
243 corev1.ResourceCPU: *resource.NewMilliQuantity(110, resource.DecimalSI),
244 corev1.ResourceMemory: *resource.NewMilliQuantity(120, resource.BinarySI),
245 },
246 {
247 corev1.ResourceCPU: *resource.NewMilliQuantity(210, resource.DecimalSI),
248 corev1.ResourceMemory: *resource.NewMilliQuantity(220, resource.BinarySI),
249 },
250 },
251 ))
252 })
253
254 It("should return nil metrics for missing nodes", func() {
255 By("sending the batch to the sink")
256 Expect(provSink.Receive(batch)).To(Succeed())
257
258 By("fetching the nodes, plus a missing node")
259 ts, nodeMetrics, err := prov.GetNodeMetrics("node1", "node2", "node42")
260 Expect(err).NotTo(HaveOccurred())
261
262 By("verifying that the timestamp is the smallest time amongst all containers")
263 Expect(ts).To(Equal([]provider.TimeInfo{{Timestamp: now.Add(100 * time.Millisecond), Window: defaultWindow}, {Timestamp: now.Add(200 * time.Millisecond), Window: defaultWindow}, {}}))
264
265 By("verifying that all present nodes have data")
266 Expect(nodeMetrics).To(Equal(
267 []corev1.ResourceList{
268 {
269 corev1.ResourceCPU: *resource.NewMilliQuantity(110, resource.DecimalSI),
270 corev1.ResourceMemory: *resource.NewMilliQuantity(120, resource.BinarySI),
271 },
272 {
273 corev1.ResourceCPU: *resource.NewMilliQuantity(210, resource.DecimalSI),
274 corev1.ResourceMemory: *resource.NewMilliQuantity(220, resource.BinarySI),
275 },
276 nil,
277 },
278 ))
279
280 })
281})