Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/AspNetGrains/Node1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddLogging(x => x.AddConsole());

builder.Services.AddProtoCluster("MyCluster",
builder.Services.AddProtoCluster("MyCluster", port:8090,
configureRemote: r => r.WithProtoMessages(AspNetGrains.Messages.ProtosReflection.Descriptor),
configureCluster: c => c, clusterProvider:SeedNodeClusterProvider.JoinSeedNode("localhost",8090));
configureCluster: c => c, clusterProvider:SeedNodeClusterProvider.StartSeedNode());

builder.Services.AddHealthChecks().AddCheck<ActorSystemHealthCheck>("proto", null, new[] { "ready", "live" });

Expand Down
6 changes: 4 additions & 2 deletions examples/AspNetGrains/Node2/HelloGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Node2;
public class HelloGrain : HelloGrainBase
{
private readonly string _identity;
private int _count = 0;

public HelloGrain(IContext ctx, string identity) : base(ctx)
{
Expand All @@ -14,11 +15,12 @@ public HelloGrain(IContext ctx, string identity) : base(ctx)

public override Task<HelloResponse> SayHello(HelloRequest request)
{
Console.WriteLine("Got request!!");
_count++;
Console.WriteLine("Got request!! " + _count + " " + request.GetHashCode());

var res = new HelloResponse
{
Message = $"Hello from typed grain {_identity}"
Message = $"Hello from typed grain {_identity} Call count {_count}"
};

return Task.FromResult(res);
Expand Down
4 changes: 2 additions & 2 deletions examples/AspNetGrains/Node2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

builder.Services.AddLogging(x => x.AddConsole());

builder.Services.AddProtoCluster("MyCluster", port: 8090,
builder.Services.AddProtoCluster("MyCluster", port: 0,
configureRemote: r => r.WithProtoMessages(AspNetGrains.Messages.ProtosReflection.Descriptor),
configureCluster: c => c.WithClusterKind(HelloGrainActor.GetClusterKind((ctx,ci) => new Node2.HelloGrain(ctx,ci.Identity))),
clusterProvider:SeedNodeClusterProvider.StartSeedNode());
clusterProvider:SeedNodeClusterProvider.JoinSeedNode("localhost",8090));

builder.Services.AddHealthChecks().AddCheck<ActorSystemHealthCheck>("proto", null, new[] { "ready", "live" });

Expand Down
24 changes: 24 additions & 0 deletions src/Proto.Cluster.Kubernetes/KubernetesProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using k8s;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Proto.Diagnostics;
using Proto.Utils;
using static Proto.Cluster.Kubernetes.Messages;
using static Proto.Cluster.Kubernetes.ProtoLabels;
Expand All @@ -38,6 +39,29 @@ public class KubernetesProvider : IClusterProvider
private string _podName;
private int _port;

public async Task<DiagnosticsEntry[]> GetDiagnostics()
{
try
{
var selector = $"{LabelCluster}={_clusterName}";
using var client = _config.ClientFactory();
var res = await client.ListNamespacedPodWithHttpMessagesAsync(
KubernetesExtensions.GetKubeNamespace(),
labelSelector: selector,
watch: false,
timeoutSeconds: _config.WatchTimeoutSeconds
);

var pods = new DiagnosticsEntry("KubernetesProvider", "Pods", res.Body);

return new[] { pods };
}
catch (Exception x)
{
return new[] { new DiagnosticsEntry("KubernetesProvider", "Exception", x.ToString() ) };
}
}

public KubernetesProvider() : this(new KubernetesProviderConfig())
{
}
Expand Down
46 changes: 25 additions & 21 deletions src/Proto.Cluster/Partition/PartitionIdentityActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -629,29 +629,33 @@ TaskCompletionSource<ActivationResponse> setResponse

if (response.Pid != null)
{
if (response.TopologyHash != TopologyHash) // Topology changed between request and response
if (response.Failed is false)
{
if (!_currentMemberAddresses.Contains(response.Pid.Address))
if (response.TopologyHash != TopologyHash) // Topology changed between request and response
{
// No longer part of cluster, dropped
Logger.LogWarning(
"[PartitionIdentity] Received activation response {@Response}, no longer part of cluster",
response);

Respond(new ActivationResponse { Failed = true });

return;
}

var currentActivatorAddress =
_cluster.MemberList.GetActivator(msg.Kind, context.Sender!.Address)?.Address;

if (_myAddress != currentActivatorAddress)
{
//Stop it or handover. ? Should be rebalanced in the current pass
Logger.LogWarning(
"[PartitionIdentity] Misplaced spawn: {ClusterIdentity}, {Pid}, Expected {MyAddress} ({MyTopology}), Actual {ActivatorAddress} ({ActivatorTopology})",
msg.ClusterIdentity, response.Pid, _myAddress, TopologyHash, currentActivatorAddress, response.TopologyHash);
if (!_currentMemberAddresses.Contains(response.Pid.Address))
{
// No longer part of cluster, dropped
Logger.LogWarning(
"[PartitionIdentity] Received activation response {@Response}, no longer part of cluster",
response);

Respond(new ActivationResponse { Failed = true });

return;
}

var currentActivatorAddress =
_cluster.MemberList.GetActivator(msg.Kind, context.Sender!.Address)?.Address;

if (_myAddress != currentActivatorAddress)
{
//Stop it or handover. ? Should be rebalanced in the current pass
Logger.LogWarning(
"[PartitionIdentity] Misplaced spawn: {ClusterIdentity}, {Pid}, Expected {MyAddress} ({MyTopology}), Actual {ActivatorAddress} ({ActivatorTopology})",
msg.ClusterIdentity, response.Pid, _myAddress, TopologyHash,
currentActivatorAddress, response.TopologyHash);
}
}
}

Expand Down
24 changes: 18 additions & 6 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,11 @@ private Task OnActivationRequest(IContext context, ActivationRequest msg)
if (clusterKind is null)
{
Logger.LogError("Failed to spawn {Kind}/{Identity}, kind not found for member", msg.Kind, msg.Identity);
context.Respond(new ActivationResponse { Failed = true });
context.Respond(new ActivationResponse
{
Failed = true,
TopologyHash = msg.TopologyHash
});

return Task.CompletedTask;
}
Expand Down Expand Up @@ -348,7 +352,8 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl

context.Respond(new ActivationResponse
{
Failed = true
Failed = true,
TopologyHash = msg.TopologyHash
}
);
}
Expand Down Expand Up @@ -379,7 +384,8 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl

context.Respond(new ActivationResponse
{
Failed = true
Failed = true,
TopologyHash = msg.TopologyHash
}
);
}
Expand All @@ -398,15 +404,20 @@ private void Spawn(ActivationRequest msg, IContext context, ActivatedClusterKind

context.Respond(new ActivationResponse
{
Pid = pid
Pid = pid,
TopologyHash = msg.TopologyHash
}
);
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[PartitionIdentity] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity);
context.Respond(new ActivationResponse { Failed = true });
context.Respond(new ActivationResponse
{
Failed = true,
TopologyHash = msg.TopologyHash
});
}
}

Expand All @@ -422,7 +433,8 @@ private void OnSpawnDecided(ActivationRequest msg, IContext context, ActivatedCl
context.Respond(new ActivationResponse
{
Failed = true,
InvalidIdentity = true
InvalidIdentity = true,
TopologyHash = msg.TopologyHash
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private Task OnActivationRequest(ActivationRequest msg, IContext context)
{
context.Respond(new ActivationResponse
{
Pid = existing
Pid = existing,
}
);
}
Expand Down