Skip to content

Commit b2fda24

Browse files
committed
rpc_util: reuse memory buffer for receiving message
1 parent 0e5421c commit b2fda24

File tree

1 file changed

+18
-13
lines changed

1 file changed

+18
-13
lines changed

rpc_util.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,9 @@ type parser struct {
540540
// The header of a gRPC message. Find more detail at
541541
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
542542
header [5]byte
543+
544+
// buffer is recycled memory used to read the message.
545+
buffer []byte
543546
}
544547

545548
// recvMsg reads a complete gRPC message from the stream.
@@ -573,9 +576,10 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
573576
if int(length) > maxReceiveMessageSize {
574577
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
575578
}
576-
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
577-
// of making it for each message:
578-
msg = make([]byte, int(length))
579+
if uint32(cap(p.buffer)) < length {
580+
p.buffer = make([]byte, int(length))
581+
}
582+
msg = p.buffer[:length]
579583
if _, err := p.r.Read(msg); err != nil {
580584
if err == io.EOF {
581585
err = io.ErrUnexpectedEOF
@@ -688,12 +692,12 @@ type payloadInfo struct {
688692
}
689693

690694
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
691-
pf, d, err := p.recvMsg(maxReceiveMessageSize)
695+
pf, buf, err := p.recvMsg(maxReceiveMessageSize)
692696
if err != nil {
693697
return nil, err
694698
}
695699
if payInfo != nil {
696-
payInfo.wireLength = len(d)
700+
payInfo.wireLength = len(buf)
697701
}
698702

699703
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
@@ -705,10 +709,10 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
705709
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
706710
// use this decompressor as the default.
707711
if dc != nil {
708-
d, err = dc.Do(bytes.NewReader(d))
709-
size = len(d)
712+
buf, err = dc.Do(bytes.NewReader(buf))
713+
size = len(buf)
710714
} else {
711-
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
715+
buf, size, err = decompress(compressor, buf, maxReceiveMessageSize)
712716
}
713717
if err != nil {
714718
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
@@ -719,7 +723,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
719723
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
720724
}
721725
}
722-
return d, nil
726+
return buf, nil
723727
}
724728

725729
// Using compressor, decompress d, returning data and size.
@@ -754,15 +758,16 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize
754758
// dc takes precedence over compressor.
755759
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
756760
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
757-
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
761+
buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
758762
if err != nil {
759763
return err
760764
}
761-
if err := c.Unmarshal(d, m); err != nil {
765+
if err := c.Unmarshal(buf, m); err != nil {
762766
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
763767
}
764-
if payInfo != nil {
765-
payInfo.uncompressedBytes = d
768+
if payInfo != nil && buf != nil {
769+
payInfo.uncompressedBytes = make([]byte, len(buf))
770+
copy(payInfo.uncompressedBytes, buf)
766771
}
767772
return nil
768773
}

0 commit comments

Comments
 (0)