diff --git a/src/Temporalio/Bridge/CustomSlotSupplier.cs b/src/Temporalio/Bridge/CustomSlotSupplier.cs index 00e14a47..78e36242 100644 --- a/src/Temporalio/Bridge/CustomSlotSupplier.cs +++ b/src/Temporalio/Bridge/CustomSlotSupplier.cs @@ -58,6 +58,10 @@ private static SlotInfo SlotInfoFromBridge(Interop.TemporalCoreSlotInfo slotInfo Interop.TemporalCoreSlotInfo_Tag.LocalActivitySlotInfo => new SlotInfo.LocalActivitySlotInfo( ByteArrayRef.ToUtf8(slotInfo.local_activity_slot_info.activity_type)), + Interop.TemporalCoreSlotInfo_Tag.NexusSlotInfo => + new SlotInfo.NexusOperationSlotInfo( + ByteArrayRef.ToUtf8(slotInfo.nexus_slot_info.service), + ByteArrayRef.ToUtf8(slotInfo.nexus_slot_info.operation)), _ => throw new ArgumentOutOfRangeException(nameof(slotInfo)), }; } @@ -70,6 +74,7 @@ private static unsafe SlotReserveContext ReserveCtxFromBridge(Interop.TemporalCo Interop.TemporalCoreSlotKindType.WorkflowSlotKindType => SlotType.Workflow, Interop.TemporalCoreSlotKindType.ActivitySlotKindType => SlotType.Activity, Interop.TemporalCoreSlotKindType.LocalActivitySlotKindType => SlotType.LocalActivity, + Interop.TemporalCoreSlotKindType.NexusSlotKindType => SlotType.NexusOperation, _ => throw new ArgumentOutOfRangeException(nameof(ctx)), }, TaskQueue: ByteArrayRef.ToUtf8((*ctx).task_queue), diff --git a/src/Temporalio/Bridge/OptionsExtensions.cs b/src/Temporalio/Bridge/OptionsExtensions.cs index 2ad8cd19..d7018ca8 100644 --- a/src/Temporalio/Bridge/OptionsExtensions.cs +++ b/src/Temporalio/Bridge/OptionsExtensions.cs @@ -661,12 +661,6 @@ private static Interop.TemporalCoreTunerHolder ToInteropTuner( } } - // TODO: remove when implemented: https://github.com/temporalio/sdk-dotnet/issues/528 - if (tuner.NexusTaskSlotSupplier is Temporalio.Worker.Tuning.CustomSlotSupplier) - { - throw new ArgumentException("Custom Nexus task slot suppliers are unsupported"); - } - return new() { workflow_slot_supplier = diff --git a/src/Temporalio/Worker/Tuning/SlotInfo.cs b/src/Temporalio/Worker/Tuning/SlotInfo.cs index 3f5d4c7d..f4b706a3 100644 --- a/src/Temporalio/Worker/Tuning/SlotInfo.cs +++ b/src/Temporalio/Worker/Tuning/SlotInfo.cs @@ -21,5 +21,10 @@ public record ActivitySlotInfo(string ActivityType) : SlotInfo(); /// Info about a local activity task slot usage. /// public record LocalActivitySlotInfo(string ActivityType) : SlotInfo(); + /// + /// Info about a Nexus operation task slot usage. + /// + /// WARNING: Nexus support is experimental. + public record NexusOperationSlotInfo(string ServiceHandlerType, string OperationName) : SlotInfo(); } } diff --git a/src/Temporalio/Worker/Tuning/SlotType.cs b/src/Temporalio/Worker/Tuning/SlotType.cs index dd5083bd..46adede1 100644 --- a/src/Temporalio/Worker/Tuning/SlotType.cs +++ b/src/Temporalio/Worker/Tuning/SlotType.cs @@ -19,5 +19,11 @@ public enum SlotType /// Local activity slot type. /// LocalActivity, + + /// + /// Nexus operation slot type. + /// + /// WARNING: Nexus support is experimental. + NexusOperation, } } diff --git a/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs b/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs index de9af261..c2deaf7c 100644 --- a/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs @@ -1,5 +1,8 @@ +using NexusRpc; +using NexusRpc.Handlers; using Temporalio.Activities; using Temporalio.Client; +using Temporalio.Nexus; using Temporalio.Worker; using Temporalio.Worker.Tuning; using Temporalio.Workflows; @@ -138,6 +141,8 @@ private class MySlotSupplier : CustomSlotSupplier public bool SawActSlotInfo { get; private set; } + public bool SawNexusSlotInfo { get; private set; } + public HashSet SeenReserveTypes { get; } = new(); public HashSet SeenActivityTypes { get; } = new(); @@ -148,6 +153,10 @@ private class MySlotSupplier : CustomSlotSupplier public HashSet SeenReleaseInfoPresence { get; } = new(); + public HashSet SeenNexusServiceHandlerTypes { get; } = new(); + + public HashSet SeenNexusOperationNames { get; } = new(); + public override async Task ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken) { // Do something async to make sure that works @@ -168,16 +177,23 @@ public override void MarkSlotUsed(SlotMarkUsedContext ctx) { switch (ctx.SlotInfo) { - case Temporalio.Worker.Tuning.SlotInfo.WorkflowSlotInfo wsi: + case SlotInfo.WorkflowSlotInfo wsi: SawWFSlotInfo = true; SeenWorkflowTypes.Add(wsi.WorkflowType); break; - case Temporalio.Worker.Tuning.SlotInfo.ActivitySlotInfo asi: + case SlotInfo.ActivitySlotInfo asi: SawActSlotInfo = true; SeenActivityTypes.Add(asi.ActivityType); break; - case Temporalio.Worker.Tuning.SlotInfo.LocalActivitySlotInfo lasi: + case SlotInfo.LocalActivitySlotInfo lasi: break; + case SlotInfo.NexusOperationSlotInfo nosi: + SawNexusSlotInfo = true; + SeenNexusServiceHandlerTypes.Add(nosi.ServiceHandlerType); + SeenNexusOperationNames.Add(nosi.OperationName); + break; + default: + throw new ArgumentOutOfRangeException(nameof(ctx)); } } } @@ -208,14 +224,13 @@ private void ReserveTracking(SlotReserveContext ctx) } [Fact(Timeout = 10000)] - public async Task CanRunWith_CustomSlotSupplier() + public async Task CanRunWith_CustomSlotSupplier_WithoutNexus() { var mySlotSupplier = new MySlotSupplier(); using var worker = new TemporalWorker( Client, new TemporalWorkerOptions($"tq-{Guid.NewGuid()}") { - // TODO: change Nexus to custom slot supplier after it's implemented: https://github.com/temporalio/sdk-dotnet/issues/528 Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, new FixedSizeSlotSupplier(10)), }.AddWorkflow().AddActivity(SimpleWorkflow.SomeActivity)); await worker.ExecuteAsync(async () => @@ -227,12 +242,100 @@ await Env.Client.ExecuteWorkflowAsync( Assert.Equal(mySlotSupplier.ReleaseCount, mySlotSupplier.BiggestReleasedPermit); Assert.True(mySlotSupplier.SawWFSlotInfo); Assert.True(mySlotSupplier.SawActSlotInfo); - Assert.Contains("SimpleWorkflow", mySlotSupplier.SeenWorkflowTypes); - Assert.Contains("SomeActivity", mySlotSupplier.SeenActivityTypes); + Assert.False(mySlotSupplier.SawNexusSlotInfo); + Assert.Single(mySlotSupplier.SeenWorkflowTypes); + Assert.Contains(nameof(SimpleWorkflow), mySlotSupplier.SeenWorkflowTypes); + Assert.Single(mySlotSupplier.SeenActivityTypes); + Assert.Contains(nameof(SimpleWorkflow.SomeActivity), mySlotSupplier.SeenActivityTypes); Assert.Equal(3, mySlotSupplier.SeenReserveTypes.Count); Assert.Equal(2, mySlotSupplier.SeenReleaseInfoPresence.Count); } + [NexusService] + public interface ISimpleService + { + [NexusOperation] + string Simple(string param); + } + + [NexusServiceHandler(typeof(ISimpleService))] + public class SimpleService + { + [NexusOperationHandler] + public IOperationHandler Simple() => + WorkflowRunOperationHandler.FromHandleFactory((context, name) => + context.StartWorkflowAsync( + (SimpleWorkflow wf) => wf.RunAsync(name), + new() { Id = $"wf-{Guid.NewGuid()}" })); + } + + public record class NexusCallingWorkflowInput(string EndpointName, string Name); + + [Workflow] + public class NexusCallingWorkflow + { + [WorkflowRun] + public async Task RunAsync(NexusCallingWorkflowInput input) + { + return await Workflow.CreateNexusClient(input.EndpointName). + ExecuteNexusOperationAsync(svc => svc.Simple(input.Name)); + } + } + + [Fact(Timeout = 10000)] + public async Task CanRunWith_CustomSlotSupplier_WithNexus() + { + var mySlotSupplier = new MySlotSupplier(); + + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}") + .AddWorkflow() + .AddWorkflow() + .AddActivity(SimpleWorkflow.SomeActivity) + .AddNexusService(new SimpleService()); + workerOptions.Tuner = new WorkerTuner( + mySlotSupplier, + mySlotSupplier, + mySlotSupplier, + mySlotSupplier); + + using var worker = new TemporalWorker(Client, workerOptions); + + var endpointName = $"nexus-endpoint-{workerOptions.TaskQueue}"; + var endpoint = await Env.TestEnv.CreateNexusEndpointAsync( + endpointName, workerOptions.TaskQueue!); + try + { + await worker.ExecuteAsync(async () => + { + var input = new NexusCallingWorkflowInput(endpointName, "Temporal"); + + await Env.Client.ExecuteWorkflowAsync( + (NexusCallingWorkflow wf) => wf.RunAsync(input), + new(id: $"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + }); + } + finally + { + await Env.TestEnv.DeleteNexusEndpointAsync(endpoint); + } + + Assert.Equal(mySlotSupplier.ReleaseCount, mySlotSupplier.BiggestReleasedPermit); + Assert.True(mySlotSupplier.SawWFSlotInfo); + Assert.True(mySlotSupplier.SawActSlotInfo); + Assert.True(mySlotSupplier.SawNexusSlotInfo); + Assert.Equal(2, mySlotSupplier.SeenWorkflowTypes.Count); + Assert.Contains(nameof(NexusCallingWorkflow), mySlotSupplier.SeenWorkflowTypes); + Assert.Contains(nameof(SimpleWorkflow), mySlotSupplier.SeenWorkflowTypes); + Assert.Single(mySlotSupplier.SeenActivityTypes); + Assert.Contains(nameof(SimpleWorkflow.SomeActivity), mySlotSupplier.SeenActivityTypes); + Assert.Single(mySlotSupplier.SeenNexusServiceHandlerTypes); + Assert.Contains(nameof(SimpleService), mySlotSupplier.SeenNexusServiceHandlerTypes); + Assert.Single(mySlotSupplier.SeenNexusOperationNames); + Assert.Contains(nameof(ISimpleService.Simple), mySlotSupplier.SeenNexusOperationNames); + Assert.Equal(4, mySlotSupplier.SeenReserveTypes.Count); + Assert.Equal(2, mySlotSupplier.SeenReleaseInfoPresence.Count); + } + private class ThrowingSlotSupplier : CustomSlotSupplier { public override Task ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken) @@ -269,8 +372,7 @@ public async Task CanRunWith_ThrowingSlotSupplier() Client, new TemporalWorkerOptions($"tq-{Guid.NewGuid()}") { - // TODO: change Nexus to custom slot supplier after it's implemented: https://github.com/temporalio/sdk-dotnet/issues/528 - Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, new FixedSizeSlotSupplier(10)), + Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, mySlotSupplier), }.AddWorkflow()); await worker.ExecuteAsync(async () => { @@ -311,8 +413,7 @@ public async Task CanRunWith_BlockingSlotSupplier() Client, new TemporalWorkerOptions($"tq-{Guid.NewGuid()}") { - // TODO: change Nexus to custom slot supplier after it's implemented: https://github.com/temporalio/sdk-dotnet/issues/528 - Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, new FixedSizeSlotSupplier(10)), + Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier, mySlotSupplier), }.AddWorkflow()); await worker.ExecuteAsync(async () => {