blob: 4097b3afa2add1ecb5977a9df7da1b3c569a4c99 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
23 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
24
25 "google.golang.org/grpc"
26 "google.golang.org/grpc/metadata"
27)
28
29type (
30 LeaseRevokeResponse pb.LeaseRevokeResponse
31 LeaseID int64
32)
33
34// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
35type LeaseGrantResponse struct {
36 *pb.ResponseHeader
37 ID LeaseID
38 TTL int64
39 Error string
40}
41
42// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
43type LeaseKeepAliveResponse struct {
44 *pb.ResponseHeader
45 ID LeaseID
46 TTL int64
47}
48
49// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
50type LeaseTimeToLiveResponse struct {
51 *pb.ResponseHeader
52 ID LeaseID `json:"id"`
53
54 // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
55 TTL int64 `json:"ttl"`
56
57 // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
58 GrantedTTL int64 `json:"granted-ttl"`
59
60 // Keys is the list of keys attached to this lease.
61 Keys [][]byte `json:"keys"`
62}
63
64// LeaseStatus represents a lease status.
65type LeaseStatus struct {
66 ID LeaseID `json:"id"`
67 // TODO: TTL int64
68}
69
70// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
71type LeaseLeasesResponse struct {
72 *pb.ResponseHeader
73 Leases []LeaseStatus `json:"leases"`
74}
75
76const (
77 // defaultTTL is the assumed lease TTL used for the first keepalive
78 // deadline before the actual TTL is known to the client.
79 defaultTTL = 5 * time.Second
80 // a small buffer to store unsent lease responses.
81 leaseResponseChSize = 16
82 // NoLease is a lease ID for the absence of a lease.
83 NoLease LeaseID = 0
84
85 // retryConnWait is how long to wait before retrying request due to an error
86 retryConnWait = 500 * time.Millisecond
87)
88
89// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
90//
91// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
92type ErrKeepAliveHalted struct {
93 Reason error
94}
95
96func (e ErrKeepAliveHalted) Error() string {
97 s := "etcdclient: leases keep alive halted"
98 if e.Reason != nil {
99 s += ": " + e.Reason.Error()
100 }
101 return s
102}
103
104type Lease interface {
105 // Grant creates a new lease.
106 Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
107
108 // Revoke revokes the given lease.
109 Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
110
111 // TimeToLive retrieves the lease information of the given lease ID.
112 TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
113
114 // Leases retrieves all leases.
115 Leases(ctx context.Context) (*LeaseLeasesResponse, error)
116
117 // KeepAlive keeps the given lease alive forever. If the keepalive response
118 // posted to the channel is not consumed immediately, the lease client will
119 // continue sending keep alive requests to the etcd server at least every
120 // second until latest response is consumed.
121 //
122 // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
123 // alive stream is interrupted in some way the client cannot handle itself;
124 // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
125 // from this closed channel is nil.
126 //
127 // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
128 // no leader") or canceled by the caller (e.g. context.Canceled), the error
129 // is returned. Otherwise, it retries.
130 //
131 // TODO(v4.0): post errors to last keep alive message before closing
132 // (see https://github.com/coreos/etcd/pull/7866)
133 KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
134
135 // KeepAliveOnce renews the lease once. The response corresponds to the
136 // first message from calling KeepAlive. If the response has a recoverable
137 // error, KeepAliveOnce will retry the RPC with a new keep alive message.
138 //
139 // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
140 KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
141
142 // Close releases all resources Lease keeps for efficient communication
143 // with the etcd server.
144 Close() error
145}
146
147type lessor struct {
148 mu sync.Mutex // guards all fields
149
150 // donec is closed and loopErr is set when recvKeepAliveLoop stops
151 donec chan struct{}
152 loopErr error
153
154 remote pb.LeaseClient
155
156 stream pb.Lease_LeaseKeepAliveClient
157 streamCancel context.CancelFunc
158
159 stopCtx context.Context
160 stopCancel context.CancelFunc
161
162 keepAlives map[LeaseID]*keepAlive
163
164 // firstKeepAliveTimeout is the timeout for the first keepalive request
165 // before the actual TTL is known to the lease client
166 firstKeepAliveTimeout time.Duration
167
168 // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
169 firstKeepAliveOnce sync.Once
170
171 callOpts []grpc.CallOption
172}
173
174// keepAlive multiplexes a keepalive for a lease over multiple channels
175type keepAlive struct {
176 chs []chan<- *LeaseKeepAliveResponse
177 ctxs []context.Context
178 // deadline is the time the keep alive channels close if no response
179 deadline time.Time
180 // nextKeepAlive is when to send the next keep alive message
181 nextKeepAlive time.Time
182 // donec is closed on lease revoke, expiration, or cancel.
183 donec chan struct{}
184}
185
186func NewLease(c *Client) Lease {
187 return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
188}
189
190func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
191 l := &lessor{
192 donec: make(chan struct{}),
193 keepAlives: make(map[LeaseID]*keepAlive),
194 remote: remote,
195 firstKeepAliveTimeout: keepAliveTimeout,
196 }
197 if l.firstKeepAliveTimeout == time.Second {
198 l.firstKeepAliveTimeout = defaultTTL
199 }
200 if c != nil {
201 l.callOpts = c.callOpts
202 }
203 reqLeaderCtx := WithRequireLeader(context.Background())
204 l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
205 return l
206}
207
208func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
209 r := &pb.LeaseGrantRequest{TTL: ttl}
210 resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
211 if err == nil {
212 gresp := &LeaseGrantResponse{
213 ResponseHeader: resp.GetHeader(),
214 ID: LeaseID(resp.ID),
215 TTL: resp.TTL,
216 Error: resp.Error,
217 }
218 return gresp, nil
219 }
220 return nil, toErr(ctx, err)
221}
222
223func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
224 r := &pb.LeaseRevokeRequest{ID: int64(id)}
225 resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
226 if err == nil {
227 return (*LeaseRevokeResponse)(resp), nil
228 }
229 return nil, toErr(ctx, err)
230}
231
232func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
233 r := toLeaseTimeToLiveRequest(id, opts...)
234 resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
235 if err == nil {
236 gresp := &LeaseTimeToLiveResponse{
237 ResponseHeader: resp.GetHeader(),
238 ID: LeaseID(resp.ID),
239 TTL: resp.TTL,
240 GrantedTTL: resp.GrantedTTL,
241 Keys: resp.Keys,
242 }
243 return gresp, nil
244 }
245 return nil, toErr(ctx, err)
246}
247
248func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
249 resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
250 if err == nil {
251 leases := make([]LeaseStatus, len(resp.Leases))
252 for i := range resp.Leases {
253 leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
254 }
255 return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
256 }
257 return nil, toErr(ctx, err)
258}
259
260func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
261 ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
262
263 l.mu.Lock()
264 // ensure that recvKeepAliveLoop is still running
265 select {
266 case <-l.donec:
267 err := l.loopErr
268 l.mu.Unlock()
269 close(ch)
270 return ch, ErrKeepAliveHalted{Reason: err}
271 default:
272 }
273 ka, ok := l.keepAlives[id]
274 if !ok {
275 // create fresh keep alive
276 ka = &keepAlive{
277 chs: []chan<- *LeaseKeepAliveResponse{ch},
278 ctxs: []context.Context{ctx},
279 deadline: time.Now().Add(l.firstKeepAliveTimeout),
280 nextKeepAlive: time.Now(),
281 donec: make(chan struct{}),
282 }
283 l.keepAlives[id] = ka
284 } else {
285 // add channel and context to existing keep alive
286 ka.ctxs = append(ka.ctxs, ctx)
287 ka.chs = append(ka.chs, ch)
288 }
289 l.mu.Unlock()
290
291 go l.keepAliveCtxCloser(id, ctx, ka.donec)
292 l.firstKeepAliveOnce.Do(func() {
293 go l.recvKeepAliveLoop()
294 go l.deadlineLoop()
295 })
296
297 return ch, nil
298}
299
300func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
301 for {
302 resp, err := l.keepAliveOnce(ctx, id)
303 if err == nil {
304 if resp.TTL <= 0 {
305 err = rpctypes.ErrLeaseNotFound
306 }
307 return resp, err
308 }
309 if isHaltErr(ctx, err) {
310 return nil, toErr(ctx, err)
311 }
312 }
313}
314
315func (l *lessor) Close() error {
316 l.stopCancel()
317 // close for synchronous teardown if stream goroutines never launched
318 l.firstKeepAliveOnce.Do(func() { close(l.donec) })
319 <-l.donec
320 return nil
321}
322
323func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
324 select {
325 case <-donec:
326 return
327 case <-l.donec:
328 return
329 case <-ctx.Done():
330 }
331
332 l.mu.Lock()
333 defer l.mu.Unlock()
334
335 ka, ok := l.keepAlives[id]
336 if !ok {
337 return
338 }
339
340 // close channel and remove context if still associated with keep alive
341 for i, c := range ka.ctxs {
342 if c == ctx {
343 close(ka.chs[i])
344 ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
345 ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
346 break
347 }
348 }
349 // remove if no one more listeners
350 if len(ka.chs) == 0 {
351 delete(l.keepAlives, id)
352 }
353}
354
355// closeRequireLeader scans keepAlives for ctxs that have require leader
356// and closes the associated channels.
357func (l *lessor) closeRequireLeader() {
358 l.mu.Lock()
359 defer l.mu.Unlock()
360 for _, ka := range l.keepAlives {
361 reqIdxs := 0
362 // find all required leader channels, close, mark as nil
363 for i, ctx := range ka.ctxs {
364 md, ok := metadata.FromOutgoingContext(ctx)
365 if !ok {
366 continue
367 }
368 ks := md[rpctypes.MetadataRequireLeaderKey]
369 if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
370 continue
371 }
372 close(ka.chs[i])
373 ka.chs[i] = nil
374 reqIdxs++
375 }
376 if reqIdxs == 0 {
377 continue
378 }
379 // remove all channels that required a leader from keepalive
380 newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
381 newCtxs := make([]context.Context, len(newChs))
382 newIdx := 0
383 for i := range ka.chs {
384 if ka.chs[i] == nil {
385 continue
386 }
387 newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
388 newIdx++
389 }
390 ka.chs, ka.ctxs = newChs, newCtxs
391 }
392}
393
394func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
395 cctx, cancel := context.WithCancel(ctx)
396 defer cancel()
397
398 stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
399 if err != nil {
400 return nil, toErr(ctx, err)
401 }
402
403 err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
404 if err != nil {
405 return nil, toErr(ctx, err)
406 }
407
408 resp, rerr := stream.Recv()
409 if rerr != nil {
410 return nil, toErr(ctx, rerr)
411 }
412
413 karesp := &LeaseKeepAliveResponse{
414 ResponseHeader: resp.GetHeader(),
415 ID: LeaseID(resp.ID),
416 TTL: resp.TTL,
417 }
418 return karesp, nil
419}
420
421func (l *lessor) recvKeepAliveLoop() (gerr error) {
422 defer func() {
423 l.mu.Lock()
424 close(l.donec)
425 l.loopErr = gerr
426 for _, ka := range l.keepAlives {
427 ka.close()
428 }
429 l.keepAlives = make(map[LeaseID]*keepAlive)
430 l.mu.Unlock()
431 }()
432
433 for {
434 stream, err := l.resetRecv()
435 if err != nil {
436 if canceledByCaller(l.stopCtx, err) {
437 return err
438 }
439 } else {
440 for {
441 resp, err := stream.Recv()
442 if err != nil {
443 if canceledByCaller(l.stopCtx, err) {
444 return err
445 }
446
447 if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
448 l.closeRequireLeader()
449 }
450 break
451 }
452
453 l.recvKeepAlive(resp)
454 }
455 }
456
457 select {
458 case <-time.After(retryConnWait):
459 continue
460 case <-l.stopCtx.Done():
461 return l.stopCtx.Err()
462 }
463 }
464}
465
466// resetRecv opens a new lease stream and starts sending keep alive requests.
467func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
468 sctx, cancel := context.WithCancel(l.stopCtx)
469 stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
470 if err != nil {
471 cancel()
472 return nil, err
473 }
474
475 l.mu.Lock()
476 defer l.mu.Unlock()
477 if l.stream != nil && l.streamCancel != nil {
478 l.streamCancel()
479 }
480
481 l.streamCancel = cancel
482 l.stream = stream
483
484 go l.sendKeepAliveLoop(stream)
485 return stream, nil
486}
487
488// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
489func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
490 karesp := &LeaseKeepAliveResponse{
491 ResponseHeader: resp.GetHeader(),
492 ID: LeaseID(resp.ID),
493 TTL: resp.TTL,
494 }
495
496 l.mu.Lock()
497 defer l.mu.Unlock()
498
499 ka, ok := l.keepAlives[karesp.ID]
500 if !ok {
501 return
502 }
503
504 if karesp.TTL <= 0 {
505 // lease expired; close all keep alive channels
506 delete(l.keepAlives, karesp.ID)
507 ka.close()
508 return
509 }
510
511 // send update to all channels
512 nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
513 ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
514 for _, ch := range ka.chs {
515 select {
516 case ch <- karesp:
517 ka.nextKeepAlive = nextKeepAlive
518 default:
519 }
520 }
521}
522
523// deadlineLoop reaps any keep alive channels that have not received a response
524// within the lease TTL
525func (l *lessor) deadlineLoop() {
526 for {
527 select {
528 case <-time.After(time.Second):
529 case <-l.donec:
530 return
531 }
532 now := time.Now()
533 l.mu.Lock()
534 for id, ka := range l.keepAlives {
535 if ka.deadline.Before(now) {
536 // waited too long for response; lease may be expired
537 ka.close()
538 delete(l.keepAlives, id)
539 }
540 }
541 l.mu.Unlock()
542 }
543}
544
545// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
546func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
547 for {
548 var tosend []LeaseID
549
550 now := time.Now()
551 l.mu.Lock()
552 for id, ka := range l.keepAlives {
553 if ka.nextKeepAlive.Before(now) {
554 tosend = append(tosend, id)
555 }
556 }
557 l.mu.Unlock()
558
559 for _, id := range tosend {
560 r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
561 if err := stream.Send(r); err != nil {
562 // TODO do something with this error?
563 return
564 }
565 }
566
567 select {
568 case <-time.After(500 * time.Millisecond):
569 case <-stream.Context().Done():
570 return
571 case <-l.donec:
572 return
573 case <-l.stopCtx.Done():
574 return
575 }
576 }
577}
578
579func (ka *keepAlive) close() {
580 close(ka.donec)
581 for _, ch := range ka.chs {
582 close(ch)
583 }
584}