blob: 8d4ea7118099e37298eff0c5bd791c9d35f462c1 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package protobuf
18
19import (
20 "bytes"
21 "fmt"
22 "io"
23 "reflect"
24
25 "github.com/gogo/protobuf/proto"
26
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/apimachinery/pkg/runtime/schema"
29 "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
30 "k8s.io/apimachinery/pkg/util/framer"
31)
32
33var (
34 // protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All
35 // proto messages serialized by this schema will be preceded by the bytes 0x6b 0x38 0x73, with the fourth
36 // byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that
37 // the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2).
38 //
39 // See k8s.io/apimachinery/pkg/runtime/generated.proto for details of the runtime.Unknown message.
40 //
41 // This encoding scheme is experimental, and is subject to change at any time.
42 protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
43)
44
45type errNotMarshalable struct {
46 t reflect.Type
47}
48
49func (e errNotMarshalable) Error() string {
50 return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
51}
52
53func IsNotMarshalable(err error) bool {
54 _, ok := err.(errNotMarshalable)
55 return err != nil && ok
56}
57
58// NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
59// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
60// as-is (any type info passed with the object will be used).
61//
62// This encoding scheme is experimental, and is subject to change at any time.
63func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *Serializer {
64 return &Serializer{
65 prefix: protoEncodingPrefix,
66 creater: creater,
67 typer: typer,
68 contentType: defaultContentType,
69 }
70}
71
72type Serializer struct {
73 prefix []byte
74 creater runtime.ObjectCreater
75 typer runtime.ObjectTyper
76 contentType string
77}
78
79var _ runtime.Serializer = &Serializer{}
80var _ recognizer.RecognizingDecoder = &Serializer{}
81
82// Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
83// gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
84// the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
85// be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
86// not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
87// errors, the method will return the calculated schema kind.
88func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
89 if versioned, ok := into.(*runtime.VersionedObjects); ok {
90 into = versioned.Last()
91 obj, actual, err := s.Decode(originalData, gvk, into)
92 if err != nil {
93 return nil, actual, err
94 }
95 // the last item in versioned becomes into, so if versioned was not originally empty we reset the object
96 // array so the first position is the decoded object and the second position is the outermost object.
97 // if there were no objects in the versioned list passed to us, only add ourselves.
98 if into != nil && into != obj {
99 versioned.Objects = []runtime.Object{obj, into}
100 } else {
101 versioned.Objects = []runtime.Object{obj}
102 }
103 return versioned, actual, err
104 }
105
106 prefixLen := len(s.prefix)
107 switch {
108 case len(originalData) == 0:
109 // TODO: treat like decoding {} from JSON with defaulting
110 return nil, nil, fmt.Errorf("empty data")
111 case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
112 return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
113 case len(originalData) == prefixLen:
114 // TODO: treat like decoding {} from JSON with defaulting
115 return nil, nil, fmt.Errorf("empty body")
116 }
117
118 data := originalData[prefixLen:]
119 unk := runtime.Unknown{}
120 if err := unk.Unmarshal(data); err != nil {
121 return nil, nil, err
122 }
123
124 actual := unk.GroupVersionKind()
125 copyKindDefaults(&actual, gvk)
126
127 if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
128 *intoUnknown = unk
129 if ok, _, _ := s.RecognizesData(bytes.NewBuffer(unk.Raw)); ok {
130 intoUnknown.ContentType = s.contentType
131 }
132 return intoUnknown, &actual, nil
133 }
134
135 if into != nil {
136 types, _, err := s.typer.ObjectKinds(into)
137 switch {
138 case runtime.IsNotRegisteredError(err):
139 pb, ok := into.(proto.Message)
140 if !ok {
141 return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
142 }
143 if err := proto.Unmarshal(unk.Raw, pb); err != nil {
144 return nil, &actual, err
145 }
146 return into, &actual, nil
147 case err != nil:
148 return nil, &actual, err
149 default:
150 copyKindDefaults(&actual, &types[0])
151 // if the result of defaulting did not set a version or group, ensure that at least group is set
152 // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
153 // of into is set if there is no better information from the caller or object.
154 if len(actual.Version) == 0 && len(actual.Group) == 0 {
155 actual.Group = types[0].Group
156 }
157 }
158 }
159
160 if len(actual.Kind) == 0 {
161 return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
162 }
163 if len(actual.Version) == 0 {
164 return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
165 }
166
167 return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
168}
169
170// Encode serializes the provided object to the given writer.
171func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
172 prefixSize := uint64(len(s.prefix))
173
174 var unk runtime.Unknown
175 switch t := obj.(type) {
176 case *runtime.Unknown:
177 estimatedSize := prefixSize + uint64(t.Size())
178 data := make([]byte, estimatedSize)
179 i, err := t.MarshalTo(data[prefixSize:])
180 if err != nil {
181 return err
182 }
183 copy(data, s.prefix)
184 _, err = w.Write(data[:prefixSize+uint64(i)])
185 return err
186 default:
187 kind := obj.GetObjectKind().GroupVersionKind()
188 unk = runtime.Unknown{
189 TypeMeta: runtime.TypeMeta{
190 Kind: kind.Kind,
191 APIVersion: kind.GroupVersion().String(),
192 },
193 }
194 }
195
196 switch t := obj.(type) {
197 case bufferedMarshaller:
198 // this path performs a single allocation during write but requires the caller to implement
199 // the more efficient Size and MarshalTo methods
200 encodedSize := uint64(t.Size())
201 estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
202 data := make([]byte, estimatedSize)
203
204 i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
205 if err != nil {
206 return err
207 }
208
209 copy(data, s.prefix)
210
211 _, err = w.Write(data[:prefixSize+uint64(i)])
212 return err
213
214 case proto.Marshaler:
215 // this path performs extra allocations
216 data, err := t.Marshal()
217 if err != nil {
218 return err
219 }
220 unk.Raw = data
221
222 estimatedSize := prefixSize + uint64(unk.Size())
223 data = make([]byte, estimatedSize)
224
225 i, err := unk.MarshalTo(data[prefixSize:])
226 if err != nil {
227 return err
228 }
229
230 copy(data, s.prefix)
231
232 _, err = w.Write(data[:prefixSize+uint64(i)])
233 return err
234
235 default:
236 // TODO: marshal with a different content type and serializer (JSON for third party objects)
237 return errNotMarshalable{reflect.TypeOf(obj)}
238 }
239}
240
241// RecognizesData implements the RecognizingDecoder interface.
242func (s *Serializer) RecognizesData(peek io.Reader) (bool, bool, error) {
243 prefix := make([]byte, 4)
244 n, err := peek.Read(prefix)
245 if err != nil {
246 if err == io.EOF {
247 return false, false, nil
248 }
249 return false, false, err
250 }
251 if n != 4 {
252 return false, false, nil
253 }
254 return bytes.Equal(s.prefix, prefix), false, nil
255}
256
257// copyKindDefaults defaults dst to the value in src if dst does not have a value set.
258func copyKindDefaults(dst, src *schema.GroupVersionKind) {
259 if src == nil {
260 return
261 }
262 // apply kind and version defaulting from provided default
263 if len(dst.Kind) == 0 {
264 dst.Kind = src.Kind
265 }
266 if len(dst.Version) == 0 && len(src.Version) > 0 {
267 dst.Group = src.Group
268 dst.Version = src.Version
269 }
270}
271
272// bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple
273// byte buffers by pre-calculating the size of the final buffer needed.
274type bufferedMarshaller interface {
275 proto.Sizer
276 runtime.ProtobufMarshaller
277}
278
279// estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown
280// object with a nil RawJSON struct and the expected size of the provided buffer. The
281// returned size will not be correct if RawJSOn is set on unk.
282func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
283 size := uint64(unk.Size())
284 // protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here),
285 // and the size of the array.
286 size += 1 + 8 + byteSize
287 return size
288}
289
290// NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer
291// is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the
292// encoded object, and thus is not self describing (callers must know what type is being described in order to decode).
293//
294// This encoding scheme is experimental, and is subject to change at any time.
295func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *RawSerializer {
296 return &RawSerializer{
297 creater: creater,
298 typer: typer,
299 contentType: defaultContentType,
300 }
301}
302
303// RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying
304// type).
305type RawSerializer struct {
306 creater runtime.ObjectCreater
307 typer runtime.ObjectTyper
308 contentType string
309}
310
311var _ runtime.Serializer = &RawSerializer{}
312
313// Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
314// gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
315// the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
316// be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
317// not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
318// errors, the method will return the calculated schema kind.
319func (s *RawSerializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
320 if into == nil {
321 return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
322 }
323
324 if versioned, ok := into.(*runtime.VersionedObjects); ok {
325 into = versioned.Last()
326 obj, actual, err := s.Decode(originalData, gvk, into)
327 if err != nil {
328 return nil, actual, err
329 }
330 if into != nil && into != obj {
331 versioned.Objects = []runtime.Object{obj, into}
332 } else {
333 versioned.Objects = []runtime.Object{obj}
334 }
335 return versioned, actual, err
336 }
337
338 if len(originalData) == 0 {
339 // TODO: treat like decoding {} from JSON with defaulting
340 return nil, nil, fmt.Errorf("empty data")
341 }
342 data := originalData
343
344 actual := &schema.GroupVersionKind{}
345 copyKindDefaults(actual, gvk)
346
347 if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
348 intoUnknown.Raw = data
349 intoUnknown.ContentEncoding = ""
350 intoUnknown.ContentType = s.contentType
351 intoUnknown.SetGroupVersionKind(*actual)
352 return intoUnknown, actual, nil
353 }
354
355 types, _, err := s.typer.ObjectKinds(into)
356 switch {
357 case runtime.IsNotRegisteredError(err):
358 pb, ok := into.(proto.Message)
359 if !ok {
360 return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
361 }
362 if err := proto.Unmarshal(data, pb); err != nil {
363 return nil, actual, err
364 }
365 return into, actual, nil
366 case err != nil:
367 return nil, actual, err
368 default:
369 copyKindDefaults(actual, &types[0])
370 // if the result of defaulting did not set a version or group, ensure that at least group is set
371 // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
372 // of into is set if there is no better information from the caller or object.
373 if len(actual.Version) == 0 && len(actual.Group) == 0 {
374 actual.Group = types[0].Group
375 }
376 }
377
378 if len(actual.Kind) == 0 {
379 return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
380 }
381 if len(actual.Version) == 0 {
382 return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
383 }
384
385 return unmarshalToObject(s.typer, s.creater, actual, into, data)
386}
387
388// unmarshalToObject is the common code between decode in the raw and normal serializer.
389func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *schema.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *schema.GroupVersionKind, error) {
390 // use the target if necessary
391 obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
392 if err != nil {
393 return nil, actual, err
394 }
395
396 pb, ok := obj.(proto.Message)
397 if !ok {
398 return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
399 }
400 if err := proto.Unmarshal(data, pb); err != nil {
401 return nil, actual, err
402 }
403 return obj, actual, nil
404}
405
406// Encode serializes the provided object to the given writer. Overrides is ignored.
407func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
408 switch t := obj.(type) {
409 case bufferedMarshaller:
410 // this path performs a single allocation during write but requires the caller to implement
411 // the more efficient Size and MarshalTo methods
412 encodedSize := uint64(t.Size())
413 data := make([]byte, encodedSize)
414
415 n, err := t.MarshalTo(data)
416 if err != nil {
417 return err
418 }
419 _, err = w.Write(data[:n])
420 return err
421
422 case proto.Marshaler:
423 // this path performs extra allocations
424 data, err := t.Marshal()
425 if err != nil {
426 return err
427 }
428 _, err = w.Write(data)
429 return err
430
431 default:
432 return errNotMarshalable{reflect.TypeOf(obj)}
433 }
434}
435
436var LengthDelimitedFramer = lengthDelimitedFramer{}
437
438type lengthDelimitedFramer struct{}
439
440// NewFrameWriter implements stream framing for this serializer
441func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
442 return framer.NewLengthDelimitedFrameWriter(w)
443}
444
445// NewFrameReader implements stream framing for this serializer
446func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
447 return framer.NewLengthDelimitedFrameReader(r)
448}