blob: 09ffd76dd40051a1b946935f5c4f7537e0e6695d [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package rest
18
19import (
20 "bytes"
21 "context"
22 "encoding/hex"
23 "fmt"
24 "io"
25 "io/ioutil"
26 "mime"
27 "net/http"
28 "net/url"
29 "path"
30 "reflect"
31 "strconv"
32 "strings"
33 "time"
34
35 "github.com/golang/glog"
36 "golang.org/x/net/http2"
37 "k8s.io/apimachinery/pkg/api/errors"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
42 "k8s.io/apimachinery/pkg/util/net"
43 "k8s.io/apimachinery/pkg/watch"
44 restclientwatch "k8s.io/client-go/rest/watch"
45 "k8s.io/client-go/tools/metrics"
46 "k8s.io/client-go/util/flowcontrol"
47)
48
49var (
50 // longThrottleLatency defines threshold for logging requests. All requests being
51 // throttle for more than longThrottleLatency will be logged.
52 longThrottleLatency = 50 * time.Millisecond
53)
54
55// HTTPClient is an interface for testing a request object.
56type HTTPClient interface {
57 Do(req *http.Request) (*http.Response, error)
58}
59
60// ResponseWrapper is an interface for getting a response.
61// The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
62type ResponseWrapper interface {
63 DoRaw() ([]byte, error)
64 Stream() (io.ReadCloser, error)
65}
66
67// RequestConstructionError is returned when there's an error assembling a request.
68type RequestConstructionError struct {
69 Err error
70}
71
72// Error returns a textual description of 'r'.
73func (r *RequestConstructionError) Error() string {
74 return fmt.Sprintf("request construction error: '%v'", r.Err)
75}
76
77// Request allows for building up a request to a server in a chained fashion.
78// Any errors are stored until the end of your call, so you only have to
79// check once.
80type Request struct {
81 // required
82 client HTTPClient
83 verb string
84
85 baseURL *url.URL
86 content ContentConfig
87 serializers Serializers
88
89 // generic components accessible via method setters
90 pathPrefix string
91 subpath string
92 params url.Values
93 headers http.Header
94
95 // structural elements of the request that are part of the Kubernetes API conventions
96 namespace string
97 namespaceSet bool
98 resource string
99 resourceName string
100 subresource string
101 timeout time.Duration
102
103 // output
104 err error
105 body io.Reader
106
107 // This is only used for per-request timeouts, deadlines, and cancellations.
108 ctx context.Context
109
110 backoffMgr BackoffManager
111 throttle flowcontrol.RateLimiter
112}
113
114// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
115func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
116 if backoff == nil {
117 glog.V(2).Infof("Not implementing request backoff strategy.")
118 backoff = &NoBackoff{}
119 }
120
121 pathPrefix := "/"
122 if baseURL != nil {
123 pathPrefix = path.Join(pathPrefix, baseURL.Path)
124 }
125 r := &Request{
126 client: client,
127 verb: verb,
128 baseURL: baseURL,
129 pathPrefix: path.Join(pathPrefix, versionedAPIPath),
130 content: content,
131 serializers: serializers,
132 backoffMgr: backoff,
133 throttle: throttle,
134 timeout: timeout,
135 }
136 switch {
137 case len(content.AcceptContentTypes) > 0:
138 r.SetHeader("Accept", content.AcceptContentTypes)
139 case len(content.ContentType) > 0:
140 r.SetHeader("Accept", content.ContentType+", */*")
141 }
142 return r
143}
144
145// Prefix adds segments to the relative beginning to the request path. These
146// items will be placed before the optional Namespace, Resource, or Name sections.
147// Setting AbsPath will clear any previously set Prefix segments
148func (r *Request) Prefix(segments ...string) *Request {
149 if r.err != nil {
150 return r
151 }
152 r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
153 return r
154}
155
156// Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
157// Namespace, Resource, or Name sections.
158func (r *Request) Suffix(segments ...string) *Request {
159 if r.err != nil {
160 return r
161 }
162 r.subpath = path.Join(r.subpath, path.Join(segments...))
163 return r
164}
165
166// Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
167func (r *Request) Resource(resource string) *Request {
168 if r.err != nil {
169 return r
170 }
171 if len(r.resource) != 0 {
172 r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
173 return r
174 }
175 if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
176 r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
177 return r
178 }
179 r.resource = resource
180 return r
181}
182
183// BackOff sets the request's backoff manager to the one specified,
184// or defaults to the stub implementation if nil is provided
185func (r *Request) BackOff(manager BackoffManager) *Request {
186 if manager == nil {
187 r.backoffMgr = &NoBackoff{}
188 return r
189 }
190
191 r.backoffMgr = manager
192 return r
193}
194
195// Throttle receives a rate-limiter and sets or replaces an existing request limiter
196func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
197 r.throttle = limiter
198 return r
199}
200
201// SubResource sets a sub-resource path which can be multiple segments segment after the resource
202// name but before the suffix.
203func (r *Request) SubResource(subresources ...string) *Request {
204 if r.err != nil {
205 return r
206 }
207 subresource := path.Join(subresources...)
208 if len(r.subresource) != 0 {
209 r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
210 return r
211 }
212 for _, s := range subresources {
213 if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
214 r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
215 return r
216 }
217 }
218 r.subresource = subresource
219 return r
220}
221
222// Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
223func (r *Request) Name(resourceName string) *Request {
224 if r.err != nil {
225 return r
226 }
227 if len(resourceName) == 0 {
228 r.err = fmt.Errorf("resource name may not be empty")
229 return r
230 }
231 if len(r.resourceName) != 0 {
232 r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
233 return r
234 }
235 if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
236 r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
237 return r
238 }
239 r.resourceName = resourceName
240 return r
241}
242
243// Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
244func (r *Request) Namespace(namespace string) *Request {
245 if r.err != nil {
246 return r
247 }
248 if r.namespaceSet {
249 r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
250 return r
251 }
252 if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
253 r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
254 return r
255 }
256 r.namespaceSet = true
257 r.namespace = namespace
258 return r
259}
260
261// NamespaceIfScoped is a convenience function to set a namespace if scoped is true
262func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
263 if scoped {
264 return r.Namespace(namespace)
265 }
266 return r
267}
268
269// AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
270// when a single segment is passed.
271func (r *Request) AbsPath(segments ...string) *Request {
272 if r.err != nil {
273 return r
274 }
275 r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
276 if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
277 // preserve any trailing slashes for legacy behavior
278 r.pathPrefix += "/"
279 }
280 return r
281}
282
283// RequestURI overwrites existing path and parameters with the value of the provided server relative
284// URI.
285func (r *Request) RequestURI(uri string) *Request {
286 if r.err != nil {
287 return r
288 }
289 locator, err := url.Parse(uri)
290 if err != nil {
291 r.err = err
292 return r
293 }
294 r.pathPrefix = locator.Path
295 if len(locator.Query()) > 0 {
296 if r.params == nil {
297 r.params = make(url.Values)
298 }
299 for k, v := range locator.Query() {
300 r.params[k] = v
301 }
302 }
303 return r
304}
305
306// Param creates a query parameter with the given string value.
307func (r *Request) Param(paramName, s string) *Request {
308 if r.err != nil {
309 return r
310 }
311 return r.setParam(paramName, s)
312}
313
314// VersionedParams will take the provided object, serialize it to a map[string][]string using the
315// implicit RESTClient API version and the default parameter codec, and then add those as parameters
316// to the request. Use this to provide versioned query parameters from client libraries.
317// VersionedParams will not write query parameters that have omitempty set and are empty. If a
318// parameter has already been set it is appended to (Params and VersionedParams are additive).
319func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
320 return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
321}
322
323func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
324 if r.err != nil {
325 return r
326 }
327 params, err := codec.EncodeParameters(obj, version)
328 if err != nil {
329 r.err = err
330 return r
331 }
332 for k, v := range params {
333 if r.params == nil {
334 r.params = make(url.Values)
335 }
336 r.params[k] = append(r.params[k], v...)
337 }
338 return r
339}
340
341func (r *Request) setParam(paramName, value string) *Request {
342 if r.params == nil {
343 r.params = make(url.Values)
344 }
345 r.params[paramName] = append(r.params[paramName], value)
346 return r
347}
348
349func (r *Request) SetHeader(key string, values ...string) *Request {
350 if r.headers == nil {
351 r.headers = http.Header{}
352 }
353 r.headers.Del(key)
354 for _, value := range values {
355 r.headers.Add(key, value)
356 }
357 return r
358}
359
360// Timeout makes the request use the given duration as an overall timeout for the
361// request. Additionally, if set passes the value as "timeout" parameter in URL.
362func (r *Request) Timeout(d time.Duration) *Request {
363 if r.err != nil {
364 return r
365 }
366 r.timeout = d
367 return r
368}
369
370// Body makes the request use obj as the body. Optional.
371// If obj is a string, try to read a file of that name.
372// If obj is a []byte, send it directly.
373// If obj is an io.Reader, use it directly.
374// If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
375// If obj is a runtime.Object and nil, do nothing.
376// Otherwise, set an error.
377func (r *Request) Body(obj interface{}) *Request {
378 if r.err != nil {
379 return r
380 }
381 switch t := obj.(type) {
382 case string:
383 data, err := ioutil.ReadFile(t)
384 if err != nil {
385 r.err = err
386 return r
387 }
388 glogBody("Request Body", data)
389 r.body = bytes.NewReader(data)
390 case []byte:
391 glogBody("Request Body", t)
392 r.body = bytes.NewReader(t)
393 case io.Reader:
394 r.body = t
395 case runtime.Object:
396 // callers may pass typed interface pointers, therefore we must check nil with reflection
397 if reflect.ValueOf(t).IsNil() {
398 return r
399 }
400 data, err := runtime.Encode(r.serializers.Encoder, t)
401 if err != nil {
402 r.err = err
403 return r
404 }
405 glogBody("Request Body", data)
406 r.body = bytes.NewReader(data)
407 r.SetHeader("Content-Type", r.content.ContentType)
408 default:
409 r.err = fmt.Errorf("unknown type used for body: %+v", obj)
410 }
411 return r
412}
413
414// Context adds a context to the request. Contexts are only used for
415// timeouts, deadlines, and cancellations.
416func (r *Request) Context(ctx context.Context) *Request {
417 r.ctx = ctx
418 return r
419}
420
421// URL returns the current working URL.
422func (r *Request) URL() *url.URL {
423 p := r.pathPrefix
424 if r.namespaceSet && len(r.namespace) > 0 {
425 p = path.Join(p, "namespaces", r.namespace)
426 }
427 if len(r.resource) != 0 {
428 p = path.Join(p, strings.ToLower(r.resource))
429 }
430 // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
431 if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
432 p = path.Join(p, r.resourceName, r.subresource, r.subpath)
433 }
434
435 finalURL := &url.URL{}
436 if r.baseURL != nil {
437 *finalURL = *r.baseURL
438 }
439 finalURL.Path = p
440
441 query := url.Values{}
442 for key, values := range r.params {
443 for _, value := range values {
444 query.Add(key, value)
445 }
446 }
447
448 // timeout is handled specially here.
449 if r.timeout != 0 {
450 query.Set("timeout", r.timeout.String())
451 }
452 finalURL.RawQuery = query.Encode()
453 return finalURL
454}
455
456// finalURLTemplate is similar to URL(), but will make all specific parameter values equal
457// - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
458// parameters will be reset. This creates a copy of the request so as not to change the
459// underlying object. This means some useful request info (like the types of field
460// selectors in use) will be lost.
461// TODO: preserve field selector keys
462func (r Request) finalURLTemplate() url.URL {
463 if len(r.resourceName) != 0 {
464 r.resourceName = "{name}"
465 }
466 if r.namespaceSet && len(r.namespace) != 0 {
467 r.namespace = "{namespace}"
468 }
469 newParams := url.Values{}
470 v := []string{"{value}"}
471 for k := range r.params {
472 newParams[k] = v
473 }
474 r.params = newParams
475 url := r.URL()
476 return *url
477}
478
479func (r *Request) tryThrottle() {
480 now := time.Now()
481 if r.throttle != nil {
482 r.throttle.Accept()
483 }
484 if latency := time.Since(now); latency > longThrottleLatency {
485 glog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
486 }
487}
488
489// Watch attempts to begin watching the requested location.
490// Returns a watch.Interface, or an error.
491func (r *Request) Watch() (watch.Interface, error) {
492 return r.WatchWithSpecificDecoders(
493 func(body io.ReadCloser) streaming.Decoder {
494 framer := r.serializers.Framer.NewFrameReader(body)
495 return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
496 },
497 r.serializers.Decoder,
498 )
499}
500
501// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
502// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
503// Returns a watch.Interface, or an error.
504func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
505 // We specifically don't want to rate limit watches, so we
506 // don't use r.throttle here.
507 if r.err != nil {
508 return nil, r.err
509 }
510 if r.serializers.Framer == nil {
511 return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
512 }
513
514 url := r.URL().String()
515 req, err := http.NewRequest(r.verb, url, r.body)
516 if err != nil {
517 return nil, err
518 }
519 if r.ctx != nil {
520 req = req.WithContext(r.ctx)
521 }
522 req.Header = r.headers
523 client := r.client
524 if client == nil {
525 client = http.DefaultClient
526 }
527 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
528 resp, err := client.Do(req)
529 updateURLMetrics(r, resp, err)
530 if r.baseURL != nil {
531 if err != nil {
532 r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
533 } else {
534 r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
535 }
536 }
537 if err != nil {
538 // The watch stream mechanism handles many common partial data errors, so closed
539 // connections can be retried in many cases.
540 if net.IsProbableEOF(err) {
541 return watch.NewEmptyWatch(), nil
542 }
543 return nil, err
544 }
545 if resp.StatusCode != http.StatusOK {
546 defer resp.Body.Close()
547 if result := r.transformResponse(resp, req); result.err != nil {
548 return nil, result.err
549 }
550 return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
551 }
552 wrapperDecoder := wrapperDecoderFn(resp.Body)
553 return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
554}
555
556// updateURLMetrics is a convenience function for pushing metrics.
557// It also handles corner cases for incomplete/invalid request data.
558func updateURLMetrics(req *Request, resp *http.Response, err error) {
559 url := "none"
560 if req.baseURL != nil {
561 url = req.baseURL.Host
562 }
563
564 // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
565 // system so we just report them as `<error>`.
566 if err != nil {
567 metrics.RequestResult.Increment("<error>", req.verb, url)
568 } else {
569 //Metrics for failure codes
570 metrics.RequestResult.Increment(strconv.Itoa(resp.StatusCode), req.verb, url)
571 }
572}
573
574// Stream formats and executes the request, and offers streaming of the response.
575// Returns io.ReadCloser which could be used for streaming of the response, or an error
576// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
577// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
578func (r *Request) Stream() (io.ReadCloser, error) {
579 if r.err != nil {
580 return nil, r.err
581 }
582
583 r.tryThrottle()
584
585 url := r.URL().String()
586 req, err := http.NewRequest(r.verb, url, nil)
587 if err != nil {
588 return nil, err
589 }
590 if r.ctx != nil {
591 req = req.WithContext(r.ctx)
592 }
593 req.Header = r.headers
594 client := r.client
595 if client == nil {
596 client = http.DefaultClient
597 }
598 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
599 resp, err := client.Do(req)
600 updateURLMetrics(r, resp, err)
601 if r.baseURL != nil {
602 if err != nil {
603 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
604 } else {
605 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
606 }
607 }
608 if err != nil {
609 return nil, err
610 }
611
612 switch {
613 case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
614 return resp.Body, nil
615
616 default:
617 // ensure we close the body before returning the error
618 defer resp.Body.Close()
619
620 result := r.transformResponse(resp, req)
621 err := result.Error()
622 if err == nil {
623 err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
624 }
625 return nil, err
626 }
627}
628
629// request connects to the server and invokes the provided function when a server response is
630// received. It handles retry behavior and up front validation of requests. It will invoke
631// fn at most once. It will return an error if a problem occurred prior to connecting to the
632// server - the provided function is responsible for handling server errors.
633func (r *Request) request(fn func(*http.Request, *http.Response)) error {
634 //Metrics for total request latency
635 start := time.Now()
636 defer func() {
637 metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
638 }()
639
640 if r.err != nil {
641 glog.V(4).Infof("Error in request: %v", r.err)
642 return r.err
643 }
644
645 // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
646 if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
647 return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
648 }
649 if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
650 return fmt.Errorf("an empty namespace may not be set during creation")
651 }
652
653 client := r.client
654 if client == nil {
655 client = http.DefaultClient
656 }
657
658 // Right now we make about ten retry attempts if we get a Retry-After response.
659 maxRetries := 10
660 retries := 0
661 for {
662 url := r.URL().String()
663 req, err := http.NewRequest(r.verb, url, r.body)
664 if err != nil {
665 return err
666 }
667 if r.timeout > 0 {
668 if r.ctx == nil {
669 r.ctx = context.Background()
670 }
671 var cancelFn context.CancelFunc
672 r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
673 defer cancelFn()
674 }
675 if r.ctx != nil {
676 req = req.WithContext(r.ctx)
677 }
678 req.Header = r.headers
679
680 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
681 if retries > 0 {
682 // We are retrying the request that we already send to apiserver
683 // at least once before.
684 // This request should also be throttled with the client-internal throttler.
685 r.tryThrottle()
686 }
687 resp, err := client.Do(req)
688 updateURLMetrics(r, resp, err)
689 if err != nil {
690 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
691 } else {
692 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
693 }
694 if err != nil {
695 // "Connection reset by peer" is usually a transient error.
696 // Thus in case of "GET" operations, we simply retry it.
697 // We are not automatically retrying "write" operations, as
698 // they are not idempotent.
699 if !net.IsConnectionReset(err) || r.verb != "GET" {
700 return err
701 }
702 // For the purpose of retry, we set the artificial "retry-after" response.
703 // TODO: Should we clean the original response if it exists?
704 resp = &http.Response{
705 StatusCode: http.StatusInternalServerError,
706 Header: http.Header{"Retry-After": []string{"1"}},
707 Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
708 }
709 }
710
711 done := func() bool {
712 // Ensure the response body is fully read and closed
713 // before we reconnect, so that we reuse the same TCP
714 // connection.
715 defer func() {
716 const maxBodySlurpSize = 2 << 10
717 if resp.ContentLength <= maxBodySlurpSize {
718 io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
719 }
720 resp.Body.Close()
721 }()
722
723 retries++
724 if seconds, wait := checkWait(resp); wait && retries < maxRetries {
725 if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
726 _, err := seeker.Seek(0, 0)
727 if err != nil {
728 glog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
729 fn(req, resp)
730 return true
731 }
732 }
733
734 glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url)
735 r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
736 return false
737 }
738 fn(req, resp)
739 return true
740 }()
741 if done {
742 return nil
743 }
744 }
745}
746
747// Do formats and executes the request. Returns a Result object for easy response
748// processing.
749//
750// Error type:
751// * If the request can't be constructed, or an error happened earlier while building its
752// arguments: *RequestConstructionError
753// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
754// * http.Client.Do errors are returned directly.
755func (r *Request) Do() Result {
756 r.tryThrottle()
757
758 var result Result
759 err := r.request(func(req *http.Request, resp *http.Response) {
760 result = r.transformResponse(resp, req)
761 })
762 if err != nil {
763 return Result{err: err}
764 }
765 return result
766}
767
768// DoRaw executes the request but does not process the response body.
769func (r *Request) DoRaw() ([]byte, error) {
770 r.tryThrottle()
771
772 var result Result
773 err := r.request(func(req *http.Request, resp *http.Response) {
774 result.body, result.err = ioutil.ReadAll(resp.Body)
775 glogBody("Response Body", result.body)
776 if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
777 result.err = r.transformUnstructuredResponseError(resp, req, result.body)
778 }
779 })
780 if err != nil {
781 return nil, err
782 }
783 return result.body, result.err
784}
785
786// transformResponse converts an API response into a structured API object
787func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
788 var body []byte
789 if resp.Body != nil {
790 data, err := ioutil.ReadAll(resp.Body)
791 switch err.(type) {
792 case nil:
793 body = data
794 case http2.StreamError:
795 // This is trying to catch the scenario that the server may close the connection when sending the
796 // response body. This can be caused by server timeout due to a slow network connection.
797 // TODO: Add test for this. Steps may be:
798 // 1. client-go (or kubectl) sends a GET request.
799 // 2. Apiserver sends back the headers and then part of the body
800 // 3. Apiserver closes connection.
801 // 4. client-go should catch this and return an error.
802 glog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
803 streamErr := fmt.Errorf("Stream error %#v when reading response body, may be caused by closed connection. Please retry.", err)
804 return Result{
805 err: streamErr,
806 }
807 default:
808 glog.Errorf("Unexpected error when reading response body: %#v", err)
809 unexpectedErr := fmt.Errorf("Unexpected error %#v when reading response body. Please retry.", err)
810 return Result{
811 err: unexpectedErr,
812 }
813 }
814 }
815
816 glogBody("Response Body", body)
817
818 // verify the content type is accurate
819 contentType := resp.Header.Get("Content-Type")
820 decoder := r.serializers.Decoder
821 if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
822 mediaType, params, err := mime.ParseMediaType(contentType)
823 if err != nil {
824 return Result{err: errors.NewInternalError(err)}
825 }
826 decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
827 if err != nil {
828 // if we fail to negotiate a decoder, treat this as an unstructured error
829 switch {
830 case resp.StatusCode == http.StatusSwitchingProtocols:
831 // no-op, we've been upgraded
832 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
833 return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
834 }
835 return Result{
836 body: body,
837 contentType: contentType,
838 statusCode: resp.StatusCode,
839 }
840 }
841 }
842
843 switch {
844 case resp.StatusCode == http.StatusSwitchingProtocols:
845 // no-op, we've been upgraded
846 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
847 // calculate an unstructured error from the response which the Result object may use if the caller
848 // did not return a structured error.
849 retryAfter, _ := retryAfterSeconds(resp)
850 err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
851 return Result{
852 body: body,
853 contentType: contentType,
854 statusCode: resp.StatusCode,
855 decoder: decoder,
856 err: err,
857 }
858 }
859
860 return Result{
861 body: body,
862 contentType: contentType,
863 statusCode: resp.StatusCode,
864 decoder: decoder,
865 }
866}
867
868// truncateBody decides if the body should be truncated, based on the glog Verbosity.
869func truncateBody(body string) string {
870 max := 0
871 switch {
872 case bool(glog.V(10)):
873 return body
874 case bool(glog.V(9)):
875 max = 10240
876 case bool(glog.V(8)):
877 max = 1024
878 }
879
880 if len(body) <= max {
881 return body
882 }
883
884 return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
885}
886
887// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
888// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
889// whether the body is printable.
890func glogBody(prefix string, body []byte) {
891 if glog.V(8) {
892 if bytes.IndexFunc(body, func(r rune) bool {
893 return r < 0x0a
894 }) != -1 {
895 glog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
896 } else {
897 glog.Infof("%s: %s", prefix, truncateBody(string(body)))
898 }
899 }
900}
901
902// maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
903const maxUnstructuredResponseTextBytes = 2048
904
905// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
906// It is expected to transform any response that is not recognizable as a clear server sent error from the
907// K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
908// introduce a level of uncertainty to the responses returned by servers that in common use result in
909// unexpected responses. The rough structure is:
910//
911// 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
912// - this is the happy path
913// - when you get this output, trust what the server sends
914// 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
915// generate a reasonable facsimile of the original failure.
916// - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
917// 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
918// 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
919// initial contact, the presence of mismatched body contents from posted content types
920// - Give these a separate distinct error type and capture as much as possible of the original message
921//
922// TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
923func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
924 if body == nil && resp.Body != nil {
925 if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
926 body = data
927 }
928 }
929 retryAfter, _ := retryAfterSeconds(resp)
930 return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
931}
932
933// newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
934func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
935 // cap the amount of output we create
936 if len(body) > maxUnstructuredResponseTextBytes {
937 body = body[:maxUnstructuredResponseTextBytes]
938 }
939
940 message := "unknown"
941 if isTextResponse {
942 message = strings.TrimSpace(string(body))
943 }
944 var groupResource schema.GroupResource
945 if len(r.resource) > 0 {
946 groupResource.Group = r.content.GroupVersion.Group
947 groupResource.Resource = r.resource
948 }
949 return errors.NewGenericServerResponse(
950 statusCode,
951 method,
952 groupResource,
953 r.resourceName,
954 message,
955 retryAfter,
956 true,
957 )
958}
959
960// isTextResponse returns true if the response appears to be a textual media type.
961func isTextResponse(resp *http.Response) bool {
962 contentType := resp.Header.Get("Content-Type")
963 if len(contentType) == 0 {
964 return true
965 }
966 media, _, err := mime.ParseMediaType(contentType)
967 if err != nil {
968 return false
969 }
970 return strings.HasPrefix(media, "text/")
971}
972
973// checkWait returns true along with a number of seconds if the server instructed us to wait
974// before retrying.
975func checkWait(resp *http.Response) (int, bool) {
976 switch r := resp.StatusCode; {
977 // any 500 error code and 429 can trigger a wait
978 case r == http.StatusTooManyRequests, r >= 500:
979 default:
980 return 0, false
981 }
982 i, ok := retryAfterSeconds(resp)
983 return i, ok
984}
985
986// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
987// the header was missing or not a valid number.
988func retryAfterSeconds(resp *http.Response) (int, bool) {
989 if h := resp.Header.Get("Retry-After"); len(h) > 0 {
990 if i, err := strconv.Atoi(h); err == nil {
991 return i, true
992 }
993 }
994 return 0, false
995}
996
997// Result contains the result of calling Request.Do().
998type Result struct {
999 body []byte
1000 contentType string
1001 err error
1002 statusCode int
1003
1004 decoder runtime.Decoder
1005}
1006
1007// Raw returns the raw result.
1008func (r Result) Raw() ([]byte, error) {
1009 return r.body, r.err
1010}
1011
1012// Get returns the result as an object, which means it passes through the decoder.
1013// If the returned object is of type Status and has .Status != StatusSuccess, the
1014// additional information in Status will be used to enrich the error.
1015func (r Result) Get() (runtime.Object, error) {
1016 if r.err != nil {
1017 // Check whether the result has a Status object in the body and prefer that.
1018 return nil, r.Error()
1019 }
1020 if r.decoder == nil {
1021 return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1022 }
1023
1024 // decode, but if the result is Status return that as an error instead.
1025 out, _, err := r.decoder.Decode(r.body, nil, nil)
1026 if err != nil {
1027 return nil, err
1028 }
1029 switch t := out.(type) {
1030 case *metav1.Status:
1031 // any status besides StatusSuccess is considered an error.
1032 if t.Status != metav1.StatusSuccess {
1033 return nil, errors.FromObject(t)
1034 }
1035 }
1036 return out, nil
1037}
1038
1039// StatusCode returns the HTTP status code of the request. (Only valid if no
1040// error was returned.)
1041func (r Result) StatusCode(statusCode *int) Result {
1042 *statusCode = r.statusCode
1043 return r
1044}
1045
1046// Into stores the result into obj, if possible. If obj is nil it is ignored.
1047// If the returned object is of type Status and has .Status != StatusSuccess, the
1048// additional information in Status will be used to enrich the error.
1049func (r Result) Into(obj runtime.Object) error {
1050 if r.err != nil {
1051 // Check whether the result has a Status object in the body and prefer that.
1052 return r.Error()
1053 }
1054 if r.decoder == nil {
1055 return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1056 }
1057 if len(r.body) == 0 {
1058 return fmt.Errorf("0-length response")
1059 }
1060
1061 out, _, err := r.decoder.Decode(r.body, nil, obj)
1062 if err != nil || out == obj {
1063 return err
1064 }
1065 // if a different object is returned, see if it is Status and avoid double decoding
1066 // the object.
1067 switch t := out.(type) {
1068 case *metav1.Status:
1069 // any status besides StatusSuccess is considered an error.
1070 if t.Status != metav1.StatusSuccess {
1071 return errors.FromObject(t)
1072 }
1073 }
1074 return nil
1075}
1076
1077// WasCreated updates the provided bool pointer to whether the server returned
1078// 201 created or a different response.
1079func (r Result) WasCreated(wasCreated *bool) Result {
1080 *wasCreated = r.statusCode == http.StatusCreated
1081 return r
1082}
1083
1084// Error returns the error executing the request, nil if no error occurred.
1085// If the returned object is of type Status and has Status != StatusSuccess, the
1086// additional information in Status will be used to enrich the error.
1087// See the Request.Do() comment for what errors you might get.
1088func (r Result) Error() error {
1089 // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
1090 // a Status object.
1091 if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
1092 return r.err
1093 }
1094
1095 // attempt to convert the body into a Status object
1096 // to be backwards compatible with old servers that do not return a version, default to "v1"
1097 out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
1098 if err != nil {
1099 glog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
1100 return r.err
1101 }
1102 switch t := out.(type) {
1103 case *metav1.Status:
1104 // because we default the kind, we *must* check for StatusFailure
1105 if t.Status == metav1.StatusFailure {
1106 return errors.FromObject(t)
1107 }
1108 }
1109 return r.err
1110}
1111
1112// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
1113var NameMayNotBe = []string{".", ".."}
1114
1115// NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
1116var NameMayNotContain = []string{"/", "%"}
1117
1118// IsValidPathSegmentName validates the name can be safely encoded as a path segment
1119func IsValidPathSegmentName(name string) []string {
1120 for _, illegalName := range NameMayNotBe {
1121 if name == illegalName {
1122 return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
1123 }
1124 }
1125
1126 var errors []string
1127 for _, illegalContent := range NameMayNotContain {
1128 if strings.Contains(name, illegalContent) {
1129 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1130 }
1131 }
1132
1133 return errors
1134}
1135
1136// IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
1137// It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
1138func IsValidPathSegmentPrefix(name string) []string {
1139 var errors []string
1140 for _, illegalContent := range NameMayNotContain {
1141 if strings.Contains(name, illegalContent) {
1142 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1143 }
1144 }
1145
1146 return errors
1147}
1148
1149// ValidatePathSegmentName validates the name can be safely encoded as a path segment
1150func ValidatePathSegmentName(name string, prefix bool) []string {
1151 if prefix {
1152 return IsValidPathSegmentPrefix(name)
1153 } else {
1154 return IsValidPathSegmentName(name)
1155 }
1156}