Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void EscalateFailure(Exception reason, object? message)
Parent.SendSystemMessage(System, failure);
}

public ValueTask InvokeSystemMessageAsync(object msg)
public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
{
try
{
Expand All @@ -305,6 +305,7 @@ public ValueTask InvokeSystemMessageAsync(object msg)
SuspendMailbox or ResumeMailbox => default,
Continuation cont => HandleContinuation(cont),
ProcessDiagnosticsRequest pdr => HandleProcessDiagnosticsRequest(pdr),
ReceiveTimeout _ => HandleReceiveTimeout(),
_ => HandleUnknownSystemMessage(msg)
};
}
Expand All @@ -315,6 +316,12 @@ public ValueTask InvokeSystemMessageAsync(object msg)
}
}

private async ValueTask HandleReceiveTimeout()
{
_messageOrEnvelope = Proto.ReceiveTimeout.Instance;
await DefaultReceive();
}

public ValueTask InvokeUserMessageAsync(object msg)
{
if (!System.Metrics.Enabled) return InternalInvokeUserMessageAsync(msg);
Expand Down Expand Up @@ -476,20 +483,22 @@ private async ValueTask HandleContinuation(Continuation cont)
// Don't execute the continuation if the actor instance changed.
// Without this, Continuation's Action closure would execute with
// an older Actor instance.
if (cont.Actor == Actor ||
cont.Actor == null)
{
_messageOrEnvelope = cont.Message;
await cont.Action();
}
else
if (cont.Actor != Actor && cont is not {Actor: null})
{
if (Logger.IsEnabled(LogLevel.Warning))
{
Logger.LogWarning(
"{Self} Dropping Continuation (ReenterAfter) of {Message}",
Self,
MessageEnvelope.UnwrapMessage(cont.Message));
MessageEnvelope.UnwrapMessage(cont.Message)
);
}

return;
}

_messageOrEnvelope = cont.Message;
await cont.Action();
}

private ActorContextExtras EnsureExtras()
Expand All @@ -508,7 +517,7 @@ private Task DefaultReceive() =>
{
PoisonPill => HandlePoisonPill(),
IAutoRespond autoRespond => HandleAutoRespond(autoRespond),
_ => Actor.ReceiveAsync(_props.ContextDecoratorChain is not null ? EnsureExtras().Context : this)
_ => Actor.ReceiveAsync(_props.ContextDecoratorChain != null ? EnsureExtras().Context : this)
};

private Task HandleAutoRespond(IAutoRespond autoRespond)
Expand Down Expand Up @@ -723,7 +732,8 @@ private void ReceiveTimeoutCallback(object? state)
{
if (_extras?.ReceiveTimeoutTimer is null) return;

SendUserMessage(Self, Proto.ReceiveTimeout.Instance);

Self.SendSystemMessage(System, Proto.ReceiveTimeout.Instance);
}

public CapturedContext Capture() => new(MessageEnvelope.Wrap(_messageOrEnvelope!), this);
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/EventStream/DeadLetter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

using System;
using JetBrains.Annotations;
using Proto.Mailbox;
using Proto.Metrics;

// ReSharper disable once CheckNamespace
Expand Down Expand Up @@ -59,7 +60,7 @@ protected internal override void SendUserMessage(PID pid, object message)
System.Root.Send(sender,new DeadLetterResponse {Target = pid});
}

protected internal override void SendSystemMessage(PID pid, object message)
protected internal override void SendSystemMessage(PID pid, SystemMessage message)
{
if (System.Metrics.Enabled)
ActorMetrics.DeadletterCount.Add(1, new("id", System.Id), new("address", System.Address), new("messagetype", message.GetType().Name));
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/EventStream/EventStreamProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// ReSharper disable once CheckNamespace

using JetBrains.Annotations;
using Proto.Mailbox;

// ReSharper disable once CheckNamespace
namespace Proto;
Expand All @@ -24,7 +25,7 @@ protected internal override void SendUserMessage(PID pid, object message)
System.EventStream.Publish(msg);
}

protected internal override void SendSystemMessage(PID pid, object message)
protected internal override void SendSystemMessage(PID pid, SystemMessage message)
{
//pass
}
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/Future/FutureBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Proto.Mailbox;
using Proto.Metrics;

namespace Proto.Future;
Expand Down Expand Up @@ -97,7 +98,7 @@ protected internal override void SendUserMessage(PID pid, object message)
}
}

protected internal override void SendSystemMessage(PID pid, object message)
protected internal override void SendSystemMessage(PID pid, SystemMessage message)
{
if (message is Stop)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/Future/Futures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Proto.Mailbox;
using Proto.Metrics;

namespace Proto.Future;
Expand Down Expand Up @@ -102,7 +103,7 @@ protected internal override void SendUserMessage(PID pid, object message)
}
}

protected internal override void SendSystemMessage(PID pid, object message)
protected internal override void SendSystemMessage(PID pid, SystemMessage message)
{
if (message is Stop)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/Future/SharedFuture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Proto.Mailbox;
using Proto.Metrics;

namespace Proto.Future;
Expand Down Expand Up @@ -104,7 +105,7 @@ protected internal override void SendUserMessage(PID pid, object message)
}
}

protected internal override void SendSystemMessage(PID pid, object message)
protected internal override void SendSystemMessage(PID pid, SystemMessage message)
{
if (message is Stop)
{
Expand Down
5 changes: 2 additions & 3 deletions src/Proto.Actor/Mailbox/BatchingMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private async Task RunAsync()
try
{
var batch = new List<object>(_batchSize);
var sys = _systemMessages.Pop();
var msg = _systemMessages.Pop();

if (sys is not null)
if (msg is SystemMessage sys)
{
_suspended = sys switch
{
Expand All @@ -74,7 +74,6 @@ private async Task RunAsync()
if (!_suspended)
{
batch.Clear();
object? msg;

while ((msg = _userMessages.Pop()) is not null ||
batch.Count >= _batchSize)
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Mailbox/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IMessageInvoker
{
CancellationTokenSource? CancellationTokenSource { get; }

ValueTask InvokeSystemMessageAsync(object msg);
ValueTask InvokeSystemMessageAsync(SystemMessage msg);

ValueTask InvokeUserMessageAsync(object msg);

Expand Down Expand Up @@ -90,7 +90,7 @@ class NoopInvoker : IMessageInvoker

public CancellationTokenSource CancellationTokenSource => throw new NotImplementedException();

public ValueTask InvokeSystemMessageAsync(object msg) => throw new NotImplementedException();
public ValueTask InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException();

public ValueTask InvokeUserMessageAsync(object msg) => throw new NotImplementedException();

Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private ValueTask ProcessMessages()
{
msg = _systemMessages.Pop();

if (msg is not null)
if (msg is SystemMessage sys)
{
_suspended = msg switch
{
Expand All @@ -222,7 +222,7 @@ private ValueTask ProcessMessages()
_ => _suspended
};

var t = _invoker.InvokeSystemMessageAsync(msg);
var t = _invoker.InvokeSystemMessageAsync(sys);

if (!t.IsCompletedSuccessfully)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/PID.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------
using Google.Protobuf;
using Proto.Mailbox;

namespace Proto;

Expand Down Expand Up @@ -47,7 +48,7 @@ internal void SendUserMessage(ActorSystem system, object message)
reff.SendUserMessage(this, message);
}

public void SendSystemMessage(ActorSystem system, object sys)
public void SendSystemMessage(ActorSystem system, SystemMessage sys)
{
var reff = Ref(system) ?? system.ProcessRegistry.Get(this);
reff.SendSystemMessage(this, sys);
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Actor/Process/ActorProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal bool IsDead {
protected internal override void SendUserMessage(PID pid, object message) =>
Mailbox.PostUserMessage(message);

protected internal override void SendSystemMessage(PID pid, object message) =>
protected internal override void SendSystemMessage(PID pid, SystemMessage message) =>
Mailbox.PostSystemMessage(message);

public override void Stop(PID pid)
Expand Down
4 changes: 3 additions & 1 deletion src/Proto.Actor/Process/Process.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

// ReSharper disable once CheckNamespace

using Proto.Mailbox;

namespace Proto;

public abstract class Process
Expand All @@ -16,7 +18,7 @@ public abstract class Process

protected internal abstract void SendUserMessage(PID pid, object message);

protected internal abstract void SendSystemMessage(PID pid, object message);
protected internal abstract void SendSystemMessage(PID pid, SystemMessage message);

public virtual void Stop(PID pid) => SendSystemMessage(pid, Proto.Stop.Instance);
}
2 changes: 1 addition & 1 deletion src/Proto.Actor/Supervision/Guardians.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void StopChildren(params PID[] pids)
protected internal override void SendUserMessage(PID pid, object message)
=> throw new InvalidOperationException("Guardian actor cannot receive any user messages.");

protected internal override void SendSystemMessage(PID pid, object message)
protected internal override void SendSystemMessage(PID pid, SystemMessage message)
{
if (message is Failure msg)
_supervisorStrategy.HandleFailure(this, msg.Who, msg.RestartStatistics, msg.Reason, msg.Message);
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Remote/RemoteProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

using System;
using System.Diagnostics;

using Proto.Mailbox;

namespace Proto.Remote;

Expand All @@ -27,7 +27,7 @@ public RemoteProcess(ActorSystem system, EndpointManager endpointManager, PID pi

protected internal override void SendUserMessage(PID _, object message) => Send(message);

protected internal override void SendSystemMessage(PID _, object message) => Send(message);
protected internal override void SendSystemMessage(PID _, SystemMessage message) => Send(message);

private void Send(object msg)
{
Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.TestFixtures/TestMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void PostSystemMessage(object msg)
if (msg is Stop)
_invoker?.CancellationTokenSource?.Cancel();
SystemMessages.Add(msg);
_invoker?.InvokeSystemMessageAsync(msg).GetAwaiter().GetResult();
_invoker?.InvokeSystemMessageAsync((SystemMessage)msg).GetAwaiter().GetResult();
}

public void RegisterHandlers(IMessageInvoker invoker, IDispatcher dispatcher) => _invoker = invoker;
Expand Down
3 changes: 2 additions & 1 deletion tests/Proto.TestFixtures/TestMailboxHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public async void Schedule(Func<Task> runner)
if (waitingTaskExists) onScheduleCompleted.SetResult(0);
}

public async ValueTask InvokeSystemMessageAsync(object msg) => await ((TestMessageWithTaskCompletionSource) msg).TaskCompletionSource.Task;
// ReSharper disable once SuspiciousTypeConversion.Global
public async ValueTask InvokeSystemMessageAsync(SystemMessage msg) => await ((TestMessageWithTaskCompletionSource) msg).TaskCompletionSource.Task;

public async ValueTask InvokeUserMessageAsync(object msg) => await ((TestMessageWithTaskCompletionSource) msg).TaskCompletionSource.Task;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System.Threading.Tasks;
using Proto.Mailbox;

namespace Proto.TestFixtures;

public class TestMessageWithTaskCompletionSource
public class TestMessageWithTaskCompletionSource : SystemMessage
{
public TaskCompletionSource<int> TaskCompletionSource { get; set; } = new();
public string Message { get; set; }
Expand Down
6 changes: 4 additions & 2 deletions tests/Proto.TestFixtures/TestProcess.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Proto.TestFixtures;
using Proto.Mailbox;

namespace Proto.TestFixtures;

public class TestProcess : Process
{
Expand All @@ -10,7 +12,7 @@ protected override void SendUserMessage(PID pid, object message)
{
}

protected override void SendSystemMessage(PID pid, object message)
protected override void SendSystemMessage(PID pid, SystemMessage message)
{
}
}