blob: 2fe2bbada2f64e43a3fd385b25920dd6c0671cf3 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package etcd
18
19import (
20 "context"
21 "errors"
22 "fmt"
23 "path"
24 "reflect"
25 "time"
26
27 etcd "github.com/coreos/etcd/client"
28 "github.com/golang/glog"
29
30 "k8s.io/apimachinery/pkg/api/meta"
31 "k8s.io/apimachinery/pkg/conversion"
32 "k8s.io/apimachinery/pkg/runtime"
33 utilcache "k8s.io/apimachinery/pkg/util/cache"
34 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
35 "k8s.io/apimachinery/pkg/watch"
36 "k8s.io/apiserver/pkg/storage"
37 "k8s.io/apiserver/pkg/storage/etcd/metrics"
38 etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
39 utiltrace "k8s.io/apiserver/pkg/util/trace"
40)
41
42// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
43// must be able to undo the transformation caused by the other.
44type ValueTransformer interface {
45 // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
46 // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
47 // have not changed.
48 TransformStringFromStorage(string) (value string, stale bool, err error)
49 // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
50 TransformStringToStorage(string) (value string, err error)
51}
52
53type identityTransformer struct{}
54
55func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) {
56 return s, false, nil
57}
58func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
59
60// IdentityTransformer performs no transformation on the provided values.
61var IdentityTransformer ValueTransformer = identityTransformer{}
62
63// Creates a new storage interface from the client
64// TODO: deprecate in favor of storage.Config abstraction over time
65func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, transformer ValueTransformer) storage.Interface {
66 return &etcdHelper{
67 etcdMembersAPI: etcd.NewMembersAPI(client),
68 etcdKeysAPI: etcd.NewKeysAPI(client),
69 codec: codec,
70 versioner: APIObjectVersioner{},
71 transformer: transformer,
72 pathPrefix: path.Join("/", prefix),
73 quorum: quorum,
74 cache: utilcache.NewCache(cacheSize),
75 }
76}
77
78// etcdHelper is the reference implementation of storage.Interface.
79type etcdHelper struct {
80 etcdMembersAPI etcd.MembersAPI
81 etcdKeysAPI etcd.KeysAPI
82 codec runtime.Codec
83 transformer ValueTransformer
84 // Note that versioner is required for etcdHelper to work correctly.
85 // The public constructors (NewStorage & NewEtcdStorage) are setting it
86 // correctly, so be careful when manipulating with it manually.
87 // optional, has to be set to perform any atomic operations
88 versioner storage.Versioner
89 // prefix for all etcd keys
90 pathPrefix string
91 // if true, perform quorum read
92 quorum bool
93
94 // We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
95 // to resourceVersion.
96 // This depends on etcd's indexes being globally unique across all objects/types. This will
97 // have to revisited if we decide to do things like multiple etcd clusters, or etcd will
98 // support multi-object transaction that will result in many objects with the same index.
99 // Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
100 // TODO: Measure how much this cache helps after the conversion code is optimized.
101 cache utilcache.Cache
102}
103
104// Implements storage.Interface.
105func (h *etcdHelper) Versioner() storage.Versioner {
106 return h.versioner
107}
108
109// Implements storage.Interface.
110func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
111 trace := utiltrace.New("etcdHelper::Create " + getTypeName(obj))
112 defer trace.LogIfLong(250 * time.Millisecond)
113 if ctx == nil {
114 glog.Errorf("Context is nil")
115 }
116 key = path.Join(h.pathPrefix, key)
117 data, err := runtime.Encode(h.codec, obj)
118 trace.Step("Object encoded")
119 if err != nil {
120 return err
121 }
122 if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
123 return errors.New("resourceVersion may not be set on objects to be created")
124 }
125 if err := h.versioner.PrepareObjectForStorage(obj); err != nil {
126 return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err)
127 }
128 trace.Step("Version checked")
129
130 startTime := time.Now()
131 opts := etcd.SetOptions{
132 TTL: time.Duration(ttl) * time.Second,
133 PrevExist: etcd.PrevNoExist,
134 }
135
136 newBody, err := h.transformer.TransformStringToStorage(string(data))
137 if err != nil {
138 return storage.NewInternalError(err.Error())
139 }
140
141 response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts)
142 trace.Step("Object created")
143 metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
144 if err != nil {
145 return toStorageErr(err, key, 0)
146 }
147 if out != nil {
148 if _, err := conversion.EnforcePtr(out); err != nil {
149 panic("unable to convert output object to pointer")
150 }
151 _, _, _, err = h.extractObj(response, err, out, false, false)
152 }
153 return err
154}
155
156func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
157 if preconditions == nil {
158 return nil
159 }
160 objMeta, err := meta.Accessor(out)
161 if err != nil {
162 return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
163 }
164 if preconditions.UID != nil && *preconditions.UID != objMeta.GetUID() {
165 errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.GetUID())
166 return storage.NewInvalidObjError(key, errMsg)
167 }
168 return nil
169}
170
171// Implements storage.Interface.
172func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
173 if ctx == nil {
174 glog.Errorf("Context is nil")
175 }
176 key = path.Join(h.pathPrefix, key)
177 v, err := conversion.EnforcePtr(out)
178 if err != nil {
179 panic("unable to convert output object to pointer")
180 }
181
182 if preconditions == nil {
183 startTime := time.Now()
184 response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
185 metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
186 if !etcdutil.IsEtcdNotFound(err) {
187 // if the object that existed prior to the delete is returned by etcd, update the out object.
188 if err != nil || response.PrevNode != nil {
189 _, _, _, err = h.extractObj(response, err, out, false, true)
190 }
191 }
192 return toStorageErr(err, key, 0)
193 }
194
195 // Check the preconditions match.
196 obj := reflect.New(v.Type()).Interface().(runtime.Object)
197 for {
198 _, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false)
199 if err != nil {
200 return toStorageErr(err, key, 0)
201 }
202 if err := checkPreconditions(key, preconditions, obj); err != nil {
203 return toStorageErr(err, key, 0)
204 }
205 index := uint64(0)
206 if node != nil {
207 index = node.ModifiedIndex
208 } else if res != nil {
209 index = res.Index
210 }
211 opt := etcd.DeleteOptions{PrevIndex: index}
212 startTime := time.Now()
213 response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
214 metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
215 if !etcdutil.IsEtcdTestFailed(err) {
216 if !etcdutil.IsEtcdNotFound(err) {
217 // if the object that existed prior to the delete is returned by etcd, update the out object.
218 if err != nil || response.PrevNode != nil {
219 _, _, _, err = h.extractObj(response, err, out, false, true)
220 }
221 }
222 return toStorageErr(err, key, 0)
223 }
224
225 glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
226 }
227}
228
229// Implements storage.Interface.
230func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
231 if ctx == nil {
232 glog.Errorf("Context is nil")
233 }
234 watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
235 if err != nil {
236 return nil, err
237 }
238 key = path.Join(h.pathPrefix, key)
239 w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h)
240 go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
241 return w, nil
242}
243
244// Implements storage.Interface.
245func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
246 if ctx == nil {
247 glog.Errorf("Context is nil")
248 }
249 watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
250 if err != nil {
251 return nil, err
252 }
253 key = path.Join(h.pathPrefix, key)
254 w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h)
255 go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
256 return w, nil
257}
258
259// Implements storage.Interface.
260func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
261 if ctx == nil {
262 glog.Errorf("Context is nil")
263 }
264 key = path.Join(h.pathPrefix, key)
265 _, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
266 return err
267}
268
269// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
270// about the response, like the current etcd index and the ttl.
271func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) {
272 if ctx == nil {
273 glog.Errorf("Context is nil")
274 }
275 startTime := time.Now()
276
277 opts := &etcd.GetOptions{
278 Quorum: h.quorum,
279 }
280
281 response, err := h.etcdKeysAPI.Get(ctx, key, opts)
282 metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
283 if err != nil && !etcdutil.IsEtcdNotFound(err) {
284 return "", nil, nil, false, toStorageErr(err, key, 0)
285 }
286 body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
287 return body, node, response, stale, toStorageErr(err, key, 0)
288}
289
290func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) {
291 if response != nil {
292 if prevNode {
293 node = response.PrevNode
294 } else {
295 node = response.Node
296 }
297 }
298 if inErr != nil || node == nil || len(node.Value) == 0 {
299 if ignoreNotFound {
300 v, err := conversion.EnforcePtr(objPtr)
301 if err != nil {
302 return "", nil, false, err
303 }
304 v.Set(reflect.Zero(v.Type()))
305 return "", nil, false, nil
306 } else if inErr != nil {
307 return "", nil, false, inErr
308 }
309 return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response)
310 }
311
312 body, stale, err = h.transformer.TransformStringFromStorage(node.Value)
313 if err != nil {
314 return body, nil, stale, storage.NewInternalError(err.Error())
315 }
316 out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
317 if err != nil {
318 return body, nil, stale, err
319 }
320 if out != objPtr {
321 return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
322 }
323 // being unable to set the version does not prevent the object from being extracted
324 _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
325 return body, node, stale, err
326}
327
328// Implements storage.Interface.
329func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
330 if ctx == nil {
331 glog.Errorf("Context is nil")
332 }
333 trace := utiltrace.New("GetToList " + getTypeName(listObj))
334 listPtr, err := meta.GetItemsPtr(listObj)
335 if err != nil {
336 return err
337 }
338 key = path.Join(h.pathPrefix, key)
339 startTime := time.Now()
340 trace.Step("About to read etcd node")
341
342 opts := &etcd.GetOptions{
343 Quorum: h.quorum,
344 }
345 response, err := h.etcdKeysAPI.Get(ctx, key, opts)
346 trace.Step("Etcd node read")
347 metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
348 if err != nil {
349 if etcdutil.IsEtcdNotFound(err) {
350 if etcdErr, ok := err.(etcd.Error); ok {
351 return h.versioner.UpdateList(listObj, etcdErr.Index, "")
352 }
353 return fmt.Errorf("unexpected error from storage: %#v", err)
354 }
355 return toStorageErr(err, key, 0)
356 }
357
358 nodes := make([]*etcd.Node, 0)
359 nodes = append(nodes, response.Node)
360
361 if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
362 return err
363 }
364 trace.Step("Object decoded")
365 if err := h.versioner.UpdateList(listObj, response.Index, ""); err != nil {
366 return err
367 }
368 return nil
369}
370
371// decodeNodeList walks the tree of each node in the list and decodes into the specified object
372func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error {
373 trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr))
374 defer trace.LogIfLong(400 * time.Millisecond)
375 v, err := conversion.EnforcePtr(slicePtr)
376 if err != nil || v.Kind() != reflect.Slice {
377 // This should not happen at runtime.
378 panic("need ptr to slice")
379 }
380 for _, node := range nodes {
381 if node.Dir {
382 // IMPORTANT: do not log each key as a discrete step in the trace log
383 // as it produces an immense amount of log spam when there is a large
384 // amount of content in the list.
385 if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil {
386 return err
387 }
388 continue
389 }
390 if obj, found := h.getFromCache(node.ModifiedIndex, pred); found {
391 // obj != nil iff it matches the pred function.
392 if obj != nil {
393 v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
394 }
395 } else {
396 body, _, err := h.transformer.TransformStringFromStorage(node.Value)
397 if err != nil {
398 // omit items from lists and watches that cannot be transformed, but log the error
399 utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
400 continue
401 }
402
403 obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
404 if err != nil {
405 return err
406 }
407 // being unable to set the version does not prevent the object from being extracted
408 _ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
409 if matched, err := pred.Matches(obj); err == nil && matched {
410 v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
411 }
412 if node.ModifiedIndex != 0 {
413 h.addToCache(node.ModifiedIndex, obj)
414 }
415 }
416 }
417 trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
418 return nil
419}
420
421// Implements storage.Interface.
422func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
423 if ctx == nil {
424 glog.Errorf("Context is nil")
425 }
426 trace := utiltrace.New("List " + getTypeName(listObj))
427 defer trace.LogIfLong(400 * time.Millisecond)
428 listPtr, err := meta.GetItemsPtr(listObj)
429 if err != nil {
430 return err
431 }
432 key = path.Join(h.pathPrefix, key)
433 startTime := time.Now()
434 trace.Step("About to list etcd node")
435 nodes, index, err := h.listEtcdNode(ctx, key)
436 trace.Step("Etcd node listed")
437 metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
438 if err != nil {
439 return err
440 }
441 if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
442 return err
443 }
444 trace.Step("Node list decoded")
445 if err := h.versioner.UpdateList(listObj, index, ""); err != nil {
446 return err
447 }
448 return nil
449}
450
451func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
452 if ctx == nil {
453 glog.Errorf("Context is nil")
454 }
455 opts := etcd.GetOptions{
456 Recursive: true,
457 Sort: true,
458 Quorum: h.quorum,
459 }
460 result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
461 if err != nil {
462 var index uint64
463 if etcdError, ok := err.(etcd.Error); ok {
464 index = etcdError.Index
465 }
466 nodes := make([]*etcd.Node, 0)
467 if etcdutil.IsEtcdNotFound(err) {
468 return nodes, index, nil
469 } else {
470 return nodes, index, toStorageErr(err, key, 0)
471 }
472 }
473 return result.Node.Nodes, result.Index, nil
474}
475
476// Implements storage.Interface.
477func (h *etcdHelper) GuaranteedUpdate(
478 ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
479 preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
480 // Ignore the suggestion about current object.
481 if ctx == nil {
482 glog.Errorf("Context is nil")
483 }
484 v, err := conversion.EnforcePtr(ptrToType)
485 if err != nil {
486 // Panic is appropriate, because this is a programming error.
487 panic("need ptr to type")
488 }
489 key = path.Join(h.pathPrefix, key)
490 for {
491 obj := reflect.New(v.Type()).Interface().(runtime.Object)
492 origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
493 if err != nil {
494 return toStorageErr(err, key, 0)
495 }
496 if err := checkPreconditions(key, preconditions, obj); err != nil {
497 return toStorageErr(err, key, 0)
498 }
499 meta := storage.ResponseMeta{}
500 if node != nil {
501 meta.TTL = node.TTL
502 meta.ResourceVersion = node.ModifiedIndex
503 }
504 // Get the object to be written by calling tryUpdate.
505 ret, newTTL, err := tryUpdate(obj, meta)
506 if err != nil {
507 return toStorageErr(err, key, 0)
508 }
509
510 index := uint64(0)
511 ttl := uint64(0)
512 if node != nil {
513 index = node.ModifiedIndex
514 if node.TTL != 0 {
515 ttl = uint64(node.TTL)
516 }
517 if node.Expiration != nil && ttl == 0 {
518 ttl = 1
519 }
520 } else if res != nil {
521 index = res.Index
522 }
523
524 if newTTL != nil {
525 if ttl != 0 && *newTTL == 0 {
526 // TODO: remove this after we have verified this is no longer an issue
527 glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
528 }
529 ttl = *newTTL
530 }
531
532 // Since update object may have a resourceVersion set, we need to clear it here.
533 if err := h.versioner.PrepareObjectForStorage(ret); err != nil {
534 return errors.New("resourceVersion cannot be set on objects store in etcd")
535 }
536
537 newBodyData, err := runtime.Encode(h.codec, ret)
538 if err != nil {
539 return err
540 }
541 newBody := string(newBodyData)
542 data, err := h.transformer.TransformStringToStorage(newBody)
543 if err != nil {
544 return storage.NewInternalError(err.Error())
545 }
546
547 // First time this key has been used, try creating new value.
548 if index == 0 {
549 startTime := time.Now()
550 opts := etcd.SetOptions{
551 TTL: time.Duration(ttl) * time.Second,
552 PrevExist: etcd.PrevNoExist,
553 }
554 response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
555 metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
556 if etcdutil.IsEtcdNodeExist(err) {
557 continue
558 }
559 _, _, _, err = h.extractObj(response, err, ptrToType, false, false)
560 return toStorageErr(err, key, 0)
561 }
562
563 // If we don't send an update, we simply return the currently existing
564 // version of the object. However, the value transformer may indicate that
565 // the on disk representation has changed and that we must commit an update.
566 if newBody == origBody && !stale {
567 _, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
568 return err
569 }
570
571 startTime := time.Now()
572 // Swap origBody with data, if origBody is the latest etcd data.
573 opts := etcd.SetOptions{
574 PrevIndex: index,
575 TTL: time.Duration(ttl) * time.Second,
576 }
577 response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
578 metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
579 if etcdutil.IsEtcdTestFailed(err) {
580 // Try again.
581 continue
582 }
583 _, _, _, err = h.extractObj(response, err, ptrToType, false, false)
584 return toStorageErr(err, key, int64(index))
585 }
586}
587
588func (*etcdHelper) Count(pathPerfix string) (int64, error) {
589 return 0, fmt.Errorf("Count is unimplemented for etcd2!")
590}
591
592// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
593// their Node.ModifiedIndex, which is unique across all types.
594// All implementations must be thread-safe.
595type etcdCache interface {
596 getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool)
597 addToCache(index uint64, obj runtime.Object)
598}
599
600func getTypeName(obj interface{}) string {
601 return reflect.TypeOf(obj).String()
602}
603
604func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
605 startTime := time.Now()
606 defer func() {
607 metrics.ObserveGetCache(startTime)
608 }()
609 obj, found := h.cache.Get(index)
610 if found {
611 if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched {
612 return nil, true
613 }
614 // We should not return the object itself to avoid polluting the cache if someone
615 // modifies returned values.
616 objCopy := obj.(runtime.Object).DeepCopyObject()
617 metrics.ObserveCacheHit()
618 return objCopy.(runtime.Object), true
619 }
620 metrics.ObserveCacheMiss()
621 return nil, false
622}
623
624func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
625 startTime := time.Now()
626 defer func() {
627 metrics.ObserveAddCache(startTime)
628 }()
629 objCopy := obj.DeepCopyObject()
630 isOverwrite := h.cache.Add(index, objCopy)
631 if !isOverwrite {
632 metrics.ObserveNewEntry()
633 }
634}
635
636func toStorageErr(err error, key string, rv int64) error {
637 if err == nil {
638 return nil
639 }
640 switch {
641 case etcdutil.IsEtcdNotFound(err):
642 return storage.NewKeyNotFoundError(key, rv)
643 case etcdutil.IsEtcdNodeExist(err):
644 return storage.NewKeyExistsError(key, rv)
645 case etcdutil.IsEtcdTestFailed(err):
646 return storage.NewResourceVersionConflictsError(key, rv)
647 case etcdutil.IsEtcdUnreachable(err):
648 return storage.NewUnreachableError(key, rv)
649 default:
650 return err
651 }
652}