Skip to content

Commit f268126

Browse files
authored
Send GOAWAY to server on Client Transport Shutdown (#7015)
1 parent 431436d commit f268126

File tree

5 files changed

+196
-22
lines changed

5 files changed

+196
-22
lines changed

internal/transport/controlbuf.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ type goAway struct {
193193
code http2.ErrCode
194194
debugData []byte
195195
headsUp bool
196-
closeConn error // if set, loopyWriter will exit, resulting in conn closure
196+
closeConn error // if set, loopyWriter will exit with this error
197197
}
198198

199199
func (*goAway) isTransportResponseFrame() bool { return false }
@@ -495,21 +495,22 @@ type loopyWriter struct {
495495
ssGoAwayHandler func(*goAway) (bool, error)
496496
}
497497

498-
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
498+
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
499499
var buf bytes.Buffer
500500
l := &loopyWriter{
501-
side: s,
502-
cbuf: cbuf,
503-
sendQuota: defaultWindowSize,
504-
oiws: defaultWindowSize,
505-
estdStreams: make(map[uint32]*outStream),
506-
activeStreams: newOutStreamList(),
507-
framer: fr,
508-
hBuf: &buf,
509-
hEnc: hpack.NewEncoder(&buf),
510-
bdpEst: bdpEst,
511-
conn: conn,
512-
logger: logger,
501+
side: s,
502+
cbuf: cbuf,
503+
sendQuota: defaultWindowSize,
504+
oiws: defaultWindowSize,
505+
estdStreams: make(map[uint32]*outStream),
506+
activeStreams: newOutStreamList(),
507+
framer: fr,
508+
hBuf: &buf,
509+
hEnc: hpack.NewEncoder(&buf),
510+
bdpEst: bdpEst,
511+
conn: conn,
512+
logger: logger,
513+
ssGoAwayHandler: goAwayHandler,
513514
}
514515
return l
515516
}

internal/transport/http2_client.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
408408
readerErrCh := make(chan error, 1)
409409
go t.reader(readerErrCh)
410410
defer func() {
411-
if err == nil {
412-
err = <-readerErrCh
413-
}
414411
if err != nil {
412+
// writerDone should be closed since the loopy goroutine
413+
// wouldn't have started in the case this function returns an error.
414+
close(t.writerDone)
415415
t.Close(err)
416416
}
417417
}()
@@ -458,8 +458,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
458458
if err := t.framer.writer.Flush(); err != nil {
459459
return nil, err
460460
}
461+
// Block until the server preface is received successfully or an error occurs.
462+
if err = <-readerErrCh; err != nil {
463+
return nil, err
464+
}
461465
go func() {
462-
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
466+
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
463467
if err := t.loopy.run(); !isIOError(err) {
464468
// Immediately close the connection, as the loopy writer returns
465469
// when there are no more active streams and we were draining (the
@@ -517,6 +521,17 @@ func (t *http2Client) getPeer() *peer.Peer {
517521
}
518522
}
519523

524+
// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway
525+
// to be the last frame loopy writes to the transport.
526+
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
527+
t.mu.Lock()
528+
defer t.mu.Unlock()
529+
if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
530+
return false, err
531+
}
532+
return false, g.closeConn
533+
}
534+
520535
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
521536
aud := t.createAudience(callHdr)
522537
ri := credentials.RequestInfo{
@@ -966,7 +981,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
966981

967982
// Close kicks off the shutdown process of the transport. This should be called
968983
// only once on a transport. Once it is called, the transport should not be
969-
// accessed any more.
984+
// accessed anymore.
970985
func (t *http2Client) Close(err error) {
971986
t.mu.Lock()
972987
// Make sure we only close once.
@@ -991,7 +1006,10 @@ func (t *http2Client) Close(err error) {
9911006
t.kpDormancyCond.Signal()
9921007
}
9931008
t.mu.Unlock()
994-
t.controlBuf.finish()
1009+
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
1010+
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY.
1011+
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
1012+
<-t.writerDone
9951013
t.cancel()
9961014
t.conn.Close()
9971015
channelz.RemoveEntry(t.channelz.ID)

internal/transport/http2_server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
330330
t.handleSettings(sf)
331331

332332
go func() {
333-
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
334-
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
333+
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
335334
err := t.loopy.run()
336335
close(t.loopyWriterDone)
337336
if !isIOError(err) {

internal/transport/transport_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2659,3 +2659,94 @@ func TestConnectionError_Unwrap(t *testing.T) {
26592659
t.Error("ConnectionError does not unwrap")
26602660
}
26612661
}
2662+
2663+
// Test that in the event of a graceful client transport shutdown, i.e.,
2664+
// clientTransport.Close(), client sends a goaway to the server with the correct
2665+
// error code and debug data.
2666+
func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
2667+
// Create a server.
2668+
lis, err := net.Listen("tcp", "localhost:0")
2669+
if err != nil {
2670+
t.Fatalf("Error while listening: %v", err)
2671+
}
2672+
defer lis.Close()
2673+
// greetDone is used to notify when server is done greeting the client.
2674+
greetDone := make(chan struct{})
2675+
// errorCh verifies that desired GOAWAY not received by server
2676+
errorCh := make(chan error)
2677+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2678+
defer cancel()
2679+
// Launch the server.
2680+
go func() {
2681+
sconn, err := lis.Accept()
2682+
if err != nil {
2683+
t.Errorf("Error while accepting: %v", err)
2684+
}
2685+
defer sconn.Close()
2686+
if _, err := io.ReadFull(sconn, make([]byte, len(clientPreface))); err != nil {
2687+
t.Errorf("Error while writing settings ack: %v", err)
2688+
return
2689+
}
2690+
sfr := http2.NewFramer(sconn, sconn)
2691+
if err := sfr.WriteSettings(); err != nil {
2692+
t.Errorf("Error while writing settings %v", err)
2693+
return
2694+
}
2695+
fr, _ := sfr.ReadFrame()
2696+
if _, ok := fr.(*http2.SettingsFrame); !ok {
2697+
t.Errorf("Expected settings frame, got %v", fr)
2698+
}
2699+
fr, _ = sfr.ReadFrame()
2700+
if fr, ok := fr.(*http2.SettingsFrame); !ok && fr.IsAck() {
2701+
t.Errorf("Expected settings ACK frame, got %v", fr)
2702+
}
2703+
fr, _ = sfr.ReadFrame()
2704+
if fr, ok := fr.(*http2.HeadersFrame); !ok && fr.Flags.Has(http2.FlagHeadersEndStream) {
2705+
t.Errorf("Expected Headers frame with END_HEADERS frame, got %v", fr)
2706+
}
2707+
close(greetDone)
2708+
2709+
frame, err := sfr.ReadFrame()
2710+
if err != nil {
2711+
return
2712+
}
2713+
switch fr := frame.(type) {
2714+
case *http2.GoAwayFrame:
2715+
// Records that the server successfully received a GOAWAY frame.
2716+
goAwayFrame := fr
2717+
if goAwayFrame.ErrCode == http2.ErrCodeNo {
2718+
t.Logf("Received goAway frame from client")
2719+
close(errorCh)
2720+
} else {
2721+
errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err)
2722+
close(errorCh)
2723+
}
2724+
return
2725+
default:
2726+
errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err)
2727+
close(errorCh)
2728+
return
2729+
}
2730+
}()
2731+
2732+
ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {})
2733+
if err != nil {
2734+
t.Fatalf("Error while creating client transport: %v", err)
2735+
}
2736+
_, err = ct.NewStream(ctx, &CallHdr{})
2737+
if err != nil {
2738+
t.Fatalf("failed to open stream: %v", err)
2739+
}
2740+
// Wait until server receives the headers and settings frame as part of greet.
2741+
<-greetDone
2742+
ct.Close(errors.New("manually closed by client"))
2743+
t.Logf("Closed the client connection")
2744+
select {
2745+
case err := <-errorCh:
2746+
if err != nil {
2747+
t.Errorf("Error receiving the GOAWAY frame: %v", err)
2748+
}
2749+
case <-ctx.Done():
2750+
t.Errorf("Context timed out")
2751+
}
2752+
}

test/goaway_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package test
2020

2121
import (
2222
"context"
23+
"fmt"
2324
"io"
2425
"net"
2526
"strings"
@@ -761,3 +762,67 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) {
761762
t.Fatalf("Error waiting for graceful shutdown of the server: %v", err)
762763
}
763764
}
765+
766+
// TestClientSendsAGoAway tests the scenario where you get a go away ping
767+
// frames from the client during graceful shutdown.
768+
func (s) TestClientSendsAGoAway(t *testing.T) {
769+
lis, err := net.Listen("tcp", "localhost:0")
770+
if err != nil {
771+
t.Fatalf("error listening: %v", err)
772+
}
773+
ctCh := testutils.NewChannel()
774+
go func() {
775+
conn, err := lis.Accept()
776+
if err != nil {
777+
t.Errorf("error in lis.Accept(): %v", err)
778+
}
779+
ct := newClientTester(t, conn)
780+
ctCh.Send(ct)
781+
}()
782+
defer lis.Close()
783+
784+
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
785+
if err != nil {
786+
t.Fatalf("error dialing: %v", err)
787+
}
788+
789+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
790+
defer cancel()
791+
792+
val, err := ctCh.Receive(ctx)
793+
if err != nil {
794+
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
795+
}
796+
ct := val.(*clientTester)
797+
goAwayReceived := make(chan struct{})
798+
errCh := make(chan error)
799+
go func() {
800+
for {
801+
f, err := ct.fr.ReadFrame()
802+
if err != nil {
803+
return
804+
}
805+
switch fr := f.(type) {
806+
case *http2.GoAwayFrame:
807+
fr = f.(*http2.GoAwayFrame)
808+
if fr.ErrCode == http2.ErrCodeNo {
809+
t.Logf("GoAway received from client")
810+
close(goAwayReceived)
811+
}
812+
default:
813+
t.Errorf("server tester received unexpected frame type %T", f)
814+
errCh <- fmt.Errorf("server tester received unexpected frame type %T", f)
815+
close(errCh)
816+
}
817+
}
818+
}()
819+
cc.Close()
820+
defer ct.conn.Close()
821+
select {
822+
case <-goAwayReceived:
823+
case err := <-errCh:
824+
t.Errorf("Error receiving the goAway: %v", err)
825+
case <-ctx.Done():
826+
t.Errorf("Context timed out")
827+
}
828+
}

0 commit comments

Comments
 (0)