blob: 152d9eccd62d37277a6e82b31201be790da23f41 [file] [log] [blame]
Matthias Andreas Benkard832a54e2019-01-29 09:27:38 +01001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "errors"
23 "io"
24 "sync"
25 "time"
26
27 "golang.org/x/net/context"
28 "golang.org/x/net/trace"
29 "google.golang.org/grpc/balancer"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/encoding"
32 "google.golang.org/grpc/internal/channelz"
33 "google.golang.org/grpc/metadata"
34 "google.golang.org/grpc/stats"
35 "google.golang.org/grpc/status"
36 "google.golang.org/grpc/transport"
37)
38
39// StreamHandler defines the handler called by gRPC server to complete the
40// execution of a streaming RPC. If a StreamHandler returns an error, it
41// should be produced by the status package, or else gRPC will use
42// codes.Unknown as the status code and err.Error() as the status message
43// of the RPC.
44type StreamHandler func(srv interface{}, stream ServerStream) error
45
46// StreamDesc represents a streaming RPC service's method specification.
47type StreamDesc struct {
48 StreamName string
49 Handler StreamHandler
50
51 // At least one of these is true.
52 ServerStreams bool
53 ClientStreams bool
54}
55
56// Stream defines the common interface a client or server stream has to satisfy.
57//
58// All errors returned from Stream are compatible with the status package.
59type Stream interface {
60 // Context returns the context for this stream.
61 Context() context.Context
62 // SendMsg blocks until it sends m, the stream is done or the stream
63 // breaks.
64 // On error, it aborts the stream and returns an RPC status on client
65 // side. On server side, it simply returns the error to the caller.
66 // SendMsg is called by generated code. Also Users can call SendMsg
67 // directly when it is really needed in their use cases.
68 // It's safe to have a goroutine calling SendMsg and another goroutine calling
69 // recvMsg on the same stream at the same time.
70 // But it is not safe to call SendMsg on the same stream in different goroutines.
71 SendMsg(m interface{}) error
72 // RecvMsg blocks until it receives a message or the stream is
73 // done. On client side, it returns io.EOF when the stream is done. On
74 // any other error, it aborts the stream and returns an RPC status. On
75 // server side, it simply returns the error to the caller.
76 // It's safe to have a goroutine calling SendMsg and another goroutine calling
77 // recvMsg on the same stream at the same time.
78 // But it is not safe to call RecvMsg on the same stream in different goroutines.
79 RecvMsg(m interface{}) error
80}
81
82// ClientStream defines the interface a client stream has to satisfy.
83type ClientStream interface {
84 // Header returns the header metadata received from the server if there
85 // is any. It blocks if the metadata is not ready to read.
86 Header() (metadata.MD, error)
87 // Trailer returns the trailer metadata from the server, if there is any.
88 // It must only be called after stream.CloseAndRecv has returned, or
89 // stream.Recv has returned a non-nil error (including io.EOF).
90 Trailer() metadata.MD
91 // CloseSend closes the send direction of the stream. It closes the stream
92 // when non-nil error is met.
93 CloseSend() error
94 // Stream.SendMsg() may return a non-nil error when something wrong happens sending
95 // the request. The returned error indicates the status of this sending, not the final
96 // status of the RPC.
97 //
98 // Always call Stream.RecvMsg() to drain the stream and get the final
99 // status, otherwise there could be leaked resources.
100 Stream
101}
102
103// NewStream creates a new Stream for the client side. This is typically
104// called by generated code. ctx is used for the lifetime of the stream.
105//
106// To ensure resources are not leaked due to the stream returned, one of the following
107// actions must be performed:
108//
109// 1. Call Close on the ClientConn.
110// 2. Cancel the context provided.
111// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
112// client-streaming RPC, for instance, might use the helper function
113// CloseAndRecv (note that CloseSend does not Recv, therefore is not
114// guaranteed to release all resources).
115// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
116//
117// If none of the above happen, a goroutine and a context will be leaked, and grpc
118// will not call the optionally-configured stats handler with a stats.End message.
119func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
120 // allow interceptor to see all applicable call options, which means those
121 // configured as defaults from dial option as well as per-call options
122 opts = combine(cc.dopts.callOptions, opts)
123
124 if cc.dopts.streamInt != nil {
125 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
126 }
127 return newClientStream(ctx, desc, cc, method, opts...)
128}
129
130// NewClientStream is a wrapper for ClientConn.NewStream.
131//
132// DEPRECATED: Use ClientConn.NewStream instead.
133func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
134 return cc.NewStream(ctx, desc, method, opts...)
135}
136
137func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
138 if channelz.IsOn() {
139 cc.incrCallsStarted()
140 defer func() {
141 if err != nil {
142 cc.incrCallsFailed()
143 }
144 }()
145 }
146 c := defaultCallInfo()
147 mc := cc.GetMethodConfig(method)
148 if mc.WaitForReady != nil {
149 c.failFast = !*mc.WaitForReady
150 }
151
152 // Possible context leak:
153 // The cancel function for the child context we create will only be called
154 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
155 // an error is generated by SendMsg.
156 // https://github.com/grpc/grpc-go/issues/1818.
157 var cancel context.CancelFunc
158 if mc.Timeout != nil && *mc.Timeout >= 0 {
159 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
160 } else {
161 ctx, cancel = context.WithCancel(ctx)
162 }
163 defer func() {
164 if err != nil {
165 cancel()
166 }
167 }()
168
169 for _, o := range opts {
170 if err := o.before(c); err != nil {
171 return nil, toRPCErr(err)
172 }
173 }
174 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
175 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
176 if err := setCallInfoCodec(c); err != nil {
177 return nil, err
178 }
179
180 callHdr := &transport.CallHdr{
181 Host: cc.authority,
182 Method: method,
183 // If it's not client streaming, we should already have the request to be sent,
184 // so we don't flush the header.
185 // If it's client streaming, the user may never send a request or send it any
186 // time soon, so we ask the transport to flush the header.
187 Flush: desc.ClientStreams,
188 ContentSubtype: c.contentSubtype,
189 }
190
191 // Set our outgoing compression according to the UseCompressor CallOption, if
192 // set. In that case, also find the compressor from the encoding package.
193 // Otherwise, use the compressor configured by the WithCompressor DialOption,
194 // if set.
195 var cp Compressor
196 var comp encoding.Compressor
197 if ct := c.compressorType; ct != "" {
198 callHdr.SendCompress = ct
199 if ct != encoding.Identity {
200 comp = encoding.GetCompressor(ct)
201 if comp == nil {
202 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
203 }
204 }
205 } else if cc.dopts.cp != nil {
206 callHdr.SendCompress = cc.dopts.cp.Type()
207 cp = cc.dopts.cp
208 }
209 if c.creds != nil {
210 callHdr.Creds = c.creds
211 }
212 var trInfo traceInfo
213 if EnableTracing {
214 trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
215 trInfo.firstLine.client = true
216 if deadline, ok := ctx.Deadline(); ok {
217 trInfo.firstLine.deadline = deadline.Sub(time.Now())
218 }
219 trInfo.tr.LazyLog(&trInfo.firstLine, false)
220 ctx = trace.NewContext(ctx, trInfo.tr)
221 defer func() {
222 if err != nil {
223 // Need to call tr.finish() if error is returned.
224 // Because tr will not be returned to caller.
225 trInfo.tr.LazyPrintf("RPC: [%v]", err)
226 trInfo.tr.SetError()
227 trInfo.tr.Finish()
228 }
229 }()
230 }
231 ctx = newContextWithRPCInfo(ctx, c.failFast)
232 sh := cc.dopts.copts.StatsHandler
233 var beginTime time.Time
234 if sh != nil {
235 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
236 beginTime = time.Now()
237 begin := &stats.Begin{
238 Client: true,
239 BeginTime: beginTime,
240 FailFast: c.failFast,
241 }
242 sh.HandleRPC(ctx, begin)
243 defer func() {
244 if err != nil {
245 // Only handle end stats if err != nil.
246 end := &stats.End{
247 Client: true,
248 Error: err,
249 BeginTime: beginTime,
250 EndTime: time.Now(),
251 }
252 sh.HandleRPC(ctx, end)
253 }
254 }()
255 }
256
257 var (
258 t transport.ClientTransport
259 s *transport.Stream
260 done func(balancer.DoneInfo)
261 )
262 for {
263 // Check to make sure the context has expired. This will prevent us from
264 // looping forever if an error occurs for wait-for-ready RPCs where no data
265 // is sent on the wire.
266 select {
267 case <-ctx.Done():
268 return nil, toRPCErr(ctx.Err())
269 default:
270 }
271
272 t, done, err = cc.getTransport(ctx, c.failFast)
273 if err != nil {
274 return nil, err
275 }
276
277 s, err = t.NewStream(ctx, callHdr)
278 if err != nil {
279 if done != nil {
280 done(balancer.DoneInfo{Err: err})
281 done = nil
282 }
283 // In the event of any error from NewStream, we never attempted to write
284 // anything to the wire, so we can retry indefinitely for non-fail-fast
285 // RPCs.
286 if !c.failFast {
287 continue
288 }
289 return nil, toRPCErr(err)
290 }
291 break
292 }
293
294 cs := &clientStream{
295 opts: opts,
296 c: c,
297 cc: cc,
298 desc: desc,
299 codec: c.codec,
300 cp: cp,
301 comp: comp,
302 cancel: cancel,
303 attempt: &csAttempt{
304 t: t,
305 s: s,
306 p: &parser{r: s},
307 done: done,
308 dc: cc.dopts.dc,
309 ctx: ctx,
310 trInfo: trInfo,
311 statsHandler: sh,
312 beginTime: beginTime,
313 },
314 }
315 cs.c.stream = cs
316 cs.attempt.cs = cs
317 if desc != unaryStreamDesc {
318 // Listen on cc and stream contexts to cleanup when the user closes the
319 // ClientConn or cancels the stream context. In all other cases, an error
320 // should already be injected into the recv buffer by the transport, which
321 // the client will eventually receive, and then we will cancel the stream's
322 // context in clientStream.finish.
323 go func() {
324 select {
325 case <-cc.ctx.Done():
326 cs.finish(ErrClientConnClosing)
327 case <-ctx.Done():
328 cs.finish(toRPCErr(ctx.Err()))
329 }
330 }()
331 }
332 return cs, nil
333}
334
335// clientStream implements a client side Stream.
336type clientStream struct {
337 opts []CallOption
338 c *callInfo
339 cc *ClientConn
340 desc *StreamDesc
341
342 codec baseCodec
343 cp Compressor
344 comp encoding.Compressor
345
346 cancel context.CancelFunc // cancels all attempts
347
348 sentLast bool // sent an end stream
349
350 mu sync.Mutex // guards finished
351 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
352
353 attempt *csAttempt // the active client stream attempt
354 // TODO(hedging): hedging will have multiple attempts simultaneously.
355}
356
357// csAttempt implements a single transport stream attempt within a
358// clientStream.
359type csAttempt struct {
360 cs *clientStream
361 t transport.ClientTransport
362 s *transport.Stream
363 p *parser
364 done func(balancer.DoneInfo)
365
366 dc Decompressor
367 decomp encoding.Compressor
368 decompSet bool
369
370 ctx context.Context // the application's context, wrapped by stats/tracing
371
372 mu sync.Mutex // guards trInfo.tr
373 // trInfo.tr is set when created (if EnableTracing is true),
374 // and cleared when the finish method is called.
375 trInfo traceInfo
376
377 statsHandler stats.Handler
378 beginTime time.Time
379}
380
381func (cs *clientStream) Context() context.Context {
382 // TODO(retry): commit the current attempt (the context has peer-aware data).
383 return cs.attempt.context()
384}
385
386func (cs *clientStream) Header() (metadata.MD, error) {
387 m, err := cs.attempt.header()
388 if err != nil {
389 // TODO(retry): maybe retry on error or commit attempt on success.
390 err = toRPCErr(err)
391 cs.finish(err)
392 }
393 return m, err
394}
395
396func (cs *clientStream) Trailer() metadata.MD {
397 // TODO(retry): on error, maybe retry (trailers-only).
398 return cs.attempt.trailer()
399}
400
401func (cs *clientStream) SendMsg(m interface{}) (err error) {
402 // TODO(retry): buffer message for replaying if not committed.
403 return cs.attempt.sendMsg(m)
404}
405
406func (cs *clientStream) RecvMsg(m interface{}) (err error) {
407 // TODO(retry): maybe retry on error or commit attempt on success.
408 return cs.attempt.recvMsg(m)
409}
410
411func (cs *clientStream) CloseSend() error {
412 cs.attempt.closeSend()
413 return nil
414}
415
416func (cs *clientStream) finish(err error) {
417 if err == io.EOF {
418 // Ending a stream with EOF indicates a success.
419 err = nil
420 }
421 cs.mu.Lock()
422 if cs.finished {
423 cs.mu.Unlock()
424 return
425 }
426 cs.finished = true
427 cs.mu.Unlock()
428 if channelz.IsOn() {
429 if err != nil {
430 cs.cc.incrCallsFailed()
431 } else {
432 cs.cc.incrCallsSucceeded()
433 }
434 }
435 // TODO(retry): commit current attempt if necessary.
436 cs.attempt.finish(err)
437 for _, o := range cs.opts {
438 o.after(cs.c)
439 }
440 cs.cancel()
441}
442
443func (a *csAttempt) context() context.Context {
444 return a.s.Context()
445}
446
447func (a *csAttempt) header() (metadata.MD, error) {
448 return a.s.Header()
449}
450
451func (a *csAttempt) trailer() metadata.MD {
452 return a.s.Trailer()
453}
454
455func (a *csAttempt) sendMsg(m interface{}) (err error) {
456 // TODO Investigate how to signal the stats handling party.
457 // generate error stats if err != nil && err != io.EOF?
458 cs := a.cs
459 defer func() {
460 // For non-client-streaming RPCs, we return nil instead of EOF on success
461 // because the generated code requires it. finish is not called; RecvMsg()
462 // will call it with the stream's status independently.
463 if err == io.EOF && !cs.desc.ClientStreams {
464 err = nil
465 }
466 if err != nil && err != io.EOF {
467 // Call finish on the client stream for errors generated by this SendMsg
468 // call, as these indicate problems created by this client. (Transport
469 // errors are converted to an io.EOF error below; the real error will be
470 // returned from RecvMsg eventually in that case, or be retried.)
471 cs.finish(err)
472 }
473 }()
474 // TODO: Check cs.sentLast and error if we already ended the stream.
475 if EnableTracing {
476 a.mu.Lock()
477 if a.trInfo.tr != nil {
478 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
479 }
480 a.mu.Unlock()
481 }
482 data, err := encode(cs.codec, m)
483 if err != nil {
484 return err
485 }
486 compData, err := compress(data, cs.cp, cs.comp)
487 if err != nil {
488 return err
489 }
490 hdr, payload := msgHeader(data, compData)
491 // TODO(dfawley): should we be checking len(data) instead?
492 if len(payload) > *cs.c.maxSendMessageSize {
493 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.c.maxSendMessageSize)
494 }
495
496 if !cs.desc.ClientStreams {
497 cs.sentLast = true
498 }
499 err = a.t.Write(a.s, hdr, payload, &transport.Options{Last: !cs.desc.ClientStreams})
500 if err == nil {
501 if a.statsHandler != nil {
502 a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payload, time.Now()))
503 }
504 if channelz.IsOn() {
505 a.t.IncrMsgSent()
506 }
507 return nil
508 }
509 return io.EOF
510}
511
512func (a *csAttempt) recvMsg(m interface{}) (err error) {
513 cs := a.cs
514 defer func() {
515 if err != nil || !cs.desc.ServerStreams {
516 // err != nil or non-server-streaming indicates end of stream.
517 cs.finish(err)
518 }
519 }()
520 var inPayload *stats.InPayload
521 if a.statsHandler != nil {
522 inPayload = &stats.InPayload{
523 Client: true,
524 }
525 }
526 if !a.decompSet {
527 // Block until we receive headers containing received message encoding.
528 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
529 if a.dc == nil || a.dc.Type() != ct {
530 // No configured decompressor, or it does not match the incoming
531 // message encoding; attempt to find a registered compressor that does.
532 a.dc = nil
533 a.decomp = encoding.GetCompressor(ct)
534 }
535 } else {
536 // No compression is used; disable our decompressor.
537 a.dc = nil
538 }
539 // Only initialize this state once per stream.
540 a.decompSet = true
541 }
542 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp)
543 if err != nil {
544 if err == io.EOF {
545 if statusErr := a.s.Status().Err(); statusErr != nil {
546 return statusErr
547 }
548 return io.EOF // indicates successful end of stream.
549 }
550 return toRPCErr(err)
551 }
552 if EnableTracing {
553 a.mu.Lock()
554 if a.trInfo.tr != nil {
555 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
556 }
557 a.mu.Unlock()
558 }
559 if inPayload != nil {
560 a.statsHandler.HandleRPC(a.ctx, inPayload)
561 }
562 if channelz.IsOn() {
563 a.t.IncrMsgRecv()
564 }
565 if cs.desc.ServerStreams {
566 // Subsequent messages should be received by subsequent RecvMsg calls.
567 return nil
568 }
569
570 // Special handling for non-server-stream rpcs.
571 // This recv expects EOF or errors, so we don't collect inPayload.
572 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp)
573 if err == nil {
574 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
575 }
576 if err == io.EOF {
577 return a.s.Status().Err() // non-server streaming Recv returns nil on success
578 }
579 return toRPCErr(err)
580}
581
582func (a *csAttempt) closeSend() {
583 cs := a.cs
584 if cs.sentLast {
585 return
586 }
587 cs.sentLast = true
588 cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true})
589 // We ignore errors from Write. Any error it would return would also be
590 // returned by a subsequent RecvMsg call, and the user is supposed to always
591 // finish the stream by calling RecvMsg until it returns err != nil.
592}
593
594func (a *csAttempt) finish(err error) {
595 a.mu.Lock()
596 a.t.CloseStream(a.s, err)
597
598 if a.done != nil {
599 a.done(balancer.DoneInfo{
600 Err: err,
601 BytesSent: true,
602 BytesReceived: a.s.BytesReceived(),
603 })
604 }
605 if a.statsHandler != nil {
606 end := &stats.End{
607 Client: true,
608 BeginTime: a.beginTime,
609 EndTime: time.Now(),
610 Error: err,
611 }
612 a.statsHandler.HandleRPC(a.ctx, end)
613 }
614 if a.trInfo.tr != nil {
615 if err == nil {
616 a.trInfo.tr.LazyPrintf("RPC: [OK]")
617 } else {
618 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
619 a.trInfo.tr.SetError()
620 }
621 a.trInfo.tr.Finish()
622 a.trInfo.tr = nil
623 }
624 a.mu.Unlock()
625}
626
627// ServerStream defines the interface a server stream has to satisfy.
628type ServerStream interface {
629 // SetHeader sets the header metadata. It may be called multiple times.
630 // When call multiple times, all the provided metadata will be merged.
631 // All the metadata will be sent out when one of the following happens:
632 // - ServerStream.SendHeader() is called;
633 // - The first response is sent out;
634 // - An RPC status is sent out (error or success).
635 SetHeader(metadata.MD) error
636 // SendHeader sends the header metadata.
637 // The provided md and headers set by SetHeader() will be sent.
638 // It fails if called multiple times.
639 SendHeader(metadata.MD) error
640 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
641 // When called more than once, all the provided metadata will be merged.
642 SetTrailer(metadata.MD)
643 Stream
644}
645
646// serverStream implements a server side Stream.
647type serverStream struct {
648 ctx context.Context
649 t transport.ServerTransport
650 s *transport.Stream
651 p *parser
652 codec baseCodec
653
654 cp Compressor
655 dc Decompressor
656 comp encoding.Compressor
657 decomp encoding.Compressor
658
659 maxReceiveMessageSize int
660 maxSendMessageSize int
661 trInfo *traceInfo
662
663 statsHandler stats.Handler
664
665 mu sync.Mutex // protects trInfo.tr after the service handler runs.
666}
667
668func (ss *serverStream) Context() context.Context {
669 return ss.ctx
670}
671
672func (ss *serverStream) SetHeader(md metadata.MD) error {
673 if md.Len() == 0 {
674 return nil
675 }
676 return ss.s.SetHeader(md)
677}
678
679func (ss *serverStream) SendHeader(md metadata.MD) error {
680 return ss.t.WriteHeader(ss.s, md)
681}
682
683func (ss *serverStream) SetTrailer(md metadata.MD) {
684 if md.Len() == 0 {
685 return
686 }
687 ss.s.SetTrailer(md)
688}
689
690func (ss *serverStream) SendMsg(m interface{}) (err error) {
691 defer func() {
692 if ss.trInfo != nil {
693 ss.mu.Lock()
694 if ss.trInfo.tr != nil {
695 if err == nil {
696 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
697 } else {
698 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
699 ss.trInfo.tr.SetError()
700 }
701 }
702 ss.mu.Unlock()
703 }
704 if err != nil && err != io.EOF {
705 st, _ := status.FromError(toRPCErr(err))
706 ss.t.WriteStatus(ss.s, st)
707 }
708 if channelz.IsOn() && err == nil {
709 ss.t.IncrMsgSent()
710 }
711 }()
712 data, err := encode(ss.codec, m)
713 if err != nil {
714 return err
715 }
716 compData, err := compress(data, ss.cp, ss.comp)
717 if err != nil {
718 return err
719 }
720 hdr, payload := msgHeader(data, compData)
721 // TODO(dfawley): should we be checking len(data) instead?
722 if len(payload) > ss.maxSendMessageSize {
723 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
724 }
725 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
726 return toRPCErr(err)
727 }
728 if ss.statsHandler != nil {
729 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
730 }
731 return nil
732}
733
734func (ss *serverStream) RecvMsg(m interface{}) (err error) {
735 defer func() {
736 if ss.trInfo != nil {
737 ss.mu.Lock()
738 if ss.trInfo.tr != nil {
739 if err == nil {
740 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
741 } else if err != io.EOF {
742 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
743 ss.trInfo.tr.SetError()
744 }
745 }
746 ss.mu.Unlock()
747 }
748 if err != nil && err != io.EOF {
749 st, _ := status.FromError(toRPCErr(err))
750 ss.t.WriteStatus(ss.s, st)
751 }
752 if channelz.IsOn() && err == nil {
753 ss.t.IncrMsgRecv()
754 }
755 }()
756 var inPayload *stats.InPayload
757 if ss.statsHandler != nil {
758 inPayload = &stats.InPayload{}
759 }
760 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil {
761 if err == io.EOF {
762 return err
763 }
764 if err == io.ErrUnexpectedEOF {
765 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
766 }
767 return toRPCErr(err)
768 }
769 if inPayload != nil {
770 ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
771 }
772 return nil
773}
774
775// MethodFromServerStream returns the method string for the input stream.
776// The returned string is in the format of "/service/method".
777func MethodFromServerStream(stream ServerStream) (string, bool) {
778 return Method(stream.Context())
779}