blob: 667d74bf33c4e07d869792a91fe451caca8d4aeb [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2018 The Kubernetes Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package summary
16
17import (
18 "context"
19 "fmt"
20 "math"
21 "time"
22
23 "github.com/golang/glog"
24 "github.com/kubernetes-incubator/metrics-server/pkg/sources"
25 "github.com/prometheus/client_golang/prometheus"
26 corev1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/resource"
28 "k8s.io/apimachinery/pkg/labels"
29 utilerrors "k8s.io/apimachinery/pkg/util/errors"
30 v1listers "k8s.io/client-go/listers/core/v1"
31 stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
32)
33
34var (
35 summaryRequestLatency = prometheus.NewHistogramVec(
36 prometheus.HistogramOpts{
37 Namespace: "metrics_server",
38 Subsystem: "kubelet_summary",
39 Name: "request_duration_seconds",
40 Help: "The Kubelet summary request latencies in seconds.",
41 // TODO(directxman12): it would be nice to calculate these buckets off of scrape duration,
42 // like we do elsewhere, but we're not passed the scrape duration at this level.
43 Buckets: prometheus.DefBuckets,
44 },
45 []string{"node"},
46 )
47 scrapeTotal = prometheus.NewCounterVec(
48 prometheus.CounterOpts{
49 Namespace: "metrics_server",
50 Subsystem: "kubelet_summary",
51 Name: "scrapes_total",
52 Help: "Total number of attempted Summary API scrapes done by Metrics Server",
53 },
54 []string{"success"},
55 )
56)
57
58func init() {
59 prometheus.MustRegister(summaryRequestLatency)
60 prometheus.MustRegister(scrapeTotal)
61}
62
63// NodeInfo contains the information needed to identify and connect to a particular node
64// (node name and preferred address).
65type NodeInfo struct {
66 Name string
67 ConnectAddress string
68}
69
70// Kubelet-provided metrics for pod and system container.
71type summaryMetricsSource struct {
72 node NodeInfo
73 kubeletClient KubeletInterface
74}
75
76func NewSummaryMetricsSource(node NodeInfo, client KubeletInterface) sources.MetricSource {
77 return &summaryMetricsSource{
78 node: node,
79 kubeletClient: client,
80 }
81}
82
83func (src *summaryMetricsSource) Name() string {
84 return src.String()
85}
86
87func (src *summaryMetricsSource) String() string {
88 return fmt.Sprintf("kubelet_summary:%s", src.node.Name)
89}
90
91func (src *summaryMetricsSource) Collect(ctx context.Context) (*sources.MetricsBatch, error) {
92 summary, err := func() (*stats.Summary, error) {
93 startTime := time.Now()
94 defer summaryRequestLatency.WithLabelValues(src.node.Name).Observe(float64(time.Since(startTime)) / float64(time.Second))
95 return src.kubeletClient.GetSummary(ctx, src.node.ConnectAddress)
96 }()
97
98 if err != nil {
99 scrapeTotal.WithLabelValues("false").Inc()
100 return nil, fmt.Errorf("unable to fetch metrics from Kubelet %s (%s): %v", src.node.Name, src.node.ConnectAddress, err)
101 }
102
103 scrapeTotal.WithLabelValues("true").Inc()
104
105 res := &sources.MetricsBatch{
106 Nodes: make([]sources.NodeMetricsPoint, 1),
107 Pods: make([]sources.PodMetricsPoint, len(summary.Pods)),
108 }
109
110 var errs []error
111 errs = append(errs, src.decodeNodeStats(&summary.Node, &res.Nodes[0])...)
112 if len(errs) != 0 {
113 // if we had errors providing node metrics, discard the data point
114 // so that we don't incorrectly report metric values as zero.
115 res.Nodes = res.Nodes[:1]
116 }
117
118 num := 0
119 for _, pod := range summary.Pods {
120 podErrs := src.decodePodStats(&pod, &res.Pods[num])
121 errs = append(errs, podErrs...)
122 if len(podErrs) != 0 {
123 // NB: we explicitly want to discard pods with partial results, since
124 // the horizontal pod autoscaler takes special action when a pod is missing
125 // metrics (and zero CPU or memory does not count as "missing metrics")
126
127 // we don't care if we reuse slots in the result array,
128 // because they get completely overwritten in decodePodStats
129 continue
130 }
131 num++
132 }
133 res.Pods = res.Pods[:num]
134
135 return res, utilerrors.NewAggregate(errs)
136}
137
138func (src *summaryMetricsSource) decodeNodeStats(nodeStats *stats.NodeStats, target *sources.NodeMetricsPoint) []error {
139 timestamp, err := getScrapeTime(nodeStats.CPU, nodeStats.Memory)
140 if err != nil {
141 // if we can't get a timestamp, assume bad data in general
142 return []error{fmt.Errorf("unable to get valid timestamp for metric point for node %q, discarding data: %v", src.node.ConnectAddress, err)}
143 }
144 *target = sources.NodeMetricsPoint{
145 Name: src.node.Name,
146 MetricsPoint: sources.MetricsPoint{
147 Timestamp: timestamp,
148 },
149 }
150 var errs []error
151 if err := decodeCPU(&target.CpuUsage, nodeStats.CPU); err != nil {
152 errs = append(errs, fmt.Errorf("unable to get CPU for node %q, discarding data: %v", src.node.ConnectAddress, err))
153 }
154 if err := decodeMemory(&target.MemoryUsage, nodeStats.Memory); err != nil {
155 errs = append(errs, fmt.Errorf("unable to get memory for node %q, discarding data: %v", src.node.ConnectAddress, err))
156 }
157 return errs
158}
159
160func (src *summaryMetricsSource) decodePodStats(podStats *stats.PodStats, target *sources.PodMetricsPoint) []error {
161 // completely overwrite data in the target
162 *target = sources.PodMetricsPoint{
163 Name: podStats.PodRef.Name,
164 Namespace: podStats.PodRef.Namespace,
165 Containers: make([]sources.ContainerMetricsPoint, len(podStats.Containers)),
166 }
167
168 var errs []error
169 for i, container := range podStats.Containers {
170 timestamp, err := getScrapeTime(container.CPU, container.Memory)
171 if err != nil {
172 // if we can't get a timestamp, assume bad data in general
173 errs = append(errs, fmt.Errorf("unable to get a valid timestamp for metric point for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
174 continue
175 }
176 point := sources.ContainerMetricsPoint{
177 Name: container.Name,
178 MetricsPoint: sources.MetricsPoint{
179 Timestamp: timestamp,
180 },
181 }
182 if err := decodeCPU(&point.CpuUsage, container.CPU); err != nil {
183 errs = append(errs, fmt.Errorf("unable to get CPU for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
184 }
185 if err := decodeMemory(&point.MemoryUsage, container.Memory); err != nil {
186 errs = append(errs, fmt.Errorf("unable to get memory for container %q in pod %s/%s on node %q: %v, discarding data", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
187 }
188
189 target.Containers[i] = point
190 }
191
192 return errs
193}
194
195func decodeCPU(target *resource.Quantity, cpuStats *stats.CPUStats) error {
196 if cpuStats == nil || cpuStats.UsageNanoCores == nil {
197 return fmt.Errorf("missing cpu usage metric")
198 }
199
200 *target = *uint64Quantity(*cpuStats.UsageNanoCores, -9)
201 return nil
202}
203
204func decodeMemory(target *resource.Quantity, memStats *stats.MemoryStats) error {
205 if memStats == nil || memStats.WorkingSetBytes == nil {
206 return fmt.Errorf("missing memory usage metric")
207 }
208
209 *target = *uint64Quantity(*memStats.WorkingSetBytes, 0)
210 target.Format = resource.BinarySI
211
212 return nil
213}
214
215func getScrapeTime(cpu *stats.CPUStats, memory *stats.MemoryStats) (time.Time, error) {
216 // Ensure we get the earlier timestamp so that we can tell if a given data
217 // point was tainted by pod initialization.
218
219 var earliest *time.Time
220 if cpu != nil && !cpu.Time.IsZero() && (earliest == nil || earliest.After(cpu.Time.Time)) {
221 earliest = &cpu.Time.Time
222 }
223
224 if memory != nil && !memory.Time.IsZero() && (earliest == nil || earliest.After(memory.Time.Time)) {
225 earliest = &memory.Time.Time
226 }
227
228 if earliest == nil {
229 return time.Time{}, fmt.Errorf("no non-zero timestamp on either CPU or memory")
230 }
231
232 return *earliest, nil
233}
234
235// uint64Quantity converts a uint64 into a Quantity, which only has constructors
236// that work with int64 (except for parse, which requires costly round-trips to string).
237// We lose precision until we fit in an int64 if greater than the max int64 value.
238func uint64Quantity(val uint64, scale resource.Scale) *resource.Quantity {
239 // easy path -- we can safely fit val into an int64
240 if val <= math.MaxInt64 {
241 return resource.NewScaledQuantity(int64(val), scale)
242 }
243
244 glog.V(1).Infof("unexpectedly large resource value %v, loosing precision to fit in scaled resource.Quantity", val)
245
246 // otherwise, lose an decimal order-of-magnitude precision,
247 // so we can fit into a scaled quantity
248 return resource.NewScaledQuantity(int64(val/10), resource.Scale(1)+scale)
249}
250
251type summaryProvider struct {
252 nodeLister v1listers.NodeLister
253 kubeletClient KubeletInterface
254 addrResolver NodeAddressResolver
255}
256
257func (p *summaryProvider) GetMetricSources() ([]sources.MetricSource, error) {
258 sources := []sources.MetricSource{}
259 nodes, err := p.nodeLister.List(labels.Everything())
260 if err != nil {
261 return nil, fmt.Errorf("unable to list nodes: %v", err)
262 }
263
264 var errs []error
265 for _, node := range nodes {
266 info, err := p.getNodeInfo(node)
267 if err != nil {
268 errs = append(errs, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err))
269 continue
270 }
271 sources = append(sources, NewSummaryMetricsSource(info, p.kubeletClient))
272 }
273 return sources, utilerrors.NewAggregate(errs)
274}
275
276func (p *summaryProvider) getNodeInfo(node *corev1.Node) (NodeInfo, error) {
277 addr, err := p.addrResolver.NodeAddress(node)
278 if err != nil {
279 return NodeInfo{}, err
280 }
281 info := NodeInfo{
282 Name: node.Name,
283 ConnectAddress: addr,
284 }
285
286 return info, nil
287}
288
289func NewSummaryProvider(nodeLister v1listers.NodeLister, kubeletClient KubeletInterface, addrResolver NodeAddressResolver) sources.MetricSourceProvider {
290 return &summaryProvider{
291 nodeLister: nodeLister,
292 kubeletClient: kubeletClient,
293 addrResolver: addrResolver,
294 }
295}