blob: c1ef906f66d821430df2aae2282fd32db712f93d [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package registry
18
19import (
20 "sync"
21
22 "github.com/golang/glog"
23
24 "k8s.io/apimachinery/pkg/runtime"
25 "k8s.io/apiserver/pkg/registry/generic"
26 "k8s.io/apiserver/pkg/storage"
27 etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
28 "k8s.io/apiserver/pkg/storage/storagebackend"
29 "k8s.io/apiserver/pkg/storage/storagebackend/factory"
30)
31
32// Creates a cacher based given storageConfig.
33func StorageWithCacher(capacity int) generic.StorageDecorator {
34 return func(
35 storageConfig *storagebackend.Config,
36 objectType runtime.Object,
37 resourcePrefix string,
38 keyFunc func(obj runtime.Object) (string, error),
39 newListFunc func() runtime.Object,
40 getAttrsFunc storage.AttrFunc,
41 triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
42
43 s, d := generic.NewRawStorage(storageConfig)
44 if capacity == 0 {
45 glog.V(5).Infof("Storage caching is disabled for %T", objectType)
46 return s, d
47 }
48 glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)
49
50 // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
51 // Currently it has two layers of same storage interface -- cacher and low level kv.
52 cacherConfig := storage.CacherConfig{
53 CacheCapacity: capacity,
54 Storage: s,
55 Versioner: etcdstorage.APIObjectVersioner{},
56 Type: objectType,
57 ResourcePrefix: resourcePrefix,
58 KeyFunc: keyFunc,
59 NewListFunc: newListFunc,
60 GetAttrsFunc: getAttrsFunc,
61 TriggerPublisherFunc: triggerFunc,
62 Codec: storageConfig.Codec,
63 }
64 cacher := storage.NewCacherFromConfig(cacherConfig)
65 destroyFunc := func() {
66 cacher.Stop()
67 d()
68 }
69
70 // TODO : Remove RegisterStorageCleanup below when PR
71 // https://github.com/kubernetes/kubernetes/pull/50690
72 // merges as that shuts down storage properly
73 RegisterStorageCleanup(destroyFunc)
74
75 return cacher, destroyFunc
76 }
77}
78
79// TODO : Remove all the code below when PR
80// https://github.com/kubernetes/kubernetes/pull/50690
81// merges as that shuts down storage properly
82// HACK ALERT : Track the destroy methods to call them
83// from the test harness. TrackStorageCleanup will be called
84// only from the test harness, so Register/Cleanup will be
85// no-op at runtime.
86
87var cleanupLock sync.Mutex
88var cleanup []func() = nil
89
90func TrackStorageCleanup() {
91 cleanupLock.Lock()
92 defer cleanupLock.Unlock()
93
94 if cleanup != nil {
95 panic("Conflicting storage tracking")
96 }
97 cleanup = make([]func(), 0)
98}
99
100func RegisterStorageCleanup(fn func()) {
101 cleanupLock.Lock()
102 defer cleanupLock.Unlock()
103
104 if cleanup == nil {
105 return
106 }
107 cleanup = append(cleanup, fn)
108}
109
110func CleanupStorage() {
111 cleanupLock.Lock()
112 old := cleanup
113 cleanup = nil
114 cleanupLock.Unlock()
115
116 for _, d := range old {
117 d()
118 }
119}