blob: bdcd5bcb60db49a2143a167796e06db61576c833 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package etcd3
18
19import (
20 "context"
21 "strconv"
22 "sync"
23 "time"
24
25 "github.com/coreos/etcd/clientv3"
26 "github.com/golang/glog"
27)
28
29const (
30 compactRevKey = "compact_rev_key"
31)
32
33var (
34 endpointsMapMu sync.Mutex
35 endpointsMap map[string]struct{}
36)
37
38func init() {
39 endpointsMap = make(map[string]struct{})
40}
41
42// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
43// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
44// It should be enough for slow watchers and to tolerate burst.
45// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
46func StartCompactor(ctx context.Context, client *clientv3.Client, compactInterval time.Duration) {
47 endpointsMapMu.Lock()
48 defer endpointsMapMu.Unlock()
49
50 // In one process, we can have only one compactor for one cluster.
51 // Currently we rely on endpoints to differentiate clusters.
52 for _, ep := range client.Endpoints() {
53 if _, ok := endpointsMap[ep]; ok {
54 glog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
55 return
56 }
57 }
58 for _, ep := range client.Endpoints() {
59 endpointsMap[ep] = struct{}{}
60 }
61
62 if compactInterval != 0 {
63 go compactor(ctx, client, compactInterval)
64 }
65}
66
67// compactor periodically compacts historical versions of keys in etcd.
68// It will compact keys with versions older than given interval.
69// In other words, after compaction, it will only contain keys set during last interval.
70// Any API call for the older versions of keys will return error.
71// Interval is the time interval between each compaction. The first compaction happens after "interval".
72func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
73 // Technical definitions:
74 // We have a special key in etcd defined as *compactRevKey*.
75 // compactRevKey's value will be set to the string of last compacted revision.
76 // compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
77 // Initially, because the key doesn't exist, the compact time (version) is 0.
78 //
79 // Algorithm:
80 // - Compare to see if (local compact_time) = (remote compact_time).
81 // - If yes, increment both local and remote compact_time, and do a compaction.
82 // - If not, set local to remote compact_time.
83 //
84 // Technical details/insights:
85 //
86 // The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
87 // CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
88 //
89 // For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
90 // at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
91 // If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
92 //
93 // oldRev(t2) curRev(t2)
94 // +
95 // oldRev curRev |
96 // + + |
97 // | | |
98 // | | t1' | t2'
99 // +---v-------------v----^---------v------^---->
100 // t0 t1 t2
101 //
102 // We have the guarantees:
103 // - in normal cases, the interval is 10 minutes.
104 // - in failover, the interval is >10m and <20m
105 //
106 // FAQ:
107 // - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
108 // etcd API.
109 // - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
110 // every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.
111
112 var compactTime int64
113 var rev int64
114 var err error
115 for {
116 select {
117 case <-time.After(interval):
118 case <-ctx.Done():
119 return
120 }
121
122 compactTime, rev, err = compact(ctx, client, compactTime, rev)
123 if err != nil {
124 glog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
125 continue
126 }
127 }
128}
129
130// compact compacts etcd store and returns current rev.
131// It will return the current compact time and global revision if no error occurred.
132// Note that CAS fail will not incur any error.
133func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
134 resp, err := client.KV.Txn(ctx).If(
135 clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
136 ).Then(
137 clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
138 ).Else(
139 clientv3.OpGet(compactRevKey),
140 ).Commit()
141 if err != nil {
142 return t, rev, err
143 }
144
145 curRev := resp.Header.Revision
146
147 if !resp.Succeeded {
148 curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
149 return curTime, curRev, nil
150 }
151 curTime := t + 1
152
153 if rev == 0 {
154 // We don't compact on bootstrap.
155 return curTime, curRev, nil
156 }
157 if _, err = client.Compact(ctx, rev); err != nil {
158 return curTime, curRev, err
159 }
160 glog.V(4).Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
161 return curTime, curRev, nil
162}