diff --git a/src/core/Akka.Tests/IO/TcpConnectionBatchingSpec.cs b/src/core/Akka.Tests/IO/TcpConnectionBatchingSpec.cs index 672372e76b8..50744d861ea 100644 --- a/src/core/Akka.Tests/IO/TcpConnectionBatchingSpec.cs +++ b/src/core/Akka.Tests/IO/TcpConnectionBatchingSpec.cs @@ -133,6 +133,76 @@ public override async ValueTask WriteAsync(ReadOnlyMemory buffer, } } + private sealed class TrackingDisposeStream : Stream + { + private readonly Stream _inner; + private int _disposeCount; + private int _disposeAsyncCount; + + public TrackingDisposeStream(Stream inner) + { + _inner = inner; + } + + public int DisposeCount => Volatile.Read(ref _disposeCount); + public int DisposeAsyncCount => Volatile.Read(ref _disposeAsyncCount); + + public override bool CanRead => _inner.CanRead; + public override bool CanSeek => _inner.CanSeek; + public override bool CanWrite => _inner.CanWrite; + public override long Length => _inner.Length; + + public override long Position + { + get => _inner.Position; + set => _inner.Position = value; + } + + public override void Flush() => _inner.Flush(); + + public override Task FlushAsync(CancellationToken cancellationToken) => + _inner.FlushAsync(cancellationToken); + + public override int Read(byte[] buffer, int offset, int count) => + _inner.Read(buffer, offset, count); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + _inner.ReadAsync(buffer, offset, count, cancellationToken); + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + _inner.ReadAsync(buffer, cancellationToken); + + public override long Seek(long offset, SeekOrigin origin) => _inner.Seek(offset, origin); + + public override void SetLength(long value) => _inner.SetLength(value); + + public override void Write(byte[] buffer, int offset, int count) => + _inner.Write(buffer, offset, count); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + _inner.WriteAsync(buffer, offset, count, cancellationToken); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => + _inner.WriteAsync(buffer, cancellationToken); + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Interlocked.Increment(ref _disposeCount); + _inner.Dispose(); + } + + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + Interlocked.Increment(ref _disposeAsyncCount); + await _inner.DisposeAsync().ConfigureAwait(false); + } + } + public TcpConnectionBatchingSpec(ITestOutputHelper output) : base(@"akka.loglevel = DEBUG akka.io.tcp.trace-logging = true", output: output) @@ -190,6 +260,97 @@ await AwaitAssertAsync(() => await ExpectTerminatedAsync(connection); } + [Fact] + public async Task TcpConnection_should_finalize_transport_via_DisposeAsync_after_confirmed_close() + { + using var socketPair = await ConnectedSocketPair.CreateAsync(); + var trackingStream = new TrackingDisposeStream(new NetworkStream(socketPair.Server, ownsSocket: false)); + var bindHandler = CreateTestProbe(); + var handler = CreateTestProbe(); + var settings = TcpSettings.Create(Sys); + + var connection = Sys.ActorOf(Props.Create(() => new TcpIncomingConnection( + settings, + socketPair.Server, + bindHandler.Ref, + Array.Empty(), + false, + trackingStream))); + + await bindHandler.ExpectMsgAsync(); + bindHandler.Send(connection, new Tcp.Register(handler.Ref)); + await WatchAsync(connection); + + handler.Send(connection, Tcp.ConfirmedClose.Instance); + socketPair.Client.Shutdown(SocketShutdown.Send); + + await handler.ExpectMsgAsync(); + await ExpectTerminatedAsync(connection); + + trackingStream.DisposeAsyncCount.Should().Be(1); + trackingStream.DisposeCount.Should().Be(0); + } + + [Fact] + public async Task TcpConnection_should_finalize_transport_during_graceful_close() + { + using var socketPair = await ConnectedSocketPair.CreateAsync(); + var trackingStream = new TrackingDisposeStream(new NetworkStream(socketPair.Server, ownsSocket: false)); + var bindHandler = CreateTestProbe(); + var handler = CreateTestProbe(); + var settings = TcpSettings.Create(Sys); + + var connection = Sys.ActorOf(Props.Create(() => new TcpIncomingConnection( + settings, + socketPair.Server, + bindHandler.Ref, + Array.Empty(), + false, + trackingStream))); + + await bindHandler.ExpectMsgAsync(); + bindHandler.Send(connection, new Tcp.Register(handler.Ref)); + await WatchAsync(connection); + + handler.Send(connection, Tcp.Close.Instance); + + await handler.ExpectMsgAsync(); + await ExpectTerminatedAsync(connection); + + trackingStream.DisposeAsyncCount.Should().Be(1); + trackingStream.DisposeCount.Should().Be(0); + } + + [Fact] + public async Task TcpConnection_should_fall_back_to_abort_cleanup_when_stopped_unexpectedly() + { + using var socketPair = await ConnectedSocketPair.CreateAsync(); + var trackingStream = new TrackingDisposeStream(new NetworkStream(socketPair.Server, ownsSocket: false)); + var bindHandler = CreateTestProbe(); + var handler = CreateTestProbe(); + var settings = TcpSettings.Create(Sys); + + var connection = Sys.ActorOf(Props.Create(() => new TcpIncomingConnection( + settings, + socketPair.Server, + bindHandler.Ref, + Array.Empty(), + false, + trackingStream))); + + await bindHandler.ExpectMsgAsync(); + bindHandler.Send(connection, new Tcp.Register(handler.Ref)); + await WatchAsync(connection); + + handler.Send(connection, PoisonPill.Instance); + + await handler.ExpectMsgAsync(); + await ExpectTerminatedAsync(connection); + + trackingStream.DisposeAsyncCount.Should().Be(0); + trackingStream.DisposeCount.Should().BeGreaterThan(0); + } + private sealed class ConnectedSocketPair : IDisposable { private ConnectedSocketPair(Socket client, Socket server) diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 3a0a32f9c49..ea666ed73ca 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -143,6 +143,24 @@ private sealed class TransportOperationFailed : INoSerializationVerificationNeed public TransportOperationFailed(Exception cause) { Cause = cause; } } + /// + /// Self-tell: transport finalization completed successfully. + /// + private sealed class TransportFinalized : INoSerializationVerificationNeeded + { + public static readonly TransportFinalized Instance = new(); + private TransportFinalized() { } + } + + /// + /// Self-tell: transport finalization failed. + /// + private sealed class TransportFinalizeFailed : INoSerializationVerificationNeeded + { + public Exception Cause { get; } + public TransportFinalizeFailed(Exception cause) { Cause = cause; } + } + private sealed class CommanderDied : INoSerializationVerificationNeeded, IDeadLetterSuppression { public static readonly CommanderDied Instance = new(); @@ -231,6 +249,13 @@ private HandlerDied() { } // this as Tcp.ErrorClosed instead of treating it as a clean EOF. private bool _readPumpHasError; private Exception? _readPumpError; + + // True once the transport has been fully cleaned up via CloseAsync or DisposeAsync. + // When set, PostStop skips the abort fallback because there's nothing left to tear down. + private bool _transportFinalized; + + // True while an async finalization task is in flight (used by ConfirmedClose). + private bool _transportFinalizing; #endregion private static readonly IOException DroppingWriteBecauseClosingException = @@ -265,11 +290,13 @@ protected override void PostStop() if (_transport != null) { - // Abort cancels the CTS, sets linger=0, closes the socket. - // This unblocks any pending stream.ReadAsync/WriteAsync in the pump tasks. - // The pump tasks will exit with OperationCanceledException or IOException. - try { _transport.Abort(); } - catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 transport may already be disposed + if (!_transportFinalized) + { + // Abort is the emergency fallback for unexpected actor stops. + // Clean Close / ConfirmedClose paths finalize the transport before stopping. + try { _transport.Abort(); } + catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 transport may already be disposed + } } else { @@ -486,8 +513,8 @@ private void ClosingBehaviour(IActorRef closeSender, ConnectionClosed closeEvent if (closeEvent is ConfirmedClosed) { if (_traceLogging) - Log.Debug("Peer FIN received during ConfirmedClose - connection fully closed"); - DoCloseConnection(closeSender, ConfirmedClosed.Instance); + Log.Debug("Peer FIN received during ConfirmedClose - waiting for transport finalization"); + TryFinishClose(closeSender, closeEvent); return; } @@ -511,11 +538,14 @@ private void ClosingBehaviour(IActorRef closeSender, ConnectionClosed closeEvent if (_traceLogging) Log.Debug("ConfirmedClose: FIN sent, waiting for peer FIN"); + + TryFinishClose(closeSender, closeEvent); } else { - // For regular Close, transport is fully closed + // For regular Close, transport is fully closed and finalized. _outputShutdown = true; + _transportFinalized = true; TryFinishClose(closeSender, closeEvent); } }); @@ -531,6 +561,25 @@ private void ClosingBehaviour(IActorRef closeSender, ConnectionClosed closeEvent Log.Debug("I/O task failed during close: {0}", msg.Cause.Message); DoCloseConnection(closeSender, closeEvent); }); + Receive(_ => + { + _transportFinalizing = false; + _transportFinalized = true; + + if (_traceLogging) + Log.Debug("Transport finalization completed"); + + DoCloseConnection(closeSender, closeEvent); + }); + Receive(msg => + { + _transportFinalizing = false; + + if (_traceLogging) + Log.Debug("Transport finalization failed: {0}", msg.Cause.Message); + + DoCloseConnection(closeSender, closeEvent); + }); SuspendResumeHandlers(); Receive(_ => { @@ -551,8 +600,11 @@ private void TryFinishClose(IActorRef closeSender, ConnectionClosed closeEvent) { if (closeEvent is ConfirmedClosed) { - // For ConfirmedClose, we need to wait for peer FIN (StreamEof). - // The StreamEof handler calls DoCloseConnection directly. + // For ConfirmedClose, wait until our FIN has been sent and the peer's FIN has arrived. + // Once both sides are closed, finalize the transport asynchronously before stopping. + if (_outputShutdown && _peerClosed) + StartTransportFinalization(); + return; } @@ -561,6 +613,24 @@ private void TryFinishClose(IActorRef closeSender, ConnectionClosed closeEvent) DoCloseConnection(closeSender, closeEvent); } + private void StartTransportFinalization() + { + if (_transport is null || _transportFinalized || _transportFinalizing) + return; + + _transportFinalizing = true; + _transport.DisposeAsync().AsTask().ContinueWith(t => + { + if (t.IsFaulted) + return (object)new TransportFinalizeFailed(t.Exception!.InnerException ?? t.Exception); + + if (t.IsCanceled) + return (object)new TransportFinalizeFailed(new TaskCanceledException("Transport finalization was canceled")); + + return TransportFinalized.Instance; + }, TaskContinuationOptions.ExecuteSynchronously).PipeTo(Self); + } + private void SuspendResumeHandlers() { Receive(_ => diff --git a/src/core/Akka/IO/TcpTransportConnection.cs b/src/core/Akka/IO/TcpTransportConnection.cs index a0dabe56b7d..3e6a21c3a2d 100644 --- a/src/core/Akka/IO/TcpTransportConnection.cs +++ b/src/core/Akka/IO/TcpTransportConnection.cs @@ -28,6 +28,13 @@ public sealed class TcpTransportConnection : ITransportConnection private readonly Pipe _inputPipe; private readonly Pipe _outputPipe; private readonly CancellationTokenSource _cts = new(); + private Task? _disposeTask; + private int _transportCancellationRequested; + private int _outputWriterCompleted; + private int _sendShutdownApplied; + private int _streamDisposed; + private int _socketClosed; + private int _ctsDisposed; /// /// Creates a transport connection from an already-connected socket. @@ -105,80 +112,197 @@ public ValueTask FlushAsync(CancellationToken ct = default) public async Task ShutdownAsync() { // Complete the output pipe — write pump will drain and exit - await _outputPipe.Writer.CompleteAsync().ConfigureAwait(false); + await CompleteOutputWriterAsync().ConfigureAwait(false); // Wait for write pump to finish flushing await WriteCompleted.ConfigureAwait(false); // Half-close the socket (send FIN). // SocketException is expected if the peer already reset the connection. - try - { - _socket.Shutdown(SocketShutdown.Send); - } - catch (SocketException) { } // slopwatch-ignore: SW003 socket may already be closed by peer or abort + TryShutdownSend(); } public async Task CloseAsync() { // Complete the output pipe — write pump will drain and exit - await _outputPipe.Writer.CompleteAsync().ConfigureAwait(false); + await CompleteOutputWriterAsync().ConfigureAwait(false); // Wait for write pump to finish flushing await WriteCompleted.ConfigureAwait(false); // Cancel to unblock the read pump (which may be blocked on stream.ReadAsync) - _cts.Cancel(); + CancelTransport(); // Wait for read pump to exit — it may throw OperationCanceledException (from CTS cancel) // or IOException/SocketException (from stream close). Both are expected during shutdown. try { await ReadCompleted.ConfigureAwait(false); } - catch (Exception) when (_cts.IsCancellationRequested) { } // slopwatch-ignore: SW003 expected cancellation or I/O error during shutdown + catch (Exception) when (IsTransportCancellationRequested) { } // slopwatch-ignore: SW003 expected cancellation or I/O error during shutdown - // Close the stream and socket - await _stream.DisposeAsync().ConfigureAwait(false); - _socket.Close(); + await DisposeAsync().ConfigureAwait(false); } public void Abort() { // Cancel pumps immediately - _cts.Cancel(); + CancelTransport(); // Complete pipes to unblock any pending reads/writes on them. // InvalidOperationException if already completed — safe to ignore. - try { _outputPipe.Writer.Complete(); } catch (InvalidOperationException) { } // slopwatch-ignore: SW003 pipe may already be completed - try { _inputPipe.Writer.Complete(); } catch (InvalidOperationException) { } // slopwatch-ignore: SW003 pipe may already be completed + TryCompleteOutputWriter(); // RST the socket — SocketException/ObjectDisposedException if already closed. - try - { - _socket.LingerState = new LingerOption(true, 0); - _socket.Close(); - } - catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 socket may already be disposed - catch (SocketException) { } // slopwatch-ignore: SW003 socket may already be closed + CloseSocket(abortive: true); // Dispose the stream — ObjectDisposedException if already disposed. - try { _stream.Dispose(); } catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 stream may already be disposed + DisposeStream(); } public async ValueTask DisposeAsync() { - _cts.Cancel(); + var disposeTask = Volatile.Read(ref _disposeTask); + if (disposeTask is null) + { + var created = DisposeCoreAsync(); + disposeTask = Interlocked.CompareExchange(ref _disposeTask, created, null) ?? created; + } + + await disposeTask.ConfigureAwait(false); + } + + private async Task DisposeCoreAsync() + { + CancelTransport(); - await _outputPipe.Writer.CompleteAsync().ConfigureAwait(false); - await _inputPipe.Writer.CompleteAsync().ConfigureAwait(false); + await CompleteOutputWriterAsync().ConfigureAwait(false); // Wait for pump tasks — they may throw OperationCanceledException or I/O errors during shutdown. try { await Task.WhenAll(ReadCompleted, WriteCompleted).ConfigureAwait(false); } - catch (Exception) when (_cts.IsCancellationRequested) { } // slopwatch-ignore: SW003 expected errors during disposal + catch (Exception) when (IsTransportCancellationRequested) { } // slopwatch-ignore: SW003 expected errors during disposal + + await DisposeStreamAsync().ConfigureAwait(false); + CloseSocket(); + DisposeCancellationSource(); + } + + private bool IsTransportCancellationRequested => Volatile.Read(ref _transportCancellationRequested) == 1; + + private void CancelTransport() + { + if (Interlocked.Exchange(ref _transportCancellationRequested, 1) != 0) + return; + + try + { + _cts.Cancel(); + } + catch (ObjectDisposedException) + { + // The CTS was already disposed by another shutdown path. + return; + } + } + + private async Task CompleteOutputWriterAsync() + { + if (Interlocked.CompareExchange(ref _outputWriterCompleted, 1, 0) != 0) + return; + + try + { + await _outputPipe.Writer.CompleteAsync().ConfigureAwait(false); + } + catch (InvalidOperationException) + { + // Another shutdown path already completed the writer. + return; + } + } + + private void TryCompleteOutputWriter() + { + if (Interlocked.CompareExchange(ref _outputWriterCompleted, 1, 0) != 0) + return; + + try + { + _outputPipe.Writer.Complete(); + } + catch (InvalidOperationException) + { + // Another shutdown path already completed the writer. + return; + } + } + + private void TryShutdownSend() + { + if (Interlocked.CompareExchange(ref _sendShutdownApplied, 1, 0) != 0) + return; + + try + { + _socket.Shutdown(SocketShutdown.Send); + } + catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 socket may already be disposed + catch (SocketException) { } // slopwatch-ignore: SW003 socket may already be closed by peer or abort + } + + private async Task DisposeStreamAsync() + { + if (Interlocked.CompareExchange(ref _streamDisposed, 1, 0) != 0) + return; + + try + { + await _stream.DisposeAsync().ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + // Another shutdown path already disposed the stream. + return; + } + } + + private void DisposeStream() + { + if (Interlocked.CompareExchange(ref _streamDisposed, 1, 0) != 0) + return; + + try + { + _stream.Dispose(); + } + catch (ObjectDisposedException) + { + // Another shutdown path already disposed the stream. + return; + } + } + + private void CloseSocket(bool abortive = false) + { + if (Interlocked.CompareExchange(ref _socketClosed, 1, 0) != 0) + return; + + try + { + if (abortive) + _socket.LingerState = new LingerOption(true, 0); + + _socket.Close(); + } + catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 socket may already be disposed + catch (SocketException) { } // slopwatch-ignore: SW003 socket may already be closed + } + + private void DisposeCancellationSource() + { + if (Interlocked.CompareExchange(ref _ctsDisposed, 1, 0) != 0) + return; - await _stream.DisposeAsync().ConfigureAwait(false); - _socket.Dispose(); _cts.Dispose(); }