blob: c1bc984e64c9248cf25e5d7ca3ee1a93ddfb1b57 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package handlers
18
19import (
20 "bytes"
21 "fmt"
22 "net/http"
23 "reflect"
24 "time"
25
26 "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/watch"
32 "k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
33 "k8s.io/apiserver/pkg/endpoints/metrics"
34 "k8s.io/apiserver/pkg/endpoints/request"
35 "k8s.io/apiserver/pkg/server/httplog"
36 "k8s.io/apiserver/pkg/util/wsstream"
37
38 "golang.org/x/net/websocket"
39)
40
41// nothing will ever be sent down this channel
42var neverExitWatch <-chan time.Time = make(chan time.Time)
43
44// timeoutFactory abstracts watch timeout logic for testing
45type TimeoutFactory interface {
46 TimeoutCh() (<-chan time.Time, func() bool)
47}
48
49// realTimeoutFactory implements timeoutFactory
50type realTimeoutFactory struct {
51 timeout time.Duration
52}
53
54// TimeoutCh returns a channel which will receive something when the watch times out,
55// and a cleanup function to call when this happens.
56func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
57 if w.timeout == 0 {
58 return neverExitWatch, func() bool { return false }
59 }
60 t := time.NewTimer(w.timeout)
61 return t.C, t.Stop
62}
63
64// serveWatch handles serving requests to the server
65// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
66func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
67 // negotiate for the stream serializer
68 serializer, err := negotiation.NegotiateOutputStreamSerializer(req, scope.Serializer)
69 if err != nil {
70 scope.err(err, w, req)
71 return
72 }
73 framer := serializer.StreamSerializer.Framer
74 streamSerializer := serializer.StreamSerializer.Serializer
75 embedded := serializer.Serializer
76 if framer == nil {
77 scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req)
78 return
79 }
80 encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
81
82 useTextFraming := serializer.EncodesAsText
83
84 // find the embedded serializer matching the media type
85 embeddedEncoder := scope.Serializer.EncoderForVersion(embedded, scope.Kind.GroupVersion())
86
87 // TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
88 mediaType := serializer.MediaType
89 if mediaType != runtime.ContentTypeJSON {
90 mediaType += ";stream=watch"
91 }
92
93 ctx := req.Context()
94 requestInfo, ok := request.RequestInfoFrom(ctx)
95 if !ok {
96 scope.err(fmt.Errorf("missing requestInfo"), w, req)
97 return
98 }
99
100 server := &WatchServer{
101 Watching: watcher,
102 Scope: scope,
103
104 UseTextFraming: useTextFraming,
105 MediaType: mediaType,
106 Framer: framer,
107 Encoder: encoder,
108 EmbeddedEncoder: embeddedEncoder,
109 Fixup: func(obj runtime.Object) {
110 if err := setSelfLink(obj, requestInfo, scope.Namer); err != nil {
111 utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
112 }
113 },
114
115 TimeoutFactory: &realTimeoutFactory{timeout},
116 }
117
118 server.ServeHTTP(w, req)
119}
120
121// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
122type WatchServer struct {
123 Watching watch.Interface
124 Scope RequestScope
125
126 // true if websocket messages should use text framing (as opposed to binary framing)
127 UseTextFraming bool
128 // the media type this watch is being served with
129 MediaType string
130 // used to frame the watch stream
131 Framer runtime.Framer
132 // used to encode the watch stream event itself
133 Encoder runtime.Encoder
134 // used to encode the nested object in the watch stream
135 EmbeddedEncoder runtime.Encoder
136 Fixup func(runtime.Object)
137
138 TimeoutFactory TimeoutFactory
139}
140
141// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
142// or over a websocket connection.
143func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
144 kind := s.Scope.Kind
145 metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
146 defer metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
147
148 w = httplog.Unlogged(w)
149
150 if wsstream.IsWebSocketRequest(req) {
151 w.Header().Set("Content-Type", s.MediaType)
152 websocket.Handler(s.HandleWS).ServeHTTP(w, req)
153 return
154 }
155
156 cn, ok := w.(http.CloseNotifier)
157 if !ok {
158 err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w)
159 utilruntime.HandleError(err)
160 s.Scope.err(errors.NewInternalError(err), w, req)
161 return
162 }
163 flusher, ok := w.(http.Flusher)
164 if !ok {
165 err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
166 utilruntime.HandleError(err)
167 s.Scope.err(errors.NewInternalError(err), w, req)
168 return
169 }
170
171 framer := s.Framer.NewFrameWriter(w)
172 if framer == nil {
173 // programmer error
174 err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType)
175 utilruntime.HandleError(err)
176 s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
177 return
178 }
179 e := streaming.NewEncoder(framer, s.Encoder)
180
181 // ensure the connection times out
182 timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
183 defer cleanup()
184 defer s.Watching.Stop()
185
186 // begin the stream
187 w.Header().Set("Content-Type", s.MediaType)
188 w.Header().Set("Transfer-Encoding", "chunked")
189 w.WriteHeader(http.StatusOK)
190 flusher.Flush()
191
192 var unknown runtime.Unknown
193 internalEvent := &metav1.InternalEvent{}
194 buf := &bytes.Buffer{}
195 ch := s.Watching.ResultChan()
196 for {
197 select {
198 case <-cn.CloseNotify():
199 return
200 case <-timeoutCh:
201 return
202 case event, ok := <-ch:
203 if !ok {
204 // End of results.
205 return
206 }
207
208 obj := event.Object
209 s.Fixup(obj)
210 if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
211 // unexpected error
212 utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
213 return
214 }
215
216 // ContentType is not required here because we are defaulting to the serializer
217 // type
218 unknown.Raw = buf.Bytes()
219 event.Object = &unknown
220
221 // create the external type directly and encode it. Clients will only recognize the serialization we provide.
222 // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
223 // and we get the benefit of using conversion functions which already have to stay in sync
224 outEvent := &metav1.WatchEvent{}
225 *internalEvent = metav1.InternalEvent(event)
226 err := metav1.Convert_versioned_InternalEvent_to_versioned_Event(internalEvent, outEvent, nil)
227 if err != nil {
228 utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
229 // client disconnect.
230 return
231 }
232 if err := e.Encode(outEvent); err != nil {
233 utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
234 // client disconnect.
235 return
236 }
237 if len(ch) == 0 {
238 flusher.Flush()
239 }
240
241 buf.Reset()
242 }
243 }
244}
245
246// HandleWS implements a websocket handler.
247func (s *WatchServer) HandleWS(ws *websocket.Conn) {
248 defer ws.Close()
249 done := make(chan struct{})
250
251 go func() {
252 defer utilruntime.HandleCrash()
253 // This blocks until the connection is closed.
254 // Client should not send anything.
255 wsstream.IgnoreReceives(ws, 0)
256 // Once the client closes, we should also close
257 close(done)
258 }()
259
260 var unknown runtime.Unknown
261 internalEvent := &metav1.InternalEvent{}
262 buf := &bytes.Buffer{}
263 streamBuf := &bytes.Buffer{}
264 ch := s.Watching.ResultChan()
265 for {
266 select {
267 case <-done:
268 s.Watching.Stop()
269 return
270 case event, ok := <-ch:
271 if !ok {
272 // End of results.
273 return
274 }
275 obj := event.Object
276 s.Fixup(obj)
277 if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
278 // unexpected error
279 utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
280 return
281 }
282
283 // ContentType is not required here because we are defaulting to the serializer
284 // type
285 unknown.Raw = buf.Bytes()
286 event.Object = &unknown
287
288 // the internal event will be versioned by the encoder
289 // create the external type directly and encode it. Clients will only recognize the serialization we provide.
290 // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
291 // and we get the benefit of using conversion functions which already have to stay in sync
292 outEvent := &metav1.WatchEvent{}
293 *internalEvent = metav1.InternalEvent(event)
294 err := metav1.Convert_versioned_InternalEvent_to_versioned_Event(internalEvent, outEvent, nil)
295 if err != nil {
296 utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
297 // client disconnect.
298 s.Watching.Stop()
299 return
300 }
301 if err := s.Encoder.Encode(outEvent, streamBuf); err != nil {
302 // encoding error
303 utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
304 s.Watching.Stop()
305 return
306 }
307 if s.UseTextFraming {
308 if err := websocket.Message.Send(ws, streamBuf.String()); err != nil {
309 // Client disconnect.
310 s.Watching.Stop()
311 return
312 }
313 } else {
314 if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil {
315 // Client disconnect.
316 s.Watching.Stop()
317 return
318 }
319 }
320 buf.Reset()
321 streamBuf.Reset()
322 }
323 }
324}