diff --git a/src/StreamJsonRpc/JsonRpc.cs b/src/StreamJsonRpc/JsonRpc.cs index 189b8811e..12a126203 100644 --- a/src/StreamJsonRpc/JsonRpc.cs +++ b/src/StreamJsonRpc/JsonRpc.cs @@ -83,11 +83,6 @@ public class JsonRpc : IDisposableObservable, IJsonRpcFormatterCallbacks, IJsonR /// private readonly RpcTargetInfo rpcTargetInfo; - /// - /// Carries the value from a when has not been set. - /// - private readonly System.Threading.AsyncLocal joinableTaskTokenWithoutJtf = new(); - /// /// List of remote RPC targets to call if connection should be relayed. /// @@ -122,6 +117,12 @@ public class JsonRpc : IDisposableObservable, IJsonRpcFormatterCallbacks, IJsonR [DebuggerBrowsable(DebuggerBrowsableState.Never)] private JoinableTaskFactory? joinableTaskFactory; + /// + /// Backing field for the property. + /// + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + private JoinableTaskTokenTracker joinableTaskTracker = JoinableTaskTokenTracker.Default; + /// /// Backing field for the property. /// @@ -460,6 +461,33 @@ public JoinableTaskFactory? JoinableTaskFactory } } + /// + /// Gets or sets the to use to correlate tokens. + /// This property is only applicable when is . + /// + /// Defaults to an instance shared with all other instances that do not otherwise set this value explicitly. + /// + /// + /// This property is ignored when is set to a non- value. + /// + /// + /// This property should only be set explicitly when in an advanced scenario where one process has many instances + /// that interact with multiple remote processes such that avoiding correlating tokens across instances + /// is undesirable. + /// + /// + public JoinableTaskTokenTracker JoinableTaskTracker + { + get => this.joinableTaskTracker; + + set + { + Requires.NotNull(value, nameof(value)); + this.ThrowIfConfigurationLocked(); + this.joinableTaskTracker = value; + } + } + /// /// Gets a that completes when this instance is disposed or when listening has stopped /// whether by error, disposal or the stream closing. @@ -1928,7 +1956,7 @@ private JsonRpcError CreateCancellationResponse(JsonRpcRequest request) JsonRpcEventSource.Instance.SendingRequest(request.RequestId.NumberIfPossibleForEvent, request.Method, JsonRpcEventSource.GetArgumentsString(request)); } - string? parentToken = this.JoinableTaskFactory is not null ? this.JoinableTaskFactory.Context.Capture() : this.joinableTaskTokenWithoutJtf.Value; + string? parentToken = this.JoinableTaskFactory is not null ? this.JoinableTaskFactory.Context.Capture() : this.JoinableTaskTracker.Token; if (parentToken is not null) { request.TrySetTopLevelProperty(JoinableTaskTokenHeaderName, parentToken); @@ -2087,7 +2115,7 @@ private async ValueTask DispatchIncomingRequestAsync(JsonRpcRequ request.TryGetTopLevelProperty(JoinableTaskTokenHeaderName, out string? parentToken); if (this.JoinableTaskFactory is null) { - this.joinableTaskTokenWithoutJtf.Value = parentToken; + this.JoinableTaskTracker.Token = parentToken; } if (this.JoinableTaskFactory is null || parentToken is null) @@ -2693,6 +2721,30 @@ private void ThrowIfConfigurationLocked() } } + /// + /// An object that correlates tokens within and between instances + /// within a process that does not use , + /// for purposes of mitigating deadlocks in processes that do use . + /// + public class JoinableTaskTokenTracker + { + /// + /// The default instance to use. + /// + internal static readonly JoinableTaskTokenTracker Default = new JoinableTaskTokenTracker(); + + /// + /// Carries the value from a when has not been set. + /// + private readonly System.Threading.AsyncLocal joinableTaskTokenWithoutJtf = new(); + + internal string? Token + { + get => this.joinableTaskTokenWithoutJtf.Value; + set => this.joinableTaskTokenWithoutJtf.Value = value; + } + } + private class OutstandingCallData { internal OutstandingCallData(object taskCompletionSource, Action completionHandler, Type? expectedResultType) diff --git a/src/StreamJsonRpc/netstandard2.0/PublicAPI.Unshipped.txt b/src/StreamJsonRpc/netstandard2.0/PublicAPI.Unshipped.txt index e69de29bb..fe3ee13c6 100644 --- a/src/StreamJsonRpc/netstandard2.0/PublicAPI.Unshipped.txt +++ b/src/StreamJsonRpc/netstandard2.0/PublicAPI.Unshipped.txt @@ -0,0 +1,4 @@ +StreamJsonRpc.JsonRpc.JoinableTaskTokenTracker +StreamJsonRpc.JsonRpc.JoinableTaskTokenTracker.JoinableTaskTokenTracker() -> void +StreamJsonRpc.JsonRpc.JoinableTaskTracker.get -> StreamJsonRpc.JsonRpc.JoinableTaskTokenTracker! +StreamJsonRpc.JsonRpc.JoinableTaskTracker.set -> void diff --git a/src/StreamJsonRpc/netstandard2.1/PublicAPI.Unshipped.txt b/src/StreamJsonRpc/netstandard2.1/PublicAPI.Unshipped.txt index e69de29bb..fe3ee13c6 100644 --- a/src/StreamJsonRpc/netstandard2.1/PublicAPI.Unshipped.txt +++ b/src/StreamJsonRpc/netstandard2.1/PublicAPI.Unshipped.txt @@ -0,0 +1,4 @@ +StreamJsonRpc.JsonRpc.JoinableTaskTokenTracker +StreamJsonRpc.JsonRpc.JoinableTaskTokenTracker.JoinableTaskTokenTracker() -> void +StreamJsonRpc.JsonRpc.JoinableTaskTracker.get -> StreamJsonRpc.JsonRpc.JoinableTaskTokenTracker! +StreamJsonRpc.JsonRpc.JoinableTaskTracker.set -> void diff --git a/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTests.cs b/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTests.cs index e4fa93256..5aeec1f4c 100644 --- a/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTests.cs +++ b/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTests.cs @@ -125,9 +125,16 @@ public async Task Completion_FaultsOnFatalError() Assert.Same(completion, this.serverRpc.Completion); } - protected override void InitializeFormattersAndHandlers(bool controlledFlushingClient) + protected override void InitializeFormattersAndHandlers( + Stream serverStream, + Stream clientStream, + out IJsonRpcMessageFormatter serverMessageFormatter, + out IJsonRpcMessageFormatter clientMessageFormatter, + out IJsonRpcMessageHandler serverMessageHandler, + out IJsonRpcMessageHandler clientMessageHandler, + bool controlledFlushingClient) { - this.clientMessageFormatter = new JsonMessageFormatter + clientMessageFormatter = new JsonMessageFormatter { JsonSerializer = { @@ -138,7 +145,7 @@ protected override void InitializeFormattersAndHandlers(bool controlledFlushingC }, }, }; - this.serverMessageFormatter = new JsonMessageFormatter + serverMessageFormatter = new JsonMessageFormatter { JsonSerializer = { @@ -150,10 +157,10 @@ protected override void InitializeFormattersAndHandlers(bool controlledFlushingC }, }; - this.serverMessageHandler = new HeaderDelimitedMessageHandler(this.serverStream, this.serverStream, this.serverMessageFormatter); - this.clientMessageHandler = controlledFlushingClient - ? new DelayedFlushingHandler(this.clientStream, this.clientMessageFormatter) - : new HeaderDelimitedMessageHandler(this.clientStream, this.clientStream, this.clientMessageFormatter); + serverMessageHandler = new HeaderDelimitedMessageHandler(serverStream, serverStream, serverMessageFormatter); + clientMessageHandler = controlledFlushingClient + ? new DelayedFlushingHandler(clientStream, clientMessageFormatter) + : new HeaderDelimitedMessageHandler(clientStream, clientStream, clientMessageFormatter); } protected class UnserializableTypeConverter : JsonConverter diff --git a/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTypeHandlingTests.cs b/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTypeHandlingTests.cs index 25c816687..4fb3ecf0d 100644 --- a/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTypeHandlingTests.cs +++ b/test/StreamJsonRpc.Tests/JsonRpcJsonHeadersTypeHandlingTests.cs @@ -3,8 +3,6 @@ using System.Text; using Newtonsoft.Json; -using StreamJsonRpc; -using Xunit.Abstractions; public class JsonRpcJsonHeadersTypeHandlingTests : JsonRpcJsonHeadersTests { @@ -13,13 +11,20 @@ public JsonRpcJsonHeadersTypeHandlingTests(ITestOutputHelper logger) { } - protected override void InitializeFormattersAndHandlers(bool controlledFlushingClient) + protected override void InitializeFormattersAndHandlers( + Stream serverStream, + Stream clientStream, + out IJsonRpcMessageFormatter serverMessageFormatter, + out IJsonRpcMessageFormatter clientMessageFormatter, + out IJsonRpcMessageHandler serverMessageHandler, + out IJsonRpcMessageHandler clientMessageHandler, + bool controlledFlushingClient) { - this.serverMessageFormatter = new JsonMessageFormatter(new UTF8Encoding(encoderShouldEmitUTF8Identifier: false)) + serverMessageFormatter = new JsonMessageFormatter(new UTF8Encoding(encoderShouldEmitUTF8Identifier: false)) { JsonSerializer = { - TypeNameHandling = Newtonsoft.Json.TypeNameHandling.Objects, + TypeNameHandling = TypeNameHandling.Objects, TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple, Converters = { @@ -29,11 +34,11 @@ protected override void InitializeFormattersAndHandlers(bool controlledFlushingC }, }; - this.clientMessageFormatter = new JsonMessageFormatter(new UTF8Encoding(encoderShouldEmitUTF8Identifier: false)) + clientMessageFormatter = new JsonMessageFormatter(new UTF8Encoding(encoderShouldEmitUTF8Identifier: false)) { JsonSerializer = { - TypeNameHandling = Newtonsoft.Json.TypeNameHandling.Objects, + TypeNameHandling = TypeNameHandling.Objects, TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple, Converters = { @@ -43,9 +48,9 @@ protected override void InitializeFormattersAndHandlers(bool controlledFlushingC }, }; - this.serverMessageHandler = new HeaderDelimitedMessageHandler(this.serverStream, this.serverStream, this.serverMessageFormatter); - this.clientMessageHandler = controlledFlushingClient - ? new DelayedFlushingHandler(this.clientStream, this.clientMessageFormatter) - : new HeaderDelimitedMessageHandler(this.clientStream, this.clientStream, this.clientMessageFormatter); + serverMessageHandler = new HeaderDelimitedMessageHandler(serverStream, serverStream, serverMessageFormatter); + clientMessageHandler = controlledFlushingClient + ? new DelayedFlushingHandler(clientStream, clientMessageFormatter) + : new HeaderDelimitedMessageHandler(clientStream, clientStream, clientMessageFormatter); } } diff --git a/test/StreamJsonRpc.Tests/JsonRpcMessagePackLengthTests.cs b/test/StreamJsonRpc.Tests/JsonRpcMessagePackLengthTests.cs index c510e8121..44d97552a 100644 --- a/test/StreamJsonRpc.Tests/JsonRpcMessagePackLengthTests.cs +++ b/test/StreamJsonRpc.Tests/JsonRpcMessagePackLengthTests.cs @@ -383,22 +383,29 @@ public async Task VerboseLoggingDoesNotFailWhenArgsDoNotDeserializePrimitively(b Assert.True(await clientProxy.IsExtensionArgNonNull(new CustomExtensionType())); } - protected override void InitializeFormattersAndHandlers(bool controlledFlushingClient) + protected override void InitializeFormattersAndHandlers( + Stream serverStream, + Stream clientStream, + out IJsonRpcMessageFormatter serverMessageFormatter, + out IJsonRpcMessageFormatter clientMessageFormatter, + out IJsonRpcMessageHandler serverMessageHandler, + out IJsonRpcMessageHandler clientMessageHandler, + bool controlledFlushingClient) { - this.serverMessageFormatter = new MessagePackFormatter(); - this.clientMessageFormatter = new MessagePackFormatter(); + serverMessageFormatter = new MessagePackFormatter(); + clientMessageFormatter = new MessagePackFormatter(); var options = MessagePackFormatter.DefaultUserDataSerializationOptions .WithResolver(CompositeResolver.Create( new IMessagePackFormatter[] { new UnserializableTypeFormatter(), new TypeThrowsWhenDeserializedFormatter(), new CustomExtensionFormatter() }, new IFormatterResolver[] { StandardResolverAllowPrivate.Instance })); - ((MessagePackFormatter)this.serverMessageFormatter).SetMessagePackSerializerOptions(options); - ((MessagePackFormatter)this.clientMessageFormatter).SetMessagePackSerializerOptions(options); + ((MessagePackFormatter)serverMessageFormatter).SetMessagePackSerializerOptions(options); + ((MessagePackFormatter)clientMessageFormatter).SetMessagePackSerializerOptions(options); - this.serverMessageHandler = new LengthHeaderMessageHandler(this.serverStream, this.serverStream, this.serverMessageFormatter); - this.clientMessageHandler = controlledFlushingClient - ? new DelayedFlushingHandler(this.clientStream, this.clientMessageFormatter) - : new LengthHeaderMessageHandler(this.clientStream, this.clientStream, this.clientMessageFormatter); + serverMessageHandler = new LengthHeaderMessageHandler(serverStream, serverStream, serverMessageFormatter); + clientMessageHandler = controlledFlushingClient + ? new DelayedFlushingHandler(clientStream, clientMessageFormatter) + : new LengthHeaderMessageHandler(clientStream, clientStream, clientMessageFormatter); } [MessagePackObject] diff --git a/test/StreamJsonRpc.Tests/JsonRpcSystemTextJsonHeadersTests.cs b/test/StreamJsonRpc.Tests/JsonRpcSystemTextJsonHeadersTests.cs index 6071d9f16..cf8d54768 100644 --- a/test/StreamJsonRpc.Tests/JsonRpcSystemTextJsonHeadersTests.cs +++ b/test/StreamJsonRpc.Tests/JsonRpcSystemTextJsonHeadersTests.cs @@ -25,9 +25,16 @@ public override async Task CanPassExceptionFromServer_ErrorData() Assert.StrictEqual(COR_E_UNAUTHORIZEDACCESS, errorData.HResult); } - protected override void InitializeFormattersAndHandlers(bool controlledFlushingClient) + protected override void InitializeFormattersAndHandlers( + Stream serverStream, + Stream clientStream, + out IJsonRpcMessageFormatter serverMessageFormatter, + out IJsonRpcMessageFormatter clientMessageFormatter, + out IJsonRpcMessageHandler serverMessageHandler, + out IJsonRpcMessageHandler clientMessageHandler, + bool controlledFlushingClient) { - this.clientMessageFormatter = new SystemTextJsonFormatter + clientMessageFormatter = new SystemTextJsonFormatter { JsonSerializerOptions = { @@ -37,7 +44,7 @@ protected override void InitializeFormattersAndHandlers(bool controlledFlushingC }, }, }; - this.serverMessageFormatter = new SystemTextJsonFormatter + serverMessageFormatter = new SystemTextJsonFormatter { JsonSerializerOptions = { @@ -48,10 +55,10 @@ protected override void InitializeFormattersAndHandlers(bool controlledFlushingC }, }; - this.serverMessageHandler = new HeaderDelimitedMessageHandler(this.serverStream, this.serverStream, this.serverMessageFormatter); - this.clientMessageHandler = controlledFlushingClient - ? new DelayedFlushingHandler(this.clientStream, this.clientMessageFormatter) - : new HeaderDelimitedMessageHandler(this.clientStream, this.clientStream, this.clientMessageFormatter); + serverMessageHandler = new HeaderDelimitedMessageHandler(serverStream, serverStream, serverMessageFormatter); + clientMessageHandler = controlledFlushingClient + ? new DelayedFlushingHandler(clientStream, clientMessageFormatter) + : new HeaderDelimitedMessageHandler(clientStream, clientStream, clientMessageFormatter); } protected class DelayedFlushingHandler : HeaderDelimitedMessageHandler, IControlledFlushHandler diff --git a/test/StreamJsonRpc.Tests/JsonRpcTests.cs b/test/StreamJsonRpc.Tests/JsonRpcTests.cs index 2d3c9e125..4a795c5c5 100644 --- a/test/StreamJsonRpc.Tests/JsonRpcTests.cs +++ b/test/StreamJsonRpc.Tests/JsonRpcTests.cs @@ -2851,10 +2851,11 @@ public void JoinableTaskFactory_ThrowsAfterRunning() } /// - /// Asserts that when both client and server are JTF-aware, that no deadlock occurs when the client blocks the main thread that the server needs. + /// Asserts that when both client and server are JTF-aware (and in the same process), + /// that no deadlock occurs when the client blocks the main thread that the server needs. /// [UIFact] - public void JoinableTaskFactory_IntegrationBothSides() + public void JoinableTaskFactory_IntegrationBothSides_IntraProcess() { // Set up a main thread and JoinableTaskContext. JoinableTaskContext jtc = new(); @@ -2877,7 +2878,8 @@ public void JoinableTaskFactory_IntegrationBothSides() /// /// Asserts that when only the client is JTF-aware, that no deadlock occurs when the client blocks the main thread - /// and the server calls back to the client for something that needs the main thread as part of processing the client's request. + /// and the server calls back to the client for something that needs the main thread as part of processing the client's request + /// via the same instance. /// [UIFact] public void JoinableTaskFactory_IntegrationClientSideOnly() @@ -2885,7 +2887,7 @@ public void JoinableTaskFactory_IntegrationClientSideOnly() // Set up a main thread and JoinableTaskContext. JoinableTaskContext jtc = new(); - // Configure the client and server to understand JTF. + // Configure the client (only) to understand JTF. this.clientRpc.AllowModificationWhileListening = true; this.clientRpc.JoinableTaskFactory = jtc.Factory; @@ -2903,6 +2905,110 @@ public void JoinableTaskFactory_IntegrationClientSideOnly() }); } + /// + /// Asserts that when only the client is JTF-aware, that no deadlock occurs when the client blocks the main thread + /// and the server calls "back" to the client for something that needs the main thread as part of processing the client's request + /// using a different connection. + /// + [UIFact] + public void JoinableTaskFactory_IntegrationClientSideOnly_ManyConnections() + { + // Set up a main thread and JoinableTaskContext. + JoinableTaskContext jtc = new(); + + // Configure the client (only) to understand JTF. + this.clientRpc.AllowModificationWhileListening = true; + this.clientRpc.JoinableTaskFactory = jtc.Factory; + + // Set up the alternate JsonRpc connection. + var streams = Nerdbank.FullDuplexStream.CreateStreams(); + this.InitializeFormattersAndHandlers( + streams.Item1, + streams.Item2, + out _, + out _, + out IJsonRpcMessageHandler alternateServerHandler, + out IJsonRpcMessageHandler alternateClientHandler, + controlledFlushingClient: false); + JsonRpc alternateServerRpc = new(alternateServerHandler, this.server); + JsonRpc alternateClientRpc = new(alternateClientHandler) { JoinableTaskFactory = jtc.Factory }; + this.server.AlternateRpc = alternateServerRpc; + + alternateServerRpc.TraceSource = new TraceSource("ALT Server", SourceLevels.Verbose | SourceLevels.ActivityTracing); + alternateClientRpc.TraceSource = new TraceSource("ALT Client", SourceLevels.Verbose | SourceLevels.ActivityTracing); + + alternateServerRpc.TraceSource.Listeners.Add(new XunitTraceListener(this.Logger)); + alternateClientRpc.TraceSource.Listeners.Add(new XunitTraceListener(this.Logger)); + + // Arrange for a method on the simulated client that requires the UI thread, and that the server will call back to. + bool clientCalledBackViaAlternate = false; + const string CallbackMethodName = "ClientNeedsMainThread"; + alternateClientRpc.AddLocalRpcMethod(CallbackMethodName, new Func(async delegate + { + await jtc.Factory.SwitchToMainThreadAsync(); + clientCalledBackViaAlternate = true; + })); + + alternateServerRpc.StartListening(); + alternateClientRpc.StartListening(); + + jtc.Factory.Run(async delegate + { + await this.clientRpc.InvokeWithCancellationAsync(nameof(this.server.CallbackOnAnotherConnection), new object?[] { CallbackMethodName }, this.TimeoutToken).WithCancellation(this.TimeoutToken); + }); + + Assert.True(clientCalledBackViaAlternate); + } + + /// + /// Asserts that when is set to a unique instance, the deadlock avoidance fails. + /// + [UIFact] + public void JoinableTaskFactory_IntegrationClientSideOnly_ManyConnections_UniqueTrackerLeadsToDeadlock() + { + // Set up a main thread and JoinableTaskContext. + JoinableTaskContext jtc = new(); + + // Configure the client (only) to understand JTF. + this.clientRpc.AllowModificationWhileListening = true; + this.clientRpc.JoinableTaskFactory = jtc.Factory; + + // Set up the alternate JsonRpc connection. + var streams = Nerdbank.FullDuplexStream.CreateStreams(); + this.InitializeFormattersAndHandlers( + streams.Item1, + streams.Item2, + out _, + out _, + out IJsonRpcMessageHandler alternateServerHandler, + out IJsonRpcMessageHandler alternateClientHandler, + controlledFlushingClient: false); + JsonRpc alternateServerRpc = new(alternateServerHandler, this.server) { JoinableTaskTracker = new() }; + JsonRpc alternateClientRpc = new(alternateClientHandler) { JoinableTaskFactory = jtc.Factory }; + this.server.AlternateRpc = alternateServerRpc; + + alternateServerRpc.TraceSource = new TraceSource("ALT Server", SourceLevels.Verbose | SourceLevels.ActivityTracing); + alternateClientRpc.TraceSource = new TraceSource("ALT Client", SourceLevels.Verbose | SourceLevels.ActivityTracing); + + alternateServerRpc.TraceSource.Listeners.Add(new XunitTraceListener(this.Logger)); + alternateClientRpc.TraceSource.Listeners.Add(new XunitTraceListener(this.Logger)); + + // Arrange for a method on the simulated client that requires the UI thread, and that the server will call back to. + const string CallbackMethodName = "ClientNeedsMainThread"; + alternateClientRpc.AddLocalRpcMethod(CallbackMethodName, new Func(async delegate + { + await jtc.Factory.SwitchToMainThreadAsync(); + })); + + alternateServerRpc.StartListening(); + alternateClientRpc.StartListening(); + + jtc.Factory.Run(async delegate + { + await Assert.ThrowsAsync(() => this.clientRpc.InvokeWithCancellationAsync(nameof(this.server.CallbackOnAnotherConnection), new object?[] { CallbackMethodName }, this.TimeoutToken).WithCancellation(ExpectedTimeoutToken)); + }); + } + [Fact] public async Task InvokeWithParameterObject_WithRenamingAttributes() { @@ -2964,7 +3070,23 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - protected abstract void InitializeFormattersAndHandlers(bool controlledFlushingClient = false); + protected void InitializeFormattersAndHandlers(bool controlledFlushingClient = false) => this.InitializeFormattersAndHandlers( + this.serverStream, + this.clientStream, + out this.serverMessageFormatter, + out this.clientMessageFormatter, + out this.serverMessageHandler, + out this.clientMessageHandler, + controlledFlushingClient); + + protected abstract void InitializeFormattersAndHandlers( + Stream serverStream, + Stream clientStream, + out IJsonRpcMessageFormatter serverMessageFormatter, + out IJsonRpcMessageFormatter clientMessageFormatter, + out IJsonRpcMessageHandler serverMessageHandler, + out IJsonRpcMessageHandler clientMessageHandler, + bool controlledFlushingClient); protected override Task CheckGCPressureAsync(Func scenario, int maxBytesAllocated = -1, int iterations = 100, int allowedAttempts = 10) { @@ -3153,6 +3275,8 @@ public class Server : BaseClass, IServerDerived internal TraceSource? TraceSource { get; set; } + internal JsonRpc? AlternateRpc { get; set; } + internal JoinableTaskFactory? JoinableTaskFactory { get; set; } public static string ServerMethod(string argument) @@ -3449,6 +3573,21 @@ public async Task Callback(string clientCallbackMethod, CancellationToken cancel } } + public async Task CallbackOnAnotherConnection(string clientCallbackMethod, CancellationToken cancellationToken) + { + try + { + Verify.Operation(this.AlternateRpc is object, $"Set the {nameof(this.AlternateRpc)} field first."); + await this.AlternateRpc.InvokeWithCancellationAsync(clientCallbackMethod, cancellationToken: cancellationToken); + this.ServerMethodCompleted.SetResult(null); + } + catch (Exception ex) + { + this.ServerMethodCompleted.SetException(ex); + throw; + } + } + public async Task AsyncMethodWithCancellation(string arg, CancellationToken cancellationToken) { try