Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions src/core/Akka.Tests/IO/TcpConnectionBatchingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,76 @@ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer,
}
}

private sealed class TrackingDisposeStream : Stream
{
private readonly Stream _inner;
private int _disposeCount;
private int _disposeAsyncCount;

public TrackingDisposeStream(Stream inner)
{
_inner = inner;
}

public int DisposeCount => Volatile.Read(ref _disposeCount);
public int DisposeAsyncCount => Volatile.Read(ref _disposeAsyncCount);

public override bool CanRead => _inner.CanRead;
public override bool CanSeek => _inner.CanSeek;
public override bool CanWrite => _inner.CanWrite;
public override long Length => _inner.Length;

public override long Position
{
get => _inner.Position;
set => _inner.Position = value;
}

public override void Flush() => _inner.Flush();

public override Task FlushAsync(CancellationToken cancellationToken) =>
_inner.FlushAsync(cancellationToken);

public override int Read(byte[] buffer, int offset, int count) =>
_inner.Read(buffer, offset, count);

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_inner.ReadAsync(buffer, offset, count, cancellationToken);

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) =>
_inner.ReadAsync(buffer, cancellationToken);

public override long Seek(long offset, SeekOrigin origin) => _inner.Seek(offset, origin);

public override void SetLength(long value) => _inner.SetLength(value);

public override void Write(byte[] buffer, int offset, int count) =>
_inner.Write(buffer, offset, count);

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_inner.WriteAsync(buffer, offset, count, cancellationToken);

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) =>
_inner.WriteAsync(buffer, cancellationToken);

protected override void Dispose(bool disposing)
{
if (disposing)
{
Interlocked.Increment(ref _disposeCount);
_inner.Dispose();
}

base.Dispose(disposing);
}

public override async ValueTask DisposeAsync()
{
Interlocked.Increment(ref _disposeAsyncCount);
await _inner.DisposeAsync().ConfigureAwait(false);
}
}

public TcpConnectionBatchingSpec(ITestOutputHelper output)
: base(@"akka.loglevel = DEBUG
akka.io.tcp.trace-logging = true", output: output)
Expand Down Expand Up @@ -190,6 +260,97 @@ await AwaitAssertAsync(() =>
await ExpectTerminatedAsync(connection);
}

[Fact]
public async Task TcpConnection_should_finalize_transport_via_DisposeAsync_after_confirmed_close()
{
using var socketPair = await ConnectedSocketPair.CreateAsync();
var trackingStream = new TrackingDisposeStream(new NetworkStream(socketPair.Server, ownsSocket: false));
var bindHandler = CreateTestProbe();
var handler = CreateTestProbe();
var settings = TcpSettings.Create(Sys);

var connection = Sys.ActorOf(Props.Create(() => new TcpIncomingConnection(
settings,
socketPair.Server,
bindHandler.Ref,
Array.Empty<Inet.SocketOption>(),
false,
trackingStream)));

await bindHandler.ExpectMsgAsync<Tcp.Connected>();
bindHandler.Send(connection, new Tcp.Register(handler.Ref));
await WatchAsync(connection);

handler.Send(connection, Tcp.ConfirmedClose.Instance);
socketPair.Client.Shutdown(SocketShutdown.Send);

await handler.ExpectMsgAsync<Tcp.ConfirmedClosed>();
await ExpectTerminatedAsync(connection);

trackingStream.DisposeAsyncCount.Should().Be(1);
trackingStream.DisposeCount.Should().Be(0);
}

[Fact]
public async Task TcpConnection_should_finalize_transport_during_graceful_close()
{
using var socketPair = await ConnectedSocketPair.CreateAsync();
var trackingStream = new TrackingDisposeStream(new NetworkStream(socketPair.Server, ownsSocket: false));
var bindHandler = CreateTestProbe();
var handler = CreateTestProbe();
var settings = TcpSettings.Create(Sys);

var connection = Sys.ActorOf(Props.Create(() => new TcpIncomingConnection(
settings,
socketPair.Server,
bindHandler.Ref,
Array.Empty<Inet.SocketOption>(),
false,
trackingStream)));

await bindHandler.ExpectMsgAsync<Tcp.Connected>();
bindHandler.Send(connection, new Tcp.Register(handler.Ref));
await WatchAsync(connection);

handler.Send(connection, Tcp.Close.Instance);

await handler.ExpectMsgAsync<Tcp.Closed>();
await ExpectTerminatedAsync(connection);

trackingStream.DisposeAsyncCount.Should().Be(1);
trackingStream.DisposeCount.Should().Be(0);
}

[Fact]
public async Task TcpConnection_should_fall_back_to_abort_cleanup_when_stopped_unexpectedly()
{
using var socketPair = await ConnectedSocketPair.CreateAsync();
var trackingStream = new TrackingDisposeStream(new NetworkStream(socketPair.Server, ownsSocket: false));
var bindHandler = CreateTestProbe();
var handler = CreateTestProbe();
var settings = TcpSettings.Create(Sys);

var connection = Sys.ActorOf(Props.Create(() => new TcpIncomingConnection(
settings,
socketPair.Server,
bindHandler.Ref,
Array.Empty<Inet.SocketOption>(),
false,
trackingStream)));

await bindHandler.ExpectMsgAsync<Tcp.Connected>();
bindHandler.Send(connection, new Tcp.Register(handler.Ref));
await WatchAsync(connection);

handler.Send(connection, PoisonPill.Instance);

await handler.ExpectMsgAsync<Tcp.Aborted>();
await ExpectTerminatedAsync(connection);

trackingStream.DisposeAsyncCount.Should().Be(0);
trackingStream.DisposeCount.Should().BeGreaterThan(0);
}

private sealed class ConnectedSocketPair : IDisposable
{
private ConnectedSocketPair(Socket client, Socket server)
Expand Down
90 changes: 80 additions & 10 deletions src/core/Akka/IO/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,24 @@ private sealed class TransportOperationFailed : INoSerializationVerificationNeed
public TransportOperationFailed(Exception cause) { Cause = cause; }
}

/// <summary>
/// Self-tell: transport finalization completed successfully.
/// </summary>
private sealed class TransportFinalized : INoSerializationVerificationNeeded
{
public static readonly TransportFinalized Instance = new();
private TransportFinalized() { }
}

/// <summary>
/// Self-tell: transport finalization failed.
/// </summary>
private sealed class TransportFinalizeFailed : INoSerializationVerificationNeeded
{
public Exception Cause { get; }
public TransportFinalizeFailed(Exception cause) { Cause = cause; }
}

private sealed class CommanderDied : INoSerializationVerificationNeeded, IDeadLetterSuppression
{
public static readonly CommanderDied Instance = new();
Expand Down Expand Up @@ -231,6 +249,13 @@ private HandlerDied() { }
// this as Tcp.ErrorClosed instead of treating it as a clean EOF.
private bool _readPumpHasError;
private Exception? _readPumpError;

// True once the transport has been fully cleaned up via CloseAsync or DisposeAsync.
// When set, PostStop skips the abort fallback because there's nothing left to tear down.
private bool _transportFinalized;

// True while an async finalization task is in flight (used by ConfirmedClose).
private bool _transportFinalizing;
#endregion

private static readonly IOException DroppingWriteBecauseClosingException =
Expand Down Expand Up @@ -265,11 +290,13 @@ protected override void PostStop()

if (_transport != null)
{
// Abort cancels the CTS, sets linger=0, closes the socket.
// This unblocks any pending stream.ReadAsync/WriteAsync in the pump tasks.
// The pump tasks will exit with OperationCanceledException or IOException.
try { _transport.Abort(); }
catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 transport may already be disposed
if (!_transportFinalized)
{
// Abort is the emergency fallback for unexpected actor stops.
// Clean Close / ConfirmedClose paths finalize the transport before stopping.
try { _transport.Abort(); }
catch (ObjectDisposedException) { } // slopwatch-ignore: SW003 transport may already be disposed
}
}
else
{
Expand Down Expand Up @@ -486,8 +513,8 @@ private void ClosingBehaviour(IActorRef closeSender, ConnectionClosed closeEvent
if (closeEvent is ConfirmedClosed)
{
if (_traceLogging)
Log.Debug("Peer FIN received during ConfirmedClose - connection fully closed");
DoCloseConnection(closeSender, ConfirmedClosed.Instance);
Log.Debug("Peer FIN received during ConfirmedClose - waiting for transport finalization");
TryFinishClose(closeSender, closeEvent);
return;
}

Expand All @@ -511,11 +538,14 @@ private void ClosingBehaviour(IActorRef closeSender, ConnectionClosed closeEvent

if (_traceLogging)
Log.Debug("ConfirmedClose: FIN sent, waiting for peer FIN");

TryFinishClose(closeSender, closeEvent);
}
else
{
// For regular Close, transport is fully closed
// For regular Close, transport is fully closed and finalized.
_outputShutdown = true;
_transportFinalized = true;
TryFinishClose(closeSender, closeEvent);
}
});
Expand All @@ -531,6 +561,25 @@ private void ClosingBehaviour(IActorRef closeSender, ConnectionClosed closeEvent
Log.Debug("I/O task failed during close: {0}", msg.Cause.Message);
DoCloseConnection(closeSender, closeEvent);
});
Receive<TransportFinalized>(_ =>
{
_transportFinalizing = false;
_transportFinalized = true;

if (_traceLogging)
Log.Debug("Transport finalization completed");

DoCloseConnection(closeSender, closeEvent);
});
Receive<TransportFinalizeFailed>(msg =>
{
_transportFinalizing = false;

if (_traceLogging)
Log.Debug("Transport finalization failed: {0}", msg.Cause.Message);

DoCloseConnection(closeSender, closeEvent);
});
SuspendResumeHandlers();
Receive<HandlerDied>(_ =>
{
Expand All @@ -551,8 +600,11 @@ private void TryFinishClose(IActorRef closeSender, ConnectionClosed closeEvent)
{
if (closeEvent is ConfirmedClosed)
{
// For ConfirmedClose, we need to wait for peer FIN (StreamEof).
// The StreamEof handler calls DoCloseConnection directly.
// For ConfirmedClose, wait until our FIN has been sent and the peer's FIN has arrived.
// Once both sides are closed, finalize the transport asynchronously before stopping.
if (_outputShutdown && _peerClosed)
StartTransportFinalization();

return;
}

Expand All @@ -561,6 +613,24 @@ private void TryFinishClose(IActorRef closeSender, ConnectionClosed closeEvent)
DoCloseConnection(closeSender, closeEvent);
}

private void StartTransportFinalization()
{
if (_transport is null || _transportFinalized || _transportFinalizing)
return;

_transportFinalizing = true;
_transport.DisposeAsync().AsTask().ContinueWith(t =>
{
if (t.IsFaulted)
return (object)new TransportFinalizeFailed(t.Exception!.InnerException ?? t.Exception);

if (t.IsCanceled)
return (object)new TransportFinalizeFailed(new TaskCanceledException("Transport finalization was canceled"));

return TransportFinalized.Instance;
}, TaskContinuationOptions.ExecuteSynchronously).PipeTo(Self);
}

private void SuspendResumeHandlers()
{
Receive<ResumeReading>(_ =>
Expand Down
Loading
Loading