@@ -146,10 +146,11 @@ type earlyAbortStream struct {
146146func (* earlyAbortStream ) isTransportResponseFrame () bool { return false }
147147
148148type dataFrame struct {
149- streamID uint32
150- endStream bool
151- h []byte
152- reader mem.Reader
149+ streamID uint32
150+ endStream bool
151+ h []byte
152+ data mem.BufferSlice
153+ processing bool
153154 // onEachWrite is called every time
154155 // a part of data is written out.
155156 onEachWrite func ()
@@ -234,6 +235,7 @@ type outStream struct {
234235 itl * itemList
235236 bytesOutStanding int
236237 wq * writeQuota
238+ reader mem.Reader
237239
238240 next * outStream
239241 prev * outStream
@@ -461,7 +463,9 @@ func (c *controlBuffer) finish() {
461463 v .onOrphaned (ErrConnClosing )
462464 }
463465 case * dataFrame :
464- _ = v .reader .Close ()
466+ if ! v .processing {
467+ v .data .Free ()
468+ }
465469 }
466470 }
467471
@@ -650,10 +654,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
650654
651655func (l * loopyWriter ) registerStreamHandler (h * registerStream ) {
652656 str := & outStream {
653- id : h .streamID ,
654- state : empty ,
655- itl : & itemList {},
656- wq : h .wq ,
657+ id : h .streamID ,
658+ state : empty ,
659+ itl : & itemList {},
660+ wq : h .wq ,
661+ reader : mem.BufferSlice {}.Reader (),
657662 }
658663 l .estdStreams [h .streamID ] = str
659664}
@@ -685,10 +690,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
685690 }
686691 // Case 2: Client wants to originate stream.
687692 str := & outStream {
688- id : h .streamID ,
689- state : empty ,
690- itl : & itemList {},
691- wq : h .wq ,
693+ id : h .streamID ,
694+ state : empty ,
695+ itl : & itemList {},
696+ wq : h .wq ,
697+ reader : mem.BufferSlice {}.Reader (),
692698 }
693699 return l .originateStream (str , h )
694700}
@@ -790,10 +796,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
790796 // a RST_STREAM before stream initialization thus the stream might
791797 // not be established yet.
792798 delete (l .estdStreams , c .streamID )
799+ str .reader .Close ()
793800 str .deleteSelf ()
794801 for head := str .itl .dequeueAll (); head != nil ; head = head .next {
795802 if df , ok := head .it .(* dataFrame ); ok {
796- _ = df .reader .Close ()
803+ if ! df .processing {
804+ df .data .Free ()
805+ }
797806 }
798807 }
799808 }
@@ -928,21 +937,27 @@ func (l *loopyWriter) processData() (bool, error) {
928937 if str == nil {
929938 return true , nil
930939 }
940+ reader := str .reader
931941 dataItem := str .itl .peek ().(* dataFrame ) // Peek at the first data item this stream.
942+ if ! dataItem .processing {
943+ dataItem .processing = true
944+ str .reader .Reset (dataItem .data )
945+ dataItem .data .Free ()
946+ }
932947 // A data item is represented by a dataFrame, since it later translates into
933948 // multiple HTTP2 data frames.
934949 // Every dataFrame has two buffers; h that keeps grpc-message header and data
935950 // that is the actual message. As an optimization to keep wire traffic low, data
936951 // from data is copied to h to make as big as the maximum possible HTTP2 frame
937952 // size.
938953
939- if len (dataItem .h ) == 0 && dataItem . reader .Remaining () == 0 { // Empty data frame
954+ if len (dataItem .h ) == 0 && reader .Remaining () == 0 { // Empty data frame
940955 // Client sends out empty data frame with endStream = true
941956 if err := l .framer .fr .WriteData (dataItem .streamID , dataItem .endStream , nil ); err != nil {
942957 return false , err
943958 }
944959 str .itl .dequeue () // remove the empty data item from stream
945- _ = dataItem . reader .Close ()
960+ _ = reader .Close ()
946961 if str .itl .isEmpty () {
947962 str .state = empty
948963 } else if trailer , ok := str .itl .peek ().(* headerFrame ); ok { // the next item is trailers.
@@ -971,8 +986,8 @@ func (l *loopyWriter) processData() (bool, error) {
971986 }
972987 // Compute how much of the header and data we can send within quota and max frame length
973988 hSize := min (maxSize , len (dataItem .h ))
974- dSize := min (maxSize - hSize , dataItem . reader .Remaining ())
975- remainingBytes := len (dataItem .h ) + dataItem . reader .Remaining () - hSize - dSize
989+ dSize := min (maxSize - hSize , reader .Remaining ())
990+ remainingBytes := len (dataItem .h ) + reader .Remaining () - hSize - dSize
976991 size := hSize + dSize
977992
978993 var buf * []byte
@@ -993,7 +1008,7 @@ func (l *loopyWriter) processData() (bool, error) {
9931008 defer pool .Put (buf )
9941009
9951010 copy ((* buf )[:hSize ], dataItem .h )
996- _ , _ = dataItem . reader .Read ((* buf )[hSize :])
1011+ _ , _ = reader .Read ((* buf )[hSize :])
9971012 }
9981013
9991014 // Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1014,7 +1029,7 @@ func (l *loopyWriter) processData() (bool, error) {
10141029 dataItem .h = dataItem .h [hSize :]
10151030
10161031 if remainingBytes == 0 { // All the data from that message was written out.
1017- _ = dataItem . reader .Close ()
1032+ _ = reader .Close ()
10181033 str .itl .dequeue ()
10191034 }
10201035 if str .itl .isEmpty () {
0 commit comments