diff --git a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs index 555a54895e7e6f..ca53bbe2068257 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs @@ -189,13 +189,36 @@ public override async Task HandleRequestAsync(HttpStatusCode st await stream.SendResponseAsync(statusCode, headers, content).ConfigureAwait(false); - // closing the connection here causes bytes written to streams to go missing. - // Regardless, we told the client we are closing so it shouldn't matter -- they should not use this connection anymore. - //await CloseAsync(H3_NO_ERROR).ConfigureAwait(false); + await WaitForClientDisconnectAsync(); return request; } + // Wait for the client to close the connection, e.g. after we send a GOAWAY, or after the HttpClient is disposed. + public async Task WaitForClientDisconnectAsync() + { + while (true) + { + Http3LoopbackStream stream; + + try + { + stream = await AcceptRequestStreamAsync().ConfigureAwait(false); + } + catch (QuicConnectionAbortedException abortException) when (abortException.ErrorCode == H3_NO_ERROR) + { + break; + } + + using (stream) + { + await stream.AbortAndWaitForShutdownAsync(H3_REQUEST_REJECTED); + } + } + + await CloseAsync(H3_NO_ERROR); + } + public override async Task WaitForCancellationAsync(bool ignoreIncomingData = true, int requestId = 0) { await GetOpenRequest(requestId).WaitForCancellationAsync(ignoreIncomingData).ConfigureAwait(false); diff --git a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackStream.cs b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackStream.cs index b93807a0321466..a3d2c9a4215c2b 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackStream.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackStream.cs @@ -359,6 +359,13 @@ public async Task WaitForCancellationAsync(bool ignoreIncomingData = true) } } + public async Task AbortAndWaitForShutdownAsync(long errorCode) + { + _stream.AbortRead(errorCode); + _stream.AbortWrite(errorCode); + await _stream.ShutdownCompleted(); + } + public async Task<(long? frameType, byte[] payload)> ReadFrameAsync() { long? frameType = await ReadIntegerAsync().ConfigureAwait(false); diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index d3e6a90c4305b5..6b2012d6df384c 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -283,7 +283,7 @@ private void OnServerGoAway(long lastProcessedStreamId) lock (SyncObj) { - if (lastProcessedStreamId > _lastProcessedStreamId) + if (_lastProcessedStreamId != -1 && lastProcessedStreamId > _lastProcessedStreamId) { // Server can send multiple GOAWAY frames. // Spec says a server MUST NOT increase the stream ID in subsequent GOAWAYs, diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs index 3c1f232ed7e7e3..1e8fd20f066e5d 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs @@ -219,6 +219,8 @@ internal override QuicStreamProvider OpenBidirectionalStream() internal MockStream OpenStream(long streamId, bool bidirectional) { + CheckDisposed(); + ConnectionState? state = _state; if (state is null) { @@ -274,12 +276,15 @@ internal override async ValueTask AcceptStreamAsync(Cancella catch (ChannelClosedException) { long errorCode = _isClient ? state._serverErrorCode : state._clientErrorCode; - throw new QuicConnectionAbortedException(errorCode); + throw (errorCode == -1) ? new QuicOperationAbortedException() : new QuicConnectionAbortedException(errorCode); } } internal override ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken = default) { + // TODO: We should abort local streams (and signal the peer to do likewise) + // Currently, we are not tracking the streams associated with this connection. + ConnectionState? state = _state; if (state is not null) { @@ -292,10 +297,12 @@ internal override ValueTask CloseAsync(long errorCode, CancellationToken cancell if (_isClient) { state._clientErrorCode = errorCode; + DrainAcceptQueue(-1, errorCode); } else { state._serverErrorCode = errorCode; + DrainAcceptQueue(errorCode, -1); } } @@ -312,19 +319,37 @@ private void CheckDisposed() } } + private void DrainAcceptQueue(long outboundErrorCode, long inboundErrorCode) + { + ConnectionState? state = _state; + if (state is not null) + { + // TODO: We really only need to do the complete and drain once, but it doesn't really hurt to do it twice. + state._clientInitiatedStreamChannel.Writer.TryComplete(); + while (state._clientInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState)) + { + streamState._outboundReadErrorCode = streamState._outboundWriteErrorCode = outboundErrorCode; + streamState._inboundStreamBuffer?.AbortRead(); + streamState._outboundStreamBuffer?.EndWrite(); + } + + state._serverInitiatedStreamChannel.Writer.TryComplete(); + while (state._serverInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState)) + { + streamState._inboundReadErrorCode = streamState._inboundWriteErrorCode = inboundErrorCode; + streamState._outboundStreamBuffer?.AbortRead(); + streamState._inboundStreamBuffer?.EndWrite(); + } + } + } + private void Dispose(bool disposing) { if (!_disposed) { if (disposing) { - ConnectionState? state = _state; - if (state is not null) - { - Channel streamChannel = _isClient ? state._clientInitiatedStreamChannel : state._serverInitiatedStreamChannel; - streamChannel.Writer.Complete(); - } - + DrainAcceptQueue(-1, -1); PeerStreamLimit? streamLimit = LocalStreamLimit; if (streamLimit is not null) @@ -448,6 +473,7 @@ public ConnectionState(SslApplicationProtocol applicationProtocol) _applicationProtocol = applicationProtocol; _clientInitiatedStreamChannel = Channel.CreateUnbounded(); _serverInitiatedStreamChannel = Channel.CreateUnbounded(); + _clientErrorCode = _serverErrorCode = -1; } } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockStream.cs index 313fc1eb6b6915..fde0eab97d197b 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockStream.cs @@ -73,7 +73,7 @@ internal override async ValueTask ReadAsync(Memory buffer, Cancellati long errorCode = _isInitiator ? _streamState._inboundReadErrorCode : _streamState._outboundReadErrorCode; if (errorCode != 0) { - throw new QuicStreamAbortedException(errorCode); + throw (errorCode == -1) ? new QuicOperationAbortedException() : new QuicStreamAbortedException(errorCode); } } diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index 9988961e81e5af..b8c91f43395504 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -37,23 +37,146 @@ public async Task TestConnect() Assert.Equal(ApplicationProtocol.ToString(), serverConnection.NegotiatedApplicationProtocol.ToString()); } + private static async Task OpenAndUseStreamAsync(QuicConnection c) + { + QuicStream s = c.OpenBidirectionalStream(); + + // This will pend + await s.ReadAsync(new byte[1]); + + return s; + } + [Fact] [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)] - public async Task AcceptStream_ConnectionAborted_ByClient_Throws() + public async Task CloseAsync_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException() { using var sync = new SemaphoreSlim(0); await RunClientServer( async clientConnection => { - await clientConnection.CloseAsync(ExpectedErrorCode); + await sync.WaitAsync(); + }, + async serverConnection => + { + // Pend operations before the client closes. + Task acceptTask = serverConnection.AcceptStreamAsync().AsTask(); + Assert.False(acceptTask.IsCompleted); + Task connectTask = OpenAndUseStreamAsync(serverConnection); + Assert.False(connectTask.IsCompleted); + + await serverConnection.CloseAsync(ExpectedErrorCode); + sync.Release(); + + // Pending ops should fail + await Assert.ThrowsAsync(() => acceptTask); + await Assert.ThrowsAsync(() => connectTask); + + // Subsequent attempts should fail + // TODO: Which exception is correct? + if (IsMockProvider) + { + await Assert.ThrowsAsync(async () => await serverConnection.AcceptStreamAsync()); + await Assert.ThrowsAsync(async () => await OpenAndUseStreamAsync(serverConnection)); + } + else + { + await Assert.ThrowsAsync(async () => await serverConnection.AcceptStreamAsync()); + + // TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133 + // MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE + //await Assert.ThrowsAsync(async () => await OpenAndUseStreamAsync(serverConnection)); + await Assert.ThrowsAsync(() => OpenAndUseStreamAsync(serverConnection)); + } + }); + } + + [Fact] + [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)] + public async Task Dispose_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException() + { + using var sync = new SemaphoreSlim(0); + + await RunClientServer( + async clientConnection => + { + await sync.WaitAsync(); }, async serverConnection => + { + // Pend operations before the client closes. + Task acceptTask = serverConnection.AcceptStreamAsync().AsTask(); + Assert.False(acceptTask.IsCompleted); + Task connectTask = OpenAndUseStreamAsync(serverConnection); + Assert.False(connectTask.IsCompleted); + + serverConnection.Dispose(); + + sync.Release(); + + // Pending ops should fail + await Assert.ThrowsAsync(() => acceptTask); + await Assert.ThrowsAsync(() => connectTask); + + // Subsequent attempts should fail + // TODO: Should these be QuicOperationAbortedException, to match above? Or vice-versa? + await Assert.ThrowsAsync(async () => await serverConnection.AcceptStreamAsync()); + await Assert.ThrowsAsync(async () => await OpenAndUseStreamAsync(serverConnection)); + }); + } + + [Fact] + [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)] + public async Task ConnectionClosedByPeer_WithPendingAcceptAndConnect_PendingAndSubsequentThrowConnectionAbortedException() + { + if (IsMockProvider) + { + return; + } + + using var sync = new SemaphoreSlim(0); + + await RunClientServer( + async clientConnection => { await sync.WaitAsync(); - QuicConnectionAbortedException ex = await Assert.ThrowsAsync(() => serverConnection.AcceptStreamAsync().AsTask()); + + await clientConnection.CloseAsync(ExpectedErrorCode); + }, + async serverConnection => + { + // Pend operations before the client closes. + Task acceptTask = serverConnection.AcceptStreamAsync().AsTask(); + Assert.False(acceptTask.IsCompleted); + Task connectTask = OpenAndUseStreamAsync(serverConnection); + Assert.False(connectTask.IsCompleted); + + sync.Release(); + + // Pending ops should fail + QuicConnectionAbortedException ex; + + ex = await Assert.ThrowsAsync(() => acceptTask); + Assert.Equal(ExpectedErrorCode, ex.ErrorCode); + ex = await Assert.ThrowsAsync(() => connectTask); + Assert.Equal(ExpectedErrorCode, ex.ErrorCode); + + // Subsequent attempts should fail + ex = await Assert.ThrowsAsync(() => serverConnection.AcceptStreamAsync().AsTask()); Assert.Equal(ExpectedErrorCode, ex.ErrorCode); + // TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133 + // MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE + if (IsMsQuicProvider) + { + await Assert.ThrowsAsync(() => OpenAndUseStreamAsync(serverConnection)); + } + else + { + ex = await Assert.ThrowsAsync(() => OpenAndUseStreamAsync(serverConnection)); + Assert.Equal(ExpectedErrorCode, ex.ErrorCode); + } }); } @@ -79,7 +202,7 @@ private static async Task DoReads(QuicStream reader, int readCount) [InlineData(10)] public async Task CloseAsync_WithOpenStream_LocalAndPeerStreamsFailWithQuicOperationAbortedException(int writesBeforeClose) { - if (typeof(T) == typeof(MockProviderFactory)) + if (IsMockProvider) { return; } @@ -122,7 +245,7 @@ await RunClientServer( [InlineData(10)] public async Task Dispose_WithOpenLocalStream_LocalStreamFailsWithQuicOperationAbortedException(int writesBeforeClose) { - if (typeof(T) == typeof(MockProviderFactory)) + if (IsMockProvider) { return; } diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs index 9a6e43a863e5b1..24a653ceed38bd 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs @@ -23,6 +23,9 @@ public abstract class QuicTestBase public static QuicImplementationProvider ImplementationProvider { get; } = s_factory.GetProvider(); public static bool IsSupported => ImplementationProvider.IsSupported; + public static bool IsMockProvider => typeof(T) == typeof(MockProviderFactory); + public static bool IsMsQuicProvider => typeof(T) == typeof(MsQuicProviderFactory); + public static SslApplicationProtocol ApplicationProtocol { get; } = new SslApplicationProtocol("quictest"); public X509Certificate2 ServerCertificate = System.Net.Test.Common.Configuration.Certificates.GetServerCertificate();