blob: 19acedb2b0208f66564eb08be3551f39ba2dc5a5 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "strconv"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "github.com/golang/protobuf/proto"
34 "golang.org/x/net/context"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/grpclog"
41 "google.golang.org/grpc/internal/channelz"
42 "google.golang.org/grpc/internal/grpcrand"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/metadata"
45 "google.golang.org/grpc/peer"
46 "google.golang.org/grpc/stats"
47 "google.golang.org/grpc/status"
48 "google.golang.org/grpc/tap"
49)
50
51// ErrIllegalHeaderWrite indicates that setting header is illegal because of
52// the stream's state.
53var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
54
55// http2Server implements the ServerTransport interface with HTTP2.
56type http2Server struct {
57 ctx context.Context
58 ctxDone <-chan struct{} // Cache the context.Done() chan
59 cancel context.CancelFunc
60 conn net.Conn
61 loopy *loopyWriter
62 readerDone chan struct{} // sync point to enable testing.
63 writerDone chan struct{} // sync point to enable testing.
64 remoteAddr net.Addr
65 localAddr net.Addr
66 maxStreamID uint32 // max stream ID ever seen
67 authInfo credentials.AuthInfo // auth info about the connection
68 inTapHandle tap.ServerInHandle
69 framer *framer
70 // The max number of concurrent streams.
71 maxStreams uint32
72 // controlBuf delivers all the control related tasks (e.g., window
73 // updates, reset streams, and various settings) to the controller.
74 controlBuf *controlBuffer
75 fc *trInFlow
76 stats stats.Handler
77 // Flag to keep track of reading activity on transport.
78 // 1 is true and 0 is false.
79 activity uint32 // Accessed atomically.
80 // Keepalive and max-age parameters for the server.
81 kp keepalive.ServerParameters
82
83 // Keepalive enforcement policy.
84 kep keepalive.EnforcementPolicy
85 // The time instance last ping was received.
86 lastPingAt time.Time
87 // Number of times the client has violated keepalive ping policy so far.
88 pingStrikes uint8
89 // Flag to signify that number of ping strikes should be reset to 0.
90 // This is set whenever data or header frames are sent.
91 // 1 means yes.
92 resetPingStrikes uint32 // Accessed atomically.
93 initialWindowSize int32
94 bdpEst *bdpEstimator
95
96 mu sync.Mutex // guard the following
97
98 // drainChan is initialized when drain(...) is called the first time.
99 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
100 // Then an independent goroutine will be launched to later send the second GoAway.
101 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
102 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
103 // already underway.
104 drainChan chan struct{}
105 state transportState
106 activeStreams map[uint32]*Stream
107 // idle is the time instant when the connection went idle.
108 // This is either the beginning of the connection or when the number of
109 // RPCs go down to 0.
110 // When the connection is busy, this value is set to 0.
111 idle time.Time
112
113 // Fields below are for channelz metric collection.
114 channelzID int64 // channelz unique identification number
115 czmu sync.RWMutex
116 kpCount int64
117 // The number of streams that have started, including already finished ones.
118 streamsStarted int64
119 // The number of streams that have ended successfully by sending frame with
120 // EoS bit set.
121 streamsSucceeded int64
122 streamsFailed int64
123 lastStreamCreated time.Time
124 msgSent int64
125 msgRecv int64
126 lastMsgSent time.Time
127 lastMsgRecv time.Time
128}
129
130// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
131// returned if something goes wrong.
132func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
133 writeBufSize := defaultWriteBufSize
134 if config.WriteBufferSize > 0 {
135 writeBufSize = config.WriteBufferSize
136 }
137 readBufSize := defaultReadBufSize
138 if config.ReadBufferSize > 0 {
139 readBufSize = config.ReadBufferSize
140 }
141 framer := newFramer(conn, writeBufSize, readBufSize)
142 // Send initial settings as connection preface to client.
143 var isettings []http2.Setting
144 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
145 // permitted in the HTTP2 spec.
146 maxStreams := config.MaxStreams
147 if maxStreams == 0 {
148 maxStreams = math.MaxUint32
149 } else {
150 isettings = append(isettings, http2.Setting{
151 ID: http2.SettingMaxConcurrentStreams,
152 Val: maxStreams,
153 })
154 }
155 dynamicWindow := true
156 iwz := int32(initialWindowSize)
157 if config.InitialWindowSize >= defaultWindowSize {
158 iwz = config.InitialWindowSize
159 dynamicWindow = false
160 }
161 icwz := int32(initialWindowSize)
162 if config.InitialConnWindowSize >= defaultWindowSize {
163 icwz = config.InitialConnWindowSize
164 dynamicWindow = false
165 }
166 if iwz != defaultWindowSize {
167 isettings = append(isettings, http2.Setting{
168 ID: http2.SettingInitialWindowSize,
169 Val: uint32(iwz)})
170 }
171 if err := framer.fr.WriteSettings(isettings...); err != nil {
172 return nil, connectionErrorf(false, err, "transport: %v", err)
173 }
174 // Adjust the connection flow control window if needed.
175 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
176 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
177 return nil, connectionErrorf(false, err, "transport: %v", err)
178 }
179 }
180 kp := config.KeepaliveParams
181 if kp.MaxConnectionIdle == 0 {
182 kp.MaxConnectionIdle = defaultMaxConnectionIdle
183 }
184 if kp.MaxConnectionAge == 0 {
185 kp.MaxConnectionAge = defaultMaxConnectionAge
186 }
187 // Add a jitter to MaxConnectionAge.
188 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
189 if kp.MaxConnectionAgeGrace == 0 {
190 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
191 }
192 if kp.Time == 0 {
193 kp.Time = defaultServerKeepaliveTime
194 }
195 if kp.Timeout == 0 {
196 kp.Timeout = defaultServerKeepaliveTimeout
197 }
198 kep := config.KeepalivePolicy
199 if kep.MinTime == 0 {
200 kep.MinTime = defaultKeepalivePolicyMinTime
201 }
202 ctx, cancel := context.WithCancel(context.Background())
203 t := &http2Server{
204 ctx: ctx,
205 cancel: cancel,
206 ctxDone: ctx.Done(),
207 conn: conn,
208 remoteAddr: conn.RemoteAddr(),
209 localAddr: conn.LocalAddr(),
210 authInfo: config.AuthInfo,
211 framer: framer,
212 readerDone: make(chan struct{}),
213 writerDone: make(chan struct{}),
214 maxStreams: maxStreams,
215 inTapHandle: config.InTapHandle,
216 fc: &trInFlow{limit: uint32(icwz)},
217 state: reachable,
218 activeStreams: make(map[uint32]*Stream),
219 stats: config.StatsHandler,
220 kp: kp,
221 idle: time.Now(),
222 kep: kep,
223 initialWindowSize: iwz,
224 }
225 t.controlBuf = newControlBuffer(t.ctxDone)
226 if dynamicWindow {
227 t.bdpEst = &bdpEstimator{
228 bdp: initialWindowSize,
229 updateFlowControl: t.updateFlowControl,
230 }
231 }
232 if t.stats != nil {
233 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
234 RemoteAddr: t.remoteAddr,
235 LocalAddr: t.localAddr,
236 })
237 connBegin := &stats.ConnBegin{}
238 t.stats.HandleConn(t.ctx, connBegin)
239 }
240 if channelz.IsOn() {
241 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
242 }
243 t.framer.writer.Flush()
244
245 defer func() {
246 if err != nil {
247 t.Close()
248 }
249 }()
250
251 // Check the validity of client preface.
252 preface := make([]byte, len(clientPreface))
253 if _, err := io.ReadFull(t.conn, preface); err != nil {
254 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
255 }
256 if !bytes.Equal(preface, clientPreface) {
257 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
258 }
259
260 frame, err := t.framer.fr.ReadFrame()
261 if err == io.EOF || err == io.ErrUnexpectedEOF {
262 return nil, err
263 }
264 if err != nil {
265 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
266 }
267 atomic.StoreUint32(&t.activity, 1)
268 sf, ok := frame.(*http2.SettingsFrame)
269 if !ok {
270 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
271 }
272 t.handleSettings(sf)
273
274 go func() {
275 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
276 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
277 if err := t.loopy.run(); err != nil {
278 errorf("transport: loopyWriter.run returning. Err: %v", err)
279 }
280 t.conn.Close()
281 close(t.writerDone)
282 }()
283 go t.keepalive()
284 return t, nil
285}
286
287// operateHeader takes action on the decoded headers.
288func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
289 streamID := frame.Header().StreamID
290 var state decodeState
291 for _, hf := range frame.Fields {
292 if err := state.processHeaderField(hf); err != nil {
293 if se, ok := err.(StreamError); ok {
294 t.controlBuf.put(&cleanupStream{
295 streamID: streamID,
296 rst: true,
297 rstCode: statusCodeConvTab[se.Code],
298 onWrite: func() {},
299 })
300 }
301 return
302 }
303 }
304
305 buf := newRecvBuffer()
306 s := &Stream{
307 id: streamID,
308 st: t,
309 buf: buf,
310 fc: &inFlow{limit: uint32(t.initialWindowSize)},
311 recvCompress: state.encoding,
312 method: state.method,
313 contentSubtype: state.contentSubtype,
314 }
315 if frame.StreamEnded() {
316 // s is just created by the caller. No lock needed.
317 s.state = streamReadDone
318 }
319 if state.timeoutSet {
320 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
321 } else {
322 s.ctx, s.cancel = context.WithCancel(t.ctx)
323 }
324 pr := &peer.Peer{
325 Addr: t.remoteAddr,
326 }
327 // Attach Auth info if there is any.
328 if t.authInfo != nil {
329 pr.AuthInfo = t.authInfo
330 }
331 s.ctx = peer.NewContext(s.ctx, pr)
332 // Attach the received metadata to the context.
333 if len(state.mdata) > 0 {
334 s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
335 }
336 if state.statsTags != nil {
337 s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
338 }
339 if state.statsTrace != nil {
340 s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
341 }
342 if t.inTapHandle != nil {
343 var err error
344 info := &tap.Info{
345 FullMethodName: state.method,
346 }
347 s.ctx, err = t.inTapHandle(s.ctx, info)
348 if err != nil {
349 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
350 t.controlBuf.put(&cleanupStream{
351 streamID: s.id,
352 rst: true,
353 rstCode: http2.ErrCodeRefusedStream,
354 onWrite: func() {},
355 })
356 return
357 }
358 }
359 t.mu.Lock()
360 if t.state != reachable {
361 t.mu.Unlock()
362 return
363 }
364 if uint32(len(t.activeStreams)) >= t.maxStreams {
365 t.mu.Unlock()
366 t.controlBuf.put(&cleanupStream{
367 streamID: streamID,
368 rst: true,
369 rstCode: http2.ErrCodeRefusedStream,
370 onWrite: func() {},
371 })
372 return
373 }
374 if streamID%2 != 1 || streamID <= t.maxStreamID {
375 t.mu.Unlock()
376 // illegal gRPC stream id.
377 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
378 return true
379 }
380 t.maxStreamID = streamID
381 t.activeStreams[streamID] = s
382 if len(t.activeStreams) == 1 {
383 t.idle = time.Time{}
384 }
385 t.mu.Unlock()
386 if channelz.IsOn() {
387 t.czmu.Lock()
388 t.streamsStarted++
389 t.lastStreamCreated = time.Now()
390 t.czmu.Unlock()
391 }
392 s.requestRead = func(n int) {
393 t.adjustWindow(s, uint32(n))
394 }
395 s.ctx = traceCtx(s.ctx, s.method)
396 if t.stats != nil {
397 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
398 inHeader := &stats.InHeader{
399 FullMethod: s.method,
400 RemoteAddr: t.remoteAddr,
401 LocalAddr: t.localAddr,
402 Compression: s.recvCompress,
403 WireLength: int(frame.Header().Length),
404 }
405 t.stats.HandleRPC(s.ctx, inHeader)
406 }
407 s.ctxDone = s.ctx.Done()
408 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
409 s.trReader = &transportReader{
410 reader: &recvBufferReader{
411 ctx: s.ctx,
412 ctxDone: s.ctxDone,
413 recv: s.buf,
414 },
415 windowHandler: func(n int) {
416 t.updateWindow(s, uint32(n))
417 },
418 }
419 // Register the stream with loopy.
420 t.controlBuf.put(&registerStream{
421 streamID: s.id,
422 wq: s.wq,
423 })
424 handle(s)
425 return
426}
427
428// HandleStreams receives incoming streams using the given handler. This is
429// typically run in a separate goroutine.
430// traceCtx attaches trace to ctx and returns the new context.
431func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
432 defer close(t.readerDone)
433 for {
434 frame, err := t.framer.fr.ReadFrame()
435 atomic.StoreUint32(&t.activity, 1)
436 if err != nil {
437 if se, ok := err.(http2.StreamError); ok {
438 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
439 t.mu.Lock()
440 s := t.activeStreams[se.StreamID]
441 t.mu.Unlock()
442 if s != nil {
443 t.closeStream(s, true, se.Code, nil, false)
444 } else {
445 t.controlBuf.put(&cleanupStream{
446 streamID: se.StreamID,
447 rst: true,
448 rstCode: se.Code,
449 onWrite: func() {},
450 })
451 }
452 continue
453 }
454 if err == io.EOF || err == io.ErrUnexpectedEOF {
455 t.Close()
456 return
457 }
458 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
459 t.Close()
460 return
461 }
462 switch frame := frame.(type) {
463 case *http2.MetaHeadersFrame:
464 if t.operateHeaders(frame, handle, traceCtx) {
465 t.Close()
466 break
467 }
468 case *http2.DataFrame:
469 t.handleData(frame)
470 case *http2.RSTStreamFrame:
471 t.handleRSTStream(frame)
472 case *http2.SettingsFrame:
473 t.handleSettings(frame)
474 case *http2.PingFrame:
475 t.handlePing(frame)
476 case *http2.WindowUpdateFrame:
477 t.handleWindowUpdate(frame)
478 case *http2.GoAwayFrame:
479 // TODO: Handle GoAway from the client appropriately.
480 default:
481 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
482 }
483 }
484}
485
486func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
487 t.mu.Lock()
488 defer t.mu.Unlock()
489 if t.activeStreams == nil {
490 // The transport is closing.
491 return nil, false
492 }
493 s, ok := t.activeStreams[f.Header().StreamID]
494 if !ok {
495 // The stream is already done.
496 return nil, false
497 }
498 return s, true
499}
500
501// adjustWindow sends out extra window update over the initial window size
502// of stream if the application is requesting data larger in size than
503// the window.
504func (t *http2Server) adjustWindow(s *Stream, n uint32) {
505 if w := s.fc.maybeAdjust(n); w > 0 {
506 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
507 }
508
509}
510
511// updateWindow adjusts the inbound quota for the stream and the transport.
512// Window updates will deliver to the controller for sending when
513// the cumulative quota exceeds the corresponding threshold.
514func (t *http2Server) updateWindow(s *Stream, n uint32) {
515 if w := s.fc.onRead(n); w > 0 {
516 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
517 increment: w,
518 })
519 }
520}
521
522// updateFlowControl updates the incoming flow control windows
523// for the transport and the stream based on the current bdp
524// estimation.
525func (t *http2Server) updateFlowControl(n uint32) {
526 t.mu.Lock()
527 for _, s := range t.activeStreams {
528 s.fc.newLimit(n)
529 }
530 t.initialWindowSize = int32(n)
531 t.mu.Unlock()
532 t.controlBuf.put(&outgoingWindowUpdate{
533 streamID: 0,
534 increment: t.fc.newLimit(n),
535 })
536 t.controlBuf.put(&outgoingSettings{
537 ss: []http2.Setting{
538 {
539 ID: http2.SettingInitialWindowSize,
540 Val: n,
541 },
542 },
543 })
544
545}
546
547func (t *http2Server) handleData(f *http2.DataFrame) {
548 size := f.Header().Length
549 var sendBDPPing bool
550 if t.bdpEst != nil {
551 sendBDPPing = t.bdpEst.add(size)
552 }
553 // Decouple connection's flow control from application's read.
554 // An update on connection's flow control should not depend on
555 // whether user application has read the data or not. Such a
556 // restriction is already imposed on the stream's flow control,
557 // and therefore the sender will be blocked anyways.
558 // Decoupling the connection flow control will prevent other
559 // active(fast) streams from starving in presence of slow or
560 // inactive streams.
561 if w := t.fc.onData(size); w > 0 {
562 t.controlBuf.put(&outgoingWindowUpdate{
563 streamID: 0,
564 increment: w,
565 })
566 }
567 if sendBDPPing {
568 // Avoid excessive ping detection (e.g. in an L7 proxy)
569 // by sending a window update prior to the BDP ping.
570 if w := t.fc.reset(); w > 0 {
571 t.controlBuf.put(&outgoingWindowUpdate{
572 streamID: 0,
573 increment: w,
574 })
575 }
576 t.controlBuf.put(bdpPing)
577 }
578 // Select the right stream to dispatch.
579 s, ok := t.getStream(f)
580 if !ok {
581 return
582 }
583 if size > 0 {
584 if err := s.fc.onData(size); err != nil {
585 t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
586 return
587 }
588 if f.Header().Flags.Has(http2.FlagDataPadded) {
589 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
590 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
591 }
592 }
593 // TODO(bradfitz, zhaoq): A copy is required here because there is no
594 // guarantee f.Data() is consumed before the arrival of next frame.
595 // Can this copy be eliminated?
596 if len(f.Data()) > 0 {
597 data := make([]byte, len(f.Data()))
598 copy(data, f.Data())
599 s.write(recvMsg{data: data})
600 }
601 }
602 if f.Header().Flags.Has(http2.FlagDataEndStream) {
603 // Received the end of stream from the client.
604 s.compareAndSwapState(streamActive, streamReadDone)
605 s.write(recvMsg{err: io.EOF})
606 }
607}
608
609func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
610 s, ok := t.getStream(f)
611 if !ok {
612 return
613 }
614 t.closeStream(s, false, 0, nil, false)
615}
616
617func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
618 if f.IsAck() {
619 return
620 }
621 var ss []http2.Setting
622 f.ForeachSetting(func(s http2.Setting) error {
623 ss = append(ss, s)
624 return nil
625 })
626 t.controlBuf.put(&incomingSettings{
627 ss: ss,
628 })
629}
630
631const (
632 maxPingStrikes = 2
633 defaultPingTimeout = 2 * time.Hour
634)
635
636func (t *http2Server) handlePing(f *http2.PingFrame) {
637 if f.IsAck() {
638 if f.Data == goAwayPing.data && t.drainChan != nil {
639 close(t.drainChan)
640 return
641 }
642 // Maybe it's a BDP ping.
643 if t.bdpEst != nil {
644 t.bdpEst.calculate(f.Data)
645 }
646 return
647 }
648 pingAck := &ping{ack: true}
649 copy(pingAck.data[:], f.Data[:])
650 t.controlBuf.put(pingAck)
651
652 now := time.Now()
653 defer func() {
654 t.lastPingAt = now
655 }()
656 // A reset ping strikes means that we don't need to check for policy
657 // violation for this ping and the pingStrikes counter should be set
658 // to 0.
659 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
660 t.pingStrikes = 0
661 return
662 }
663 t.mu.Lock()
664 ns := len(t.activeStreams)
665 t.mu.Unlock()
666 if ns < 1 && !t.kep.PermitWithoutStream {
667 // Keepalive shouldn't be active thus, this new ping should
668 // have come after at least defaultPingTimeout.
669 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
670 t.pingStrikes++
671 }
672 } else {
673 // Check if keepalive policy is respected.
674 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
675 t.pingStrikes++
676 }
677 }
678
679 if t.pingStrikes > maxPingStrikes {
680 // Send goaway and close the connection.
681 errorf("transport: Got too many pings from the client, closing the connection.")
682 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
683 }
684}
685
686func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
687 t.controlBuf.put(&incomingWindowUpdate{
688 streamID: f.Header().StreamID,
689 increment: f.Increment,
690 })
691}
692
693func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
694 for k, vv := range md {
695 if isReservedHeader(k) {
696 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
697 continue
698 }
699 for _, v := range vv {
700 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
701 }
702 }
703 return headerFields
704}
705
706// WriteHeader sends the header metedata md back to the client.
707func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
708 if s.updateHeaderSent() || s.getState() == streamDone {
709 return ErrIllegalHeaderWrite
710 }
711 s.hdrMu.Lock()
712 if md.Len() > 0 {
713 if s.header.Len() > 0 {
714 s.header = metadata.Join(s.header, md)
715 } else {
716 s.header = md
717 }
718 }
719 t.writeHeaderLocked(s)
720 s.hdrMu.Unlock()
721 return nil
722}
723
724func (t *http2Server) writeHeaderLocked(s *Stream) {
725 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
726 // first and create a slice of that exact size.
727 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
728 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
729 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
730 if s.sendCompress != "" {
731 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
732 }
733 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
734 t.controlBuf.put(&headerFrame{
735 streamID: s.id,
736 hf: headerFields,
737 endStream: false,
738 onWrite: func() {
739 atomic.StoreUint32(&t.resetPingStrikes, 1)
740 },
741 })
742 if t.stats != nil {
743 // Note: WireLength is not set in outHeader.
744 // TODO(mmukhi): Revisit this later, if needed.
745 outHeader := &stats.OutHeader{}
746 t.stats.HandleRPC(s.Context(), outHeader)
747 }
748}
749
750// WriteStatus sends stream status to the client and terminates the stream.
751// There is no further I/O operations being able to perform on this stream.
752// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
753// OK is adopted.
754func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
755 if s.getState() == streamDone {
756 return nil
757 }
758 s.hdrMu.Lock()
759 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
760 // first and create a slice of that exact size.
761 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
762 if !s.updateHeaderSent() { // No headers have been sent.
763 if len(s.header) > 0 { // Send a separate header frame.
764 t.writeHeaderLocked(s)
765 } else { // Send a trailer only response.
766 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
767 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
768 }
769 }
770 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
771 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
772
773 if p := st.Proto(); p != nil && len(p.Details) > 0 {
774 stBytes, err := proto.Marshal(p)
775 if err != nil {
776 // TODO: return error instead, when callers are able to handle it.
777 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
778 } else {
779 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
780 }
781 }
782
783 // Attach the trailer metadata.
784 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
785 trailingHeader := &headerFrame{
786 streamID: s.id,
787 hf: headerFields,
788 endStream: true,
789 onWrite: func() {
790 atomic.StoreUint32(&t.resetPingStrikes, 1)
791 },
792 }
793 s.hdrMu.Unlock()
794 t.closeStream(s, false, 0, trailingHeader, true)
795 if t.stats != nil {
796 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
797 }
798 return nil
799}
800
801// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
802// is returns if it fails (e.g., framing error, transport error).
803func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
804 if !s.isHeaderSent() { // Headers haven't been written yet.
805 if err := t.WriteHeader(s, nil); err != nil {
806 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
807 return streamErrorf(codes.Internal, "transport: %v", err)
808 }
809 } else {
810 // Writing headers checks for this condition.
811 if s.getState() == streamDone {
812 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
813 s.cancel()
814 select {
815 case <-t.ctx.Done():
816 return ErrConnClosing
817 default:
818 }
819 return ContextErr(s.ctx.Err())
820 }
821 }
822 // Add some data to header frame so that we can equally distribute bytes across frames.
823 emptyLen := http2MaxFrameLen - len(hdr)
824 if emptyLen > len(data) {
825 emptyLen = len(data)
826 }
827 hdr = append(hdr, data[:emptyLen]...)
828 data = data[emptyLen:]
829 df := &dataFrame{
830 streamID: s.id,
831 h: hdr,
832 d: data,
833 onEachWrite: func() {
834 atomic.StoreUint32(&t.resetPingStrikes, 1)
835 },
836 }
837 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
838 select {
839 case <-t.ctx.Done():
840 return ErrConnClosing
841 default:
842 }
843 return ContextErr(s.ctx.Err())
844 }
845 return t.controlBuf.put(df)
846}
847
848// keepalive running in a separate goroutine does the following:
849// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
850// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
851// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
852// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
853// after an additional duration of keepalive.Timeout.
854func (t *http2Server) keepalive() {
855 p := &ping{}
856 var pingSent bool
857 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
858 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
859 keepalive := time.NewTimer(t.kp.Time)
860 // NOTE: All exit paths of this function should reset their
861 // respective timers. A failure to do so will cause the
862 // following clean-up to deadlock and eventually leak.
863 defer func() {
864 if !maxIdle.Stop() {
865 <-maxIdle.C
866 }
867 if !maxAge.Stop() {
868 <-maxAge.C
869 }
870 if !keepalive.Stop() {
871 <-keepalive.C
872 }
873 }()
874 for {
875 select {
876 case <-maxIdle.C:
877 t.mu.Lock()
878 idle := t.idle
879 if idle.IsZero() { // The connection is non-idle.
880 t.mu.Unlock()
881 maxIdle.Reset(t.kp.MaxConnectionIdle)
882 continue
883 }
884 val := t.kp.MaxConnectionIdle - time.Since(idle)
885 t.mu.Unlock()
886 if val <= 0 {
887 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
888 // Gracefully close the connection.
889 t.drain(http2.ErrCodeNo, []byte{})
890 // Resetting the timer so that the clean-up doesn't deadlock.
891 maxIdle.Reset(infinity)
892 return
893 }
894 maxIdle.Reset(val)
895 case <-maxAge.C:
896 t.drain(http2.ErrCodeNo, []byte{})
897 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
898 select {
899 case <-maxAge.C:
900 // Close the connection after grace period.
901 t.Close()
902 // Resetting the timer so that the clean-up doesn't deadlock.
903 maxAge.Reset(infinity)
904 case <-t.ctx.Done():
905 }
906 return
907 case <-keepalive.C:
908 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
909 pingSent = false
910 keepalive.Reset(t.kp.Time)
911 continue
912 }
913 if pingSent {
914 t.Close()
915 // Resetting the timer so that the clean-up doesn't deadlock.
916 keepalive.Reset(infinity)
917 return
918 }
919 pingSent = true
920 if channelz.IsOn() {
921 t.czmu.Lock()
922 t.kpCount++
923 t.czmu.Unlock()
924 }
925 t.controlBuf.put(p)
926 keepalive.Reset(t.kp.Timeout)
927 case <-t.ctx.Done():
928 return
929 }
930 }
931}
932
933// Close starts shutting down the http2Server transport.
934// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
935// could cause some resource issue. Revisit this later.
936func (t *http2Server) Close() error {
937 t.mu.Lock()
938 if t.state == closing {
939 t.mu.Unlock()
940 return errors.New("transport: Close() was already called")
941 }
942 t.state = closing
943 streams := t.activeStreams
944 t.activeStreams = nil
945 t.mu.Unlock()
946 t.controlBuf.finish()
947 t.cancel()
948 err := t.conn.Close()
949 if channelz.IsOn() {
950 channelz.RemoveEntry(t.channelzID)
951 }
952 // Cancel all active streams.
953 for _, s := range streams {
954 s.cancel()
955 }
956 if t.stats != nil {
957 connEnd := &stats.ConnEnd{}
958 t.stats.HandleConn(t.ctx, connEnd)
959 }
960 return err
961}
962
963// closeStream clears the footprint of a stream when the stream is not needed
964// any more.
965func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
966 if s.swapState(streamDone) == streamDone {
967 // If the stream was already done, return.
968 return
969 }
970 // In case stream sending and receiving are invoked in separate
971 // goroutines (e.g., bi-directional streaming), cancel needs to be
972 // called to interrupt the potential blocking on other goroutines.
973 s.cancel()
974 cleanup := &cleanupStream{
975 streamID: s.id,
976 rst: rst,
977 rstCode: rstCode,
978 onWrite: func() {
979 t.mu.Lock()
980 if t.activeStreams != nil {
981 delete(t.activeStreams, s.id)
982 if len(t.activeStreams) == 0 {
983 t.idle = time.Now()
984 }
985 }
986 t.mu.Unlock()
987 if channelz.IsOn() {
988 t.czmu.Lock()
989 if eosReceived {
990 t.streamsSucceeded++
991 } else {
992 t.streamsFailed++
993 }
994 t.czmu.Unlock()
995 }
996 },
997 }
998 if hdr != nil {
999 hdr.cleanup = cleanup
1000 t.controlBuf.put(hdr)
1001 } else {
1002 t.controlBuf.put(cleanup)
1003 }
1004}
1005
1006func (t *http2Server) RemoteAddr() net.Addr {
1007 return t.remoteAddr
1008}
1009
1010func (t *http2Server) Drain() {
1011 t.drain(http2.ErrCodeNo, []byte{})
1012}
1013
1014func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1015 t.mu.Lock()
1016 defer t.mu.Unlock()
1017 if t.drainChan != nil {
1018 return
1019 }
1020 t.drainChan = make(chan struct{})
1021 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1022}
1023
1024var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1025
1026// Handles outgoing GoAway and returns true if loopy needs to put itself
1027// in draining mode.
1028func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1029 t.mu.Lock()
1030 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1031 t.mu.Unlock()
1032 // The transport is closing.
1033 return false, ErrConnClosing
1034 }
1035 sid := t.maxStreamID
1036 if !g.headsUp {
1037 // Stop accepting more streams now.
1038 t.state = draining
1039 if len(t.activeStreams) == 0 {
1040 g.closeConn = true
1041 }
1042 t.mu.Unlock()
1043 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1044 return false, err
1045 }
1046 if g.closeConn {
1047 // Abruptly close the connection following the GoAway (via
1048 // loopywriter). But flush out what's inside the buffer first.
1049 t.framer.writer.Flush()
1050 return false, fmt.Errorf("transport: Connection closing")
1051 }
1052 return true, nil
1053 }
1054 t.mu.Unlock()
1055 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1056 // Follow that with a ping and wait for the ack to come back or a timer
1057 // to expire. During this time accept new streams since they might have
1058 // originated before the GoAway reaches the client.
1059 // After getting the ack or timer expiration send out another GoAway this
1060 // time with an ID of the max stream server intends to process.
1061 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1062 return false, err
1063 }
1064 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1065 return false, err
1066 }
1067 go func() {
1068 timer := time.NewTimer(time.Minute)
1069 defer timer.Stop()
1070 select {
1071 case <-t.drainChan:
1072 case <-timer.C:
1073 case <-t.ctx.Done():
1074 return
1075 }
1076 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1077 }()
1078 return false, nil
1079}
1080
1081func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1082 t.czmu.RLock()
1083 s := channelz.SocketInternalMetric{
1084 StreamsStarted: t.streamsStarted,
1085 StreamsSucceeded: t.streamsSucceeded,
1086 StreamsFailed: t.streamsFailed,
1087 MessagesSent: t.msgSent,
1088 MessagesReceived: t.msgRecv,
1089 KeepAlivesSent: t.kpCount,
1090 LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
1091 LastMessageSentTimestamp: t.lastMsgSent,
1092 LastMessageReceivedTimestamp: t.lastMsgRecv,
1093 LocalFlowControlWindow: int64(t.fc.getSize()),
1094 //socket options
1095 LocalAddr: t.localAddr,
1096 RemoteAddr: t.remoteAddr,
1097 // Security
1098 // RemoteName :
1099 }
1100 t.czmu.RUnlock()
1101 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1102 return &s
1103}
1104
1105func (t *http2Server) IncrMsgSent() {
1106 t.czmu.Lock()
1107 t.msgSent++
1108 t.lastMsgSent = time.Now()
1109 t.czmu.Unlock()
1110}
1111
1112func (t *http2Server) IncrMsgRecv() {
1113 t.czmu.Lock()
1114 t.msgRecv++
1115 t.lastMsgRecv = time.Now()
1116 t.czmu.Unlock()
1117}
1118
1119func (t *http2Server) getOutFlowWindow() int64 {
1120 resp := make(chan uint32)
1121 timer := time.NewTimer(time.Second)
1122 defer timer.Stop()
1123 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1124 select {
1125 case sz := <-resp:
1126 return int64(sz)
1127 case <-t.ctxDone:
1128 return -1
1129 case <-timer.C:
1130 return -2
1131 }
1132}
1133
1134func getJitter(v time.Duration) time.Duration {
1135 if v == infinity {
1136 return 0
1137 }
1138 // Generate a jitter between +/- 10% of the value.
1139 r := int64(v / 10)
1140 j := grpcrand.Int63n(2*r) - r
1141 return time.Duration(j)
1142}