Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Temporalio/Bridge/CustomSlotSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ private static SlotInfo SlotInfoFromBridge(Interop.TemporalCoreSlotInfo slotInfo
Interop.TemporalCoreSlotInfo_Tag.LocalActivitySlotInfo =>
new SlotInfo.LocalActivitySlotInfo(
ByteArrayRef.ToUtf8(slotInfo.local_activity_slot_info.activity_type)),
Interop.TemporalCoreSlotInfo_Tag.NexusSlotInfo =>
new SlotInfo.NexusOperationSlotInfo(
ByteArrayRef.ToUtf8(slotInfo.nexus_slot_info.service),
ByteArrayRef.ToUtf8(slotInfo.nexus_slot_info.operation)),
_ => throw new ArgumentOutOfRangeException(nameof(slotInfo)),
};
}
Expand All @@ -70,6 +74,7 @@ private static unsafe SlotReserveContext ReserveCtxFromBridge(Interop.TemporalCo
Interop.TemporalCoreSlotKindType.WorkflowSlotKindType => SlotType.Workflow,
Interop.TemporalCoreSlotKindType.ActivitySlotKindType => SlotType.Activity,
Interop.TemporalCoreSlotKindType.LocalActivitySlotKindType => SlotType.LocalActivity,
Interop.TemporalCoreSlotKindType.NexusSlotKindType => SlotType.NexusOperation,
_ => throw new ArgumentOutOfRangeException(nameof(ctx)),
},
TaskQueue: ByteArrayRef.ToUtf8((*ctx).task_queue),
Expand Down
6 changes: 0 additions & 6 deletions src/Temporalio/Bridge/OptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -661,12 +661,6 @@ private static Interop.TemporalCoreTunerHolder ToInteropTuner(
}
}

// TODO: remove when implemented: https://github.com/temporalio/sdk-dotnet/issues/528
if (tuner.NexusTaskSlotSupplier is Temporalio.Worker.Tuning.CustomSlotSupplier)
{
throw new ArgumentException("Custom Nexus task slot suppliers are unsupported");
}

return new()
{
workflow_slot_supplier =
Expand Down
5 changes: 5 additions & 0 deletions src/Temporalio/Worker/Tuning/SlotInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,10 @@ public record ActivitySlotInfo(string ActivityType) : SlotInfo();
/// Info about a local activity task slot usage.
/// </summary>
public record LocalActivitySlotInfo(string ActivityType) : SlotInfo();
/// <summary>
/// Info about a Nexus operation task slot usage.
/// </summary>
/// <remarks>WARNING: Nexus support is experimental.</remarks>
public record NexusOperationSlotInfo(string ServiceHandlerType, string OperationName) : SlotInfo();
}
}
6 changes: 6 additions & 0 deletions src/Temporalio/Worker/Tuning/SlotType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,11 @@ public enum SlotType
/// Local activity slot type.
/// </summary>
LocalActivity,

/// <summary>
/// Nexus operation slot type.
/// </summary>
/// <remarks>WARNING: Nexus support is experimental.</remarks>
NexusOperation,
}
}
123 changes: 112 additions & 11 deletions tests/Temporalio.Tests/Worker/WorkerTuningTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using NexusRpc;
using NexusRpc.Handlers;
using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Nexus;
using Temporalio.Worker;
using Temporalio.Worker.Tuning;
using Temporalio.Workflows;
Expand Down Expand Up @@ -138,6 +141,8 @@ private class MySlotSupplier : CustomSlotSupplier

public bool SawActSlotInfo { get; private set; }

public bool SawNexusSlotInfo { get; private set; }

public HashSet<SlotType> SeenReserveTypes { get; } = new();

public HashSet<string> SeenActivityTypes { get; } = new();
Expand All @@ -148,6 +153,10 @@ private class MySlotSupplier : CustomSlotSupplier

public HashSet<bool> SeenReleaseInfoPresence { get; } = new();

public HashSet<string> SeenNexusServiceHandlerTypes { get; } = new();

public HashSet<string> SeenNexusOperationNames { get; } = new();

public override async Task<SlotPermit> ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken)
{
// Do something async to make sure that works
Expand All @@ -168,16 +177,23 @@ public override void MarkSlotUsed(SlotMarkUsedContext ctx)
{
switch (ctx.SlotInfo)
{
case Temporalio.Worker.Tuning.SlotInfo.WorkflowSlotInfo wsi:
case SlotInfo.WorkflowSlotInfo wsi:
SawWFSlotInfo = true;
SeenWorkflowTypes.Add(wsi.WorkflowType);
break;
case Temporalio.Worker.Tuning.SlotInfo.ActivitySlotInfo asi:
case SlotInfo.ActivitySlotInfo asi:
SawActSlotInfo = true;
SeenActivityTypes.Add(asi.ActivityType);
break;
case Temporalio.Worker.Tuning.SlotInfo.LocalActivitySlotInfo lasi:
case SlotInfo.LocalActivitySlotInfo lasi:
break;
case SlotInfo.NexusOperationSlotInfo nosi:
SawNexusSlotInfo = true;
SeenNexusServiceHandlerTypes.Add(nosi.ServiceHandlerType);
SeenNexusOperationNames.Add(nosi.OperationName);
break;
default:
throw new ArgumentOutOfRangeException(nameof(ctx));
}
}
}
Expand Down Expand Up @@ -208,14 +224,13 @@ private void ReserveTracking(SlotReserveContext ctx)
}

[Fact(Timeout = 10000)]
public async Task CanRunWith_CustomSlotSupplier()
public async Task CanRunWith_CustomSlotSupplier_WithoutNexus()
{
var mySlotSupplier = new MySlotSupplier();
using var worker = new TemporalWorker(
Client,
new TemporalWorkerOptions($"tq-{Guid.NewGuid()}")
{
// TODO: change Nexus to custom slot supplier after it's implemented: https://github.com/temporalio/sdk-dotnet/issues/528
Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, new FixedSizeSlotSupplier(10)),
}.AddWorkflow<SimpleWorkflow>().AddActivity(SimpleWorkflow.SomeActivity));
await worker.ExecuteAsync(async () =>
Expand All @@ -227,12 +242,100 @@ await Env.Client.ExecuteWorkflowAsync(
Assert.Equal(mySlotSupplier.ReleaseCount, mySlotSupplier.BiggestReleasedPermit);
Assert.True(mySlotSupplier.SawWFSlotInfo);
Assert.True(mySlotSupplier.SawActSlotInfo);
Assert.Contains("SimpleWorkflow", mySlotSupplier.SeenWorkflowTypes);
Assert.Contains("SomeActivity", mySlotSupplier.SeenActivityTypes);
Assert.False(mySlotSupplier.SawNexusSlotInfo);
Assert.Single(mySlotSupplier.SeenWorkflowTypes);
Assert.Contains(nameof(SimpleWorkflow), mySlotSupplier.SeenWorkflowTypes);
Assert.Single(mySlotSupplier.SeenActivityTypes);
Assert.Contains(nameof(SimpleWorkflow.SomeActivity), mySlotSupplier.SeenActivityTypes);
Assert.Equal(3, mySlotSupplier.SeenReserveTypes.Count);
Assert.Equal(2, mySlotSupplier.SeenReleaseInfoPresence.Count);
}

[NexusService]
public interface ISimpleService
{
[NexusOperation]
string Simple(string param);
}

[NexusServiceHandler(typeof(ISimpleService))]
public class SimpleService
{
[NexusOperationHandler]
public IOperationHandler<string, string> Simple() =>
WorkflowRunOperationHandler.FromHandleFactory<string, string>((context, name) =>
context.StartWorkflowAsync(
(SimpleWorkflow wf) => wf.RunAsync(name),
new() { Id = $"wf-{Guid.NewGuid()}" }));
}

public record class NexusCallingWorkflowInput(string EndpointName, string Name);

[Workflow]
public class NexusCallingWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync(NexusCallingWorkflowInput input)
{
return await Workflow.CreateNexusClient<ISimpleService>(input.EndpointName).
ExecuteNexusOperationAsync(svc => svc.Simple(input.Name));
}
}

[Fact(Timeout = 10000)]
public async Task CanRunWith_CustomSlotSupplier_WithNexus()
{
var mySlotSupplier = new MySlotSupplier();

var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}")
.AddWorkflow<NexusCallingWorkflow>()
.AddWorkflow<SimpleWorkflow>()
.AddActivity(SimpleWorkflow.SomeActivity)
.AddNexusService(new SimpleService());
workerOptions.Tuner = new WorkerTuner(
mySlotSupplier,
mySlotSupplier,
mySlotSupplier,
mySlotSupplier);

using var worker = new TemporalWorker(Client, workerOptions);

var endpointName = $"nexus-endpoint-{workerOptions.TaskQueue}";
var endpoint = await Env.TestEnv.CreateNexusEndpointAsync(
endpointName, workerOptions.TaskQueue!);
try
{
await worker.ExecuteAsync(async () =>
{
var input = new NexusCallingWorkflowInput(endpointName, "Temporal");

await Env.Client.ExecuteWorkflowAsync(
(NexusCallingWorkflow wf) => wf.RunAsync(input),
new(id: $"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
});
}
finally
{
await Env.TestEnv.DeleteNexusEndpointAsync(endpoint);
}

Assert.Equal(mySlotSupplier.ReleaseCount, mySlotSupplier.BiggestReleasedPermit);
Assert.True(mySlotSupplier.SawWFSlotInfo);
Assert.True(mySlotSupplier.SawActSlotInfo);
Assert.True(mySlotSupplier.SawNexusSlotInfo);
Assert.Equal(2, mySlotSupplier.SeenWorkflowTypes.Count);
Assert.Contains(nameof(NexusCallingWorkflow), mySlotSupplier.SeenWorkflowTypes);
Assert.Contains(nameof(SimpleWorkflow), mySlotSupplier.SeenWorkflowTypes);
Assert.Single(mySlotSupplier.SeenActivityTypes);
Assert.Contains(nameof(SimpleWorkflow.SomeActivity), mySlotSupplier.SeenActivityTypes);
Assert.Single(mySlotSupplier.SeenNexusServiceHandlerTypes);
Assert.Contains(nameof(SimpleService), mySlotSupplier.SeenNexusServiceHandlerTypes);
Assert.Single(mySlotSupplier.SeenNexusOperationNames);
Assert.Contains(nameof(ISimpleService.Simple), mySlotSupplier.SeenNexusOperationNames);
Assert.Equal(4, mySlotSupplier.SeenReserveTypes.Count);
Assert.Equal(2, mySlotSupplier.SeenReleaseInfoPresence.Count);
}

private class ThrowingSlotSupplier : CustomSlotSupplier
{
public override Task<SlotPermit> ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken)
Expand Down Expand Up @@ -269,8 +372,7 @@ public async Task CanRunWith_ThrowingSlotSupplier()
Client,
new TemporalWorkerOptions($"tq-{Guid.NewGuid()}")
{
// TODO: change Nexus to custom slot supplier after it's implemented: https://github.com/temporalio/sdk-dotnet/issues/528
Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, new FixedSizeSlotSupplier(10)),
Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, mySlotSupplier),
}.AddWorkflow<OneTaskWf>());
await worker.ExecuteAsync(async () =>
{
Expand Down Expand Up @@ -311,8 +413,7 @@ public async Task CanRunWith_BlockingSlotSupplier()
Client,
new TemporalWorkerOptions($"tq-{Guid.NewGuid()}")
{
// TODO: change Nexus to custom slot supplier after it's implemented: https://github.com/temporalio/sdk-dotnet/issues/528
Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, new FixedSizeSlotSupplier(10)),
Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, mySlotSupplier),
}.AddWorkflow<OneTaskWf>());
await worker.ExecuteAsync(async () =>
{
Expand Down