diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index afc6e038e32..3abb2fd86ed 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -3977,10 +3977,9 @@ namespace Akka.IO protected override void PostStop() { } protected override bool Receive(object message) { } } - public class Tcp : Akka.Actor.ExtensionIdProvider + public sealed class Tcp : Akka.Actor.ExtensionIdProvider { public static readonly Akka.Actor.SupervisorStrategy ConnectionSupervisorStrategy; - public static readonly Akka.IO.Tcp Instance; public Tcp() { } public override Akka.IO.TcpExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { } public static Akka.Actor.IActorRef Manager(Akka.Actor.ActorSystem system) { } @@ -3989,7 +3988,7 @@ namespace Akka.IO public static readonly Akka.IO.Tcp.Abort Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Aborted : Akka.IO.Tcp.ConnectionClosed + public sealed class Aborted : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Aborted Instance; public override bool IsAborted { get; } @@ -4004,13 +4003,13 @@ namespace Akka.IO public bool PullMode { get; } public override string ToString() { } } - public class Bound : Akka.IO.Tcp.Event + public sealed class Bound : Akka.IO.Tcp.Event { public Bound(System.Net.EndPoint localAddress) { } public System.Net.EndPoint LocalAddress { get; } public override string ToString() { } } - public class Close : Akka.IO.Tcp.CloseCommand + public sealed class Close : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.Close Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } @@ -4020,7 +4019,7 @@ namespace Akka.IO protected CloseCommand() { } public abstract Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Closed : Akka.IO.Tcp.ConnectionClosed + public sealed class Closed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Closed Instance; } @@ -4031,6 +4030,7 @@ namespace Akka.IO } public sealed class CommandFailed : Akka.IO.Tcp.Event { + public CommandFailed(Akka.IO.Tcp.Command cmd, Akka.Util.Option ex) { } public CommandFailed(Akka.IO.Tcp.Command cmd) { } public Akka.Util.Option Cause { get; } [Akka.Annotations.InternalApiAttribute()] @@ -4040,7 +4040,7 @@ namespace Akka.IO [Akka.Annotations.InternalApiAttribute()] public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { } } - public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable + public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable { public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { } public Akka.IO.Tcp.SimpleWriteCommand Head { get; } @@ -4048,17 +4048,17 @@ namespace Akka.IO public System.Collections.Generic.IEnumerator GetEnumerator() { } public override string ToString() { } } - public class ConfirmedClose : Akka.IO.Tcp.CloseCommand + public sealed class ConfirmedClose : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.ConfirmedClose Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.ConfirmedClosed Instance; public override bool IsConfirmed { get; } } - public class Connect : Akka.IO.Tcp.Command + public sealed class Connect : Akka.IO.Tcp.Command { public Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress = null, System.Collections.Generic.IEnumerable options = null, System.Nullable timeout = null, bool pullMode = False) { } public System.Net.EndPoint LocalAddress { get; } @@ -4078,12 +4078,14 @@ namespace Akka.IO public class ConnectionClosed : Akka.IO.Tcp.Event, Akka.Event.IDeadLetterSuppression { public ConnectionClosed() { } + [System.Runtime.CompilerServices.NullableAttribute(2)] public virtual string Cause { get; } public virtual bool IsAborted { get; } public virtual bool IsConfirmed { get; } public virtual bool IsErrorClosed { get; } public virtual bool IsPeerClosed { get; } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ErrorClosed : Akka.IO.Tcp.ConnectionClosed { public ErrorClosed(string cause) { } @@ -4106,7 +4108,7 @@ namespace Akka.IO public object Token { get; } public override string ToString() { } } - public class PeerClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class PeerClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.PeerClosed Instance; public override bool IsPeerClosed { get; } @@ -4125,17 +4127,17 @@ namespace Akka.IO public bool UseResumeWriting { get; } public override string ToString() { } } - public class ResumeAccepting : Akka.IO.Tcp.Command + public sealed class ResumeAccepting : Akka.IO.Tcp.Command { public ResumeAccepting(int batchSize) { } public int BatchSize { get; } public override string ToString() { } } - public class ResumeReading : Akka.IO.Tcp.Command + public sealed class ResumeReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeReading Instance; } - public class ResumeWriting : Akka.IO.Tcp.Command + public sealed class ResumeWriting : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeWriting Instance; } @@ -4146,7 +4148,7 @@ namespace Akka.IO public bool WantsAck { get; } public Akka.IO.Tcp.CompoundWrite Append(Akka.IO.Tcp.WriteCommand that) { } } - public class SuspendReading : Akka.IO.Tcp.Command + public sealed class SuspendReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.SuspendReading Instance; } @@ -4154,11 +4156,11 @@ namespace Akka.IO { public static readonly Akka.IO.Tcp.Unbind Instance; } - public class Unbound : Akka.IO.Tcp.Event + public sealed class Unbound : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.Unbound Instance; } - public class Write : Akka.IO.Tcp.SimpleWriteCommand + public sealed class Write : Akka.IO.Tcp.SimpleWriteCommand { public static readonly Akka.IO.Tcp.Write Empty; public override Akka.IO.Tcp.Event Ack { get; } @@ -4175,7 +4177,7 @@ namespace Akka.IO public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { } public Akka.IO.Tcp.WriteCommand Prepend(System.Collections.Generic.IEnumerable writes) { } } - public class WritingResumed : Akka.IO.Tcp.Event + public sealed class WritingResumed : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.WritingResumed Instance; } @@ -4191,15 +4193,14 @@ namespace Akka.IO { public static Akka.Actor.IActorRef Tcp(this Akka.Actor.ActorSystem system) { } } - public class TcpMessage + public class static TcpMessage { - public TcpMessage() { } public static Akka.IO.Tcp.Command Abort() { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog, System.Collections.Generic.IEnumerable options, bool pullMode) { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog) { } public static Akka.IO.Tcp.Command Close() { } public static Akka.IO.Tcp.Command ConfirmedClose() { } - public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } + public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, [System.Runtime.CompilerServices.NullableAttribute(2)] System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress) { } public static Akka.IO.Tcp.NoAck NoAck(object token = null) { } public static Akka.IO.Tcp.Command Register(Akka.Actor.IActorRef handler, bool keepOpenOnPeerClosed = False, bool useResumeWriting = True) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 895d2bd5d76..284afaaade0 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3967,10 +3967,9 @@ namespace Akka.IO protected override void PostStop() { } protected override bool Receive(object message) { } } - public class Tcp : Akka.Actor.ExtensionIdProvider + public sealed class Tcp : Akka.Actor.ExtensionIdProvider { public static readonly Akka.Actor.SupervisorStrategy ConnectionSupervisorStrategy; - public static readonly Akka.IO.Tcp Instance; public Tcp() { } public override Akka.IO.TcpExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { } public static Akka.Actor.IActorRef Manager(Akka.Actor.ActorSystem system) { } @@ -3979,7 +3978,7 @@ namespace Akka.IO public static readonly Akka.IO.Tcp.Abort Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Aborted : Akka.IO.Tcp.ConnectionClosed + public sealed class Aborted : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Aborted Instance; public override bool IsAborted { get; } @@ -3994,13 +3993,13 @@ namespace Akka.IO public bool PullMode { get; } public override string ToString() { } } - public class Bound : Akka.IO.Tcp.Event + public sealed class Bound : Akka.IO.Tcp.Event { public Bound(System.Net.EndPoint localAddress) { } public System.Net.EndPoint LocalAddress { get; } public override string ToString() { } } - public class Close : Akka.IO.Tcp.CloseCommand + public sealed class Close : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.Close Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } @@ -4010,7 +4009,7 @@ namespace Akka.IO protected CloseCommand() { } public abstract Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Closed : Akka.IO.Tcp.ConnectionClosed + public sealed class Closed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Closed Instance; } @@ -4021,6 +4020,7 @@ namespace Akka.IO } public sealed class CommandFailed : Akka.IO.Tcp.Event { + public CommandFailed(Akka.IO.Tcp.Command cmd, Akka.Util.Option ex) { } public CommandFailed(Akka.IO.Tcp.Command cmd) { } public Akka.Util.Option Cause { get; } [Akka.Annotations.InternalApiAttribute()] @@ -4030,7 +4030,7 @@ namespace Akka.IO [Akka.Annotations.InternalApiAttribute()] public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { } } - public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable + public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable { public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { } public Akka.IO.Tcp.SimpleWriteCommand Head { get; } @@ -4038,17 +4038,17 @@ namespace Akka.IO public System.Collections.Generic.IEnumerator GetEnumerator() { } public override string ToString() { } } - public class ConfirmedClose : Akka.IO.Tcp.CloseCommand + public sealed class ConfirmedClose : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.ConfirmedClose Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.ConfirmedClosed Instance; public override bool IsConfirmed { get; } } - public class Connect : Akka.IO.Tcp.Command + public sealed class Connect : Akka.IO.Tcp.Command { public Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress = null, System.Collections.Generic.IEnumerable options = null, System.Nullable timeout = null, bool pullMode = False) { } public System.Net.EndPoint LocalAddress { get; } @@ -4068,12 +4068,14 @@ namespace Akka.IO public class ConnectionClosed : Akka.IO.Tcp.Event, Akka.Event.IDeadLetterSuppression { public ConnectionClosed() { } + [System.Runtime.CompilerServices.NullableAttribute(2)] public virtual string Cause { get; } public virtual bool IsAborted { get; } public virtual bool IsConfirmed { get; } public virtual bool IsErrorClosed { get; } public virtual bool IsPeerClosed { get; } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ErrorClosed : Akka.IO.Tcp.ConnectionClosed { public ErrorClosed(string cause) { } @@ -4096,7 +4098,7 @@ namespace Akka.IO public object Token { get; } public override string ToString() { } } - public class PeerClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class PeerClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.PeerClosed Instance; public override bool IsPeerClosed { get; } @@ -4115,17 +4117,17 @@ namespace Akka.IO public bool UseResumeWriting { get; } public override string ToString() { } } - public class ResumeAccepting : Akka.IO.Tcp.Command + public sealed class ResumeAccepting : Akka.IO.Tcp.Command { public ResumeAccepting(int batchSize) { } public int BatchSize { get; } public override string ToString() { } } - public class ResumeReading : Akka.IO.Tcp.Command + public sealed class ResumeReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeReading Instance; } - public class ResumeWriting : Akka.IO.Tcp.Command + public sealed class ResumeWriting : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeWriting Instance; } @@ -4136,7 +4138,7 @@ namespace Akka.IO public bool WantsAck { get; } public Akka.IO.Tcp.CompoundWrite Append(Akka.IO.Tcp.WriteCommand that) { } } - public class SuspendReading : Akka.IO.Tcp.Command + public sealed class SuspendReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.SuspendReading Instance; } @@ -4144,11 +4146,11 @@ namespace Akka.IO { public static readonly Akka.IO.Tcp.Unbind Instance; } - public class Unbound : Akka.IO.Tcp.Event + public sealed class Unbound : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.Unbound Instance; } - public class Write : Akka.IO.Tcp.SimpleWriteCommand + public sealed class Write : Akka.IO.Tcp.SimpleWriteCommand { public static readonly Akka.IO.Tcp.Write Empty; public override Akka.IO.Tcp.Event Ack { get; } @@ -4165,7 +4167,7 @@ namespace Akka.IO public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { } public Akka.IO.Tcp.WriteCommand Prepend(System.Collections.Generic.IEnumerable writes) { } } - public class WritingResumed : Akka.IO.Tcp.Event + public sealed class WritingResumed : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.WritingResumed Instance; } @@ -4181,15 +4183,14 @@ namespace Akka.IO { public static Akka.Actor.IActorRef Tcp(this Akka.Actor.ActorSystem system) { } } - public class TcpMessage + public class static TcpMessage { - public TcpMessage() { } public static Akka.IO.Tcp.Command Abort() { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog, System.Collections.Generic.IEnumerable options, bool pullMode) { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog) { } public static Akka.IO.Tcp.Command Close() { } public static Akka.IO.Tcp.Command ConfirmedClose() { } - public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } + public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, [System.Runtime.CompilerServices.NullableAttribute(2)] System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress) { } public static Akka.IO.Tcp.NoAck NoAck(object token = null) { } public static Akka.IO.Tcp.Command Register(Akka.Actor.IActorRef handler, bool keepOpenOnPeerClosed = False, bool useResumeWriting = True) { } diff --git a/src/core/Akka.Tests/IO/TcpListenerSpec.cs b/src/core/Akka.Tests/IO/TcpListenerSpec.cs index db15a98bbf7..a6b845ad47d 100644 --- a/src/core/Akka.Tests/IO/TcpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/TcpListenerSpec.cs @@ -143,7 +143,7 @@ public ListenerParent(TestSetup test, bool pullMode) _listener = Context.ActorOf(Props.Create(() => new TcpListener( - Tcp.Instance.Apply(Context.System), + Tcp.For(Context.System), test._bindCommander.Ref, new Tcp.Bind( _test._handler.Ref, diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index a5e2cac3e13..94cbc3c5831 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -23,28 +23,24 @@ namespace Akka.IO /// /// The set of TCP capabilities for Akka.IO are exposed via this extension. /// - public class Tcp : ExtensionIdProvider + public sealed class Tcp : ExtensionIdProvider { + // TODO: refactor this in v1.6 to use a `.For` method with the correct ExtensionId provider setup + private static readonly Tcp PluginInstance = new(); + /// - /// TBD - /// - public static readonly Tcp Instance = new(); - - /// - /// TBD + /// Fetches the TCP manager actor for the given actor system. /// - /// TBD - /// TBD public static IActorRef Manager(ActorSystem system) { - return Instance.Apply(system).Manager; + return PluginInstance.Apply(system).Manager; } - - /// - /// TBD - /// - /// TBD - /// TBD + + internal static TcpExt For(ActorSystem system) + { + return PluginInstance.Apply(system); + } + public override TcpExt CreateExtension(ExtendedActorSystem system) { return new TcpExt(system); @@ -82,32 +78,24 @@ private SocketConnected() { } #endregion /// - /// TBD + /// Akka.IO Tcp messages are all derived from this class. /// public class Message : INoSerializationVerificationNeeded { } #region user commands // COMMANDS + /// - /// TBD + /// All Akka.IO.Tcp commands inherit from this class. /// public abstract class Command : Message { - private readonly CommandFailed _failureMessage; - - /// - /// TBD - /// - protected Command() - { - _failureMessage = new CommandFailed(this); - } - /// - /// TBD + /// A predefined failure message which can be used to indicate that a command + /// failed during processing. /// - public CommandFailed FailureMessage => _failureMessage; + public CommandFailed FailureMessage => new CommandFailed(this); } /// @@ -116,16 +104,17 @@ protected Command() /// or the actor handling the new connection replies with a /// message. /// - public class Connect : Command + public sealed class Connect : Command { + /// - /// TBD + /// Connect to a remote TCP endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + /// The remote endpoint + /// An optional local endpoint address to bind to. Most users don't specify this. + /// A set of socket options. + /// An optional connect timeout. Will result in a message being returned if we exceed this value. + /// Specifies whether we're running in "pull mode" or not. public Connect(EndPoint remoteAddress, EndPoint localAddress = null, IEnumerable options = null, @@ -134,30 +123,18 @@ public Connect(EndPoint remoteAddress, { RemoteAddress = remoteAddress; LocalAddress = localAddress; - Options = options ?? Enumerable.Empty(); + Options = options ?? []; Timeout = timeout; PullMode = pullMode; } - - /// - /// TBD - /// + public EndPoint RemoteAddress { get; } - /// - /// TBD - /// + public EndPoint LocalAddress { get; } - /// - /// TBD - /// + public IEnumerable Options { get; } - /// - /// TBD - /// + public TimeSpan? Timeout { get; } - /// - /// TBD - /// public bool PullMode { get; } public override string ToString() => @@ -175,13 +152,13 @@ public override string ToString() => public class Bind : Command { /// - /// TBD + /// Bind a TCP listener to a local endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP listener. + /// The local endpoint we are binding to. + /// TCP backlog - the number of pending connections that the queue will hold. + /// A set of socket options. + /// Specifies whether we're running in "pull mode" or not. public Bind(IActorRef handler, EndPoint localAddress, int backlog = 100, @@ -191,29 +168,18 @@ public Bind(IActorRef handler, Handler = handler; LocalAddress = localAddress; Backlog = backlog; - Options = options ?? Enumerable.Empty(); + Options = options ?? []; PullMode = pullMode; } - /// - /// TBD - /// public IActorRef Handler { get; } - /// - /// TBD - /// + public EndPoint LocalAddress { get; } - /// - /// TBD - /// + public int Backlog { get; } - /// - /// TBD - /// + public IEnumerable Options { get; } - /// - /// TBD - /// + public bool PullMode { get; } public override string ToString() => @@ -229,11 +195,11 @@ public override string ToString() => public class Register : Command { /// - /// TBD + /// Registers an actor to handle an outgoing or incoming TCP connection that has been established. /// - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP communication. + /// Keep the connection open if the peer is closed + /// Use resume / pause writing semantics once buffer gets full public Register(IActorRef handler, bool keepOpenOnPeerClosed = false, bool useResumeWriting = true) { Handler = handler; @@ -241,17 +207,11 @@ public Register(IActorRef handler, bool keepOpenOnPeerClosed = false, bool useRe UseResumeWriting = useResumeWriting; } - /// - /// TBD - /// + public IActorRef Handler { get; } - /// - /// TBD - /// + public bool KeepOpenOnPeerClosed { get; } - /// - /// TBD - /// + public bool UseResumeWriting { get; } public override string ToString() => @@ -265,9 +225,6 @@ public override string ToString() => /// public class Unbind : Command { - /// - /// TBD - /// public static readonly Unbind Instance = new(); private Unbind() @@ -280,7 +237,7 @@ private Unbind() public abstract class CloseCommand : Command, IDeadLetterSuppression { /// - /// TBD + /// The event to return in response to this command /// public abstract ConnectionClosed Event { get; } } @@ -291,20 +248,14 @@ public abstract class CloseCommand : Command, IDeadLetterSuppression /// data will both be notified once the socket is closed using a /// message. /// - public class Close : CloseCommand + public sealed class Close : CloseCommand { - /// - /// TBD - /// public static readonly Close Instance = new(); private Close() { } - - /// - /// TBD - /// + public override ConnectionClosed Event => Closed.Instance; } @@ -314,20 +265,14 @@ private Close() /// command and the registered handler for incoming data will both be notified /// once the socket is closed using a message. /// - public class ConfirmedClose : CloseCommand + public sealed class ConfirmedClose : CloseCommand { - /// - /// TBD - /// public static readonly ConfirmedClose Instance = new(); private ConfirmedClose() { } - - /// - /// TBD - /// + public override ConnectionClosed Event => ConfirmedClosed.Instance; } @@ -340,18 +285,12 @@ private ConfirmedClose() /// public class Abort : CloseCommand { - /// - /// TBD - /// public static readonly Abort Instance = new(); private Abort() { } - - /// - /// TBD - /// + public override ConnectionClosed Event => Aborted.Instance; } @@ -363,22 +302,15 @@ private Abort() /// public class NoAck : Event { - /// - /// TBD - /// public static readonly NoAck Instance = new(null); - - /// - /// TBD - /// - /// TBD + public NoAck(object token) { Token = token; } /// - /// TBD + /// A correlation id which can be used to identify a specific write operation. /// public object Token { get; } @@ -387,56 +319,44 @@ public override string ToString() => } /// - /// TBD + /// All write commands inherit from this class. /// public abstract class WriteCommand : Command { /// - /// TBD + /// Prepend another write before this one. /// - /// TBD - /// TBD + /// The other write to prepend + /// A compound write consisting of multiple byte buffers of non-contiguous memory public CompoundWrite Prepend(SimpleWriteCommand other) { return new CompoundWrite(other, this); } /// - /// TBD + /// Prepend a group of writes before this one. /// - /// TBD - /// TBD + /// The set of writes that will preceed this one. + /// A compound write consisting of multiple byte buffers of non-contiguous memory public WriteCommand Prepend(IEnumerable writes) { return writes.Reverse().Aggregate(this, (b, a) => { - var simple = a as SimpleWriteCommand; - if (simple != null) - return b.Prepend(simple); - - var compound = a as CompoundWrite; - if (compound != null) - return b.Prepend(compound); - - throw new ArgumentException("The supplied WriteCommand is invalid. Only SimpleWriteCommand and CompoundWrite WriteCommands are supported."); + return a switch + { + SimpleWriteCommand simple => b.Prepend(simple), + CompoundWrite compound => b.Prepend(compound), + _ => throw new ArgumentException( + "The supplied WriteCommand is invalid. Only SimpleWriteCommand and CompoundWrite WriteCommands are supported.") + }; }); } - - /// - /// TBD - /// - /// TBD - /// TBD + public static WriteCommand Create(IEnumerable writes) { return Write.Empty.Prepend(writes); } - - /// - /// TBD - /// - /// TBD - /// TBD + public static WriteCommand Create(params WriteCommand[] writes) { return Create((IEnumerable)writes); @@ -444,25 +364,25 @@ public static WriteCommand Create(params WriteCommand[] writes) } /// - /// TBD + /// A non-compounded write /// public abstract class SimpleWriteCommand : WriteCommand { /// - /// TBD + /// An optional acknowledgment event which will be sent to the sender of this command /// public abstract Event Ack { get; } /// - /// TBD + /// Indicates whether this message needs to be ACK'd to the handler. /// - public bool WantsAck => !(Ack is NoAck); + public bool WantsAck => Ack is not NoAck; /// - /// TBD + /// Appends a write after this one. /// - /// TBD - /// TBD + /// The next write to append. + /// A compound write of non-contiguous memory. public CompoundWrite Append(WriteCommand that) { return that.Prepend(this); @@ -479,20 +399,20 @@ public CompoundWrite Append(WriteCommand that) /// or have been sent! Unfortunately there is no way to determine whether /// a particular write has been sent by the O/S. /// - public class Write : SimpleWriteCommand + public sealed class Write : SimpleWriteCommand { /// - /// TBD + /// Write with no data and /// public static readonly Write Empty = new(ByteString.Empty, NoAck.Instance); /// - /// TBD + /// The data we are going to write. /// public ByteString Data { get; } /// - /// TBD + /// The optional acknowledgment event which will be sent to the sender of this command. /// public override Event Ack { get; } @@ -506,83 +426,25 @@ public override string ToString() => $"Write(bytes: {Data.Count}, ack: {Ack})"; /// - /// TBD + /// Creates a write from a /// - /// TBD - /// TBD + /// The data to return. public static Write Create(ByteString data) { return data.IsEmpty ? Empty : new Write(data, NoAck.Instance); } /// - /// TBD + /// Creates a write from a /// - /// TBD - /// TbD - /// TBD + /// The data to return. + /// The acknowledgement message we're receive once this write is complete. public static Write Create(ByteString data, Event ack) { return new Write(data, ack); } } - - /* - /// - /// Write `count` bytes starting at `position` from file at `filePath` to the connection. - /// The count must be > 0. The connection actor will reply with a - /// message if the write could not be enqueued. If - /// returns true, the connection actor will reply with the supplied - /// token once the write has been successfully enqueued to the O/S kernel. - /// Note that this does not in any way guarantee that the data will be - /// or have been sent! Unfortunately there is no way to determine whether - /// a particular write has been sent by the O/S. - /// - public class WriteFile : SimpleWriteCommand - { - private readonly Event _ack; - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - public WriteFile(string filePath, long position, long count, Event ack) - { - if (position < 0) throw new ArgumentException("WriteFile.position must be >= 0", nameof(position)); - if (count <= 0) throw new ArgumentException("WriteFile.count must be > 0", nameof(count)); - - _ack = ack; - FilePath = filePath; - Position = position; - Count = count; - } - - /// - /// TBD - /// - public string FilePath { get; } - /// - /// TBD - /// - public long Position { get; } - /// - /// TBD - /// - public long Count { get; } - - /// - /// TBD - /// - public override Event Ack => _ack; - - public override string ToString() => - $"WriteFile(path: {FilePath}, position: {Position}, count: {Count}, ack: {Ack})"; - } - */ + /// /// A write command which aggregates two other write commands. Using this construct /// you can chain a number of commands together in a way @@ -591,26 +453,14 @@ public override string ToString() => /// If the sub commands contain `ack` requests they will be honored as soon as the /// respective write has been written completely. /// - public class CompoundWrite : WriteCommand, IEnumerable + public sealed class CompoundWrite : WriteCommand, IEnumerable { - private readonly SimpleWriteCommand _head; - private readonly WriteCommand _tailCommand; - - /// - /// TBD - /// - /// TBD - /// TBD public CompoundWrite(SimpleWriteCommand head, WriteCommand tailCommand) { - _head = head; - _tailCommand = tailCommand; + Head = head; + TailCommand = tailCommand; } - - /// - /// TBD - /// - /// TBD + public IEnumerator GetEnumerator() { return Enumerable().GetEnumerator(); @@ -626,31 +476,21 @@ private IEnumerable Enumerable() WriteCommand current = this; while (current != null) { - var compound = current as CompoundWrite; - if (compound != null) + if (current is CompoundWrite compound) { current = compound.TailCommand; yield return compound.Head; } - var simple = current as SimpleWriteCommand; - if (simple != null) - { - current = null; - yield return simple; - } + if (current is not SimpleWriteCommand simple) continue; + current = null; + yield return simple; } } + + public SimpleWriteCommand Head { get; } - /// - /// TBD - /// - public SimpleWriteCommand Head => _head; - - /// - /// TBD - /// - public WriteCommand TailCommand => _tailCommand; + public WriteCommand TailCommand { get; } public override string ToString() => $"CompoundWrite({Head}, {TailCommand})"; @@ -663,11 +503,8 @@ public override string ToString() => /// connection actor between the first and subsequent reception of /// this message will also be rejected with . /// - public class ResumeWriting : Command + public sealed class ResumeWriting : Command { - /// - /// TBD - /// public static readonly ResumeWriting Instance = new(); private ResumeWriting() @@ -680,11 +517,8 @@ private ResumeWriting() /// socket. TCP flow-control will then propagate backpressure to the sender side /// as buffers fill up on either end. To re-enable reading send . /// - public class SuspendReading : Command + public sealed class SuspendReading : Command { - /// - /// TBD - /// public static readonly SuspendReading Instance = new(); private SuspendReading() @@ -696,11 +530,8 @@ private SuspendReading() /// This command needs to be sent to the connection actor after a /// command in order to resume reading from the socket. /// - public class ResumeReading : Command + public sealed class ResumeReading : Command { - /// - /// TBD - /// public static readonly ResumeReading Instance = new(); private ResumeReading() @@ -712,24 +543,20 @@ private ResumeReading() /// This message enables the accepting of the next connection if read throttling is enabled /// for connection actors. /// - public class ResumeAccepting : Command + public sealed class ResumeAccepting : Command { /// - /// TBD + /// The number of connections to accept before resuming read throttling. /// public int BatchSize { get; } - - /// - /// TBD - /// - /// TBD + public ResumeAccepting(int batchSize) { BatchSize = batchSize; } public override string ToString() => - $"ResumeAccepting(batchSize: {BatchSize})"; + $"ResumeAccepting(BatchSize: {BatchSize})"; } #endregion @@ -750,18 +577,11 @@ public class Event : Message /// public sealed class Received : Event { - /// - /// TBD - /// - /// TBD public Received(ByteString data) { Data = data; } - - /// - /// TBD - /// + public ByteString Data { get; } public override string ToString() => @@ -776,11 +596,6 @@ public override string ToString() => /// public sealed class Connected : Event { - /// - /// TBD - /// - /// TBD - /// TBD public Connected(EndPoint remoteAddress, EndPoint localAddress) { RemoteAddress = remoteAddress; @@ -788,11 +603,12 @@ public Connected(EndPoint remoteAddress, EndPoint localAddress) } /// - /// TBD + /// The remote endpoint of the connection. /// public EndPoint RemoteAddress { get; } + /// - /// TBD + /// The local endpoint of the connection. /// public EndPoint LocalAddress { get; } @@ -806,21 +622,25 @@ public override string ToString() => /// public sealed class CommandFailed : Event { - /// - /// TBD - /// - /// TBD - public CommandFailed(Command cmd) => Cmd = cmd; + public CommandFailed(Command cmd, Option ex) + { + Cmd = cmd; + Cause = ex; + } + + public CommandFailed(Command cmd) : this(cmd, Option.None) + { + } /// - /// TBD + /// The original command which failed. /// public Command Cmd { get; } /// /// Optionally contains the cause why the command failed. /// - public Option Cause { get; private set; } = Option.None; + public Option Cause { get; } /// /// Creates a copy of this object with a new cause set. @@ -829,11 +649,11 @@ public sealed class CommandFailed : Event public CommandFailed WithCause(Exception cause) { // Needs to be added with a mutable property for compatibility reasons - return new CommandFailed(Cmd) { Cause = cause }; + return new CommandFailed(Cmd, cause); } [InternalApi] - public string CauseString => Cause.HasValue ? $" because of {Cause.Value.Message}" : ""; + public string CauseString => Cause.HasValue ? $" because of {Cause.Value.Message}" : string.Empty; public override string ToString() => $"CommandFailed({Cmd}){CauseString}"; } @@ -845,11 +665,8 @@ public CommandFailed WithCause(Exception cause) /// the first message have been enqueued to the O/S kernel at this /// point. /// - public class WritingResumed : Event + public sealed class WritingResumed : Event { - /// - /// TBD - /// public static readonly WritingResumed Instance = new(); private WritingResumed() @@ -862,7 +679,7 @@ private WritingResumed() /// in this form. If the bind address indicated a 0 port number, then the contained /// `localAddress` can be used to find out which port was automatically assigned. /// - public class Bound : Event + public sealed class Bound : Event { /// /// The local listening endpoint of the bound socket. @@ -886,7 +703,7 @@ public override string ToString() => /// The sender of an command will receive confirmation through this /// message once the listening socket has been closed. /// - public class Unbound : Event + public sealed class Unbound : Event { /// /// Singleton instance @@ -905,39 +722,36 @@ private Unbound() public class ConnectionClosed : Event, IDeadLetterSuppression { /// - /// TBD + /// Was the connection closed normally? /// public virtual bool IsAborted => false; /// - /// TBD + /// Can we confirm that the connection was open in the first place? /// public virtual bool IsConfirmed => false; /// - /// TBD + /// Is our remote peer closed too? /// public virtual bool IsPeerClosed => false; /// - /// TBD + /// Did the connection close due to an IO error? /// public virtual bool IsErrorClosed => false; /// - /// TBD + /// Was there a given cause for why the connection was closed? /// - public virtual string Cause => null; + public virtual string? Cause => null; } /// /// The connection has been closed normally in response to a command. /// - public class Closed : ConnectionClosed + public sealed class Closed : ConnectionClosed { - /// - /// TBD - /// public static readonly Closed Instance = new(); private Closed() @@ -948,20 +762,14 @@ private Closed() /// /// The connection has been aborted in response to an command. /// - public class Aborted : ConnectionClosed + public sealed class Aborted : ConnectionClosed { - /// - /// TBD - /// public static readonly Aborted Instance = new(); private Aborted() { } - - /// - /// TBD - /// + public override bool IsAborted => true; } @@ -969,40 +777,28 @@ private Aborted() /// The connection has been half-closed by us and then half-close by the peer /// in response to a command. /// - public class ConfirmedClosed : ConnectionClosed + public sealed class ConfirmedClosed : ConnectionClosed { - /// - /// TBD - /// public static readonly ConfirmedClosed Instance = new(); private ConfirmedClosed() { } - - /// - /// TBD - /// + public override bool IsConfirmed => true; } /// /// The peer has closed its writing half of the connection. /// - public class PeerClosed : ConnectionClosed + public sealed class PeerClosed : ConnectionClosed { - /// - /// TBD - /// public static readonly PeerClosed Instance = new(); private PeerClosed() { } - - /// - /// TBD - /// + public override bool IsPeerClosed => true; } @@ -1011,25 +807,14 @@ private PeerClosed() /// public sealed class ErrorClosed : ConnectionClosed { - /// - /// TBD - /// - /// TBD - public ErrorClosed(string cause) + public ErrorClosed(string? cause) { Cause = cause; } - /// - /// TBD - /// public override bool IsErrorClosed => true; - - /// - /// TBD - /// - /// TBD - public override string Cause { get; } + + public override string? Cause { get; } public override string ToString() => $"ErrorClosed('{Cause}')"; @@ -1037,7 +822,7 @@ public override string ToString() => #endregion - private class ConnectionSupervisorStrategyImp : OneForOneStrategy + private sealed class ConnectionSupervisorStrategyImp : OneForOneStrategy { public ConnectionSupervisorStrategyImp() : base(StoppingStrategy.Decider) @@ -1061,7 +846,7 @@ protected override void LogFailure(IActorContext context, IActorRef child, Excep } /// - /// TBD + /// Akka.IO TCP extension - provides an actor-based API for TCP socket communication. /// public sealed class TcpExt : IOExtension { @@ -1093,7 +878,7 @@ internal TcpExt(ExtendedActorSystem system, TcpSettings settings) public IBufferPool BufferPool { get; } /// - /// TBD + /// The settings used by this extension. /// public TcpSettings Settings { get; } @@ -1126,21 +911,20 @@ private IBufferPool CreateBufferPool(ExtendedActorSystem system, Config config) } /// - /// TBD + /// Helpers for generating TCP messages. /// - public class TcpMessage + public static class TcpMessage { /// - /// TBD + /// Connect to a remote TCP endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TDB - /// TBD + /// The remote endpoint + /// An optional local endpoint address to bind to. Most users don't specify this. + /// A set of socket options. + /// An optional connect timeout. Will result in a message being returned if we exceed this value. + /// Specifies whether we're running in "pull mode" or not. public static Tcp.Command Connect(EndPoint remoteAddress, - EndPoint localAddress, + EndPoint? localAddress, IEnumerable options, TimeSpan? timeout, bool pullMode) @@ -1149,24 +933,22 @@ public static Tcp.Command Connect(EndPoint remoteAddress, } /// - /// TBD + /// Connect to a remote TCP endpoint. /// - /// TBD - /// TBD + /// The remote endpoint public static Tcp.Command Connect(EndPoint remoteAddress) { - return Connect(remoteAddress, null, null, null, false); + return Connect(remoteAddress, null, [], null, false); } /// - /// TBD + /// Bind a TCP listener to a local endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP listener. + /// The local endpoint we are binding to. + /// TCP backlog - the number of pending connections that the queue will hold. + /// A set of socket options. + /// Specifies whether we're running in "pull mode" or not for all subsequent client connections. public static Tcp.Command Bind(IActorRef handler, EndPoint endpoint, int backlog, @@ -1177,24 +959,22 @@ public static Tcp.Command Bind(IActorRef handler, } /// - /// TBD + /// Bind a TCP listener to a local endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP listener. + /// The local endpoint we are binding to. + /// TCP backlog - the number of pending connections that the queue will hold. public static Tcp.Command Bind(IActorRef handler, EndPoint endpoint, int backlog) { return new Tcp.Bind(handler, endpoint, backlog); } /// - /// TBD + /// Registers an actor to handle an outgoing or incoming TCP connection that has been established. /// - /// TBD - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP communication. + /// Keep the connection open if the peer is closed + /// Use resume / pause writing semantics once buffer gets full public static Tcp.Command Register(IActorRef handler, bool keepOpenOnPeerClosed = false, bool useResumeWriting = true) { @@ -1202,36 +982,32 @@ public static Tcp.Command Register(IActorRef handler, bool keepOpenOnPeerClosed } /// - /// TBD + /// Unbinds a previously bound TCP listener. /// - /// TBD public static Tcp.Command Unbind() { return Tcp.Unbind.Instance; } /// - /// TBD + /// Closes an open TCP connection. /// - /// TBD public static Tcp.Command Close() { return Tcp.Close.Instance; } /// - /// TBD + /// Closes a confirmed-to-have-been-previously-running TCP connection. /// - /// TBD public static Tcp.Command ConfirmedClose() { return Tcp.ConfirmedClose.Instance; } /// - /// TBD + /// Aborts a TCP connection without flushing pending writes. /// - /// TBD public static Tcp.Command Abort() { return Tcp.Abort.Instance; @@ -1297,15 +1073,14 @@ public static Tcp.Command ResumeAccepting(int batchSize) } /// - /// TBD + /// Convenience methods for using the Akka.IO.Tcp extension. /// public static class TcpExtensions { /// - /// TBD + /// Returns the -specific instance for TCP connectivity. /// - /// TBD - /// TBD + /// The current actor system. public static IActorRef Tcp(this ActorSystem system) { return IO.Tcp.Manager(system); diff --git a/src/core/Akka/IO/TcpManager.cs b/src/core/Akka/IO/TcpManager.cs index b590d8618f8..efbb6ad3604 100644 --- a/src/core/Akka/IO/TcpManager.cs +++ b/src/core/Akka/IO/TcpManager.cs @@ -53,52 +53,42 @@ namespace Akka.IO internal sealed class TcpManager : ActorBase { private readonly TcpExt _tcp; - - /// - /// TBD - /// - /// TBD + public TcpManager(TcpExt tcp) { _tcp = tcp; Context.System.EventStream.Subscribe(Self, typeof(DeadLetter)); } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD protected override bool Receive(object message) { - var c = message as Connect; - if (c != null) - { - var commander = Sender; - Context.ActorOf(Props.Create(_tcp, commander, c).WithDeploy(Deploy.Local)); - return true; - } - var b = message as Bind; - if (b != null) - { - var commander = Sender; - Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local)); - return true; - } - var dl = message as DeadLetter; - if (dl != null) + switch (message) { - var completed = dl.Message as SocketCompleted; - if (completed != null) + case Connect c: + { + var commander = Sender; + Context.ActorOf(Props.Create(_tcp, commander, c).WithDeploy(Deploy.Local)); + return true; + } + case Bind b: + { + var commander = Sender; + Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local)); + return true; + } + case DeadLetter dl: { - //TODO: release resources? + if (dl.Message is SocketCompleted completed) + { + //TODO: release resources? + } + return true; } - return true; + default: + throw new ArgumentException($"The supplied message of type {message.GetType().Name} is invalid. Only Connect and Bind messages are supported. " + + $"If you are going to manage your connection state, you need to communicate with Tcp.Connected sender actor. " + + $"See more here: https://getakka.net/articles/networking/io.html", nameof(message)); } - throw new ArgumentException($"The supplied message of type {message.GetType().Name} is invalid. Only Connect and Bind messages are supported. " + - $"If you are going to manage your connection state, you need to communicate with Tcp.Connected sender actor. " + - $"See more here: https://getakka.net/articles/networking/io.html", nameof(message)); } } }