Skip to content

Commit ed6efb4

Browse files
author
Can Guler
committed
transport: fix a race that could lead to memory leaks (#2765)
* When a RST_STREAM is received by the server transport, a cleanupStream item is placed into controlbuf no matter what. * Updates comments. * Replaces getCleanupStream with inline struct initialization.
1 parent 68f52d7 commit ed6efb4

File tree

1 file changed

+52
-54
lines changed

1 file changed

+52
-54
lines changed

internal/transport/http2_server.go

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
437437
s := t.activeStreams[se.StreamID]
438438
t.mu.Unlock()
439439
if s != nil {
440-
t.closeStream(s, true, se.Code, nil, false)
440+
t.closeStream(s, true, se.Code, false)
441441
} else {
442442
t.controlBuf.put(&cleanupStream{
443443
streamID: se.StreamID,
@@ -579,7 +579,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
579579
}
580580
if size > 0 {
581581
if err := s.fc.onData(size); err != nil {
582-
t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
582+
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
583583
return
584584
}
585585
if f.Header().Flags.Has(http2.FlagDataPadded) {
@@ -604,11 +604,18 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
604604
}
605605

606606
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
607-
s, ok := t.getStream(f)
608-
if !ok {
607+
// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
608+
if s, ok := t.getStream(f); ok {
609+
t.closeStream(s, false, 0, false)
609610
return
610611
}
611-
t.closeStream(s, false, 0, nil, false)
612+
// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
613+
t.controlBuf.put(&cleanupStream{
614+
streamID: f.Header().StreamID,
615+
rst: false,
616+
rstCode: 0,
617+
onWrite: func() {},
618+
})
612619
}
613620

614621
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -772,7 +779,7 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
772779
if err != nil {
773780
return err
774781
}
775-
t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
782+
t.closeStream(s, true, http2.ErrCodeInternal, false)
776783
return ErrHeaderListSizeLimitViolation
777784
}
778785
if t.stats != nil {
@@ -836,12 +843,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
836843
if err != nil {
837844
return err
838845
}
839-
t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
846+
t.closeStream(s, true, http2.ErrCodeInternal, false)
840847
return ErrHeaderListSizeLimitViolation
841848
}
842849
// Send a RST_STREAM after the trailers if the client has not already half-closed.
843850
rst := s.getState() == streamActive
844-
t.closeStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
851+
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
845852
if t.stats != nil {
846853
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
847854
}
@@ -1012,16 +1019,24 @@ func (t *http2Server) Close() error {
10121019
}
10131020

10141021
// deleteStream deletes the stream s from transport's active streams.
1015-
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1016-
t.mu.Lock()
1017-
if _, ok := t.activeStreams[s.id]; !ok {
1018-
t.mu.Unlock()
1019-
return
1022+
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState streamState) {
1023+
oldState = s.swapState(streamDone)
1024+
if oldState == streamDone {
1025+
// If the stream was already done, return.
1026+
return oldState
10201027
}
10211028

1022-
delete(t.activeStreams, s.id)
1023-
if len(t.activeStreams) == 0 {
1024-
t.idle = time.Now()
1029+
// In case stream sending and receiving are invoked in separate
1030+
// goroutines (e.g., bi-directional streaming), cancel needs to be
1031+
// called to interrupt the potential blocking on other goroutines.
1032+
s.cancel()
1033+
1034+
t.mu.Lock()
1035+
if _, ok := t.activeStreams[s.id]; ok {
1036+
delete(t.activeStreams, s.id)
1037+
if len(t.activeStreams) == 0 {
1038+
t.idle = time.Now()
1039+
}
10251040
}
10261041
t.mu.Unlock()
10271042

@@ -1032,55 +1047,38 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
10321047
atomic.AddInt64(&t.czData.streamsFailed, 1)
10331048
}
10341049
}
1035-
}
10361050

1037-
// closeStream clears the footprint of a stream when the stream is not needed
1038-
// any more.
1039-
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1040-
// Mark the stream as done
1041-
oldState := s.swapState(streamDone)
1042-
1043-
// In case stream sending and receiving are invoked in separate
1044-
// goroutines (e.g., bi-directional streaming), cancel needs to be
1045-
// called to interrupt the potential blocking on other goroutines.
1046-
s.cancel()
1051+
return oldState
1052+
}
10471053

1048-
// Deletes the stream from active streams
1049-
t.deleteStream(s, eosReceived)
1054+
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1055+
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1056+
oldState := t.deleteStream(s, eosReceived)
1057+
// If the stream is already closed, then don't put trailing header to controlbuf.
1058+
if oldState == streamDone {
1059+
return
1060+
}
10501061

1051-
cleanup := &cleanupStream{
1062+
hdr.cleanup = &cleanupStream{
10521063
streamID: s.id,
10531064
rst: rst,
10541065
rstCode: rstCode,
10551066
onWrite: func() {},
10561067
}
1057-
1058-
// No trailer. Puts cleanupFrame into transport's control buffer.
1059-
if hdr == nil {
1060-
t.controlBuf.put(cleanup)
1061-
return
1062-
}
1063-
1064-
// We do the check here, because of the following scenario:
1065-
// 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
1066-
// is put to control buffer.
1067-
// 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
1068-
// some point. So loopy can't act on trailer
1069-
// 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
1070-
// the result of the received RST_STREAM.
1071-
// If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
1072-
// response to received RST_STREAM into the control buffer and outStream in loopy writer will
1073-
// never get cleaned up.
1074-
1075-
// If the stream is already done, don't send the trailer.
1076-
if oldState == streamDone {
1077-
return
1078-
}
1079-
1080-
hdr.cleanup = cleanup
10811068
t.controlBuf.put(hdr)
10821069
}
10831070

1071+
// closeStream clears the footprint of a stream when the stream is not needed any more.
1072+
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1073+
t.deleteStream(s, eosReceived)
1074+
t.controlBuf.put(&cleanupStream{
1075+
streamID: s.id,
1076+
rst: rst,
1077+
rstCode: rstCode,
1078+
onWrite: func() {},
1079+
})
1080+
}
1081+
10841082
func (t *http2Server) RemoteAddr() net.Addr {
10851083
return t.remoteAddr
10861084
}

0 commit comments

Comments
 (0)