Matthias Andreas Benkard | 832a54e | 2019-01-29 09:27:38 +0100 | [diff] [blame] | 1 | /* |
| 2 | Copyright 2017 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package handlers |
| 18 | |
| 19 | import ( |
| 20 | "context" |
| 21 | "fmt" |
| 22 | "net/http" |
| 23 | "strings" |
| 24 | "time" |
| 25 | |
| 26 | "github.com/evanphx/json-patch" |
| 27 | |
| 28 | "k8s.io/apimachinery/pkg/api/errors" |
| 29 | "k8s.io/apimachinery/pkg/runtime" |
| 30 | "k8s.io/apimachinery/pkg/runtime/schema" |
| 31 | "k8s.io/apimachinery/pkg/types" |
| 32 | "k8s.io/apimachinery/pkg/util/json" |
| 33 | "k8s.io/apimachinery/pkg/util/mergepatch" |
| 34 | "k8s.io/apimachinery/pkg/util/sets" |
| 35 | "k8s.io/apimachinery/pkg/util/strategicpatch" |
| 36 | "k8s.io/apiserver/pkg/admission" |
| 37 | "k8s.io/apiserver/pkg/audit" |
| 38 | "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" |
| 39 | "k8s.io/apiserver/pkg/endpoints/request" |
| 40 | "k8s.io/apiserver/pkg/registry/rest" |
| 41 | utiltrace "k8s.io/apiserver/pkg/util/trace" |
| 42 | ) |
| 43 | |
| 44 | // PatchResource returns a function that will handle a resource patch. |
| 45 | func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface, patchTypes []string) http.HandlerFunc { |
| 46 | return func(w http.ResponseWriter, req *http.Request) { |
| 47 | // For performance tracking purposes. |
| 48 | trace := utiltrace.New("Patch " + req.URL.Path) |
| 49 | defer trace.LogIfLong(500 * time.Millisecond) |
| 50 | |
| 51 | if isDryRun(req.URL) { |
| 52 | scope.err(errors.NewBadRequest("dryRun is not supported yet"), w, req) |
| 53 | return |
| 54 | } |
| 55 | |
| 56 | // Do this first, otherwise name extraction can fail for unrecognized content types |
| 57 | // TODO: handle this in negotiation |
| 58 | contentType := req.Header.Get("Content-Type") |
| 59 | // Remove "; charset=" if included in header. |
| 60 | if idx := strings.Index(contentType, ";"); idx > 0 { |
| 61 | contentType = contentType[:idx] |
| 62 | } |
| 63 | patchType := types.PatchType(contentType) |
| 64 | |
| 65 | // Ensure the patchType is one we support |
| 66 | if !sets.NewString(patchTypes...).Has(contentType) { |
| 67 | scope.err(negotiation.NewUnsupportedMediaTypeError(patchTypes), w, req) |
| 68 | return |
| 69 | } |
| 70 | |
| 71 | // TODO: we either want to remove timeout or document it (if we |
| 72 | // document, move timeout out of this function and declare it in |
| 73 | // api_installer) |
| 74 | timeout := parseTimeout(req.URL.Query().Get("timeout")) |
| 75 | |
| 76 | namespace, name, err := scope.Namer.Name(req) |
| 77 | if err != nil { |
| 78 | scope.err(err, w, req) |
| 79 | return |
| 80 | } |
| 81 | |
| 82 | ctx := req.Context() |
| 83 | ctx = request.WithNamespace(ctx, namespace) |
| 84 | |
| 85 | patchJS, err := readBody(req) |
| 86 | if err != nil { |
| 87 | scope.err(err, w, req) |
| 88 | return |
| 89 | } |
| 90 | |
| 91 | ae := request.AuditEventFrom(ctx) |
| 92 | admit = admission.WithAudit(admit, ae) |
| 93 | |
| 94 | audit.LogRequestPatch(ae, patchJS) |
| 95 | trace.Step("Recorded the audit event") |
| 96 | |
| 97 | s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), runtime.ContentTypeJSON) |
| 98 | if !ok { |
| 99 | scope.err(fmt.Errorf("no serializer defined for JSON"), w, req) |
| 100 | return |
| 101 | } |
| 102 | gv := scope.Kind.GroupVersion() |
| 103 | codec := runtime.NewCodec( |
| 104 | scope.Serializer.EncoderForVersion(s.Serializer, gv), |
| 105 | scope.Serializer.DecoderToVersion(s.Serializer, schema.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal}), |
| 106 | ) |
| 107 | |
| 108 | userInfo, _ := request.UserFrom(ctx) |
| 109 | staticAdmissionAttributes := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo) |
| 110 | admissionCheck := func(updatedObject runtime.Object, currentObject runtime.Object) error { |
| 111 | if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && admit.Handles(admission.Update) { |
| 112 | return mutatingAdmission.Admit(admission.NewAttributesRecord(updatedObject, currentObject, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo)) |
| 113 | } |
| 114 | return nil |
| 115 | } |
| 116 | |
| 117 | p := patcher{ |
| 118 | namer: scope.Namer, |
| 119 | creater: scope.Creater, |
| 120 | defaulter: scope.Defaulter, |
| 121 | unsafeConvertor: scope.UnsafeConvertor, |
| 122 | kind: scope.Kind, |
| 123 | resource: scope.Resource, |
| 124 | |
| 125 | createValidation: rest.AdmissionToValidateObjectFunc(admit, staticAdmissionAttributes), |
| 126 | updateValidation: rest.AdmissionToValidateObjectUpdateFunc(admit, staticAdmissionAttributes), |
| 127 | admissionCheck: admissionCheck, |
| 128 | |
| 129 | codec: codec, |
| 130 | |
| 131 | timeout: timeout, |
| 132 | |
| 133 | restPatcher: r, |
| 134 | name: name, |
| 135 | patchType: patchType, |
| 136 | patchJS: patchJS, |
| 137 | |
| 138 | trace: trace, |
| 139 | } |
| 140 | |
| 141 | result, err := p.patchResource(ctx) |
| 142 | if err != nil { |
| 143 | scope.err(err, w, req) |
| 144 | return |
| 145 | } |
| 146 | trace.Step("Object stored in database") |
| 147 | |
| 148 | requestInfo, ok := request.RequestInfoFrom(ctx) |
| 149 | if !ok { |
| 150 | scope.err(fmt.Errorf("missing requestInfo"), w, req) |
| 151 | return |
| 152 | } |
| 153 | if err := setSelfLink(result, requestInfo, scope.Namer); err != nil { |
| 154 | scope.err(err, w, req) |
| 155 | return |
| 156 | } |
| 157 | trace.Step("Self-link added") |
| 158 | |
| 159 | transformResponseObject(ctx, scope, req, w, http.StatusOK, result) |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | type mutateObjectUpdateFunc func(obj, old runtime.Object) error |
| 164 | |
| 165 | // patcher breaks the process of patch application and retries into smaller |
| 166 | // pieces of functionality. |
| 167 | // TODO: Use builder pattern to construct this object? |
| 168 | // TODO: As part of that effort, some aspects of PatchResource above could be |
| 169 | // moved into this type. |
| 170 | type patcher struct { |
| 171 | // Pieces of RequestScope |
| 172 | namer ScopeNamer |
| 173 | creater runtime.ObjectCreater |
| 174 | defaulter runtime.ObjectDefaulter |
| 175 | unsafeConvertor runtime.ObjectConvertor |
| 176 | resource schema.GroupVersionResource |
| 177 | kind schema.GroupVersionKind |
| 178 | |
| 179 | // Validation functions |
| 180 | createValidation rest.ValidateObjectFunc |
| 181 | updateValidation rest.ValidateObjectUpdateFunc |
| 182 | admissionCheck mutateObjectUpdateFunc |
| 183 | |
| 184 | codec runtime.Codec |
| 185 | |
| 186 | timeout time.Duration |
| 187 | |
| 188 | // Operation information |
| 189 | restPatcher rest.Patcher |
| 190 | name string |
| 191 | patchType types.PatchType |
| 192 | patchJS []byte |
| 193 | |
| 194 | trace *utiltrace.Trace |
| 195 | |
| 196 | // Set at invocation-time (by applyPatch) and immutable thereafter |
| 197 | namespace string |
| 198 | updatedObjectInfo rest.UpdatedObjectInfo |
| 199 | mechanism patchMechanism |
| 200 | } |
| 201 | |
| 202 | func (p *patcher) toUnversioned(versionedObj runtime.Object) (runtime.Object, error) { |
| 203 | gvk := p.kind.GroupKind().WithVersion(runtime.APIVersionInternal) |
| 204 | return p.unsafeConvertor.ConvertToVersion(versionedObj, gvk.GroupVersion()) |
| 205 | } |
| 206 | |
| 207 | type patchMechanism interface { |
| 208 | applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) |
| 209 | } |
| 210 | |
| 211 | type jsonPatcher struct { |
| 212 | *patcher |
| 213 | } |
| 214 | |
| 215 | func (p *jsonPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) { |
| 216 | // Encode will convert & return a versioned object in JSON. |
| 217 | currentObjJS, err := runtime.Encode(p.codec, currentObject) |
| 218 | if err != nil { |
| 219 | return nil, err |
| 220 | } |
| 221 | |
| 222 | // Apply the patch. |
| 223 | patchedObjJS, err := p.applyJSPatch(currentObjJS) |
| 224 | if err != nil { |
| 225 | return nil, interpretPatchError(err) |
| 226 | } |
| 227 | |
| 228 | // Construct the resulting typed, unversioned object. |
| 229 | objToUpdate := p.restPatcher.New() |
| 230 | if err := runtime.DecodeInto(p.codec, patchedObjJS, objToUpdate); err != nil { |
| 231 | return nil, err |
| 232 | } |
| 233 | |
| 234 | return objToUpdate, nil |
| 235 | } |
| 236 | |
| 237 | // patchJS applies the patch. Input and output objects must both have |
| 238 | // the external version, since that is what the patch must have been constructed against. |
| 239 | func (p *jsonPatcher) applyJSPatch(versionedJS []byte) (patchedJS []byte, retErr error) { |
| 240 | switch p.patchType { |
| 241 | case types.JSONPatchType: |
| 242 | patchObj, err := jsonpatch.DecodePatch(p.patchJS) |
| 243 | if err != nil { |
| 244 | return nil, err |
| 245 | } |
| 246 | return patchObj.Apply(versionedJS) |
| 247 | case types.MergePatchType: |
| 248 | return jsonpatch.MergePatch(versionedJS, p.patchJS) |
| 249 | default: |
| 250 | // only here as a safety net - go-restful filters content-type |
| 251 | return nil, fmt.Errorf("unknown Content-Type header for patch: %v", p.patchType) |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | type smpPatcher struct { |
| 256 | *patcher |
| 257 | |
| 258 | // Schema |
| 259 | schemaReferenceObj runtime.Object |
| 260 | } |
| 261 | |
| 262 | func (p *smpPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) { |
| 263 | // Since the patch is applied on versioned objects, we need to convert the |
| 264 | // current object to versioned representation first. |
| 265 | currentVersionedObject, err := p.unsafeConvertor.ConvertToVersion(currentObject, p.kind.GroupVersion()) |
| 266 | if err != nil { |
| 267 | return nil, err |
| 268 | } |
| 269 | versionedObjToUpdate, err := p.creater.New(p.kind) |
| 270 | if err != nil { |
| 271 | return nil, err |
| 272 | } |
| 273 | if err := strategicPatchObject(p.codec, p.defaulter, currentVersionedObject, p.patchJS, versionedObjToUpdate, p.schemaReferenceObj); err != nil { |
| 274 | return nil, err |
| 275 | } |
| 276 | // Convert the object back to unversioned (aka internal version). |
| 277 | unversionedObjToUpdate, err := p.toUnversioned(versionedObjToUpdate) |
| 278 | if err != nil { |
| 279 | return nil, err |
| 280 | } |
| 281 | |
| 282 | return unversionedObjToUpdate, nil |
| 283 | } |
| 284 | |
| 285 | // strategicPatchObject applies a strategic merge patch of <patchJS> to |
| 286 | // <originalObject> and stores the result in <objToUpdate>. |
| 287 | // It additionally returns the map[string]interface{} representation of the |
| 288 | // <originalObject> and <patchJS>. |
| 289 | // NOTE: Both <originalObject> and <objToUpdate> are supposed to be versioned. |
| 290 | func strategicPatchObject( |
| 291 | codec runtime.Codec, |
| 292 | defaulter runtime.ObjectDefaulter, |
| 293 | originalObject runtime.Object, |
| 294 | patchJS []byte, |
| 295 | objToUpdate runtime.Object, |
| 296 | schemaReferenceObj runtime.Object, |
| 297 | ) error { |
| 298 | originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(originalObject) |
| 299 | if err != nil { |
| 300 | return err |
| 301 | } |
| 302 | |
| 303 | patchMap := make(map[string]interface{}) |
| 304 | if err := json.Unmarshal(patchJS, &patchMap); err != nil { |
| 305 | return errors.NewBadRequest(err.Error()) |
| 306 | } |
| 307 | |
| 308 | if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, schemaReferenceObj); err != nil { |
| 309 | return err |
| 310 | } |
| 311 | return nil |
| 312 | } |
| 313 | |
| 314 | // applyPatch is called every time GuaranteedUpdate asks for the updated object, |
| 315 | // and is given the currently persisted object as input. |
| 316 | func (p *patcher) applyPatch(_ context.Context, _, currentObject runtime.Object) (runtime.Object, error) { |
| 317 | // Make sure we actually have a persisted currentObject |
| 318 | p.trace.Step("About to apply patch") |
| 319 | if hasUID, err := hasUID(currentObject); err != nil { |
| 320 | return nil, err |
| 321 | } else if !hasUID { |
| 322 | return nil, errors.NewNotFound(p.resource.GroupResource(), p.name) |
| 323 | } |
| 324 | |
| 325 | objToUpdate, err := p.mechanism.applyPatchToCurrentObject(currentObject) |
| 326 | if err != nil { |
| 327 | return nil, err |
| 328 | } |
| 329 | if err := checkName(objToUpdate, p.name, p.namespace, p.namer); err != nil { |
| 330 | return nil, err |
| 331 | } |
| 332 | return objToUpdate, nil |
| 333 | } |
| 334 | |
| 335 | // applyAdmission is called every time GuaranteedUpdate asks for the updated object, |
| 336 | // and is given the currently persisted object and the patched object as input. |
| 337 | func (p *patcher) applyAdmission(ctx context.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) { |
| 338 | p.trace.Step("About to check admission control") |
| 339 | return patchedObject, p.admissionCheck(patchedObject, currentObject) |
| 340 | } |
| 341 | |
| 342 | // patchResource divides PatchResource for easier unit testing |
| 343 | func (p *patcher) patchResource(ctx context.Context) (runtime.Object, error) { |
| 344 | p.namespace = request.NamespaceValue(ctx) |
| 345 | switch p.patchType { |
| 346 | case types.JSONPatchType, types.MergePatchType: |
| 347 | p.mechanism = &jsonPatcher{patcher: p} |
| 348 | case types.StrategicMergePatchType: |
| 349 | schemaReferenceObj, err := p.unsafeConvertor.ConvertToVersion(p.restPatcher.New(), p.kind.GroupVersion()) |
| 350 | if err != nil { |
| 351 | return nil, err |
| 352 | } |
| 353 | p.mechanism = &smpPatcher{patcher: p, schemaReferenceObj: schemaReferenceObj} |
| 354 | default: |
| 355 | return nil, fmt.Errorf("%v: unimplemented patch type", p.patchType) |
| 356 | } |
| 357 | p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission) |
| 358 | return finishRequest(p.timeout, func() (runtime.Object, error) { |
| 359 | updateObject, _, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation) |
| 360 | return updateObject, updateErr |
| 361 | }) |
| 362 | } |
| 363 | |
| 364 | // applyPatchToObject applies a strategic merge patch of <patchMap> to |
| 365 | // <originalMap> and stores the result in <objToUpdate>. |
| 366 | // NOTE: <objToUpdate> must be a versioned object. |
| 367 | func applyPatchToObject( |
| 368 | codec runtime.Codec, |
| 369 | defaulter runtime.ObjectDefaulter, |
| 370 | originalMap map[string]interface{}, |
| 371 | patchMap map[string]interface{}, |
| 372 | objToUpdate runtime.Object, |
| 373 | schemaReferenceObj runtime.Object, |
| 374 | ) error { |
| 375 | patchedObjMap, err := strategicpatch.StrategicMergeMapPatch(originalMap, patchMap, schemaReferenceObj) |
| 376 | if err != nil { |
| 377 | return interpretPatchError(err) |
| 378 | } |
| 379 | |
| 380 | // Rather than serialize the patched map to JSON, then decode it to an object, we go directly from a map to an object |
| 381 | if err := runtime.DefaultUnstructuredConverter.FromUnstructured(patchedObjMap, objToUpdate); err != nil { |
| 382 | return err |
| 383 | } |
| 384 | // Decoding from JSON to a versioned object would apply defaults, so we do the same here |
| 385 | defaulter.Default(objToUpdate) |
| 386 | |
| 387 | return nil |
| 388 | } |
| 389 | |
| 390 | // interpretPatchError interprets the error type and returns an error with appropriate HTTP code. |
| 391 | func interpretPatchError(err error) error { |
| 392 | switch err { |
| 393 | case mergepatch.ErrBadJSONDoc, mergepatch.ErrBadPatchFormatForPrimitiveList, mergepatch.ErrBadPatchFormatForRetainKeys, mergepatch.ErrBadPatchFormatForSetElementOrderList, mergepatch.ErrUnsupportedStrategicMergePatchFormat: |
| 394 | return errors.NewBadRequest(err.Error()) |
| 395 | case mergepatch.ErrNoListOfLists, mergepatch.ErrPatchContentNotMatchRetainKeys: |
| 396 | return errors.NewGenericServerResponse(http.StatusUnprocessableEntity, "", schema.GroupResource{}, "", err.Error(), 0, false) |
| 397 | default: |
| 398 | return err |
| 399 | } |
| 400 | } |