blob: 78700c33a886276de1efa1ee6c84865ec1d5c51e [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package filters
18
19import (
20 "fmt"
21 "net/http"
22 "sync"
23 "time"
24
25 "k8s.io/apimachinery/pkg/util/sets"
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/apiserver/pkg/authentication/user"
28 "k8s.io/apiserver/pkg/endpoints/metrics"
29 apirequest "k8s.io/apiserver/pkg/endpoints/request"
30
31 "github.com/golang/glog"
32)
33
34const (
35 // Constant for the retry-after interval on rate limiting.
36 // TODO: maybe make this dynamic? or user-adjustable?
37 retryAfter = "1"
38
39 // How often inflight usage metric should be updated. Because
40 // the metrics tracks maximal value over period making this
41 // longer will increase the metric value.
42 inflightUsageMetricUpdatePeriod = time.Second
43)
44
45var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
46
47func handleError(w http.ResponseWriter, r *http.Request, err error) {
48 w.WriteHeader(http.StatusInternalServerError)
49 fmt.Fprintf(w, "Internal Server Error: %#v", r.RequestURI)
50 glog.Errorf(err.Error())
51}
52
53// requestWatermark is used to trak maximal usage of inflight requests.
54type requestWatermark struct {
55 lock sync.Mutex
56 readOnlyWatermark, mutatingWatermark int
57}
58
59func (w *requestWatermark) recordMutating(mutatingVal int) {
60 w.lock.Lock()
61 defer w.lock.Unlock()
62
63 if w.mutatingWatermark < mutatingVal {
64 w.mutatingWatermark = mutatingVal
65 }
66}
67
68func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
69 w.lock.Lock()
70 defer w.lock.Unlock()
71
72 if w.readOnlyWatermark < readOnlyVal {
73 w.readOnlyWatermark = readOnlyVal
74 }
75}
76
77var watermark = &requestWatermark{}
78
79func startRecordingUsage() {
80 go func() {
81 wait.Forever(func() {
82 watermark.lock.Lock()
83 readOnlyWatermark := watermark.readOnlyWatermark
84 mutatingWatermark := watermark.mutatingWatermark
85 watermark.readOnlyWatermark = 0
86 watermark.mutatingWatermark = 0
87 watermark.lock.Unlock()
88
89 metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
90 }, inflightUsageMetricUpdatePeriod)
91 }()
92}
93
94var startOnce sync.Once
95
96// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
97func WithMaxInFlightLimit(
98 handler http.Handler,
99 nonMutatingLimit int,
100 mutatingLimit int,
101 longRunningRequestCheck apirequest.LongRunningRequestCheck,
102) http.Handler {
103 startOnce.Do(startRecordingUsage)
104 if nonMutatingLimit == 0 && mutatingLimit == 0 {
105 return handler
106 }
107 var nonMutatingChan chan bool
108 var mutatingChan chan bool
109 if nonMutatingLimit != 0 {
110 nonMutatingChan = make(chan bool, nonMutatingLimit)
111 }
112 if mutatingLimit != 0 {
113 mutatingChan = make(chan bool, mutatingLimit)
114 }
115
116 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
117 ctx := r.Context()
118 requestInfo, ok := apirequest.RequestInfoFrom(ctx)
119 if !ok {
120 handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
121 return
122 }
123
124 // Skip tracking long running events.
125 if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
126 handler.ServeHTTP(w, r)
127 return
128 }
129
130 var c chan bool
131 isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
132 if isMutatingRequest {
133 c = mutatingChan
134 } else {
135 c = nonMutatingChan
136 }
137
138 if c == nil {
139 handler.ServeHTTP(w, r)
140 } else {
141
142 select {
143 case c <- true:
144 var mutatingLen, readOnlyLen int
145 if isMutatingRequest {
146 mutatingLen = len(mutatingChan)
147 } else {
148 readOnlyLen = len(nonMutatingChan)
149 }
150
151 defer func() {
152 <-c
153 if isMutatingRequest {
154 watermark.recordMutating(mutatingLen)
155 } else {
156 watermark.recordReadOnly(readOnlyLen)
157 }
158
159 }()
160 handler.ServeHTTP(w, r)
161
162 default:
163 // We need to split this data between buckets used for throttling.
164 if isMutatingRequest {
165 metrics.DroppedRequests.WithLabelValues(metrics.MutatingKind).Inc()
166 } else {
167 metrics.DroppedRequests.WithLabelValues(metrics.ReadOnlyKind).Inc()
168 }
169 // at this point we're about to return a 429, BUT not all actors should be rate limited. A system:master is so powerful
170 // that they should always get an answer. It's a super-admin or a loopback connection.
171 if currUser, ok := apirequest.UserFrom(ctx); ok {
172 for _, group := range currUser.GetGroups() {
173 if group == user.SystemPrivilegedGroup {
174 handler.ServeHTTP(w, r)
175 return
176 }
177 }
178 }
179 metrics.Record(r, requestInfo, "", http.StatusTooManyRequests, 0, 0)
180 tooManyRequests(r, w)
181 }
182 }
183 })
184}
185
186func tooManyRequests(req *http.Request, w http.ResponseWriter) {
187 // Return a 429 status indicating "Too Many Requests"
188 w.Header().Set("Retry-After", retryAfter)
189 http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
190}