blob: 28c676c2657770c27664778d6a893b1e457bb91f [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2018 The Kubernetes Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package manager
16
17import (
18 "context"
19 "fmt"
20 "net/http"
21 "sync"
22 "time"
23
24 utilmetrics "github.com/kubernetes-incubator/metrics-server/pkg/metrics"
25 "github.com/kubernetes-incubator/metrics-server/pkg/sink"
26 "github.com/kubernetes-incubator/metrics-server/pkg/sources"
27
28 "github.com/golang/glog"
29 "github.com/prometheus/client_golang/prometheus"
30)
31
32var (
33 // initialized below to an actual value by a call to RegisterTickDuration
34 // (acts as a no-op by default), but we can't just register it in the constructor,
35 // since it could be called multiple times during setup.
36 tickDuration prometheus.Histogram = prometheus.NewHistogram(prometheus.HistogramOpts{})
37)
38
39// RegisterTickDuration creates and registers a histogram metric for
40// scrape duration, suitable for use in the overall manager.
41func RegisterDurationMetrics(resolution time.Duration) {
42 tickDuration = prometheus.NewHistogram(
43 prometheus.HistogramOpts{
44 Namespace: "metrics_server",
45 Subsystem: "manager",
46 Name: "tick_duration_seconds",
47 Help: "The total time spent collecting and storing metrics in seconds.",
48 Buckets: utilmetrics.BucketsForScrapeDuration(resolution),
49 },
50 )
51 prometheus.MustRegister(tickDuration)
52}
53
54// Runnable represents something that can be run until a signal is given to stop.
55type Runnable interface {
56 // Run runs this runnable until the given channel is closed.
57 // It should not block -- it will spawn its own goroutine.
58 RunUntil(stopCh <-chan struct{})
59}
60
61type Manager struct {
62 source sources.MetricSource
63 sink sink.MetricSink
64 resolution time.Duration
65
66 healthMu sync.RWMutex
67 lastTickStart time.Time
68 lastOk bool
69}
70
71func NewManager(metricSrc sources.MetricSource, metricSink sink.MetricSink, resolution time.Duration) *Manager {
72 manager := Manager{
73 source: metricSrc,
74 sink: metricSink,
75 resolution: resolution,
76 }
77
78 return &manager
79}
80
81func (rm *Manager) RunUntil(stopCh <-chan struct{}) {
82 go func() {
83 ticker := time.NewTicker(rm.resolution)
84 defer ticker.Stop()
85 rm.Collect(time.Now())
86
87 for {
88 select {
89 case startTime := <-ticker.C:
90 rm.Collect(startTime)
91 case <-stopCh:
92 return
93 }
94 }
95 }()
96}
97
98func (rm *Manager) Collect(startTime time.Time) {
99 rm.healthMu.Lock()
100 rm.lastTickStart = startTime
101 rm.healthMu.Unlock()
102
103 healthyTick := true
104
105 ctx, cancelTimeout := context.WithTimeout(context.Background(), rm.resolution)
106 defer cancelTimeout()
107
108 glog.V(6).Infof("Beginning cycle, collecting metrics...")
109 data, collectErr := rm.source.Collect(ctx)
110 if collectErr != nil {
111 glog.Errorf("unable to fully collect metrics: %v", collectErr)
112
113 // only consider this an indication of bad health if we
114 // couldn't collect from any nodes -- one node going down
115 // shouldn't indicate that metrics-server is unhealthy
116 if len(data.Nodes) == 0 {
117 healthyTick = false
118 }
119
120 // NB: continue on so that we don't lose all metrics
121 // if one node goes down
122 }
123
124 glog.V(6).Infof("...Storing metrics...")
125 recvErr := rm.sink.Receive(data)
126 if recvErr != nil {
127 glog.Errorf("unable to save metrics: %v", recvErr)
128
129 // any failure to save means we're unhealthy
130 healthyTick = false
131 }
132
133 collectTime := time.Now().Sub(startTime)
134 tickDuration.Observe(float64(collectTime) / float64(time.Second))
135 glog.V(6).Infof("...Cycle complete")
136
137 rm.healthMu.Lock()
138 rm.lastOk = healthyTick
139 rm.healthMu.Unlock()
140}
141
142// CheckHealth checks the health of the manager by looking at tick times,
143// and checking if we have at least one node in the collected data.
144// It implements the health checker func part of the healthz checker.
145func (rm *Manager) CheckHealth(_ *http.Request) error {
146 rm.healthMu.RLock()
147 lastTick := rm.lastTickStart
148 healthyTick := rm.lastOk
149 rm.healthMu.RUnlock()
150
151 // use 1.1 for a bit of wiggle room
152 maxTickWait := time.Duration(1.1 * float64(rm.resolution))
153 tickWait := time.Now().Sub(lastTick)
154
155 if tickWait > maxTickWait {
156 return fmt.Errorf("time since last tick (%s) was greater than expected metrics resolution (%s)", tickWait, maxTickWait)
157 }
158
159 if !healthyTick {
160 return fmt.Errorf("there was an error collecting or saving metrics in the last collection tick")
161 }
162
163 return nil
164}