blob: 1c201efb62923d82479be5e38b5218cfc72d4d52 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20 "fmt"
21 "sync"
22
23 "k8s.io/apimachinery/pkg/util/sets"
24)
25
26// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
27// TL;DR caveats: you must not modify anything returned by Get or List as it will break
28// the indexing feature in addition to not being thread safe.
29//
30// The guarantees of thread safety provided by List/Get are only valid if the caller
31// treats returned items as read-only. For example, a pointer inserted in the store
32// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
33// on the same key and modify the pointer in a non-thread-safe way. Also note that
34// modifying objects stored by the indexers (if any) will *not* automatically lead
35// to a re-index. So it's not a good idea to directly modify the objects returned by
36// Get/List, in general.
37type ThreadSafeStore interface {
38 Add(key string, obj interface{})
39 Update(key string, obj interface{})
40 Delete(key string)
41 Get(key string) (item interface{}, exists bool)
42 List() []interface{}
43 ListKeys() []string
44 Replace(map[string]interface{}, string)
45 Index(indexName string, obj interface{}) ([]interface{}, error)
46 IndexKeys(indexName, indexKey string) ([]string, error)
47 ListIndexFuncValues(name string) []string
48 ByIndex(indexName, indexKey string) ([]interface{}, error)
49 GetIndexers() Indexers
50
51 // AddIndexers adds more indexers to this store. If you call this after you already have data
52 // in the store, the results are undefined.
53 AddIndexers(newIndexers Indexers) error
54 Resync() error
55}
56
57// threadSafeMap implements ThreadSafeStore
58type threadSafeMap struct {
59 lock sync.RWMutex
60 items map[string]interface{}
61
62 // indexers maps a name to an IndexFunc
63 indexers Indexers
64 // indices maps a name to an Index
65 indices Indices
66}
67
68func (c *threadSafeMap) Add(key string, obj interface{}) {
69 c.lock.Lock()
70 defer c.lock.Unlock()
71 oldObject := c.items[key]
72 c.items[key] = obj
73 c.updateIndices(oldObject, obj, key)
74}
75
76func (c *threadSafeMap) Update(key string, obj interface{}) {
77 c.lock.Lock()
78 defer c.lock.Unlock()
79 oldObject := c.items[key]
80 c.items[key] = obj
81 c.updateIndices(oldObject, obj, key)
82}
83
84func (c *threadSafeMap) Delete(key string) {
85 c.lock.Lock()
86 defer c.lock.Unlock()
87 if obj, exists := c.items[key]; exists {
88 c.deleteFromIndices(obj, key)
89 delete(c.items, key)
90 }
91}
92
93func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
94 c.lock.RLock()
95 defer c.lock.RUnlock()
96 item, exists = c.items[key]
97 return item, exists
98}
99
100func (c *threadSafeMap) List() []interface{} {
101 c.lock.RLock()
102 defer c.lock.RUnlock()
103 list := make([]interface{}, 0, len(c.items))
104 for _, item := range c.items {
105 list = append(list, item)
106 }
107 return list
108}
109
110// ListKeys returns a list of all the keys of the objects currently
111// in the threadSafeMap.
112func (c *threadSafeMap) ListKeys() []string {
113 c.lock.RLock()
114 defer c.lock.RUnlock()
115 list := make([]string, 0, len(c.items))
116 for key := range c.items {
117 list = append(list, key)
118 }
119 return list
120}
121
122func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
123 c.lock.Lock()
124 defer c.lock.Unlock()
125 c.items = items
126
127 // rebuild any index
128 c.indices = Indices{}
129 for key, item := range c.items {
130 c.updateIndices(nil, item, key)
131 }
132}
133
134// Index returns a list of items that match on the index function
135// Index is thread-safe so long as you treat all items as immutable
136func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
137 c.lock.RLock()
138 defer c.lock.RUnlock()
139
140 indexFunc := c.indexers[indexName]
141 if indexFunc == nil {
142 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
143 }
144
145 indexKeys, err := indexFunc(obj)
146 if err != nil {
147 return nil, err
148 }
149 index := c.indices[indexName]
150
151 // need to de-dupe the return list. Since multiple keys are allowed, this can happen.
152 returnKeySet := sets.String{}
153 for _, indexKey := range indexKeys {
154 set := index[indexKey]
155 for _, key := range set.UnsortedList() {
156 returnKeySet.Insert(key)
157 }
158 }
159
160 list := make([]interface{}, 0, returnKeySet.Len())
161 for absoluteKey := range returnKeySet {
162 list = append(list, c.items[absoluteKey])
163 }
164 return list, nil
165}
166
167// ByIndex returns a list of items that match an exact value on the index function
168func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
169 c.lock.RLock()
170 defer c.lock.RUnlock()
171
172 indexFunc := c.indexers[indexName]
173 if indexFunc == nil {
174 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
175 }
176
177 index := c.indices[indexName]
178
179 set := index[indexKey]
180 list := make([]interface{}, 0, set.Len())
181 for _, key := range set.List() {
182 list = append(list, c.items[key])
183 }
184
185 return list, nil
186}
187
188// IndexKeys returns a list of keys that match on the index function.
189// IndexKeys is thread-safe so long as you treat all items as immutable.
190func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
191 c.lock.RLock()
192 defer c.lock.RUnlock()
193
194 indexFunc := c.indexers[indexName]
195 if indexFunc == nil {
196 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
197 }
198
199 index := c.indices[indexName]
200
201 set := index[indexKey]
202 return set.List(), nil
203}
204
205func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
206 c.lock.RLock()
207 defer c.lock.RUnlock()
208
209 index := c.indices[indexName]
210 names := make([]string, 0, len(index))
211 for key := range index {
212 names = append(names, key)
213 }
214 return names
215}
216
217func (c *threadSafeMap) GetIndexers() Indexers {
218 return c.indexers
219}
220
221func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
222 c.lock.Lock()
223 defer c.lock.Unlock()
224
225 if len(c.items) > 0 {
226 return fmt.Errorf("cannot add indexers to running index")
227 }
228
229 oldKeys := sets.StringKeySet(c.indexers)
230 newKeys := sets.StringKeySet(newIndexers)
231
232 if oldKeys.HasAny(newKeys.List()...) {
233 return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
234 }
235
236 for k, v := range newIndexers {
237 c.indexers[k] = v
238 }
239 return nil
240}
241
242// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
243// updateIndices must be called from a function that already has a lock on the cache
244func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
245 // if we got an old object, we need to remove it before we add it again
246 if oldObj != nil {
247 c.deleteFromIndices(oldObj, key)
248 }
249 for name, indexFunc := range c.indexers {
250 indexValues, err := indexFunc(newObj)
251 if err != nil {
252 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
253 }
254 index := c.indices[name]
255 if index == nil {
256 index = Index{}
257 c.indices[name] = index
258 }
259
260 for _, indexValue := range indexValues {
261 set := index[indexValue]
262 if set == nil {
263 set = sets.String{}
264 index[indexValue] = set
265 }
266 set.Insert(key)
267 }
268 }
269}
270
271// deleteFromIndices removes the object from each of the managed indexes
272// it is intended to be called from a function that already has a lock on the cache
273func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
274 for name, indexFunc := range c.indexers {
275 indexValues, err := indexFunc(obj)
276 if err != nil {
277 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
278 }
279
280 index := c.indices[name]
281 if index == nil {
282 continue
283 }
284 for _, indexValue := range indexValues {
285 set := index[indexValue]
286 if set != nil {
287 set.Delete(key)
288 }
289 }
290 }
291}
292
293func (c *threadSafeMap) Resync() error {
294 // Nothing to do
295 return nil
296}
297
298func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
299 return &threadSafeMap{
300 items: map[string]interface{}{},
301 indexers: indexers,
302 indices: indices,
303 }
304}