Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/Proto.Actor/Router/RouterActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Proto.Router.Messages;
using Proto.Router.Routers;
Expand All @@ -15,21 +15,28 @@ public class RouterActor : IActor
{
private readonly RouterConfig _config;
private readonly RouterState _routerState;
private readonly AutoResetEvent _wg;
private readonly RouterStartNotification _startNotification;

public RouterActor(RouterConfig config, RouterState routerState, AutoResetEvent wg)
public RouterActor(RouterConfig config, RouterState routerState, RouterStartNotification startNotification)
{
_config = config;
_routerState = routerState;
_wg = wg;
_startNotification = startNotification;
}

public Task ReceiveAsync(IContext context)
{
if (context.Message is Started)
{
_config.OnStarted(context, _routerState);
_wg.Set();
try
{
_config.OnStarted(context, _routerState);
_startNotification.NotifyStarted();
}
catch (Exception e)
{
_startNotification.NotifyFailed(e);
}
return Task.CompletedTask;
}

Expand Down
41 changes: 38 additions & 3 deletions src/Proto.Actor/Router/Routers/RouterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public abstract record RouterConfig
private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID? parent, Action<IContext>? callback)
{
var routerState = CreateRouterState();
var wg = new AutoResetEvent(false);
var p = props.WithProducer(() => new RouterActor(this, routerState, wg));
var notifyStarted = new RouterStartNotification();
var p = props.WithProducer(() => new RouterActor(this, routerState, notifyStarted));

var mailbox = props.MailboxProducer();
var dispatcher = props.Dispatcher;
Expand All @@ -35,7 +35,42 @@ private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID
mailbox.RegisterHandlers(ctx, dispatcher);
mailbox.PostSystemMessage(Started.Instance);
mailbox.Start();
wg.WaitOne();

var (startSuccess, startException) = notifyStarted.Wait();

if (!startSuccess)
{
system.Root.Stop(self);
throw new RouterStartFailedException(startException!);
}

return self;
}
}

public class RouterStartNotification
{
private readonly ManualResetEvent _wg = new(false);
private Exception? _exception;

public void NotifyStarted() => _wg.Set();

public void NotifyFailed(Exception exception)
{
_exception = exception;
_wg.Set();
}

public (bool StartSuccess, Exception? Exception) Wait()
{
_wg.WaitOne();
return (_exception is null, _exception);
}
}

public class RouterStartFailedException : Exception
{
public RouterStartFailedException(Exception inner) : base("Router failed to start", inner)
{
}
}
14 changes: 14 additions & 0 deletions tests/Proto.Actor.Tests/Router/PoolRouterTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Threading.Tasks;
using FluentAssertions;
using Proto.Router.Messages;
using Proto.Router.Routers;
using Proto.TestFixtures;
using Xunit;

Expand Down Expand Up @@ -58,4 +60,16 @@ public async Task RandomPool_CreatesRoutees()
var routees = await system.Root.RequestAsync<Routees>(router, new RouterGetRoutees(), _timeout);
Assert.Equal(3, routees.Pids.Count);
}

[Fact]
public async Task If_routee_props_then_router_creation_fails()
{
await using var system = new ActorSystem();

var failingProps = Props.FromProducer(() => throw new Exception("Failing props"));

system.Invoking(s => s.Root.Spawn(s.Root.NewRandomPool(failingProps, 3, 0)))
.Should().Throw<RouterStartFailedException>()
.WithInnerException<Exception>().WithMessage("Failing props");
}
}