blob: 014c72b3f2834f630d545e3cd66384b9c11d916f [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "bytes"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "time"
34
35 "io/ioutil"
36
37 "golang.org/x/net/context"
38 "golang.org/x/net/http2"
39 "golang.org/x/net/trace"
40
41 "google.golang.org/grpc/codes"
42 "google.golang.org/grpc/credentials"
43 "google.golang.org/grpc/encoding"
44 "google.golang.org/grpc/encoding/proto"
45 "google.golang.org/grpc/grpclog"
46 "google.golang.org/grpc/internal"
47 "google.golang.org/grpc/internal/channelz"
48 "google.golang.org/grpc/keepalive"
49 "google.golang.org/grpc/metadata"
50 "google.golang.org/grpc/stats"
51 "google.golang.org/grpc/status"
52 "google.golang.org/grpc/tap"
53 "google.golang.org/grpc/transport"
54)
55
56const (
57 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
58 defaultServerMaxSendMessageSize = math.MaxInt32
59)
60
61type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
62
63// MethodDesc represents an RPC service's method specification.
64type MethodDesc struct {
65 MethodName string
66 Handler methodHandler
67}
68
69// ServiceDesc represents an RPC service's specification.
70type ServiceDesc struct {
71 ServiceName string
72 // The pointer to the service interface. Used to check whether the user
73 // provided implementation satisfies the interface requirements.
74 HandlerType interface{}
75 Methods []MethodDesc
76 Streams []StreamDesc
77 Metadata interface{}
78}
79
80// service consists of the information of the server serving this service and
81// the methods in this service.
82type service struct {
83 server interface{} // the server for service methods
84 md map[string]*MethodDesc
85 sd map[string]*StreamDesc
86 mdata interface{}
87}
88
89// Server is a gRPC server to serve RPC requests.
90type Server struct {
91 opts options
92
93 mu sync.Mutex // guards following
94 lis map[net.Listener]bool
95 conns map[io.Closer]bool
96 serve bool
97 drain bool
98 cv *sync.Cond // signaled when connections close for GracefulStop
99 m map[string]*service // service name -> service info
100 events trace.EventLog
101
102 quit chan struct{}
103 done chan struct{}
104 quitOnce sync.Once
105 doneOnce sync.Once
106 channelzRemoveOnce sync.Once
107 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
108
109 channelzID int64 // channelz unique identification number
110 czmu sync.RWMutex
111 callsStarted int64
112 callsFailed int64
113 callsSucceeded int64
114 lastCallStartedTime time.Time
115}
116
117type options struct {
118 creds credentials.TransportCredentials
119 codec baseCodec
120 cp Compressor
121 dc Decompressor
122 unaryInt UnaryServerInterceptor
123 streamInt StreamServerInterceptor
124 inTapHandle tap.ServerInHandle
125 statsHandler stats.Handler
126 maxConcurrentStreams uint32
127 maxReceiveMessageSize int
128 maxSendMessageSize int
129 useHandlerImpl bool // use http.Handler-based server
130 unknownStreamDesc *StreamDesc
131 keepaliveParams keepalive.ServerParameters
132 keepalivePolicy keepalive.EnforcementPolicy
133 initialWindowSize int32
134 initialConnWindowSize int32
135 writeBufferSize int
136 readBufferSize int
137 connectionTimeout time.Duration
138}
139
140var defaultServerOptions = options{
141 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
142 maxSendMessageSize: defaultServerMaxSendMessageSize,
143 connectionTimeout: 120 * time.Second,
144}
145
146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
147type ServerOption func(*options)
148
149// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
150// before doing a write on the wire.
151func WriteBufferSize(s int) ServerOption {
152 return func(o *options) {
153 o.writeBufferSize = s
154 }
155}
156
157// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
158// for one read syscall.
159func ReadBufferSize(s int) ServerOption {
160 return func(o *options) {
161 o.readBufferSize = s
162 }
163}
164
165// InitialWindowSize returns a ServerOption that sets window size for stream.
166// The lower bound for window size is 64K and any value smaller than that will be ignored.
167func InitialWindowSize(s int32) ServerOption {
168 return func(o *options) {
169 o.initialWindowSize = s
170 }
171}
172
173// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
174// The lower bound for window size is 64K and any value smaller than that will be ignored.
175func InitialConnWindowSize(s int32) ServerOption {
176 return func(o *options) {
177 o.initialConnWindowSize = s
178 }
179}
180
181// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
182func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
183 return func(o *options) {
184 o.keepaliveParams = kp
185 }
186}
187
188// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
189func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
190 return func(o *options) {
191 o.keepalivePolicy = kep
192 }
193}
194
195// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
196//
197// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
198func CustomCodec(codec Codec) ServerOption {
199 return func(o *options) {
200 o.codec = codec
201 }
202}
203
204// RPCCompressor returns a ServerOption that sets a compressor for outbound
205// messages. For backward compatibility, all outbound messages will be sent
206// using this compressor, regardless of incoming message compression. By
207// default, server messages will be sent using the same compressor with which
208// request messages were sent.
209//
210// Deprecated: use encoding.RegisterCompressor instead.
211func RPCCompressor(cp Compressor) ServerOption {
212 return func(o *options) {
213 o.cp = cp
214 }
215}
216
217// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
218// messages. It has higher priority than decompressors registered via
219// encoding.RegisterCompressor.
220//
221// Deprecated: use encoding.RegisterCompressor instead.
222func RPCDecompressor(dc Decompressor) ServerOption {
223 return func(o *options) {
224 o.dc = dc
225 }
226}
227
228// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
229// If this is not set, gRPC uses the default limit.
230//
231// Deprecated: use MaxRecvMsgSize instead.
232func MaxMsgSize(m int) ServerOption {
233 return MaxRecvMsgSize(m)
234}
235
236// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
237// If this is not set, gRPC uses the default 4MB.
238func MaxRecvMsgSize(m int) ServerOption {
239 return func(o *options) {
240 o.maxReceiveMessageSize = m
241 }
242}
243
244// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
245// If this is not set, gRPC uses the default 4MB.
246func MaxSendMsgSize(m int) ServerOption {
247 return func(o *options) {
248 o.maxSendMessageSize = m
249 }
250}
251
252// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
253// of concurrent streams to each ServerTransport.
254func MaxConcurrentStreams(n uint32) ServerOption {
255 return func(o *options) {
256 o.maxConcurrentStreams = n
257 }
258}
259
260// Creds returns a ServerOption that sets credentials for server connections.
261func Creds(c credentials.TransportCredentials) ServerOption {
262 return func(o *options) {
263 o.creds = c
264 }
265}
266
267// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
268// server. Only one unary interceptor can be installed. The construction of multiple
269// interceptors (e.g., chaining) can be implemented at the caller.
270func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
271 return func(o *options) {
272 if o.unaryInt != nil {
273 panic("The unary server interceptor was already set and may not be reset.")
274 }
275 o.unaryInt = i
276 }
277}
278
279// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
280// server. Only one stream interceptor can be installed.
281func StreamInterceptor(i StreamServerInterceptor) ServerOption {
282 return func(o *options) {
283 if o.streamInt != nil {
284 panic("The stream server interceptor was already set and may not be reset.")
285 }
286 o.streamInt = i
287 }
288}
289
290// InTapHandle returns a ServerOption that sets the tap handle for all the server
291// transport to be created. Only one can be installed.
292func InTapHandle(h tap.ServerInHandle) ServerOption {
293 return func(o *options) {
294 if o.inTapHandle != nil {
295 panic("The tap handle was already set and may not be reset.")
296 }
297 o.inTapHandle = h
298 }
299}
300
301// StatsHandler returns a ServerOption that sets the stats handler for the server.
302func StatsHandler(h stats.Handler) ServerOption {
303 return func(o *options) {
304 o.statsHandler = h
305 }
306}
307
308// UnknownServiceHandler returns a ServerOption that allows for adding a custom
309// unknown service handler. The provided method is a bidi-streaming RPC service
310// handler that will be invoked instead of returning the "unimplemented" gRPC
311// error whenever a request is received for an unregistered service or method.
312// The handling function has full access to the Context of the request and the
313// stream, and the invocation bypasses interceptors.
314func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
315 return func(o *options) {
316 o.unknownStreamDesc = &StreamDesc{
317 StreamName: "unknown_service_handler",
318 Handler: streamHandler,
319 // We need to assume that the users of the streamHandler will want to use both.
320 ClientStreams: true,
321 ServerStreams: true,
322 }
323 }
324}
325
326// ConnectionTimeout returns a ServerOption that sets the timeout for
327// connection establishment (up to and including HTTP/2 handshaking) for all
328// new connections. If this is not set, the default is 120 seconds. A zero or
329// negative value will result in an immediate timeout.
330//
331// This API is EXPERIMENTAL.
332func ConnectionTimeout(d time.Duration) ServerOption {
333 return func(o *options) {
334 o.connectionTimeout = d
335 }
336}
337
338// NewServer creates a gRPC server which has no service registered and has not
339// started to accept requests yet.
340func NewServer(opt ...ServerOption) *Server {
341 opts := defaultServerOptions
342 for _, o := range opt {
343 o(&opts)
344 }
345 s := &Server{
346 lis: make(map[net.Listener]bool),
347 opts: opts,
348 conns: make(map[io.Closer]bool),
349 m: make(map[string]*service),
350 quit: make(chan struct{}),
351 done: make(chan struct{}),
352 }
353 s.cv = sync.NewCond(&s.mu)
354 if EnableTracing {
355 _, file, line, _ := runtime.Caller(1)
356 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
357 }
358
359 if channelz.IsOn() {
360 s.channelzID = channelz.RegisterServer(s, "")
361 }
362 return s
363}
364
365// printf records an event in s's event log, unless s has been stopped.
366// REQUIRES s.mu is held.
367func (s *Server) printf(format string, a ...interface{}) {
368 if s.events != nil {
369 s.events.Printf(format, a...)
370 }
371}
372
373// errorf records an error in s's event log, unless s has been stopped.
374// REQUIRES s.mu is held.
375func (s *Server) errorf(format string, a ...interface{}) {
376 if s.events != nil {
377 s.events.Errorf(format, a...)
378 }
379}
380
381// RegisterService registers a service and its implementation to the gRPC
382// server. It is called from the IDL generated code. This must be called before
383// invoking Serve.
384func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
385 ht := reflect.TypeOf(sd.HandlerType).Elem()
386 st := reflect.TypeOf(ss)
387 if !st.Implements(ht) {
388 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
389 }
390 s.register(sd, ss)
391}
392
393func (s *Server) register(sd *ServiceDesc, ss interface{}) {
394 s.mu.Lock()
395 defer s.mu.Unlock()
396 s.printf("RegisterService(%q)", sd.ServiceName)
397 if s.serve {
398 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
399 }
400 if _, ok := s.m[sd.ServiceName]; ok {
401 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
402 }
403 srv := &service{
404 server: ss,
405 md: make(map[string]*MethodDesc),
406 sd: make(map[string]*StreamDesc),
407 mdata: sd.Metadata,
408 }
409 for i := range sd.Methods {
410 d := &sd.Methods[i]
411 srv.md[d.MethodName] = d
412 }
413 for i := range sd.Streams {
414 d := &sd.Streams[i]
415 srv.sd[d.StreamName] = d
416 }
417 s.m[sd.ServiceName] = srv
418}
419
420// MethodInfo contains the information of an RPC including its method name and type.
421type MethodInfo struct {
422 // Name is the method name only, without the service name or package name.
423 Name string
424 // IsClientStream indicates whether the RPC is a client streaming RPC.
425 IsClientStream bool
426 // IsServerStream indicates whether the RPC is a server streaming RPC.
427 IsServerStream bool
428}
429
430// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
431type ServiceInfo struct {
432 Methods []MethodInfo
433 // Metadata is the metadata specified in ServiceDesc when registering service.
434 Metadata interface{}
435}
436
437// GetServiceInfo returns a map from service names to ServiceInfo.
438// Service names include the package names, in the form of <package>.<service>.
439func (s *Server) GetServiceInfo() map[string]ServiceInfo {
440 ret := make(map[string]ServiceInfo)
441 for n, srv := range s.m {
442 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
443 for m := range srv.md {
444 methods = append(methods, MethodInfo{
445 Name: m,
446 IsClientStream: false,
447 IsServerStream: false,
448 })
449 }
450 for m, d := range srv.sd {
451 methods = append(methods, MethodInfo{
452 Name: m,
453 IsClientStream: d.ClientStreams,
454 IsServerStream: d.ServerStreams,
455 })
456 }
457
458 ret[n] = ServiceInfo{
459 Methods: methods,
460 Metadata: srv.mdata,
461 }
462 }
463 return ret
464}
465
466// ErrServerStopped indicates that the operation is now illegal because of
467// the server being stopped.
468var ErrServerStopped = errors.New("grpc: the server has been stopped")
469
470func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
471 if s.opts.creds == nil {
472 return rawConn, nil, nil
473 }
474 return s.opts.creds.ServerHandshake(rawConn)
475}
476
477type listenSocket struct {
478 net.Listener
479 channelzID int64
480}
481
482func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
483 return &channelz.SocketInternalMetric{
484 LocalAddr: l.Listener.Addr(),
485 }
486}
487
488func (l *listenSocket) Close() error {
489 err := l.Listener.Close()
490 if channelz.IsOn() {
491 channelz.RemoveEntry(l.channelzID)
492 }
493 return err
494}
495
496// Serve accepts incoming connections on the listener lis, creating a new
497// ServerTransport and service goroutine for each. The service goroutines
498// read gRPC requests and then call the registered handlers to reply to them.
499// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
500// this method returns.
501// Serve will return a non-nil error unless Stop or GracefulStop is called.
502func (s *Server) Serve(lis net.Listener) error {
503 s.mu.Lock()
504 s.printf("serving")
505 s.serve = true
506 if s.lis == nil {
507 // Serve called after Stop or GracefulStop.
508 s.mu.Unlock()
509 lis.Close()
510 return ErrServerStopped
511 }
512
513 s.serveWG.Add(1)
514 defer func() {
515 s.serveWG.Done()
516 select {
517 // Stop or GracefulStop called; block until done and return nil.
518 case <-s.quit:
519 <-s.done
520 default:
521 }
522 }()
523
524 ls := &listenSocket{Listener: lis}
525 s.lis[ls] = true
526
527 if channelz.IsOn() {
528 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "")
529 }
530 s.mu.Unlock()
531
532 defer func() {
533 s.mu.Lock()
534 if s.lis != nil && s.lis[ls] {
535 ls.Close()
536 delete(s.lis, ls)
537 }
538 s.mu.Unlock()
539 }()
540
541 var tempDelay time.Duration // how long to sleep on accept failure
542
543 for {
544 rawConn, err := lis.Accept()
545 if err != nil {
546 if ne, ok := err.(interface {
547 Temporary() bool
548 }); ok && ne.Temporary() {
549 if tempDelay == 0 {
550 tempDelay = 5 * time.Millisecond
551 } else {
552 tempDelay *= 2
553 }
554 if max := 1 * time.Second; tempDelay > max {
555 tempDelay = max
556 }
557 s.mu.Lock()
558 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
559 s.mu.Unlock()
560 timer := time.NewTimer(tempDelay)
561 select {
562 case <-timer.C:
563 case <-s.quit:
564 timer.Stop()
565 return nil
566 }
567 continue
568 }
569 s.mu.Lock()
570 s.printf("done serving; Accept = %v", err)
571 s.mu.Unlock()
572
573 select {
574 case <-s.quit:
575 return nil
576 default:
577 }
578 return err
579 }
580 tempDelay = 0
581 // Start a new goroutine to deal with rawConn so we don't stall this Accept
582 // loop goroutine.
583 //
584 // Make sure we account for the goroutine so GracefulStop doesn't nil out
585 // s.conns before this conn can be added.
586 s.serveWG.Add(1)
587 go func() {
588 s.handleRawConn(rawConn)
589 s.serveWG.Done()
590 }()
591 }
592}
593
594// handleRawConn forks a goroutine to handle a just-accepted connection that
595// has not had any I/O performed on it yet.
596func (s *Server) handleRawConn(rawConn net.Conn) {
597 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
598 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
599 if err != nil {
600 s.mu.Lock()
601 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
602 s.mu.Unlock()
603 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
604 // If serverHandshake returns ErrConnDispatched, keep rawConn open.
605 if err != credentials.ErrConnDispatched {
606 rawConn.Close()
607 }
608 rawConn.SetDeadline(time.Time{})
609 return
610 }
611
612 s.mu.Lock()
613 if s.conns == nil {
614 s.mu.Unlock()
615 conn.Close()
616 return
617 }
618 s.mu.Unlock()
619
620 var serve func()
621 c := conn.(io.Closer)
622 if s.opts.useHandlerImpl {
623 serve = func() { s.serveUsingHandler(conn) }
624 } else {
625 // Finish handshaking (HTTP2)
626 st := s.newHTTP2Transport(conn, authInfo)
627 if st == nil {
628 return
629 }
630 c = st
631 serve = func() { s.serveStreams(st) }
632 }
633
634 rawConn.SetDeadline(time.Time{})
635 if !s.addConn(c) {
636 return
637 }
638 go func() {
639 serve()
640 s.removeConn(c)
641 }()
642}
643
644// newHTTP2Transport sets up a http/2 transport (using the
645// gRPC http2 server transport in transport/http2_server.go).
646func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
647 config := &transport.ServerConfig{
648 MaxStreams: s.opts.maxConcurrentStreams,
649 AuthInfo: authInfo,
650 InTapHandle: s.opts.inTapHandle,
651 StatsHandler: s.opts.statsHandler,
652 KeepaliveParams: s.opts.keepaliveParams,
653 KeepalivePolicy: s.opts.keepalivePolicy,
654 InitialWindowSize: s.opts.initialWindowSize,
655 InitialConnWindowSize: s.opts.initialConnWindowSize,
656 WriteBufferSize: s.opts.writeBufferSize,
657 ReadBufferSize: s.opts.readBufferSize,
658 ChannelzParentID: s.channelzID,
659 }
660 st, err := transport.NewServerTransport("http2", c, config)
661 if err != nil {
662 s.mu.Lock()
663 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
664 s.mu.Unlock()
665 c.Close()
666 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
667 return nil
668 }
669
670 return st
671}
672
673func (s *Server) serveStreams(st transport.ServerTransport) {
674 defer st.Close()
675 var wg sync.WaitGroup
676 st.HandleStreams(func(stream *transport.Stream) {
677 wg.Add(1)
678 go func() {
679 defer wg.Done()
680 s.handleStream(st, stream, s.traceInfo(st, stream))
681 }()
682 }, func(ctx context.Context, method string) context.Context {
683 if !EnableTracing {
684 return ctx
685 }
686 tr := trace.New("grpc.Recv."+methodFamily(method), method)
687 return trace.NewContext(ctx, tr)
688 })
689 wg.Wait()
690}
691
692var _ http.Handler = (*Server)(nil)
693
694// serveUsingHandler is called from handleRawConn when s is configured
695// to handle requests via the http.Handler interface. It sets up a
696// net/http.Server to handle the just-accepted conn. The http.Server
697// is configured to route all incoming requests (all HTTP/2 streams)
698// to ServeHTTP, which creates a new ServerTransport for each stream.
699// serveUsingHandler blocks until conn closes.
700//
701// This codepath is only used when Server.TestingUseHandlerImpl has
702// been configured. This lets the end2end tests exercise the ServeHTTP
703// method as one of the environment types.
704//
705// conn is the *tls.Conn that's already been authenticated.
706func (s *Server) serveUsingHandler(conn net.Conn) {
707 h2s := &http2.Server{
708 MaxConcurrentStreams: s.opts.maxConcurrentStreams,
709 }
710 h2s.ServeConn(conn, &http2.ServeConnOpts{
711 Handler: s,
712 })
713}
714
715// ServeHTTP implements the Go standard library's http.Handler
716// interface by responding to the gRPC request r, by looking up
717// the requested gRPC method in the gRPC server s.
718//
719// The provided HTTP request must have arrived on an HTTP/2
720// connection. When using the Go standard library's server,
721// practically this means that the Request must also have arrived
722// over TLS.
723//
724// To share one port (such as 443 for https) between gRPC and an
725// existing http.Handler, use a root http.Handler such as:
726//
727// if r.ProtoMajor == 2 && strings.HasPrefix(
728// r.Header.Get("Content-Type"), "application/grpc") {
729// grpcServer.ServeHTTP(w, r)
730// } else {
731// yourMux.ServeHTTP(w, r)
732// }
733//
734// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
735// separate from grpc-go's HTTP/2 server. Performance and features may vary
736// between the two paths. ServeHTTP does not support some gRPC features
737// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
738// and subject to change.
739func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
740 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
741 if err != nil {
742 http.Error(w, err.Error(), http.StatusInternalServerError)
743 return
744 }
745 if !s.addConn(st) {
746 return
747 }
748 defer s.removeConn(st)
749 s.serveStreams(st)
750}
751
752// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
753// If tracing is not enabled, it returns nil.
754func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
755 tr, ok := trace.FromContext(stream.Context())
756 if !ok {
757 return nil
758 }
759
760 trInfo = &traceInfo{
761 tr: tr,
762 }
763 trInfo.firstLine.client = false
764 trInfo.firstLine.remoteAddr = st.RemoteAddr()
765
766 if dl, ok := stream.Context().Deadline(); ok {
767 trInfo.firstLine.deadline = dl.Sub(time.Now())
768 }
769 return trInfo
770}
771
772func (s *Server) addConn(c io.Closer) bool {
773 s.mu.Lock()
774 defer s.mu.Unlock()
775 if s.conns == nil {
776 c.Close()
777 return false
778 }
779 if s.drain {
780 // Transport added after we drained our existing conns: drain it
781 // immediately.
782 c.(transport.ServerTransport).Drain()
783 }
784 s.conns[c] = true
785 return true
786}
787
788func (s *Server) removeConn(c io.Closer) {
789 s.mu.Lock()
790 defer s.mu.Unlock()
791 if s.conns != nil {
792 delete(s.conns, c)
793 s.cv.Broadcast()
794 }
795}
796
797// ChannelzMetric returns ServerInternalMetric of current server.
798// This is an EXPERIMENTAL API.
799func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric {
800 s.czmu.RLock()
801 defer s.czmu.RUnlock()
802 return &channelz.ServerInternalMetric{
803 CallsStarted: s.callsStarted,
804 CallsSucceeded: s.callsSucceeded,
805 CallsFailed: s.callsFailed,
806 LastCallStartedTimestamp: s.lastCallStartedTime,
807 }
808}
809
810func (s *Server) incrCallsStarted() {
811 s.czmu.Lock()
812 s.callsStarted++
813 s.lastCallStartedTime = time.Now()
814 s.czmu.Unlock()
815}
816
817func (s *Server) incrCallsSucceeded() {
818 s.czmu.Lock()
819 s.callsSucceeded++
820 s.czmu.Unlock()
821}
822
823func (s *Server) incrCallsFailed() {
824 s.czmu.Lock()
825 s.callsFailed++
826 s.czmu.Unlock()
827}
828
829func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
830 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
831 if err != nil {
832 grpclog.Errorln("grpc: server failed to encode response: ", err)
833 return err
834 }
835 compData, err := compress(data, cp, comp)
836 if err != nil {
837 grpclog.Errorln("grpc: server failed to compress response: ", err)
838 return err
839 }
840 hdr, payload := msgHeader(data, compData)
841 // TODO(dfawley): should we be checking len(data) instead?
842 if len(payload) > s.opts.maxSendMessageSize {
843 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
844 }
845 err = t.Write(stream, hdr, payload, opts)
846 if err == nil && s.opts.statsHandler != nil {
847 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
848 }
849 return err
850}
851
852func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
853 if channelz.IsOn() {
854 s.incrCallsStarted()
855 defer func() {
856 if err != nil && err != io.EOF {
857 s.incrCallsFailed()
858 } else {
859 s.incrCallsSucceeded()
860 }
861 }()
862 }
863 sh := s.opts.statsHandler
864 if sh != nil {
865 beginTime := time.Now()
866 begin := &stats.Begin{
867 BeginTime: beginTime,
868 }
869 sh.HandleRPC(stream.Context(), begin)
870 defer func() {
871 end := &stats.End{
872 BeginTime: beginTime,
873 EndTime: time.Now(),
874 }
875 if err != nil && err != io.EOF {
876 end.Error = toRPCErr(err)
877 }
878 sh.HandleRPC(stream.Context(), end)
879 }()
880 }
881 if trInfo != nil {
882 defer trInfo.tr.Finish()
883 trInfo.firstLine.client = false
884 trInfo.tr.LazyLog(&trInfo.firstLine, false)
885 defer func() {
886 if err != nil && err != io.EOF {
887 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
888 trInfo.tr.SetError()
889 }
890 }()
891 }
892
893 // comp and cp are used for compression. decomp and dc are used for
894 // decompression. If comp and decomp are both set, they are the same;
895 // however they are kept separate to ensure that at most one of the
896 // compressor/decompressor variable pairs are set for use later.
897 var comp, decomp encoding.Compressor
898 var cp Compressor
899 var dc Decompressor
900
901 // If dc is set and matches the stream's compression, use it. Otherwise, try
902 // to find a matching registered compressor for decomp.
903 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
904 dc = s.opts.dc
905 } else if rc != "" && rc != encoding.Identity {
906 decomp = encoding.GetCompressor(rc)
907 if decomp == nil {
908 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
909 t.WriteStatus(stream, st)
910 return st.Err()
911 }
912 }
913
914 // If cp is set, use it. Otherwise, attempt to compress the response using
915 // the incoming message compression method.
916 //
917 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
918 if s.opts.cp != nil {
919 cp = s.opts.cp
920 stream.SetSendCompress(cp.Type())
921 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
922 // Legacy compressor not specified; attempt to respond with same encoding.
923 comp = encoding.GetCompressor(rc)
924 if comp != nil {
925 stream.SetSendCompress(rc)
926 }
927 }
928
929 p := &parser{r: stream}
930 pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
931 if err == io.EOF {
932 // The entire stream is done (for unary RPC only).
933 return err
934 }
935 if err == io.ErrUnexpectedEOF {
936 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
937 }
938 if err != nil {
939 if st, ok := status.FromError(err); ok {
940 if e := t.WriteStatus(stream, st); e != nil {
941 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
942 }
943 } else {
944 switch st := err.(type) {
945 case transport.ConnectionError:
946 // Nothing to do here.
947 case transport.StreamError:
948 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
949 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
950 }
951 default:
952 panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
953 }
954 }
955 return err
956 }
957 if channelz.IsOn() {
958 t.IncrMsgRecv()
959 }
960 if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
961 if e := t.WriteStatus(stream, st); e != nil {
962 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
963 }
964 return st.Err()
965 }
966 var inPayload *stats.InPayload
967 if sh != nil {
968 inPayload = &stats.InPayload{
969 RecvTime: time.Now(),
970 }
971 }
972 df := func(v interface{}) error {
973 if inPayload != nil {
974 inPayload.WireLength = len(req)
975 }
976 if pf == compressionMade {
977 var err error
978 if dc != nil {
979 req, err = dc.Do(bytes.NewReader(req))
980 if err != nil {
981 return status.Errorf(codes.Internal, err.Error())
982 }
983 } else {
984 tmp, _ := decomp.Decompress(bytes.NewReader(req))
985 req, err = ioutil.ReadAll(tmp)
986 if err != nil {
987 return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
988 }
989 }
990 }
991 if len(req) > s.opts.maxReceiveMessageSize {
992 // TODO: Revisit the error code. Currently keep it consistent with
993 // java implementation.
994 return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
995 }
996 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
997 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
998 }
999 if inPayload != nil {
1000 inPayload.Payload = v
1001 inPayload.Data = req
1002 inPayload.Length = len(req)
1003 sh.HandleRPC(stream.Context(), inPayload)
1004 }
1005 if trInfo != nil {
1006 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1007 }
1008 return nil
1009 }
1010 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1011 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1012 if appErr != nil {
1013 appStatus, ok := status.FromError(appErr)
1014 if !ok {
1015 // Convert appErr if it is not a grpc status error.
1016 appErr = status.Error(codes.Unknown, appErr.Error())
1017 appStatus, _ = status.FromError(appErr)
1018 }
1019 if trInfo != nil {
1020 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1021 trInfo.tr.SetError()
1022 }
1023 if e := t.WriteStatus(stream, appStatus); e != nil {
1024 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1025 }
1026 return appErr
1027 }
1028 if trInfo != nil {
1029 trInfo.tr.LazyLog(stringer("OK"), false)
1030 }
1031 opts := &transport.Options{
1032 Last: true,
1033 Delay: false,
1034 }
1035
1036 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1037 if err == io.EOF {
1038 // The entire stream is done (for unary RPC only).
1039 return err
1040 }
1041 if s, ok := status.FromError(err); ok {
1042 if e := t.WriteStatus(stream, s); e != nil {
1043 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1044 }
1045 } else {
1046 switch st := err.(type) {
1047 case transport.ConnectionError:
1048 // Nothing to do here.
1049 case transport.StreamError:
1050 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
1051 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
1052 }
1053 default:
1054 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1055 }
1056 }
1057 return err
1058 }
1059 if channelz.IsOn() {
1060 t.IncrMsgSent()
1061 }
1062 if trInfo != nil {
1063 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1064 }
1065 // TODO: Should we be logging if writing status failed here, like above?
1066 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1067 // error or allow the stats handler to see it?
1068 return t.WriteStatus(stream, status.New(codes.OK, ""))
1069}
1070
1071func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1072 if channelz.IsOn() {
1073 s.incrCallsStarted()
1074 defer func() {
1075 if err != nil && err != io.EOF {
1076 s.incrCallsFailed()
1077 } else {
1078 s.incrCallsSucceeded()
1079 }
1080 }()
1081 }
1082 sh := s.opts.statsHandler
1083 if sh != nil {
1084 beginTime := time.Now()
1085 begin := &stats.Begin{
1086 BeginTime: beginTime,
1087 }
1088 sh.HandleRPC(stream.Context(), begin)
1089 defer func() {
1090 end := &stats.End{
1091 BeginTime: beginTime,
1092 EndTime: time.Now(),
1093 }
1094 if err != nil && err != io.EOF {
1095 end.Error = toRPCErr(err)
1096 }
1097 sh.HandleRPC(stream.Context(), end)
1098 }()
1099 }
1100 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1101 ss := &serverStream{
1102 ctx: ctx,
1103 t: t,
1104 s: stream,
1105 p: &parser{r: stream},
1106 codec: s.getCodec(stream.ContentSubtype()),
1107 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1108 maxSendMessageSize: s.opts.maxSendMessageSize,
1109 trInfo: trInfo,
1110 statsHandler: sh,
1111 }
1112
1113 // If dc is set and matches the stream's compression, use it. Otherwise, try
1114 // to find a matching registered compressor for decomp.
1115 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1116 ss.dc = s.opts.dc
1117 } else if rc != "" && rc != encoding.Identity {
1118 ss.decomp = encoding.GetCompressor(rc)
1119 if ss.decomp == nil {
1120 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1121 t.WriteStatus(ss.s, st)
1122 return st.Err()
1123 }
1124 }
1125
1126 // If cp is set, use it. Otherwise, attempt to compress the response using
1127 // the incoming message compression method.
1128 //
1129 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1130 if s.opts.cp != nil {
1131 ss.cp = s.opts.cp
1132 stream.SetSendCompress(s.opts.cp.Type())
1133 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1134 // Legacy compressor not specified; attempt to respond with same encoding.
1135 ss.comp = encoding.GetCompressor(rc)
1136 if ss.comp != nil {
1137 stream.SetSendCompress(rc)
1138 }
1139 }
1140
1141 if trInfo != nil {
1142 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1143 defer func() {
1144 ss.mu.Lock()
1145 if err != nil && err != io.EOF {
1146 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1147 ss.trInfo.tr.SetError()
1148 }
1149 ss.trInfo.tr.Finish()
1150 ss.trInfo.tr = nil
1151 ss.mu.Unlock()
1152 }()
1153 }
1154 var appErr error
1155 var server interface{}
1156 if srv != nil {
1157 server = srv.server
1158 }
1159 if s.opts.streamInt == nil {
1160 appErr = sd.Handler(server, ss)
1161 } else {
1162 info := &StreamServerInfo{
1163 FullMethod: stream.Method(),
1164 IsClientStream: sd.ClientStreams,
1165 IsServerStream: sd.ServerStreams,
1166 }
1167 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1168 }
1169 if appErr != nil {
1170 appStatus, ok := status.FromError(appErr)
1171 if !ok {
1172 switch err := appErr.(type) {
1173 case transport.StreamError:
1174 appStatus = status.New(err.Code, err.Desc)
1175 default:
1176 appStatus = status.New(codes.Unknown, appErr.Error())
1177 }
1178 appErr = appStatus.Err()
1179 }
1180 if trInfo != nil {
1181 ss.mu.Lock()
1182 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1183 ss.trInfo.tr.SetError()
1184 ss.mu.Unlock()
1185 }
1186 t.WriteStatus(ss.s, appStatus)
1187 // TODO: Should we log an error from WriteStatus here and below?
1188 return appErr
1189 }
1190 if trInfo != nil {
1191 ss.mu.Lock()
1192 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1193 ss.mu.Unlock()
1194 }
1195 return t.WriteStatus(ss.s, status.New(codes.OK, ""))
1196}
1197
1198func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1199 sm := stream.Method()
1200 if sm != "" && sm[0] == '/' {
1201 sm = sm[1:]
1202 }
1203 pos := strings.LastIndex(sm, "/")
1204 if pos == -1 {
1205 if trInfo != nil {
1206 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1207 trInfo.tr.SetError()
1208 }
1209 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1210 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1211 if trInfo != nil {
1212 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1213 trInfo.tr.SetError()
1214 }
1215 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1216 }
1217 if trInfo != nil {
1218 trInfo.tr.Finish()
1219 }
1220 return
1221 }
1222 service := sm[:pos]
1223 method := sm[pos+1:]
1224 srv, ok := s.m[service]
1225 if !ok {
1226 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1227 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1228 return
1229 }
1230 if trInfo != nil {
1231 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1232 trInfo.tr.SetError()
1233 }
1234 errDesc := fmt.Sprintf("unknown service %v", service)
1235 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1236 if trInfo != nil {
1237 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1238 trInfo.tr.SetError()
1239 }
1240 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1241 }
1242 if trInfo != nil {
1243 trInfo.tr.Finish()
1244 }
1245 return
1246 }
1247 // Unary RPC or Streaming RPC?
1248 if md, ok := srv.md[method]; ok {
1249 s.processUnaryRPC(t, stream, srv, md, trInfo)
1250 return
1251 }
1252 if sd, ok := srv.sd[method]; ok {
1253 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1254 return
1255 }
1256 if trInfo != nil {
1257 trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
1258 trInfo.tr.SetError()
1259 }
1260 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1261 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1262 return
1263 }
1264 errDesc := fmt.Sprintf("unknown method %v", method)
1265 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1266 if trInfo != nil {
1267 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1268 trInfo.tr.SetError()
1269 }
1270 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1271 }
1272 if trInfo != nil {
1273 trInfo.tr.Finish()
1274 }
1275}
1276
1277// The key to save ServerTransportStream in the context.
1278type streamKey struct{}
1279
1280// NewContextWithServerTransportStream creates a new context from ctx and
1281// attaches stream to it.
1282//
1283// This API is EXPERIMENTAL.
1284func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1285 return context.WithValue(ctx, streamKey{}, stream)
1286}
1287
1288// ServerTransportStream is a minimal interface that a transport stream must
1289// implement. This can be used to mock an actual transport stream for tests of
1290// handler code that use, for example, grpc.SetHeader (which requires some
1291// stream to be in context).
1292//
1293// See also NewContextWithServerTransportStream.
1294//
1295// This API is EXPERIMENTAL.
1296type ServerTransportStream interface {
1297 Method() string
1298 SetHeader(md metadata.MD) error
1299 SendHeader(md metadata.MD) error
1300 SetTrailer(md metadata.MD) error
1301}
1302
1303// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1304// ctx. Returns nil if the given context has no stream associated with it
1305// (which implies it is not an RPC invocation context).
1306//
1307// This API is EXPERIMENTAL.
1308func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1309 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1310 return s
1311}
1312
1313// Stop stops the gRPC server. It immediately closes all open
1314// connections and listeners.
1315// It cancels all active RPCs on the server side and the corresponding
1316// pending RPCs on the client side will get notified by connection
1317// errors.
1318func (s *Server) Stop() {
1319 s.quitOnce.Do(func() {
1320 close(s.quit)
1321 })
1322
1323 defer func() {
1324 s.serveWG.Wait()
1325 s.doneOnce.Do(func() {
1326 close(s.done)
1327 })
1328 }()
1329
1330 s.channelzRemoveOnce.Do(func() {
1331 if channelz.IsOn() {
1332 channelz.RemoveEntry(s.channelzID)
1333 }
1334 })
1335
1336 s.mu.Lock()
1337 listeners := s.lis
1338 s.lis = nil
1339 st := s.conns
1340 s.conns = nil
1341 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1342 s.cv.Broadcast()
1343 s.mu.Unlock()
1344
1345 for lis := range listeners {
1346 lis.Close()
1347 }
1348 for c := range st {
1349 c.Close()
1350 }
1351
1352 s.mu.Lock()
1353 if s.events != nil {
1354 s.events.Finish()
1355 s.events = nil
1356 }
1357 s.mu.Unlock()
1358}
1359
1360// GracefulStop stops the gRPC server gracefully. It stops the server from
1361// accepting new connections and RPCs and blocks until all the pending RPCs are
1362// finished.
1363func (s *Server) GracefulStop() {
1364 s.quitOnce.Do(func() {
1365 close(s.quit)
1366 })
1367
1368 defer func() {
1369 s.doneOnce.Do(func() {
1370 close(s.done)
1371 })
1372 }()
1373
1374 s.channelzRemoveOnce.Do(func() {
1375 if channelz.IsOn() {
1376 channelz.RemoveEntry(s.channelzID)
1377 }
1378 })
1379 s.mu.Lock()
1380 if s.conns == nil {
1381 s.mu.Unlock()
1382 return
1383 }
1384
1385 for lis := range s.lis {
1386 lis.Close()
1387 }
1388 s.lis = nil
1389 if !s.drain {
1390 for c := range s.conns {
1391 c.(transport.ServerTransport).Drain()
1392 }
1393 s.drain = true
1394 }
1395
1396 // Wait for serving threads to be ready to exit. Only then can we be sure no
1397 // new conns will be created.
1398 s.mu.Unlock()
1399 s.serveWG.Wait()
1400 s.mu.Lock()
1401
1402 for len(s.conns) != 0 {
1403 s.cv.Wait()
1404 }
1405 s.conns = nil
1406 if s.events != nil {
1407 s.events.Finish()
1408 s.events = nil
1409 }
1410 s.mu.Unlock()
1411}
1412
1413func init() {
1414 internal.TestingUseHandlerImpl = func(arg interface{}) {
1415 arg.(*Server).opts.useHandlerImpl = true
1416 }
1417}
1418
1419// contentSubtype must be lowercase
1420// cannot return nil
1421func (s *Server) getCodec(contentSubtype string) baseCodec {
1422 if s.opts.codec != nil {
1423 return s.opts.codec
1424 }
1425 if contentSubtype == "" {
1426 return encoding.GetCodec(proto.Name)
1427 }
1428 codec := encoding.GetCodec(contentSubtype)
1429 if codec == nil {
1430 return encoding.GetCodec(proto.Name)
1431 }
1432 return codec
1433}
1434
1435// SetHeader sets the header metadata.
1436// When called multiple times, all the provided metadata will be merged.
1437// All the metadata will be sent out when one of the following happens:
1438// - grpc.SendHeader() is called;
1439// - The first response is sent out;
1440// - An RPC status is sent out (error or success).
1441func SetHeader(ctx context.Context, md metadata.MD) error {
1442 if md.Len() == 0 {
1443 return nil
1444 }
1445 stream := ServerTransportStreamFromContext(ctx)
1446 if stream == nil {
1447 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1448 }
1449 return stream.SetHeader(md)
1450}
1451
1452// SendHeader sends header metadata. It may be called at most once.
1453// The provided md and headers set by SetHeader() will be sent.
1454func SendHeader(ctx context.Context, md metadata.MD) error {
1455 stream := ServerTransportStreamFromContext(ctx)
1456 if stream == nil {
1457 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1458 }
1459 if err := stream.SendHeader(md); err != nil {
1460 return toRPCErr(err)
1461 }
1462 return nil
1463}
1464
1465// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1466// When called more than once, all the provided metadata will be merged.
1467func SetTrailer(ctx context.Context, md metadata.MD) error {
1468 if md.Len() == 0 {
1469 return nil
1470 }
1471 stream := ServerTransportStreamFromContext(ctx)
1472 if stream == nil {
1473 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1474 }
1475 return stream.SetTrailer(md)
1476}
1477
1478// Method returns the method string for the server context. The returned
1479// string is in the format of "/service/method".
1480func Method(ctx context.Context) (string, bool) {
1481 s := ServerTransportStreamFromContext(ctx)
1482 if s == nil {
1483 return "", false
1484 }
1485 return s.Method(), true
1486}