blob: 019e658004ebf34f8225670b81600fad5943c5d7 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "io"
23 "sync"
24 "sync/atomic"
25
26 "golang.org/x/net/context"
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/channelz"
31 "google.golang.org/grpc/metadata"
32 "google.golang.org/grpc/resolver"
33 "google.golang.org/grpc/status"
34 "google.golang.org/grpc/transport"
35)
36
37// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
38// actions and unblock when there's a picker update.
39type pickerWrapper struct {
40 mu sync.Mutex
41 done bool
42 blockingCh chan struct{}
43 picker balancer.Picker
44
45 // The latest connection happened.
46 connErrMu sync.Mutex
47 connErr error
48
49 stickinessMDKey atomic.Value
50 stickiness *stickyStore
51}
52
53func newPickerWrapper() *pickerWrapper {
54 bp := &pickerWrapper{
55 blockingCh: make(chan struct{}),
56 stickiness: newStickyStore(),
57 }
58 return bp
59}
60
61func (bp *pickerWrapper) updateConnectionError(err error) {
62 bp.connErrMu.Lock()
63 bp.connErr = err
64 bp.connErrMu.Unlock()
65}
66
67func (bp *pickerWrapper) connectionError() error {
68 bp.connErrMu.Lock()
69 err := bp.connErr
70 bp.connErrMu.Unlock()
71 return err
72}
73
74func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
75 // No need to check ok because mdKey == "" if ok == false.
76 if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey {
77 bp.stickinessMDKey.Store(newKey)
78 bp.stickiness.reset(newKey)
79 }
80}
81
82func (bp *pickerWrapper) getStickinessMDKey() string {
83 // No need to check ok because mdKey == "" if ok == false.
84 mdKey, _ := bp.stickinessMDKey.Load().(string)
85 return mdKey
86}
87
88func (bp *pickerWrapper) clearStickinessState() {
89 if oldKey := bp.getStickinessMDKey(); oldKey != "" {
90 // There's no need to reset store if mdKey was "".
91 bp.stickiness.reset(oldKey)
92 }
93}
94
95// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
96func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
97 bp.mu.Lock()
98 if bp.done {
99 bp.mu.Unlock()
100 return
101 }
102 bp.picker = p
103 // bp.blockingCh should never be nil.
104 close(bp.blockingCh)
105 bp.blockingCh = make(chan struct{})
106 bp.mu.Unlock()
107}
108
109func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
110 acw.mu.Lock()
111 ac := acw.ac
112 acw.mu.Unlock()
113 ac.incrCallsStarted()
114 return func(b balancer.DoneInfo) {
115 if b.Err != nil && b.Err != io.EOF {
116 ac.incrCallsFailed()
117 } else {
118 ac.incrCallsSucceeded()
119 }
120 if done != nil {
121 done(b)
122 }
123 }
124}
125
126// pick returns the transport that will be used for the RPC.
127// It may block in the following cases:
128// - there's no picker
129// - the current picker returns ErrNoSubConnAvailable
130// - the current picker returns other errors and failfast is false.
131// - the subConn returned by the current picker is not READY
132// When one of these situations happens, pick blocks until the picker gets updated.
133func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
134
135 mdKey := bp.getStickinessMDKey()
136 stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
137
138 // Potential race here: if stickinessMDKey is updated after the above two
139 // lines, and this pick is a sticky pick, the following put could add an
140 // entry to sticky store with an outdated sticky key.
141 //
142 // The solution: keep the current md key in sticky store, and at the
143 // beginning of each get/put, check the mdkey against store.curMDKey.
144 // - Cons: one more string comparing for each get/put.
145 // - Pros: the string matching happens inside get/put, so the overhead for
146 // non-sticky RPCs will be minimal.
147
148 if isSticky {
149 if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
150 // Done function returned is always nil.
151 return t, nil, nil
152 }
153 }
154
155 var (
156 p balancer.Picker
157 ch chan struct{}
158 )
159
160 for {
161 bp.mu.Lock()
162 if bp.done {
163 bp.mu.Unlock()
164 return nil, nil, ErrClientConnClosing
165 }
166
167 if bp.picker == nil {
168 ch = bp.blockingCh
169 }
170 if ch == bp.blockingCh {
171 // This could happen when either:
172 // - bp.picker is nil (the previous if condition), or
173 // - has called pick on the current picker.
174 bp.mu.Unlock()
175 select {
176 case <-ctx.Done():
177 return nil, nil, ctx.Err()
178 case <-ch:
179 }
180 continue
181 }
182
183 ch = bp.blockingCh
184 p = bp.picker
185 bp.mu.Unlock()
186
187 subConn, done, err := p.Pick(ctx, opts)
188
189 if err != nil {
190 switch err {
191 case balancer.ErrNoSubConnAvailable:
192 continue
193 case balancer.ErrTransientFailure:
194 if !failfast {
195 continue
196 }
197 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
198 default:
199 // err is some other error.
200 return nil, nil, toRPCErr(err)
201 }
202 }
203
204 acw, ok := subConn.(*acBalancerWrapper)
205 if !ok {
206 grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
207 continue
208 }
209 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
210 if isSticky {
211 bp.stickiness.put(mdKey, stickyKey, acw)
212 }
213 if channelz.IsOn() {
214 return t, doneChannelzWrapper(acw, done), nil
215 }
216 return t, done, nil
217 }
218 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
219 // If ok == false, ac.state is not READY.
220 // A valid picker always returns READY subConn. This means the state of ac
221 // just changed, and picker will be updated shortly.
222 // continue back to the beginning of the for loop to repick.
223 }
224}
225
226func (bp *pickerWrapper) close() {
227 bp.mu.Lock()
228 defer bp.mu.Unlock()
229 if bp.done {
230 return
231 }
232 bp.done = true
233 close(bp.blockingCh)
234}
235
236const stickinessKeyCountLimit = 1000
237
238type stickyStoreEntry struct {
239 acw *acBalancerWrapper
240 addr resolver.Address
241}
242
243type stickyStore struct {
244 mu sync.Mutex
245 // curMDKey is check before every get/put to avoid races. The operation will
246 // abort immediately when the given mdKey is different from the curMDKey.
247 curMDKey string
248 store *linkedMap
249}
250
251func newStickyStore() *stickyStore {
252 return &stickyStore{
253 store: newLinkedMap(),
254 }
255}
256
257// reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
258func (ss *stickyStore) reset(newMDKey string) {
259 ss.mu.Lock()
260 ss.curMDKey = newMDKey
261 ss.store.clear()
262 ss.mu.Unlock()
263}
264
265// stickyKey is the key to look up in store. mdKey will be checked against
266// curMDKey to avoid races.
267func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
268 ss.mu.Lock()
269 defer ss.mu.Unlock()
270 if mdKey != ss.curMDKey {
271 return
272 }
273 // TODO(stickiness): limit the total number of entries.
274 ss.store.put(stickyKey, &stickyStoreEntry{
275 acw: acw,
276 addr: acw.getAddrConn().getCurAddr(),
277 })
278 if ss.store.len() > stickinessKeyCountLimit {
279 ss.store.removeOldest()
280 }
281}
282
283// stickyKey is the key to look up in store. mdKey will be checked against
284// curMDKey to avoid races.
285func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
286 ss.mu.Lock()
287 defer ss.mu.Unlock()
288 if mdKey != ss.curMDKey {
289 return nil, false
290 }
291 entry, ok := ss.store.get(stickyKey)
292 if !ok {
293 return nil, false
294 }
295 ac := entry.acw.getAddrConn()
296 if ac.getCurAddr() != entry.addr {
297 ss.store.remove(stickyKey)
298 return nil, false
299 }
300 t, ok := ac.getReadyTransport()
301 if !ok {
302 ss.store.remove(stickyKey)
303 return nil, false
304 }
305 return t, true
306}
307
308// Get one value from metadata in ctx with key stickinessMDKey.
309//
310// It returns "", false if stickinessMDKey is an empty string.
311func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
312 if stickinessMDKey == "" {
313 return "", false
314 }
315
316 md, added, ok := metadata.FromOutgoingContextRaw(ctx)
317 if !ok {
318 return "", false
319 }
320
321 if vv, ok := md[stickinessMDKey]; ok {
322 if len(vv) > 0 {
323 return vv[0], true
324 }
325 }
326
327 for _, ss := range added {
328 for i := 0; i < len(ss)-1; i += 2 {
329 if ss[i] == stickinessMDKey {
330 return ss[i+1], true
331 }
332 }
333 }
334
335 return "", false
336}