blob: ff5b4d06c4a522332161fab6449490f1cfbe4b71 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2018 The Kubernetes Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package sources
16
17import (
18 "context"
19 "fmt"
20 "math/rand"
21 "time"
22
23 "github.com/golang/glog"
24 "github.com/prometheus/client_golang/prometheus"
25 utilerrors "k8s.io/apimachinery/pkg/util/errors"
26
27 utilmetrics "github.com/kubernetes-incubator/metrics-server/pkg/metrics"
28)
29
30const (
31 maxDelayMs = 4 * 1000
32 delayPerSourceMs = 8
33)
34
35var (
36 lastScrapeTimestamp = prometheus.NewGaugeVec(
37 prometheus.GaugeOpts{
38 Namespace: "metrics_server",
39 Subsystem: "scraper",
40 Name: "last_time_seconds",
41 Help: "Last time metrics-server performed a scrape since unix epoch in seconds.",
42 },
43 []string{"source"},
44 )
45
46 // initialized below to an actual value by a call to RegisterScraperDuration
47 // (acts as a no-op by default), but we can't just register it in the constructor,
48 // since it could be called multiple times during setup.
49 scraperDuration *prometheus.HistogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"source"})
50)
51
52func init() {
53 prometheus.MustRegister(lastScrapeTimestamp)
54}
55
56// RegisterScraperDuration creates and registers a histogram metric for
57// scrape duration, suitable for use in the source manager.
58func RegisterDurationMetrics(scrapeTimeout time.Duration) {
59 scraperDuration = prometheus.NewHistogramVec(
60 prometheus.HistogramOpts{
61 Namespace: "metrics_server",
62 Subsystem: "scraper",
63 Name: "duration_seconds",
64 Help: "Time spent scraping sources in seconds.",
65 Buckets: utilmetrics.BucketsForScrapeDuration(scrapeTimeout),
66 },
67 []string{"source"},
68 )
69 prometheus.MustRegister(scraperDuration)
70}
71
72func NewSourceManager(srcProv MetricSourceProvider, scrapeTimeout time.Duration) MetricSource {
73 return &sourceManager{
74 srcProv: srcProv,
75 scrapeTimeout: scrapeTimeout,
76 }
77}
78
79type sourceManager struct {
80 srcProv MetricSourceProvider
81 scrapeTimeout time.Duration
82}
83
84func (m *sourceManager) Name() string {
85 return "source_manager"
86}
87
88func (m *sourceManager) Collect(baseCtx context.Context) (*MetricsBatch, error) {
89 sources, err := m.srcProv.GetMetricSources()
90 var errs []error
91 if err != nil {
92 // save the error, and continue on in case of partial results
93 errs = append(errs, err)
94 }
95 glog.V(1).Infof("Scraping metrics from %v sources", len(sources))
96
97 responseChannel := make(chan *MetricsBatch, len(sources))
98 errChannel := make(chan error, len(sources))
99 defer close(responseChannel)
100 defer close(errChannel)
101
102 startTime := time.Now()
103
104 // TODO(directxman12): re-evaluate this code -- do we really need to stagger fetches like this?
105 delayMs := delayPerSourceMs * len(sources)
106 if delayMs > maxDelayMs {
107 delayMs = maxDelayMs
108 }
109
110 for _, source := range sources {
111 go func(source MetricSource) {
112 // Prevents network congestion.
113 sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond
114 time.Sleep(sleepDuration)
115 // make the timeout a bit shorter to account for staggering, so we still preserve
116 // the overall timeout
117 ctx, cancelTimeout := context.WithTimeout(baseCtx, m.scrapeTimeout-sleepDuration)
118 defer cancelTimeout()
119
120 glog.V(2).Infof("Querying source: %s", source)
121 metrics, err := scrapeWithMetrics(ctx, source)
122 if err != nil {
123 errChannel <- fmt.Errorf("unable to fully scrape metrics from source %s: %v", source.Name(), err)
124 responseChannel <- metrics
125 return
126 }
127 responseChannel <- metrics
128 errChannel <- nil
129 }(source)
130 }
131
132 res := &MetricsBatch{}
133
134 for range sources {
135 err := <-errChannel
136 srcBatch := <-responseChannel
137 if err != nil {
138 errs = append(errs, err)
139 // NB: partial node results are still worth saving, so
140 // don't skip storing results if we got an error
141 }
142 if srcBatch == nil {
143 continue
144 }
145
146 res.Nodes = append(res.Nodes, srcBatch.Nodes...)
147 res.Pods = append(res.Pods, srcBatch.Pods...)
148 }
149
150 glog.V(1).Infof("ScrapeMetrics: time: %s, nodes: %v, pods: %v", time.Since(startTime), len(res.Nodes), len(res.Pods))
151 return res, utilerrors.NewAggregate(errs)
152}
153
154func scrapeWithMetrics(ctx context.Context, s MetricSource) (*MetricsBatch, error) {
155 sourceName := s.Name()
156 startTime := time.Now()
157 defer lastScrapeTimestamp.
158 WithLabelValues(sourceName).
159 Set(float64(time.Now().Unix()))
160 defer scraperDuration.
161 WithLabelValues(sourceName).
162 Observe(float64(time.Since(startTime)) / float64(time.Second))
163
164 return s.Collect(ctx)
165}