Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void EscalateFailure(Exception reason, object? message)
Parent.SendSystemMessage(System, failure);
}

public ValueTask InvokeSystemMessageAsync(object msg)
public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
{
try
{
Expand All @@ -305,6 +305,7 @@ public ValueTask InvokeSystemMessageAsync(object msg)
SuspendMailbox or ResumeMailbox => default,
Continuation cont => HandleContinuation(cont),
ProcessDiagnosticsRequest pdr => HandleProcessDiagnosticsRequest(pdr),
ReceiveTimeout _ => HandleReceiveTimeout(),
_ => HandleUnknownSystemMessage(msg)
};
}
Expand All @@ -315,6 +316,12 @@ public ValueTask InvokeSystemMessageAsync(object msg)
}
}

private async ValueTask HandleReceiveTimeout()
{
_messageOrEnvelope = Proto.ReceiveTimeout.Instance;
await DefaultReceive();
}

public ValueTask InvokeUserMessageAsync(object msg)
{
if (!System.Metrics.Enabled) return InternalInvokeUserMessageAsync(msg);
Expand Down Expand Up @@ -476,20 +483,22 @@ private async ValueTask HandleContinuation(Continuation cont)
// Don't execute the continuation if the actor instance changed.
// Without this, Continuation's Action closure would execute with
// an older Actor instance.
if (cont.Actor == Actor ||
cont.Actor == null)
{
_messageOrEnvelope = cont.Message;
await cont.Action();
}
else
if (cont.Actor != Actor && cont is not {Actor: null})
{
if (Logger.IsEnabled(LogLevel.Warning))
{
Logger.LogWarning(
"{Self} Dropping Continuation (ReenterAfter) of {Message}",
Self,
MessageEnvelope.UnwrapMessage(cont.Message));
MessageEnvelope.UnwrapMessage(cont.Message)
);
}

return;
}

_messageOrEnvelope = cont.Message;
await cont.Action();
}

private ActorContextExtras EnsureExtras()
Expand All @@ -508,7 +517,7 @@ private Task DefaultReceive() =>
{
PoisonPill => HandlePoisonPill(),
IAutoRespond autoRespond => HandleAutoRespond(autoRespond),
_ => Actor.ReceiveAsync(_props.ContextDecoratorChain is not null ? EnsureExtras().Context : this)
_ => Actor.ReceiveAsync(_props.ContextDecoratorChain != null ? EnsureExtras().Context : this)
};

private Task HandleAutoRespond(IAutoRespond autoRespond)
Expand Down Expand Up @@ -723,7 +732,8 @@ private void ReceiveTimeoutCallback(object? state)
{
if (_extras?.ReceiveTimeoutTimer is null) return;

SendUserMessage(Self, Proto.ReceiveTimeout.Instance);

Self.SendSystemMessage(System, Proto.ReceiveTimeout.Instance);
}

public CapturedContext Capture() => new(MessageEnvelope.Wrap(_messageOrEnvelope!), this);
Expand Down
5 changes: 2 additions & 3 deletions src/Proto.Actor/Mailbox/BatchingMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private async Task RunAsync()
try
{
var batch = new List<object>(_batchSize);
var sys = _systemMessages.Pop();
var msg = _systemMessages.Pop();

if (sys is not null)
if (msg is SystemMessage sys)
{
_suspended = sys switch
{
Expand All @@ -74,7 +74,6 @@ private async Task RunAsync()
if (!_suspended)
{
batch.Clear();
object? msg;

while ((msg = _userMessages.Pop()) is not null ||
batch.Count >= _batchSize)
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Mailbox/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IMessageInvoker
{
CancellationTokenSource? CancellationTokenSource { get; }

ValueTask InvokeSystemMessageAsync(object msg);
ValueTask InvokeSystemMessageAsync(SystemMessage msg);

ValueTask InvokeUserMessageAsync(object msg);

Expand Down Expand Up @@ -90,7 +90,7 @@ class NoopInvoker : IMessageInvoker

public CancellationTokenSource CancellationTokenSource => throw new NotImplementedException();

public ValueTask InvokeSystemMessageAsync(object msg) => throw new NotImplementedException();
public ValueTask InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException();

public ValueTask InvokeUserMessageAsync(object msg) => throw new NotImplementedException();

Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private ValueTask ProcessMessages()
{
msg = _systemMessages.Pop();

if (msg is not null)
if (msg is SystemMessage sys)
{
_suspended = msg switch
{
Expand All @@ -222,7 +222,7 @@ private ValueTask ProcessMessages()
_ => _suspended
};

var t = _invoker.InvokeSystemMessageAsync(msg);
var t = _invoker.InvokeSystemMessageAsync(sys);

if (!t.IsCompletedSuccessfully)
{
Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.TestFixtures/TestMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void PostSystemMessage(object msg)
if (msg is Stop)
_invoker?.CancellationTokenSource?.Cancel();
SystemMessages.Add(msg);
_invoker?.InvokeSystemMessageAsync(msg).GetAwaiter().GetResult();
_invoker?.InvokeSystemMessageAsync((SystemMessage)msg).GetAwaiter().GetResult();
}

public void RegisterHandlers(IMessageInvoker invoker, IDispatcher dispatcher) => _invoker = invoker;
Expand Down
3 changes: 2 additions & 1 deletion tests/Proto.TestFixtures/TestMailboxHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public async void Schedule(Func<Task> runner)
if (waitingTaskExists) onScheduleCompleted.SetResult(0);
}

public async ValueTask InvokeSystemMessageAsync(object msg) => await ((TestMessageWithTaskCompletionSource) msg).TaskCompletionSource.Task;
// ReSharper disable once SuspiciousTypeConversion.Global
public async ValueTask InvokeSystemMessageAsync(SystemMessage msg) => await ((TestMessageWithTaskCompletionSource) msg).TaskCompletionSource.Task;

public async ValueTask InvokeUserMessageAsync(object msg) => await ((TestMessageWithTaskCompletionSource) msg).TaskCompletionSource.Task;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System.Threading.Tasks;
using Proto.Mailbox;

namespace Proto.TestFixtures;

public class TestMessageWithTaskCompletionSource
public class TestMessageWithTaskCompletionSource : SystemMessage
{
public TaskCompletionSource<int> TaskCompletionSource { get; set; } = new();
public string Message { get; set; }
Expand Down