Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions src/Proto.Cluster/ProtoActorLifecycleHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,56 @@

namespace Proto.Cluster;


public class ProtoActorLifecycleHost : IHostedService
{
private readonly Cluster _cluster;
private readonly ActorSystem _actorSystem;
private readonly bool _runAsClient;
private readonly IHostApplicationLifetime _lifetime;
private bool _shutdownViaActorSystem;

public ProtoActorLifecycleHost(Cluster cluster, bool runAsClient)
public ProtoActorLifecycleHost(
ActorSystem actorSystem,
IHostApplicationLifetime lifetime,
bool runAsClient
)
{
_cluster = cluster;
_actorSystem = actorSystem;
_runAsClient = runAsClient;
_lifetime = lifetime;
}

public async Task StartAsync(CancellationToken _)
{
// Register a callback for when the actor system shuts down.
_actorSystem.Shutdown.Register(() =>
{
if (_lifetime.ApplicationStopping.IsCancellationRequested)
{
return;
}
_shutdownViaActorSystem = true;
_lifetime.StopApplication();
});

if (_runAsClient)
{
await _cluster.StartClientAsync();
await _actorSystem.Cluster().StartClientAsync();
}
else
{
await _cluster.StartMemberAsync();
await _actorSystem.Cluster().StartMemberAsync();
}
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await _cluster.ShutdownAsync();
if (_shutdownViaActorSystem)
{
await _actorSystem.Cluster().ShutdownCompleted;
}
else
{
await _actorSystem.Cluster().ShutdownAsync(true, "Host process is stopping");
}
}
}
}
18 changes: 13 additions & 5 deletions src/Proto.Cluster/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Proto.Cluster;
using Proto.Cluster.Identity;
Expand Down Expand Up @@ -36,7 +37,6 @@ public static IServiceCollection AddProtoCluster(this IServiceCollection self, A
var loggerFactory = p.GetRequiredService<ILoggerFactory>();
Log.SetLoggerFactory(loggerFactory);


configure(p, boot);

var s = new ActorSystemConfig();
Expand All @@ -60,11 +60,15 @@ public static IServiceCollection AddProtoCluster(this IServiceCollection self, A

self.AddSingleton(p => p.GetRequiredService<ActorSystem>().Cluster());
self.AddSingleton(p => p.GetRequiredService<ActorSystem>().Root);
self.AddHostedService(p => new ProtoActorLifecycleHost(p.GetRequiredService<Cluster>(), boot.RunAsClient));
self.AddHostedService(p =>
new ProtoActorLifecycleHost(
p.GetRequiredService<ActorSystem>(),
p.GetRequiredService<IHostApplicationLifetime>(),
boot.RunAsClient));

return self;
}

public static IServiceCollection AddProtoCluster(this IServiceCollection self, string clusterName,
string bindToHost = "localhost", int port = 0,
Func<ActorSystemConfig, ActorSystemConfig>? configureSystem = null,
Expand Down Expand Up @@ -101,8 +105,12 @@ public static IServiceCollection AddProtoCluster(this IServiceCollection self, s

self.AddSingleton(p => p.GetRequiredService<ActorSystem>().Cluster());
self.AddSingleton(p => p.GetRequiredService<ActorSystem>().Root);
self.AddHostedService(p => new ProtoActorLifecycleHost(p.GetRequiredService<Cluster>(), runAsClient));
self.AddHostedService(p =>
new ProtoActorLifecycleHost(
p.GetRequiredService<ActorSystem>(),
p.GetRequiredService<IHostApplicationLifetime>(),
runAsClient));

return self;
}
}
}