blob: 3427a15dbf858b600c2193b22c61b23892b9d613 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2018 The Kubernetes Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package sink
16
17import (
18 "fmt"
19 "sync"
20 "time"
21
22 corev1 "k8s.io/api/core/v1"
23 apitypes "k8s.io/apimachinery/pkg/types"
24 metrics "k8s.io/metrics/pkg/apis/metrics"
25
26 "github.com/kubernetes-incubator/metrics-server/pkg/provider"
27 "github.com/kubernetes-incubator/metrics-server/pkg/sink"
28 "github.com/kubernetes-incubator/metrics-server/pkg/sources"
29)
30
31// kubernetesCadvisorWindow is the max window used by cAdvisor for calculating
32// CPU usage rate. While it can vary, it's no more than this number, but may be
33// as low as half this number (when working with no backoff). It would be really
34// nice if the kubelet told us this in the summary API...
35var kubernetesCadvisorWindow = 30 * time.Second
36
37// sinkMetricsProvider is a provider.MetricsProvider that also acts as a sink.MetricSink
38type sinkMetricsProvider struct {
39 mu sync.RWMutex
40 nodes map[string]sources.NodeMetricsPoint
41 pods map[apitypes.NamespacedName]sources.PodMetricsPoint
42}
43
44// NewSinkProvider returns a MetricSink that feeds into a MetricsProvider.
45func NewSinkProvider() (sink.MetricSink, provider.MetricsProvider) {
46 prov := &sinkMetricsProvider{}
47 return prov, prov
48}
49
50// TODO(directxman12): figure out what the right value is for "window" --
51// we don't get the actual window from cAdvisor, so we could just
52// plumb down metric resolution, but that wouldn't be actually correct.
53
54func (p *sinkMetricsProvider) GetNodeMetrics(nodes ...string) ([]provider.TimeInfo, []corev1.ResourceList, error) {
55 p.mu.RLock()
56 defer p.mu.RUnlock()
57
58 timestamps := make([]provider.TimeInfo, len(nodes))
59 resMetrics := make([]corev1.ResourceList, len(nodes))
60
61 for i, node := range nodes {
62 metricPoint, present := p.nodes[node]
63 if !present {
64 continue
65 }
66
67 timestamps[i] = provider.TimeInfo{
68 Timestamp: metricPoint.Timestamp,
69 Window: kubernetesCadvisorWindow,
70 }
71 resMetrics[i] = corev1.ResourceList{
72 corev1.ResourceName(corev1.ResourceCPU): metricPoint.CpuUsage,
73 corev1.ResourceName(corev1.ResourceMemory): metricPoint.MemoryUsage,
74 }
75 }
76
77 return timestamps, resMetrics, nil
78}
79
80func (p *sinkMetricsProvider) GetContainerMetrics(pods ...apitypes.NamespacedName) ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) {
81 p.mu.RLock()
82 defer p.mu.RUnlock()
83
84 timestamps := make([]provider.TimeInfo, len(pods))
85 resMetrics := make([][]metrics.ContainerMetrics, len(pods))
86
87 for i, pod := range pods {
88 metricPoint, present := p.pods[pod]
89 if !present {
90 continue
91 }
92
93 contMetrics := make([]metrics.ContainerMetrics, len(metricPoint.Containers))
94 var earliestTS *time.Time
95 for i, contPoint := range metricPoint.Containers {
96 contMetrics[i] = metrics.ContainerMetrics{
97 Name: contPoint.Name,
98 Usage: corev1.ResourceList{
99 corev1.ResourceName(corev1.ResourceCPU): contPoint.CpuUsage,
100 corev1.ResourceName(corev1.ResourceMemory): contPoint.MemoryUsage,
101 },
102 }
103 if earliestTS == nil || earliestTS.After(contPoint.Timestamp) {
104 ts := contPoint.Timestamp // copy to avoid loop iteration variable issues
105 earliestTS = &ts
106 }
107 }
108 if earliestTS == nil {
109 // we had no containers
110 earliestTS = &time.Time{}
111 }
112 timestamps[i] = provider.TimeInfo{
113 Timestamp: *earliestTS,
114 Window: kubernetesCadvisorWindow,
115 }
116 resMetrics[i] = contMetrics
117 }
118 return timestamps, resMetrics, nil
119}
120
121func (p *sinkMetricsProvider) Receive(batch *sources.MetricsBatch) error {
122 newNodes := make(map[string]sources.NodeMetricsPoint, len(batch.Nodes))
123 for _, nodePoint := range batch.Nodes {
124 if _, exists := newNodes[nodePoint.Name]; exists {
125 return fmt.Errorf("duplicate node %s received", nodePoint.Name)
126 }
127 newNodes[nodePoint.Name] = nodePoint
128 }
129
130 newPods := make(map[apitypes.NamespacedName]sources.PodMetricsPoint, len(batch.Pods))
131 for _, podPoint := range batch.Pods {
132 podIdent := apitypes.NamespacedName{Name: podPoint.Name, Namespace: podPoint.Namespace}
133 if _, exists := newPods[podIdent]; exists {
134 return fmt.Errorf("duplicate pod %s received", podIdent)
135 }
136 newPods[podIdent] = podPoint
137 }
138
139 p.mu.Lock()
140 defer p.mu.Unlock()
141
142 p.nodes = newNodes
143 p.pods = newPods
144
145 return nil
146}