Skip to content

Commit fa4606b

Browse files
authored
. (#1700)
* Inline cluster context code
1 parent e15c9f4 commit fa4606b

File tree

3 files changed

+73
-52
lines changed

3 files changed

+73
-52
lines changed

benchmarks/SkyriseMini/Client/ProtoActorExtensions.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ public static WebApplicationBuilder AddProtoActorClient(this WebApplicationBuild
4646

4747
var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost()
4848
.WithProtoMessages(ProtoActorSut.Contracts.ProtosReflection.Descriptor)
49-
.WithChannelOptions(new GrpcChannelOptions
50-
{
51-
CompressionProviders = new[]
52-
{
53-
new GzipCompressionProvider(CompressionLevel.Fastest)
54-
}
55-
}
56-
)
49+
// .WithChannelOptions(new GrpcChannelOptions
50+
// {
51+
// CompressionProviders = new[]
52+
// {
53+
// new GzipCompressionProvider(CompressionLevel.Fastest)
54+
// }
55+
// }
56+
// )
5757
.WithLogLevelForDeserializationErrors(LogLevel.Critical);
5858

5959
var clusterProvider = new ConsulProvider(new ConsulProviderConfig());

benchmarks/SkyriseMini/Server/ProtoActorExtensions.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ public static WebApplicationBuilder AddProtoActorSUT(this WebApplicationBuilder
3737

3838
var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost()
3939
.WithProtoMessages(ProtoActorSut.Contracts.ProtosReflection.Descriptor)
40-
.WithChannelOptions(new GrpcChannelOptions
41-
{
42-
CompressionProviders = new[]
43-
{
44-
new GzipCompressionProvider(CompressionLevel.Fastest)
45-
}
46-
}
47-
)
40+
// .WithChannelOptions(new GrpcChannelOptions
41+
// {
42+
// CompressionProviders = new[]
43+
// {
44+
// new GzipCompressionProvider(CompressionLevel.Fastest)
45+
// }
46+
// }
47+
// )
4848
.WithLogLevelForDeserializationErrors(LogLevel.Critical);
4949

5050
var clusterProvider = new ConsulProvider(new ConsulProviderConfig());

src/Proto.Cluster/DefaultClusterContext.cs

Lines changed: 57 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// -----------------------------------------------------------------------
66
using System;
77
using System.Diagnostics;
8+
using System.Runtime.CompilerServices;
89
using System.Threading;
910
using System.Threading.Tasks;
1011
using Microsoft.Extensions.Logging;
@@ -54,12 +55,7 @@ public DefaultClusterContext(Cluster cluster)
5455
i++;
5556

5657
var source = PidSource.Cache;
57-
var pid = clusterIdentity.CachedPid;
58-
59-
if (pid == null)
60-
{
61-
pid = GetCachedPid(clusterIdentity);
62-
}
58+
var pid = clusterIdentity.CachedPid ?? (_pidCache.TryGet(clusterIdentity, out var tmp) ? tmp : null);
6359

6460
if (pid is null)
6561
{
@@ -75,7 +71,9 @@ public DefaultClusterContext(Cluster cluster)
7571
}
7672

7773
// Ensures that a future is not re-used against another actor.
78-
if (lastPid is not null && !pid.Equals(lastPid)) RefreshFuture();
74+
// avoid equality check for perf
75+
// ReSharper disable once PossibleUnintendedReferenceComparison
76+
if (lastPid is not null && pid != lastPid) RefreshFuture();
7977

8078
Stopwatch t = null!;
8179

@@ -93,20 +91,40 @@ public DefaultClusterContext(Cluster cluster)
9391

9492
if (task.IsCompleted)
9593
{
96-
var (status, result) = ToResult<T>(source, context, task.Result);
94+
var untypedResult = MessageEnvelope.UnwrapMessage(task.Result);
9795

98-
switch (status)
96+
if (untypedResult is T t1)
97+
{
98+
return t1;
99+
}
100+
101+
if (typeof(T) == typeof(MessageEnvelope))
102+
{
103+
return (T) (object) MessageEnvelope.Wrap(task.Result);
104+
}
105+
106+
if (untypedResult == null)
107+
{
108+
//null = timeout
109+
return default;
110+
}
111+
112+
if (untypedResult is DeadLetterResponse)
99113
{
100-
case ResponseStatus.Ok: return result;
101-
case ResponseStatus.InvalidResponse:
102-
RefreshFuture();
103-
await RemoveFromSource(clusterIdentity, source, pid);
104-
break;
105-
case ResponseStatus.DeadLetter:
106-
RefreshFuture();
107-
await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid);
108-
break;
114+
if (!context.System.Shutdown.IsCancellationRequested && Logger.IsEnabled(LogLevel.Debug))
115+
{
116+
Logger.LogDebug("TryRequestAsync failed, dead PID from {Source}", source);
117+
}
118+
119+
RefreshFuture();
120+
await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid);
121+
break;
109122
}
123+
124+
Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}", untypedResult.GetType(), typeof(T));
125+
RefreshFuture();
126+
await RemoveFromSource(clusterIdentity, source, pid);
127+
break;
110128
}
111129
else
112130
{
@@ -182,9 +200,6 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
182200
_pidCache.RemoveByVal(clusterIdentity, pid);
183201
}
184202

185-
186-
private PID? GetCachedPid(ClusterIdentity clusterIdentity) => _pidCache.TryGet(clusterIdentity, out var pid) ? pid : null;
187-
188203
private async ValueTask<PID?> GetPidFromLookup(ClusterIdentity clusterIdentity, ISenderContext context, CancellationToken ct)
189204
{
190205
try
@@ -218,26 +233,32 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
218233
}
219234
}
220235

236+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
221237
private static (ResponseStatus Ok, T?) ToResult<T>(PidSource source, ISenderContext context, object result)
222238
{
223239
var message = MessageEnvelope.UnwrapMessage(result);
224-
switch (message)
240+
241+
if (message is DeadLetterResponse)
225242
{
226-
case DeadLetterResponse:
227-
if (!context.System.Shutdown.IsCancellationRequested)
228-
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync failed, dead PID from {Source}", source);
229-
230-
return (ResponseStatus.DeadLetter, default);
231-
case null: return (ResponseStatus.Ok, default);
232-
case T t: return (ResponseStatus.Ok, t);
233-
default:
234-
if (typeof(T) == typeof(MessageEnvelope))
235-
{
236-
return (ResponseStatus.Ok, (T) (object) MessageEnvelope.Wrap(result));
237-
}
238-
Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T));
239-
return (ResponseStatus.InvalidResponse, default);
243+
if (!context.System.Shutdown.IsCancellationRequested && Logger.IsEnabled(LogLevel.Debug))
244+
{
245+
Logger.LogDebug("TryRequestAsync failed, dead PID from {Source}", source);
246+
}
247+
248+
return (ResponseStatus.DeadLetter, default);
240249
}
250+
251+
if (message == null) return (ResponseStatus.Ok, default);
252+
253+
if (message is T t) return (ResponseStatus.Ok, t);
254+
255+
if (typeof(T) == typeof(MessageEnvelope))
256+
{
257+
return (ResponseStatus.Ok, (T) (object) MessageEnvelope.Wrap(result));
258+
}
259+
260+
Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T));
261+
return (ResponseStatus.InvalidResponse, default);
241262
}
242263

243264
private enum ResponseStatus

0 commit comments

Comments
 (0)