Matthias Andreas Benkard | 832a54e | 2019-01-29 09:27:38 +0100 | [diff] [blame] | 1 | // Copyright 2018 The Kubernetes Authors. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package sources_test |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "fmt" |
| 20 | "testing" |
| 21 | "time" |
| 22 | |
| 23 | . "github.com/onsi/ginkgo" |
| 24 | . "github.com/onsi/gomega" |
| 25 | "k8s.io/apimachinery/pkg/api/resource" |
| 26 | |
| 27 | . "github.com/kubernetes-incubator/metrics-server/pkg/sources" |
| 28 | fakesrc "github.com/kubernetes-incubator/metrics-server/pkg/sources/fake" |
| 29 | ) |
| 30 | |
| 31 | func TestSourceManager(t *testing.T) { |
| 32 | RegisterFailHandler(Fail) |
| 33 | RunSpecs(t, "Source Manager Suite") |
| 34 | } |
| 35 | |
| 36 | // sleepySource returns a MetricSource that takes some amount of time (respecting |
| 37 | // context timeouts) to collect a MetricsBatch with a single node's data point. |
| 38 | func sleepySource(delay time.Duration, nodeName string, point MetricsPoint) MetricSource { |
| 39 | return &fakesrc.FunctionSource{ |
| 40 | SourceName: "sleepy_source:" + nodeName, |
| 41 | GenerateBatch: func(ctx context.Context) (*MetricsBatch, error) { |
| 42 | select { |
| 43 | case <-time.After(delay): |
| 44 | case <-ctx.Done(): |
| 45 | return nil, fmt.Errorf("timed out") |
| 46 | } |
| 47 | return &MetricsBatch{ |
| 48 | Nodes: []NodeMetricsPoint{ |
| 49 | { |
| 50 | Name: nodeName, |
| 51 | MetricsPoint: point, |
| 52 | }, |
| 53 | }, |
| 54 | }, nil |
| 55 | }, |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | func fullSource(ts time.Time, nodeInd, podStartInd, numPods int) MetricSource { |
| 60 | return &fakesrc.FunctionSource{ |
| 61 | SourceName: fmt.Sprintf("static_source:node%d", nodeInd), |
| 62 | GenerateBatch: func(_ context.Context) (*MetricsBatch, error) { |
| 63 | podPoints := make([]PodMetricsPoint, numPods) |
| 64 | for i := range podPoints { |
| 65 | podInd := int64(podStartInd + i) |
| 66 | podPoints[i].Name = fmt.Sprintf("pod%d", podInd) |
| 67 | podPoints[i].Namespace = fmt.Sprintf("ns%d", nodeInd) |
| 68 | podPoints[i].Containers = []ContainerMetricsPoint{ |
| 69 | { |
| 70 | Name: "container1", |
| 71 | MetricsPoint: MetricsPoint{ |
| 72 | Timestamp: ts, |
| 73 | CpuUsage: *resource.NewQuantity(300+10*podInd, resource.DecimalSI), |
| 74 | MemoryUsage: *resource.NewQuantity(400+10*podInd, resource.DecimalSI), |
| 75 | }, |
| 76 | }, |
| 77 | { |
| 78 | Name: "container2", |
| 79 | MetricsPoint: MetricsPoint{ |
| 80 | Timestamp: ts, |
| 81 | CpuUsage: *resource.NewQuantity(500+10*podInd, resource.DecimalSI), |
| 82 | MemoryUsage: *resource.NewQuantity(600+10*podInd, resource.DecimalSI), |
| 83 | }, |
| 84 | }, |
| 85 | } |
| 86 | } |
| 87 | return &MetricsBatch{ |
| 88 | Nodes: []NodeMetricsPoint{ |
| 89 | { |
| 90 | Name: fmt.Sprintf("node%d", nodeInd), |
| 91 | MetricsPoint: MetricsPoint{ |
| 92 | Timestamp: ts, |
| 93 | CpuUsage: *resource.NewQuantity(100+10*int64(nodeInd), resource.DecimalSI), |
| 94 | MemoryUsage: *resource.NewQuantity(200+10*int64(nodeInd), resource.DecimalSI), |
| 95 | }, |
| 96 | }, |
| 97 | }, |
| 98 | Pods: podPoints, |
| 99 | }, nil |
| 100 | }, |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | var _ = Describe("Source Manager", func() { |
| 105 | var ( |
| 106 | scrapeTime = time.Now() |
| 107 | nodeDataPoint = MetricsPoint{ |
| 108 | Timestamp: scrapeTime, |
| 109 | CpuUsage: *resource.NewQuantity(100, resource.DecimalSI), |
| 110 | MemoryUsage: *resource.NewQuantity(200, resource.DecimalSI), |
| 111 | } |
| 112 | ) |
| 113 | |
| 114 | Context("when all sources return in time", func() { |
| 115 | It("should return the results of all sources, both pods and nodes", func() { |
| 116 | By("setting up sources that take 1 second to complete") |
| 117 | metricsSourceProvider := fakesrc.StaticSourceProvider{ |
| 118 | sleepySource(1*time.Second, "node1", nodeDataPoint), |
| 119 | sleepySource(1*time.Second, "node2", nodeDataPoint), |
| 120 | } |
| 121 | |
| 122 | By("running the source manager with a scrape and context timeout of 3*seconds") |
| 123 | start := time.Now() |
| 124 | manager := NewSourceManager(metricsSourceProvider, 3*time.Second) |
| 125 | timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 3*time.Second) |
| 126 | dataBatch, errs := manager.Collect(timeoutCtx) |
| 127 | doneWithWork() |
| 128 | Expect(errs).NotTo(HaveOccurred()) |
| 129 | |
| 130 | By("ensuring that the full time took at most 3 seconds") |
| 131 | Expect(time.Now().Sub(start)).To(BeNumerically("<=", 3*time.Second)) |
| 132 | |
| 133 | By("ensuring that all the nodes are listed") |
| 134 | Expect(dataBatch.Nodes).To(ConsistOf( |
| 135 | NodeMetricsPoint{Name: "node1", MetricsPoint: nodeDataPoint}, |
| 136 | NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint}, |
| 137 | )) |
| 138 | }) |
| 139 | |
| 140 | It("should return the results of all sources' nodes and pods", func() { |
| 141 | By("setting up multiple sources") |
| 142 | metricsSourceProvider := fakesrc.StaticSourceProvider{ |
| 143 | fullSource(scrapeTime, 1, 0, 4), |
| 144 | fullSource(scrapeTime, 2, 4, 2), |
| 145 | fullSource(scrapeTime, 3, 6, 1), |
| 146 | } |
| 147 | |
| 148 | By("running the source manager") |
| 149 | manager := NewSourceManager(metricsSourceProvider, 1*time.Second) |
| 150 | dataBatch, errs := manager.Collect(context.Background()) |
| 151 | Expect(errs).NotTo(HaveOccurred()) |
| 152 | |
| 153 | By("figuring out the expected node and pod points") |
| 154 | var expectedNodePoints []interface{} |
| 155 | var expectedPodPoints []interface{} |
| 156 | for _, src := range metricsSourceProvider { |
| 157 | res, err := src.Collect(context.Background()) |
| 158 | Expect(err).NotTo(HaveOccurred()) |
| 159 | for _, pt := range res.Nodes { |
| 160 | expectedNodePoints = append(expectedNodePoints, pt) |
| 161 | } |
| 162 | for _, pt := range res.Pods { |
| 163 | expectedPodPoints = append(expectedPodPoints, pt) |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | By("ensuring that all nodes are present") |
| 168 | Expect(dataBatch.Nodes).To(ConsistOf(expectedNodePoints...)) |
| 169 | |
| 170 | By("ensuring that all pods are present") |
| 171 | Expect(dataBatch.Pods).To(ConsistOf(expectedPodPoints...)) |
| 172 | }) |
| 173 | }) |
| 174 | |
| 175 | Context("when some sources take too long", func() { |
| 176 | It("should pass the scrape timeout to the source context, so that sources can time out", func() { |
| 177 | By("setting up one source to take 4 seconds, and another to take 2") |
| 178 | metricsSourceProvider := fakesrc.StaticSourceProvider{ |
| 179 | sleepySource(4*time.Second, "node1", nodeDataPoint), |
| 180 | sleepySource(2*time.Second, "node2", nodeDataPoint), |
| 181 | } |
| 182 | |
| 183 | By("running the source manager with a scrape timeout of 3 seconds") |
| 184 | start := time.Now() |
| 185 | manager := NewSourceManager(metricsSourceProvider, 3*time.Second) |
| 186 | dataBatch, errs := manager.Collect(context.Background()) |
| 187 | |
| 188 | By("ensuring that scraping took around 3 seconds") |
| 189 | Expect(time.Now().Sub(start)).To(BeNumerically("~", 3*time.Second, 1*time.Millisecond)) |
| 190 | |
| 191 | By("ensuring that an error and partial results (data from source 2) were returned") |
| 192 | Expect(errs).To(HaveOccurred()) |
| 193 | Expect(dataBatch.Nodes).To(ConsistOf( |
| 194 | NodeMetricsPoint{Name: "node2", MetricsPoint: nodeDataPoint}, |
| 195 | )) |
| 196 | }) |
| 197 | |
| 198 | It("should respect the parent context's general timeout, even with a longer scrape timeout", func() { |
| 199 | By("setting up some sources with 4 second delays") |
| 200 | metricsSourceProvider := fakesrc.StaticSourceProvider{ |
| 201 | sleepySource(4*time.Second, "node1", nodeDataPoint), |
| 202 | sleepySource(4*time.Second, "node2", nodeDataPoint), |
| 203 | } |
| 204 | |
| 205 | By("running the source manager with a scrape timeout of 5 seconds, but a context timeout of 1 second") |
| 206 | start := time.Now() |
| 207 | manager := NewSourceManager(metricsSourceProvider, 5*time.Second) |
| 208 | timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 1*time.Second) |
| 209 | dataBatch, errs := manager.Collect(timeoutCtx) |
| 210 | doneWithWork() |
| 211 | |
| 212 | By("ensuring that it times out after 1 second with errors and no data") |
| 213 | Expect(time.Now().Sub(start)).To(BeNumerically("~", 1*time.Second, 1*time.Millisecond)) |
| 214 | Expect(errs).To(HaveOccurred()) |
| 215 | Expect(dataBatch.Nodes).To(BeEmpty()) |
| 216 | }) |
| 217 | }) |
| 218 | }) |