blob: 36f5ff536184310f5f5dc629633470eda3921607 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package options
18
19import (
20 "fmt"
21 "net/http"
22 "strconv"
23 "strings"
24 "time"
25
26 "github.com/spf13/pflag"
27
28 "k8s.io/apimachinery/pkg/runtime/schema"
29 "k8s.io/apimachinery/pkg/util/sets"
30 "k8s.io/apiserver/pkg/registry/generic"
31 genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
32 "k8s.io/apiserver/pkg/server"
33 "k8s.io/apiserver/pkg/server/healthz"
34 serverstorage "k8s.io/apiserver/pkg/server/storage"
35 "k8s.io/apiserver/pkg/storage/etcd3/preflight"
36 "k8s.io/apiserver/pkg/storage/storagebackend"
37)
38
39type EtcdOptions struct {
40 // The value of Paging on StorageConfig will be overridden by the
41 // calculated feature gate value.
42 StorageConfig storagebackend.Config
43 EncryptionProviderConfigFilepath string
44
45 EtcdServersOverrides []string
46
47 // To enable protobuf as storage format, it is enough
48 // to set it to "application/vnd.kubernetes.protobuf".
49 DefaultStorageMediaType string
50 DeleteCollectionWorkers int
51 EnableGarbageCollection bool
52
53 // Set EnableWatchCache to false to disable all watch caches
54 EnableWatchCache bool
55 // Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set
56 DefaultWatchCacheSize int
57 // WatchCacheSizes represents override to a given resource
58 WatchCacheSizes []string
59}
60
61var storageTypes = sets.NewString(
62 storagebackend.StorageTypeUnset,
63 storagebackend.StorageTypeETCD2,
64 storagebackend.StorageTypeETCD3,
65)
66
67func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
68 options := &EtcdOptions{
69 StorageConfig: *backendConfig,
70 DefaultStorageMediaType: "application/json",
71 DeleteCollectionWorkers: 1,
72 EnableGarbageCollection: true,
73 EnableWatchCache: true,
74 DefaultWatchCacheSize: 100,
75 }
76 options.StorageConfig.CountMetricPollPeriod = time.Minute
77 return options
78}
79
80func (s *EtcdOptions) Validate() []error {
81 if s == nil {
82 return nil
83 }
84
85 allErrors := []error{}
86 if len(s.StorageConfig.ServerList) == 0 {
87 allErrors = append(allErrors, fmt.Errorf("--etcd-servers must be specified"))
88 }
89
90 if !storageTypes.Has(s.StorageConfig.Type) {
91 allErrors = append(allErrors, fmt.Errorf("--storage-backend invalid, must be 'etcd3' or 'etcd2'. If not specified, it will default to 'etcd3'"))
92 }
93
94 for _, override := range s.EtcdServersOverrides {
95 tokens := strings.Split(override, "#")
96 if len(tokens) != 2 {
97 allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated"))
98 continue
99 }
100
101 apiresource := strings.Split(tokens[0], "/")
102 if len(apiresource) != 2 {
103 allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated"))
104 continue
105 }
106
107 }
108
109 return allErrors
110}
111
112// AddEtcdFlags adds flags related to etcd storage for a specific APIServer to the specified FlagSet
113func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
114 if s == nil {
115 return
116 }
117
118 fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, ""+
119 "Per-resource etcd servers overrides, comma separated. The individual override "+
120 "format: group/resource#servers, where servers are URLs, semicolon separated.")
121
122 fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, ""+
123 "The media type to use to store objects in storage. "+
124 "Some resources or storage backends may only support a specific media type and will ignore this setting.")
125 fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers,
126 "Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.")
127
128 fs.BoolVar(&s.EnableGarbageCollection, "enable-garbage-collector", s.EnableGarbageCollection, ""+
129 "Enables the generic garbage collector. MUST be synced with the corresponding flag "+
130 "of the kube-controller-manager.")
131
132 fs.BoolVar(&s.EnableWatchCache, "watch-cache", s.EnableWatchCache,
133 "Enable watch caching in the apiserver")
134
135 fs.IntVar(&s.DefaultWatchCacheSize, "default-watch-cache-size", s.DefaultWatchCacheSize,
136 "Default watch cache size. If zero, watch cache will be disabled for resources that do not have a default watch size set.")
137
138 fs.StringSliceVar(&s.WatchCacheSizes, "watch-cache-sizes", s.WatchCacheSizes, ""+
139 "List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. "+
140 "The individual override format: resource[.group]#size, where resource is lowercase plural (no version), "+
141 "group is optional, and size is a number. It takes effect when watch-cache is enabled. "+
142 "Some resources (replicationcontrollers, endpoints, nodes, pods, services, apiservices.apiregistration.k8s.io) "+
143 "have system defaults set by heuristics, others default to default-watch-cache-size")
144
145 fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type,
146 "The storage backend for persistence. Options: 'etcd3' (default), 'etcd2'.")
147
148 fs.IntVar(&s.StorageConfig.DeserializationCacheSize, "deserialization-cache-size", s.StorageConfig.DeserializationCacheSize,
149 "Number of deserialized json objects to cache in memory.")
150
151 fs.StringSliceVar(&s.StorageConfig.ServerList, "etcd-servers", s.StorageConfig.ServerList,
152 "List of etcd servers to connect with (scheme://ip:port), comma separated.")
153
154 fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix,
155 "The prefix to prepend to all resource paths in etcd.")
156
157 fs.StringVar(&s.StorageConfig.KeyFile, "etcd-keyfile", s.StorageConfig.KeyFile,
158 "SSL key file used to secure etcd communication.")
159
160 fs.StringVar(&s.StorageConfig.CertFile, "etcd-certfile", s.StorageConfig.CertFile,
161 "SSL certification file used to secure etcd communication.")
162
163 fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile,
164 "SSL Certificate Authority file used to secure etcd communication.")
165
166 fs.BoolVar(&s.StorageConfig.Quorum, "etcd-quorum-read", s.StorageConfig.Quorum,
167 "If true, enable quorum read. It defaults to true and is strongly recommended not setting to false.")
168 fs.MarkDeprecated("etcd-quorum-read", "This flag is deprecated and the ability to switch off quorum read will be removed in a future release.")
169
170 fs.StringVar(&s.EncryptionProviderConfigFilepath, "experimental-encryption-provider-config", s.EncryptionProviderConfigFilepath,
171 "The file containing configuration for encryption providers to be used for storing secrets in etcd")
172
173 fs.DurationVar(&s.StorageConfig.CompactionInterval, "etcd-compaction-interval", s.StorageConfig.CompactionInterval,
174 "The interval of compaction requests. If 0, the compaction request from apiserver is disabled.")
175
176 fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+
177 "Frequency of polling etcd for number of resources per type. 0 disables the metric collection.")
178}
179
180func (s *EtcdOptions) ApplyTo(c *server.Config) error {
181 if s == nil {
182 return nil
183 }
184
185 s.addEtcdHealthEndpoint(c)
186 c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
187 return nil
188}
189
190func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
191 s.addEtcdHealthEndpoint(c)
192 c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
193 return nil
194}
195
196func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) {
197 c.HealthzChecks = append(c.HealthzChecks, healthz.NamedCheck("etcd", func(r *http.Request) error {
198 done, err := preflight.EtcdConnection{ServerList: s.StorageConfig.ServerList}.CheckEtcdServers()
199 if !done {
200 return fmt.Errorf("etcd failed")
201 }
202 if err != nil {
203 return err
204 }
205 return nil
206 }))
207}
208
209type SimpleRestOptionsFactory struct {
210 Options EtcdOptions
211}
212
213func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
214 ret := generic.RESTOptions{
215 StorageConfig: &f.Options.StorageConfig,
216 Decorator: generic.UndecoratedStorage,
217 EnableGarbageCollection: f.Options.EnableGarbageCollection,
218 DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
219 ResourcePrefix: resource.Group + "/" + resource.Resource,
220 CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
221 }
222 if f.Options.EnableWatchCache {
223 sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
224 if err != nil {
225 return generic.RESTOptions{}, err
226 }
227 cacheSize, ok := sizes[resource]
228 if !ok {
229 cacheSize = f.Options.DefaultWatchCacheSize
230 }
231 ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
232 }
233 return ret, nil
234}
235
236type storageFactoryRestOptionsFactory struct {
237 Options EtcdOptions
238 StorageFactory serverstorage.StorageFactory
239}
240
241func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
242 storageConfig, err := f.StorageFactory.NewConfig(resource)
243 if err != nil {
244 return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
245 }
246
247 ret := generic.RESTOptions{
248 StorageConfig: storageConfig,
249 Decorator: generic.UndecoratedStorage,
250 DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
251 EnableGarbageCollection: f.Options.EnableGarbageCollection,
252 ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
253 CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
254 }
255 if f.Options.EnableWatchCache {
256 sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
257 if err != nil {
258 return generic.RESTOptions{}, err
259 }
260 cacheSize, ok := sizes[resource]
261 if !ok {
262 cacheSize = f.Options.DefaultWatchCacheSize
263 }
264 ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
265 }
266
267 return ret, nil
268}
269
270// ParseWatchCacheSizes turns a list of cache size values into a map of group resources
271// to requested sizes.
272func ParseWatchCacheSizes(cacheSizes []string) (map[schema.GroupResource]int, error) {
273 watchCacheSizes := make(map[schema.GroupResource]int)
274 for _, c := range cacheSizes {
275 tokens := strings.Split(c, "#")
276 if len(tokens) != 2 {
277 return nil, fmt.Errorf("invalid value of watch cache size: %s", c)
278 }
279
280 size, err := strconv.Atoi(tokens[1])
281 if err != nil {
282 return nil, fmt.Errorf("invalid size of watch cache size: %s", c)
283 }
284 if size < 0 {
285 return nil, fmt.Errorf("watch cache size cannot be negative: %s", c)
286 }
287
288 watchCacheSizes[schema.ParseGroupResource(tokens[0])] = size
289 }
290 return watchCacheSizes, nil
291}
292
293// WriteWatchCacheSizes turns a map of cache size values into a list of string specifications.
294func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]string, error) {
295 var cacheSizes []string
296
297 for resource, size := range watchCacheSizes {
298 if size < 0 {
299 return nil, fmt.Errorf("watch cache size cannot be negative for resource %s", resource)
300 }
301 cacheSizes = append(cacheSizes, fmt.Sprintf("%s#%d", resource.String(), size))
302 }
303 return cacheSizes, nil
304}