Skip to content

Commit 36632f6

Browse files
committed
Fixed leaked goroutine
1 parent cb9af76 commit 36632f6

File tree

4 files changed

+93
-19
lines changed

4 files changed

+93
-19
lines changed

internal/transport/controlbuf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ type goAway struct {
193193
code http2.ErrCode
194194
debugData []byte
195195
headsUp bool
196-
closeConnErr error // if set, loopyWriter will exit, resulting in conn closure
196+
closeConnErr error // if set, loopyWriter will exit with this error
197197
}
198198

199199
func (*goAway) isTransportResponseFrame() bool { return false }

internal/transport/http2_client.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
407407
// returning from this function.
408408
readerErrCh := make(chan error, 1)
409409
go t.reader(readerErrCh)
410+
defer func() {
411+
if err != nil {
412+
close(t.writerDone)
413+
t.Close(err)
414+
}
415+
}()
410416

411417
// Send connection preface to server.
412418
n, err := t.conn.Write(clientPreface)
@@ -450,6 +456,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
450456
if err := t.framer.writer.Flush(); err != nil {
451457
return nil, err
452458
}
459+
if err = <-readerErrCh; err != nil {
460+
return nil, err
461+
}
453462
go func() {
454463
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
455464
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
@@ -460,17 +469,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
460469
// after draining any remaining incoming data.
461470
t.conn.Close()
462471
}
463-
t.logger.Infof("Closing writerDone channel")
464472
close(t.writerDone)
465473
}()
466-
defer func() {
467-
if err == nil {
468-
err = <-readerErrCh
469-
}
470-
if err != nil {
471-
t.Close(err)
472-
}
473-
}()
474+
474475
return t, nil
475476
}
476477

@@ -977,7 +978,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
977978

978979
// Close kicks off the shutdown process of the transport. This should be called
979980
// only once on a transport. Once it is called, the transport should not be
980-
// accessed any more.
981+
// accessed anymore.
981982
func (t *http2Client) Close(err error) {
982983
t.mu.Lock()
983984
// Make sure we only close once.
@@ -1006,7 +1007,9 @@ func (t *http2Client) Close(err error) {
10061007
// ever starts to take in an HTTP/2 error code the peer will be able to get more information about the reason
10071008
// behind the connection close.
10081009
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(fmt.Sprintf("client shutdown with: %v", err)), closeConnErr: err})
1009-
<-t.writerDone
1010+
if t.writerDone != nil {
1011+
<-t.writerDone
1012+
}
10101013
t.cancel()
10111014
t.conn.Close()
10121015
channelz.RemoveEntry(t.channelz.ID)

internal/transport/transport_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2670,7 +2670,7 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
26702670
t.Fatalf("Error while listening: %v", err)
26712671
}
26722672
defer lis.Close()
2673-
// greetDone channel verifies that greet is done with the connection
2673+
// greetDone is used to notify when server is done greeting the client.
26742674
greetDone := make(chan struct{})
26752675
// successCh verifies that GOAWAY is received at server side
26762676
successCh := make(chan struct{})
@@ -2694,10 +2694,18 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
26942694
t.Errorf("Error while writing settings %v", err)
26952695
return
26962696
}
2697-
2698-
sfr.ReadFrame()
2699-
sfr.ReadFrame()
2700-
sfr.ReadFrame()
2697+
fr, _ := sfr.ReadFrame()
2698+
if _, ok := fr.(*http2.SettingsFrame); !ok {
2699+
t.Errorf("Expected settings frame, got %v", fr)
2700+
}
2701+
fr, _ = sfr.ReadFrame()
2702+
if fr, ok := fr.(*http2.SettingsFrame); !ok && fr.IsAck() {
2703+
t.Errorf("Expected settings ACK frame, got %v", fr)
2704+
}
2705+
fr, _ = sfr.ReadFrame()
2706+
if fr, ok := fr.(*http2.HeadersFrame); !ok && fr.Flags.Has(http2.FlagHeadersEndStream) {
2707+
t.Errorf("Expected Headers frame with END_HEADERS frame, got %v", fr)
2708+
}
27012709
close(greetDone)
27022710

27032711
frame, err := sfr.ReadFrame()
@@ -2712,11 +2720,11 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
27122720
t.Logf("Received goAway frame from client")
27132721
close(successCh)
27142722
} else {
2723+
t.Logf("Received unexpected goAway frame from client")
27152724
close(errorCh)
27162725
}
27172726
return
27182727
default:
2719-
// The client should have sent any frame other than GOAWAY
27202728
t.Logf("The server received a frame other than GOAWAY")
27212729
close(errorCh)
27222730
return
@@ -2731,7 +2739,7 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
27312739
if err != nil {
27322740
t.Fatalf("failed to open stream: %v", err)
27332741
}
2734-
// Wait until server receives the headers and settings frame as part of greet
2742+
// Wait until server receives the headers and settings frame as part of greet.
27352743
<-greetDone
27362744
ct.Close(errors.New("manually closed by client"))
27372745
t.Logf("Closed the client connection")

test/goaway_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,3 +761,66 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) {
761761
t.Fatalf("Error waiting for graceful shutdown of the server: %v", err)
762762
}
763763
}
764+
765+
// TestClientSendsAGoAway tests the scenario where you get a go away ping
766+
// frames from the client during graceful shutdown.
767+
func (s) TestClientSendsAGoAway(t *testing.T) {
768+
lis, err := net.Listen("tcp", "localhost:0")
769+
if err != nil {
770+
t.Fatalf("error listening: %v", err)
771+
}
772+
ctCh := testutils.NewChannel()
773+
go func() {
774+
conn, err := lis.Accept()
775+
if err != nil {
776+
t.Errorf("error in lis.Accept(): %v", err)
777+
}
778+
ct := newClientTester(t, conn)
779+
ctCh.Send(ct)
780+
}()
781+
defer lis.Close()
782+
783+
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
784+
if err != nil {
785+
t.Fatalf("error dialing: %v", err)
786+
}
787+
788+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
789+
defer cancel()
790+
791+
val, err := ctCh.Receive(ctx)
792+
if err != nil {
793+
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
794+
}
795+
ct := val.(*clientTester)
796+
goAwayReceived := make(chan struct{})
797+
errCh := make(chan struct{})
798+
go func() {
799+
for {
800+
f, err := ct.fr.ReadFrame()
801+
if err != nil {
802+
return
803+
}
804+
switch fr := f.(type) {
805+
case *http2.GoAwayFrame:
806+
fr = f.(*http2.GoAwayFrame)
807+
if fr.ErrCode == http2.ErrCodeNo {
808+
t.Logf("GoAway received from client")
809+
close(goAwayReceived)
810+
}
811+
default:
812+
t.Errorf("server tester received unexpected frame type %T", f)
813+
close(errCh)
814+
}
815+
}
816+
}()
817+
cc.Close()
818+
defer ct.conn.Close()
819+
select {
820+
case <-goAwayReceived:
821+
case <-errCh:
822+
t.Errorf("Error receiving the goAway: %v", err)
823+
case <-ctx.Done():
824+
t.Errorf("Context timed out")
825+
}
826+
}

0 commit comments

Comments
 (0)