From 878282e61049cf4ae0be19598efa04941b419a0b Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 24 Sep 2024 01:10:54 +0530 Subject: [PATCH 01/16] waiting for all go routine to close before closing transport --- clientconn.go | 9 +++-- internal/transport/http2_client.go | 56 ++++++++++++++++++++++-------- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/clientconn.go b/clientconn.go index a680fefc1385..5b6aac02d0a3 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1141,10 +1141,15 @@ func (cc *ClientConn) Close() error { <-cc.resolverWrapper.serializer.Done() <-cc.balancerWrapper.serializer.Done() - + var wg sync.WaitGroup for ac := range conns { - ac.tearDown(ErrClientConnClosing) + wg.Add(1) + go func(ac *addrConn) { + defer wg.Done() + ac.tearDown(ErrClientConnClosing) + }(ac) } + wg.Wait() cc.addTraceEvent("deleted") // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add // trace reference to the entity being deleted, and thus prevent it from being diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c769deab53c7..eb286f4bf6dd 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -87,8 +87,9 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} - - framer *framer + //keepAliveDone channel is closed whn the keepAlive goroutine exits + keepAliveDone chan struct{} + framer *framer // controlBuf delivers all the control related tasks (e.g., window // updates, reset streams, and various settings) to the controller. // Do not access controlBuf with mu held. @@ -335,6 +336,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts readerDone: make(chan struct{}), writerDone: make(chan struct{}), goAway: make(chan struct{}), + keepAliveDone: make(chan struct{}), framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize), fc: &trInFlow{limit: uint32(icwz)}, scheme: scheme, @@ -1025,6 +1027,14 @@ func (t *http2Client) Close(err error) { } t.cancel() t.conn.Close() + // Wait for the reader goroutine to exit to ensure all resources are cleaned + // up before Close can return. + <-t.readerDone + if t.keepaliveEnabled { + // Wait for the keepAlive goroutine to exit to ensure all resources are cleaned + // up before Close can return. + <-t.keepAliveDone + } channelz.RemoveEntry(t.channelz.ID) // Append info about previous goaways if there were any, since this may be important // for understanding the root cause for this connection to be closed. @@ -1316,11 +1326,11 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { t.controlBuf.put(pingAck) } -func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { +func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error { t.mu.Lock() if t.state == closing { t.mu.Unlock() - return + return nil } if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" { // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug @@ -1332,8 +1342,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { id := f.LastStreamID if id > 0 && id%2 == 0 { t.mu.Unlock() - t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id)) - return + return connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id) } // A client can receive multiple GoAways from the server (see // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first @@ -1350,8 +1359,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { // If there are multiple GoAways the first one should always have an ID greater than the following ones. if id > t.prevGoAwayID { t.mu.Unlock() - t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)) - return + return connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID) } default: t.setGoAwayReason(f) @@ -1375,8 +1383,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { t.prevGoAwayID = id if len(t.activeStreams) == 0 { t.mu.Unlock() - t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) - return + return connectionErrorf(true, nil, "received goaway and there are no active streams") } streamsToClose := make([]*Stream, 0) @@ -1393,6 +1400,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { for _, stream := range streamsToClose { t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) } + return nil } // setGoAwayReason sets the value of t.goAwayReason based @@ -1628,10 +1636,17 @@ func (t *http2Client) readServerPreface() error { // network connection. If the server preface is not read successfully, an // error is pushed to errCh; otherwise errCh is closed with no error. func (t *http2Client) reader(errCh chan<- error) { - defer close(t.readerDone) + var errClose error + defer func() { + close(t.readerDone) + if errClose != nil { + t.Close(errClose) + } + }() if err := t.readServerPreface(); err != nil { errCh <- err + errClose = nil return } close(errCh) @@ -1669,7 +1684,7 @@ func (t *http2Client) reader(errCh chan<- error) { continue } // Transport error. - t.Close(connectionErrorf(true, err, "error reading from server: %v", err)) + errClose = connectionErrorf(true, err, "error reading from server: %v", err) return } switch frame := frame.(type) { @@ -1684,7 +1699,11 @@ func (t *http2Client) reader(errCh chan<- error) { case *http2.PingFrame: t.handlePing(frame) case *http2.GoAwayFrame: - t.handleGoAway(frame) + { + if err := t.handleGoAway(frame); err != nil { + t.Close(err) + } + } case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) default: @@ -1697,6 +1716,13 @@ func (t *http2Client) reader(errCh chan<- error) { // keepalive running in a separate goroutine makes sure the connection is alive by sending pings. func (t *http2Client) keepalive() { + var err error + defer func() { + close(t.readerDone) + if err != nil { + t.Close(err) + } + }() p := &ping{data: [8]byte{}} // True iff a ping has been sent, and no data has been received since then. outstandingPing := false @@ -1720,7 +1746,7 @@ func (t *http2Client) keepalive() { continue } if outstandingPing && timeoutLeft <= 0 { - t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")) + err = connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout") return } t.mu.Lock() @@ -1732,6 +1758,7 @@ func (t *http2Client) keepalive() { // blocking on the condition variable which will never be // signalled again. t.mu.Unlock() + err = nil return } if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { @@ -1769,6 +1796,7 @@ func (t *http2Client) keepalive() { if !timer.Stop() { <-timer.C } + err = nil return } } From cff0d26d6c06a106dddf2ea4242be8e63e60dcc3 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 24 Sep 2024 10:03:06 +0530 Subject: [PATCH 02/16] correcting error --- internal/transport/http2_client.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index eb286f4bf6dd..78822cf6112c 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1699,11 +1699,7 @@ func (t *http2Client) reader(errCh chan<- error) { case *http2.PingFrame: t.handlePing(frame) case *http2.GoAwayFrame: - { - if err := t.handleGoAway(frame); err != nil { - t.Close(err) - } - } + errClose = t.handleGoAway(frame) case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) default: From 56731619a6f8c203a75e429e8d2b55078390aada Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 24 Sep 2024 10:24:32 +0530 Subject: [PATCH 03/16] correcting error --- internal/transport/http2_client.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 78822cf6112c..3800be0061f0 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1646,7 +1646,6 @@ func (t *http2Client) reader(errCh chan<- error) { if err := t.readServerPreface(); err != nil { errCh <- err - errClose = nil return } close(errCh) @@ -1714,7 +1713,7 @@ func (t *http2Client) reader(errCh chan<- error) { func (t *http2Client) keepalive() { var err error defer func() { - close(t.readerDone) + close(t.keepAliveDone) if err != nil { t.Close(err) } @@ -1754,7 +1753,6 @@ func (t *http2Client) keepalive() { // blocking on the condition variable which will never be // signalled again. t.mu.Unlock() - err = nil return } if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { @@ -1792,7 +1790,6 @@ func (t *http2Client) keepalive() { if !timer.Stop() { <-timer.C } - err = nil return } } From dcc80accf66e5ac5bade26059dcdfc45093eef74 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 24 Sep 2024 10:45:08 +0530 Subject: [PATCH 04/16] correcting error --- internal/transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3800be0061f0..eb070e00f2fc 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -87,7 +87,7 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} - //keepAliveDone channel is closed whn the keepAlive goroutine exits + //keepAliveDone channel is closed when the keepAlive goroutine exits keepAliveDone chan struct{} framer *framer // controlBuf delivers all the control related tasks (e.g., window From 59ae8aa1dae782b8f83f4a61cafebba90ebfcba7 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 24 Sep 2024 11:55:21 +0530 Subject: [PATCH 05/16] correcting error --- internal/transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index eb070e00f2fc..4ebba75f4aba 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -87,7 +87,7 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} - //keepAliveDone channel is closed when the keepAlive goroutine exits + //keepAliveDone channel is closed when the keepAlive go routine exits keepAliveDone chan struct{} framer *framer // controlBuf delivers all the control related tasks (e.g., window From b24f606a454b49648ebf44e5ccb3e09df877abf1 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Wed, 25 Sep 2024 08:06:17 +0530 Subject: [PATCH 06/16] correcting comments --- internal/transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 4ebba75f4aba..cf1d04e092a0 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -87,7 +87,7 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} - //keepAliveDone channel is closed when the keepAlive go routine exits + // This channel is closed when the keepAlive goroutine exits. keepAliveDone chan struct{} framer *framer // controlBuf delivers all the control related tasks (e.g., window From 5c704c7f1425bcc07ff70ab95b6fcca72bb59764 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Mon, 30 Sep 2024 19:53:57 +0530 Subject: [PATCH 07/16] Adding test for client to wait for reader goroutine --- internal/transport/transport_test.go | 71 ++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 05f0b0b2e35f..c7608a27fbdd 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2845,3 +2845,74 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { isGreetingDone.Store(true) ct.Close(errors.New("manually closed by client")) } + +type readHangingConn struct { + net.Conn + hangConn chan struct{} + // variable needed to only make read hang when conn is closed + closed *atomic.Bool +} + +func (hc *readHangingConn) Read(b []byte) (n int, err error) { + n, err = hc.Conn.Read(b) + if hc.closed.Load() { + <-hc.hangConn // hang the read till we want + } + return n, err +} + +func (hc *readHangingConn) Close() error { + hc.closed.Store(true) + err := hc.Conn.Close() + return err +} + +// Tests that client does not close untine the reader goroutine exits and closes +// once reader goroutine returns. +func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { + connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + server := setUpServerOnly(t, 0, &ServerConfig{}, normal) + defer server.stop() + addr := resolver.Address{Addr: "localhost:" + server.port} + isReaderHanging := &atomic.Bool{} + hangConn := make(chan struct{}) + dialer := func(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + return &readHangingConn{Conn: conn, hangConn: hangConn, closed: isReaderHanging}, nil + } + copts := ConnectOptions{Dialer: dialer} + copts.ChannelzParent = channelzSubChannel(t) + // Create client transport with custom dialer + ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}) + if connErr != nil { + t.Fatalf("failed to create transport: %v", connErr) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil { + t.Fatalf("Failed to open stream: %v", err) + } + + transportClosedchan := make(chan struct{}) + go func() { + ct.Close(errors.New("manually closed by client")) + close(transportClosedchan) + }() + + select { + case <-transportClosedchan: + t.Fatal("Transport closed before reader completed") + case <-time.After(defaultTestTimeout): + } + close(hangConn) + select { + case <-transportClosedchan: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout when waiting for transport to close") + } +} From 7c26e73218afb1cc8d21296dc771aef160ad7f76 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Mon, 7 Oct 2024 11:15:08 +0530 Subject: [PATCH 08/16] Adding short timeout --- internal/transport/keepalive_test.go | 1 + internal/transport/transport_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/transport/keepalive_test.go b/internal/transport/keepalive_test.go index 393a4540396f..ad377e6b241b 100644 --- a/internal/transport/keepalive_test.go +++ b/internal/transport/keepalive_test.go @@ -44,6 +44,7 @@ import ( ) const defaultTestTimeout = 10 * time.Second +const defaultTestShortTimeout = 10 * time.Millisecond // TestMaxConnectionIdle tests that a server will send GoAway to an idle // client. An idle client is one who doesn't make any RPC calls for a duration diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index c7608a27fbdd..505030b685c2 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2867,7 +2867,7 @@ func (hc *readHangingConn) Close() error { return err } -// Tests that client does not close untine the reader goroutine exits and closes +// Tests that client does not close until the reader goroutine exits and closes // once reader goroutine returns. func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -2898,20 +2898,20 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { t.Fatalf("Failed to open stream: %v", err) } - transportClosedchan := make(chan struct{}) + transportClosed := make(chan struct{}) go func() { ct.Close(errors.New("manually closed by client")) - close(transportClosedchan) + close(transportClosed) }() select { - case <-transportClosedchan: + case <-transportClosed: t.Fatal("Transport closed before reader completed") - case <-time.After(defaultTestTimeout): + case <-time.After(defaultTestShortTimeout): } close(hangConn) select { - case <-transportClosedchan: + case <-transportClosed: case <-time.After(defaultTestTimeout): t.Fatal("Timeout when waiting for transport to close") } From 18941b77b1f93a3f5807a9c2ca06a3d439fb5bc0 Mon Sep 17 00:00:00 2001 From: eshitachandwani <59800922+eshitachandwani@users.noreply.github.com> Date: Mon, 7 Oct 2024 11:34:05 +0530 Subject: [PATCH 09/16] Update transport_test.go --- internal/transport/transport_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 1fd448189e84..de5a8af85195 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2914,6 +2914,9 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { case <-transportClosed: case <-time.After(defaultTestTimeout): t.Fatal("Timeout when waiting for transport to close") + } +} + // TestReadHeaderMultipleBuffers tests the stream when the gRPC headers are // split across multiple buffers. It verifies that the reporting of the // number of bytes read for flow control is correct. From 019de33fbc2158951419e330cba0b1ea7e2a9cc4 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 8 Oct 2024 17:22:51 +0530 Subject: [PATCH 10/16] Adressing comments --- internal/transport/http2_client.go | 17 ++++----- internal/transport/transport_test.go | 55 +++++++++++++++++----------- 2 files changed, 41 insertions(+), 31 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index cf1d04e092a0..22e7abfdceb9 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -86,9 +86,8 @@ type http2Client struct { writerDone chan struct{} // sync point to enable testing. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. - goAway chan struct{} - // This channel is closed when the keepAlive goroutine exits. - keepAliveDone chan struct{} + goAway chan struct{} + keepaliveDone chan struct{} // Closed when the keepalive goroutine exits. framer *framer // controlBuf delivers all the control related tasks (e.g., window // updates, reset streams, and various settings) to the controller. @@ -336,7 +335,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts readerDone: make(chan struct{}), writerDone: make(chan struct{}), goAway: make(chan struct{}), - keepAliveDone: make(chan struct{}), + keepaliveDone: make(chan struct{}), framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize), fc: &trInFlow{limit: uint32(icwz)}, scheme: scheme, @@ -1027,13 +1026,11 @@ func (t *http2Client) Close(err error) { } t.cancel() t.conn.Close() - // Wait for the reader goroutine to exit to ensure all resources are cleaned - // up before Close can return. + // Wait for the reader and keepalive goroutines to exit before returning to + // ensure all resources are cleaned up before Close can return. <-t.readerDone if t.keepaliveEnabled { - // Wait for the keepAlive goroutine to exit to ensure all resources are cleaned - // up before Close can return. - <-t.keepAliveDone + <-t.keepaliveDone } channelz.RemoveEntry(t.channelz.ID) // Append info about previous goaways if there were any, since this may be important @@ -1713,7 +1710,7 @@ func (t *http2Client) reader(errCh chan<- error) { func (t *http2Client) keepalive() { var err error defer func() { - close(t.keepAliveDone) + close(t.keepaliveDone) if err != nil { t.Close(err) } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index de5a8af85195..96b93f4ed441 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2846,11 +2846,12 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { ct.Close(errors.New("manually closed by client")) } +// readHangingConn is a wrapper around net.Conn that makes the Read() hang when +// Close() is called. type readHangingConn struct { net.Conn - hangConn chan struct{} - // variable needed to only make read hang when conn is closed - closed *atomic.Bool + hangConn chan struct{} // Read() hangs until this channel is closed by Close(). + closed *atomic.Bool // Set to true when Close() is called. } func (hc *readHangingConn) Read(b []byte) (n int, err error) { @@ -2867,51 +2868,63 @@ func (hc *readHangingConn) Close() error { return err } -// Tests that client does not close until the reader goroutine exits and closes -// once reader goroutine returns. +// Tests that closing a client transport does not return until the reader +// goroutine exits. func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { - connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + server := setUpServerOnly(t, 0, &ServerConfig{}, normal) defer server.stop() addr := resolver.Address{Addr: "localhost:" + server.port} + isReaderHanging := &atomic.Bool{} hangConn := make(chan struct{}) - dialer := func(_ context.Context, addr string) (net.Conn, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - return &readHangingConn{Conn: conn, hangConn: hangConn, closed: isReaderHanging}, nil + copts := ConnectOptions{ + Dialer: func(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + return &readHangingConn{Conn: conn, hangConn: hangConn, closed: isReaderHanging}, nil + }, + ChannelzParent: channelzSubChannel(t), } - copts := ConnectOptions{Dialer: dialer} - copts.ChannelzParent = channelzSubChannel(t) - // Create client transport with custom dialer - ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}) - if connErr != nil { - t.Fatalf("failed to create transport: %v", connErr) + + // Create a client transport with a custom dialer that hangs the Read() + // after Close(). + ct, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {}) + if err != nil { + t.Fatalf("Failed to create transport: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("Failed to open stream: %v", err) } + // Closing the client transport will result in the underlying net.Conn being + // closed, which will result in readHangingConn.Read() to hang. This will + // stall the exit of the reader goroutine, and will stall client + // transport's Close from returning. transportClosed := make(chan struct{}) go func() { ct.Close(errors.New("manually closed by client")) close(transportClosed) }() + // Wait for a short duration and ensure that the client transport's Close() + // does not return. select { case <-transportClosed: t.Fatal("Transport closed before reader completed") case <-time.After(defaultTestShortTimeout): } + + // Closing the channel will unblock the reader goroutine and will ensure + // that the client transport's Close() returns. close(hangConn) select { - case <-transportClosed: + case <-ctx.Done(): case <-time.After(defaultTestTimeout): t.Fatal("Timeout when waiting for transport to close") } From ffbcc654d7f196d185bc78ea534dd39ee7d06fd0 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 8 Oct 2024 18:06:46 +0530 Subject: [PATCH 11/16] Adressing comments --- internal/transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 22e7abfdceb9..6f2dd656924b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1026,7 +1026,7 @@ func (t *http2Client) Close(err error) { } t.cancel() t.conn.Close() - // Wait for the reader and keepalive goroutines to exit before returning to + // Waiting for the reader and keepalive goroutines to exit before returning to // ensure all resources are cleaned up before Close can return. <-t.readerDone if t.keepaliveEnabled { From 8cc025ecef8c3015b86f4f9ca5266d1b98bbe577 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 8 Oct 2024 18:19:52 +0530 Subject: [PATCH 12/16] Changing position --- internal/transport/transport_test.go | 130 +++++++++++++-------------- 1 file changed, 65 insertions(+), 65 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 96b93f4ed441..56805b7210d5 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2781,71 +2781,6 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { } } -// hangingConn is a net.Conn wrapper for testing, simulating hanging connections -// after a GOAWAY frame is sent, of which Write operations pause until explicitly -// signaled or a timeout occurs. -type hangingConn struct { - net.Conn - hangConn chan struct{} - startHanging *atomic.Bool -} - -func (hc *hangingConn) Write(b []byte) (n int, err error) { - n, err = hc.Conn.Write(b) - if hc.startHanging.Load() { - <-hc.hangConn - } - return n, err -} - -// Tests the scenario where a client transport is closed and writing of the -// GOAWAY frame as part of the close does not complete because of a network -// hang. The test verifies that the client transport is closed without waiting -// for too long. -func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { - // Override timer for writing GOAWAY to 0 so that the connection write - // always times out. It is equivalent of real network hang when conn - // write for goaway doesn't finish in specified deadline - origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout - goAwayLoopyWriterTimeout = time.Millisecond - defer func() { - goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout - }() - - // Create the server set up. - connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - server := setUpServerOnly(t, 0, &ServerConfig{}, normal) - defer server.stop() - addr := resolver.Address{Addr: "localhost:" + server.port} - isGreetingDone := &atomic.Bool{} - hangConn := make(chan struct{}) - defer close(hangConn) - dialer := func(_ context.Context, addr string) (net.Conn, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - return &hangingConn{Conn: conn, hangConn: hangConn, startHanging: isGreetingDone}, nil - } - copts := ConnectOptions{Dialer: dialer} - copts.ChannelzParent = channelzSubChannel(t) - // Create client transport with custom dialer - ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}) - if connErr != nil { - t.Fatalf("failed to create transport: %v", connErr) - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil { - t.Fatalf("Failed to open stream: %v", err) - } - - isGreetingDone.Store(true) - ct.Close(errors.New("manually closed by client")) -} - // readHangingConn is a wrapper around net.Conn that makes the Read() hang when // Close() is called. type readHangingConn struct { @@ -2930,6 +2865,71 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { } } +// hangingConn is a net.Conn wrapper for testing, simulating hanging connections +// after a GOAWAY frame is sent, of which Write operations pause until explicitly +// signaled or a timeout occurs. +type hangingConn struct { + net.Conn + hangConn chan struct{} + startHanging *atomic.Bool +} + +func (hc *hangingConn) Write(b []byte) (n int, err error) { + n, err = hc.Conn.Write(b) + if hc.startHanging.Load() { + <-hc.hangConn + } + return n, err +} + +// Tests the scenario where a client transport is closed and writing of the +// GOAWAY frame as part of the close does not complete because of a network +// hang. The test verifies that the client transport is closed without waiting +// for too long. +func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { + // Override timer for writing GOAWAY to 0 so that the connection write + // always times out. It is equivalent of real network hang when conn + // write for goaway doesn't finish in specified deadline + origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout + goAwayLoopyWriterTimeout = time.Millisecond + defer func() { + goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout + }() + + // Create the server set up. + connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + server := setUpServerOnly(t, 0, &ServerConfig{}, normal) + defer server.stop() + addr := resolver.Address{Addr: "localhost:" + server.port} + isGreetingDone := &atomic.Bool{} + hangConn := make(chan struct{}) + defer close(hangConn) + dialer := func(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + return &hangingConn{Conn: conn, hangConn: hangConn, startHanging: isGreetingDone}, nil + } + copts := ConnectOptions{Dialer: dialer} + copts.ChannelzParent = channelzSubChannel(t) + // Create client transport with custom dialer + ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}) + if connErr != nil { + t.Fatalf("failed to create transport: %v", connErr) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil { + t.Fatalf("Failed to open stream: %v", err) + } + + isGreetingDone.Store(true) + ct.Close(errors.New("manually closed by client")) +} + // TestReadHeaderMultipleBuffers tests the stream when the gRPC headers are // split across multiple buffers. It verifies that the reporting of the // number of bytes read for flow control is correct. From 768823a554c82497de262738c81cf56c09a53b50 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 8 Oct 2024 19:20:46 +0530 Subject: [PATCH 13/16] Changing variable --- internal/transport/transport_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 56805b7210d5..32a60043dbf3 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2785,14 +2785,14 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { // Close() is called. type readHangingConn struct { net.Conn - hangConn chan struct{} // Read() hangs until this channel is closed by Close(). - closed *atomic.Bool // Set to true when Close() is called. + readHangConn chan struct{} // Read() hangs until this channel is closed by Close(). + closed *atomic.Bool // Set to true when Close() is called. } func (hc *readHangingConn) Read(b []byte) (n int, err error) { n, err = hc.Conn.Read(b) if hc.closed.Load() { - <-hc.hangConn // hang the read till we want + <-hc.readHangConn // hang the read till we want } return n, err } @@ -2814,14 +2814,14 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { addr := resolver.Address{Addr: "localhost:" + server.port} isReaderHanging := &atomic.Bool{} - hangConn := make(chan struct{}) + readHangConn := make(chan struct{}) copts := ConnectOptions{ Dialer: func(_ context.Context, addr string) (net.Conn, error) { conn, err := net.Dial("tcp", addr) if err != nil { return nil, err } - return &readHangingConn{Conn: conn, hangConn: hangConn, closed: isReaderHanging}, nil + return &readHangingConn{Conn: conn, readHangConn: readHangConn, closed: isReaderHanging}, nil }, ChannelzParent: channelzSubChannel(t), } @@ -2857,7 +2857,7 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { // Closing the channel will unblock the reader goroutine and will ensure // that the client transport's Close() returns. - close(hangConn) + close(readHangConn) select { case <-ctx.Done(): case <-time.After(defaultTestTimeout): From 60cd4d0e03bf52b41d5967eb51f86ca8530f0ca9 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 8 Oct 2024 20:23:26 +0530 Subject: [PATCH 14/16] Changing variable --- internal/transport/transport_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 32a60043dbf3..9f859b3dd52a 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2859,7 +2859,7 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { // that the client transport's Close() returns. close(readHangConn) select { - case <-ctx.Done(): + case <-transportClosed: case <-time.After(defaultTestTimeout): t.Fatal("Timeout when waiting for transport to close") } From a09b7aad44efbf17e7877811cf9b7b83f02e3b31 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 8 Oct 2024 20:44:41 +0530 Subject: [PATCH 15/16] Changing variable --- internal/transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 6f2dd656924b..fc48996408ab 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1026,7 +1026,7 @@ func (t *http2Client) Close(err error) { } t.cancel() t.conn.Close() - // Waiting for the reader and keepalive goroutines to exit before returning to + // Waits for the reader and keepalive goroutines to exit before returning to // ensure all resources are cleaned up before Close can return. <-t.readerDone if t.keepaliveEnabled { From 4eea9aaa1b253ea029a195f3a4766a7dac76811a Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 10 Oct 2024 10:41:04 +0530 Subject: [PATCH 16/16] style change --- internal/transport/transport_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 9f859b3dd52a..4752c785b59d 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2799,8 +2799,7 @@ func (hc *readHangingConn) Read(b []byte) (n int, err error) { func (hc *readHangingConn) Close() error { hc.closed.Store(true) - err := hc.Conn.Close() - return err + return hc.Conn.Close() } // Tests that closing a client transport does not return until the reader