Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion doc/rpc_marshalable_objects.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,45 @@ StreamJsonRpc allows transmitting marshalable objects (i.e., objects implementin

Marshalable interfaces must:

1. Extend `IDisposable`.
1. Extend `IDisposable` (unless interface is call-scoped).
1. Not include any properties.
1. Not include any events.

The object that implements a marshalable interface may include properties and events as well as other additional members but only the methods defined by the marshalable interface will be available on the proxy, and the data will not be serialized.

The `RpcMarshalableAttribute` must be applied directly to the interface used as the return type, parameter type, or member type within a return type or parameter type's object graph.
The attribute is not inherited.
In fact different interfaces in a type hierarchy can have this attribute applied with distinct settings, and only the settings on the attribute applied directly to the interface used will apply.

## Call-scoped vs. explicitly scoped

### Explicit lifetime

An RPC marshalable interface has an explicit lifetime by default.
This means that the receiver of a marshaled object owns its lifetime, which may extend beyond an individual RPC call.
Memory for the marshaled object and its proxy are not released until the receiver either disposes of its proxy or the JSON-RPC connection is closed.

### Call-scoped lifetime

A call-scoped interface produces a proxy that is valid only during the RPC call that delivered it.
It may only be used as part of a method request as or within an argument.
Using it as or within a return value or exception will result in an error.

This is the preferred model when an interface is expected to only be used within request arguments because it mitigates the risk of a memory leak due to the receiver failing to dispose of the proxy.
This model also allows the sender to retain control over the lifetime of the marshaled object.

Special allowance is made for `IAsyncEnumerable<T>`-returning RPC methods so that the lifetime of the marshaled object is extended to the lifetime of the enumeration.
An `IAsyncEnumerable<T>` in an exception thrown from the method will *not* have access to the call-scoped marshaled object because exceptions thrown by the server always cause termination of objects marshaled by the request.

Opt into call-scoped lifetimes by setting the `CallScopedLifetime` property to `true` on the attribute applied to the interface:

```css
[RpcMarshalable(CallScopedLifetime = true)]
```

It is not possible to customize the lifetime of an RPC marshaled object except on its own interface.
For example, applying this attribute to the parameter that uses the interface is not allowed.

## Use cases

In all cases, the special handling of a marshalable object only occurs if the container of that value is typed as the corresponding marshalable interface.
Expand Down Expand Up @@ -104,6 +137,8 @@ class RpcServer : IRpcServer
}
```

Call-scoped marshalable interfaces may not be used as a return type or member of its object graph.

### Method argument

In this use case the RPC *client* provides the marshalable object to the server:
Expand All @@ -119,6 +154,8 @@ var counter = new Counter();
await client.ProvideCounterAsync(counter);
```

Call-scoped marshalable interfaces may only appear as a method parameter or a part of its object graph.

### Value within a single argument's object graph

In this use case the RPC client again provides the marshalable object to the server,
Expand All @@ -144,6 +181,7 @@ await client.ProvideClassAsync(arg);
```

⚠️ While this use case is supported, be very wary of this pattern because it becomes less obvious to the receiver that an `IDisposable` value is tucked into the object tree of an argument somewhere that *must* be disposed to avoid a resource leak.
This risk can be mitigated by using call-scoped marshalable interfaces.

### As an argument without a proxy for an RPC interface

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Microsoft;
namespace StreamJsonRpc;

internal class DisposableAction : IDisposableObservable
{
private readonly Action? disposeAction;
private static readonly Action EmptyAction = () => { };
private Action? disposeAction;

internal DisposableAction(Action? disposeAction)
{
this.disposeAction = disposeAction;
this.disposeAction = disposeAction ?? EmptyAction;
}

public bool IsDisposed { get; private set; }
public bool IsDisposed => this.disposeAction is null;

public void Dispose()
{
this.IsDisposed = true;
this.disposeAction?.Invoke();
Interlocked.Exchange(ref this.disposeAction, null)?.Invoke();
}
}
4 changes: 2 additions & 2 deletions src/StreamJsonRpc/FormatterBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ JsonRpc IJsonRpcInstanceContainer.Rpc
this.rpc = value;

this.formatterProgressTracker = new MessageFormatterProgressTracker(value, this);
this.enumerableTracker = new MessageFormatterEnumerableTracker(value, this);
this.duplexPipeTracker = new MessageFormatterDuplexPipeTracker(value, this) { MultiplexingStream = this.MultiplexingStream };
this.rpcMarshaledContextTracker = new MessageFormatterRpcMarshaledContextTracker(value, this);
this.enumerableTracker = new MessageFormatterEnumerableTracker(value, this, this.rpcMarshaledContextTracker);
this.duplexPipeTracker = new MessageFormatterDuplexPipeTracker(value, this) { MultiplexingStream = this.MultiplexingStream };
}
}
}
Expand Down
87 changes: 58 additions & 29 deletions src/StreamJsonRpc/JsonMessageFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public JToken Serialize(JsonRpcMessage message)
Protocol.JsonRpcError IJsonRpcMessageFactory.CreateErrorMessage() => new JsonRpcError(this.JsonSerializer);

/// <inheritdoc/>
Protocol.JsonRpcResult IJsonRpcMessageFactory.CreateResultMessage() => new JsonRpcResult(this.JsonSerializer);
Protocol.JsonRpcResult IJsonRpcMessageFactory.CreateResultMessage() => new JsonRpcResult(this);

/// <inheritdoc />
protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -570,9 +570,9 @@ private JTokenWriter CreateJTokenWriter()

private bool TryGetMarshaledJsonConverter(Type type, [NotNullWhen(true)] out RpcMarshalableConverter? converter)
{
if (MessageFormatterRpcMarshaledContextTracker.TryGetMarshalOptionsForType(type, out JsonRpcProxyOptions? proxyOptions, out JsonRpcTargetOptions? targetOptions))
if (MessageFormatterRpcMarshaledContextTracker.TryGetMarshalOptionsForType(type, out JsonRpcProxyOptions? proxyOptions, out JsonRpcTargetOptions? targetOptions, out RpcMarshalableAttribute? rpcMarshalableAttribute))
{
converter = new RpcMarshalableConverter(type, this, proxyOptions, targetOptions);
converter = new RpcMarshalableConverter(type, this, proxyOptions, targetOptions, rpcMarshalableAttribute);
return true;
}

Expand Down Expand Up @@ -616,7 +616,7 @@ private JsonRpcResult ReadResult(JToken json)
RequestId id = this.ExtractRequestId(json);
JToken? result = json["result"];

return new JsonRpcResult(this.JsonSerializer)
return new JsonRpcResult(this)
{
RequestId = id,
Result = result,
Expand Down Expand Up @@ -837,15 +837,27 @@ public override bool TryGetArgumentByNameOrIndex(string? name, int position, Typ
[DataContract]
private class JsonRpcResult : JsonRpcResultBase
{
private readonly JsonSerializer jsonSerializer;
private readonly JsonMessageFormatter formatter;
private bool resultDeserialized;
private JsonSerializationException? resultDeserializationException;

internal JsonRpcResult(JsonSerializer jsonSerializer)
internal JsonRpcResult(JsonMessageFormatter formatter)
{
this.jsonSerializer = jsonSerializer ?? throw new ArgumentNullException(nameof(jsonSerializer));
this.formatter = formatter;
}

public override T GetResult<T>()
{
if (this.resultDeserializationException is not null)
{
ExceptionDispatchInfo.Capture(this.resultDeserializationException).Throw();
}

if (this.resultDeserialized)
{
return (T)this.Result!;
}

Verify.Operation(this.Result is not null, "This instance hasn't been initialized with a result yet.");
var result = (JToken)this.Result;
if (result.Type == JTokenType.Null)
Expand All @@ -856,15 +868,45 @@ public override T GetResult<T>()

try
{
return result.ToObject<T>(this.jsonSerializer)!;
using (this.formatter.TrackDeserialization(this))
{
return result.ToObject<T>(this.formatter.JsonSerializer)!;
}
}
catch (Exception exception)
{
throw new JsonSerializationException(string.Format(CultureInfo.CurrentCulture, Resources.FailureDeserializingRpcResult, typeof(T).Name, exception.GetType().Name, exception.Message), exception);
}
}

protected override TopLevelPropertyBagBase? CreateTopLevelPropertyBag() => new TopLevelPropertyBag(this.jsonSerializer);
protected internal override void SetExpectedResultType(Type resultType)
{
Verify.Operation(this.Result is not null, "This instance hasn't been initialized with a result yet.");
Verify.Operation(!this.resultDeserialized, "Result is no longer available or has already been deserialized.");

var result = (JToken)this.Result;
if (result.Type == JTokenType.Null)
{
Verify.Operation(!resultType.GetTypeInfo().IsValueType || Nullable.GetUnderlyingType(resultType) is not null, "null result is not assignable to a value type.");
return;
}

try
{
using (this.formatter.TrackDeserialization(this))
{
this.Result = result.ToObject(resultType, this.formatter.JsonSerializer)!;
this.resultDeserialized = true;
}
}
catch (Exception exception)
{
// This was a best effort anyway. We'll throw again later at a more convenient time for JsonRpc.
this.resultDeserializationException = new JsonSerializationException(string.Format(CultureInfo.CurrentCulture, Resources.FailureDeserializingRpcResult, resultType.Name, exception.GetType().Name, exception.Message), exception);
}
}

protected override TopLevelPropertyBagBase? CreateTopLevelPropertyBag() => new TopLevelPropertyBag(this.formatter.JsonSerializer);
}

private class JsonRpcError : JsonRpcErrorBase
Expand Down Expand Up @@ -1207,29 +1249,16 @@ public override void WriteJson(JsonWriter writer, Stream? value, JsonSerializer
}

[DebuggerDisplay("{" + nameof(DebuggerDisplay) + "}")]
private class RpcMarshalableConverter : JsonConverter
private class RpcMarshalableConverter(Type interfaceType, JsonMessageFormatter jsonMessageFormatter, JsonRpcProxyOptions proxyOptions, JsonRpcTargetOptions targetOptions, RpcMarshalableAttribute rpcMarshalableAttribute) : JsonConverter
{
private readonly Type interfaceType;
private readonly JsonMessageFormatter jsonMessageFormatter;
private readonly JsonRpcProxyOptions proxyOptions;
private readonly JsonRpcTargetOptions targetOptions;

public RpcMarshalableConverter(Type interfaceType, JsonMessageFormatter jsonMessageFormatter, JsonRpcProxyOptions proxyOptions, JsonRpcTargetOptions targetOptions)
{
this.interfaceType = interfaceType;
this.jsonMessageFormatter = jsonMessageFormatter;
this.proxyOptions = proxyOptions;
this.targetOptions = targetOptions;
}

private string DebuggerDisplay => $"Converter for marshalable objects of type {this.interfaceType.FullName}";
private string DebuggerDisplay => $"Converter for marshalable objects of type {interfaceType.FullName}";

public override bool CanConvert(Type objectType) => objectType == this.interfaceType;
public override bool CanConvert(Type objectType) => objectType == interfaceType;

public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer)
{
var token = (MessageFormatterRpcMarshaledContextTracker.MarshalToken?)JToken.Load(reader).ToObject(typeof(MessageFormatterRpcMarshaledContextTracker.MarshalToken), serializer);
return this.jsonMessageFormatter.RpcMarshaledContextTracker.GetObject(objectType, token, this.proxyOptions);
return jsonMessageFormatter.RpcMarshaledContextTracker.GetObject(objectType, token, proxyOptions);
}

public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer)
Expand All @@ -1238,13 +1267,13 @@ public override void WriteJson(JsonWriter writer, object? value, JsonSerializer
{
writer.WriteNull();
}
else if (!this.interfaceType.IsAssignableFrom(value.GetType()))
else if (!interfaceType.IsAssignableFrom(value.GetType()))
{
throw new InvalidOperationException($"Type {value.GetType().FullName} doesn't implement {this.interfaceType.FullName}");
throw new InvalidOperationException($"Type {value.GetType().FullName} doesn't implement {interfaceType.FullName}");
}
else
{
MessageFormatterRpcMarshaledContextTracker.MarshalToken token = this.jsonMessageFormatter.RpcMarshaledContextTracker.GetToken(value, this.targetOptions, this.interfaceType);
MessageFormatterRpcMarshaledContextTracker.MarshalToken token = jsonMessageFormatter.RpcMarshaledContextTracker.GetToken(value, targetOptions, interfaceType, rpcMarshalableAttribute);
serializer.Serialize(writer, token);
}
}
Expand Down
82 changes: 46 additions & 36 deletions src/StreamJsonRpc/JsonRpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2569,56 +2569,66 @@ private async Task HandleRpcAsync(JsonRpcMessage rpc)
}
else if (rpc is IJsonRpcMessageWithId resultOrError)
{
this.OnResponseReceived(rpc);
JsonRpcResult? result = resultOrError as JsonRpcResult;
JsonRpcError? error = resultOrError as JsonRpcError;

lock (this.dispatcherMapLock)
try
{
if (this.resultDispatcherMap.TryGetValue(resultOrError.RequestId, out data))
{
this.resultDispatcherMap.Remove(resultOrError.RequestId);
}
}
JsonRpcResult? result = resultOrError as JsonRpcResult;
JsonRpcError? error = resultOrError as JsonRpcError;

if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
if (result is not null)
{
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEvents.ReceivedResult, "Received result for request \"{0}\".", result.RequestId);
}
else if (error?.Error is object)
lock (this.dispatcherMapLock)
{
this.TraceSource.TraceEvent(TraceEventType.Warning, (int)TraceEvents.ReceivedError, "Received error response for request {0}: {1} \"{2}\": ", error.RequestId, error.Error.Code, error.Error.Message);
if (this.resultDispatcherMap.TryGetValue(resultOrError.RequestId, out data))
{
this.resultDispatcherMap.Remove(resultOrError.RequestId);
}
}
}

if (data is object)
{
if (data.ExpectedResultType is not null && rpc is JsonRpcResult resultMessage)
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
resultMessage.SetExpectedResultType(data.ExpectedResultType);
if (result is not null)
{
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEvents.ReceivedResult, "Received result for request \"{0}\".", result.RequestId);
}
else if (error?.Error is object)
{
this.TraceSource.TraceEvent(TraceEventType.Warning, (int)TraceEvents.ReceivedError, "Received error response for request {0}: {1} \"{2}\": ", error.RequestId, error.Error.Code, error.Error.Message);
}
}
else if (rpc is JsonRpcError errorMessage && errorMessage.Error is not null)

if (data is object)
{
Type? errorType = this.GetErrorDetailsDataType(errorMessage);
if (errorType is not null)
if (data.ExpectedResultType is not null && rpc is JsonRpcResult resultMessage)
{
errorMessage.Error.SetExpectedDataType(errorType);
resultMessage.SetExpectedResultType(data.ExpectedResultType);
}
else if (rpc is JsonRpcError errorMessage && errorMessage.Error is not null)
{
Type? errorType = this.GetErrorDetailsDataType(errorMessage);
if (errorType is not null)
{
errorMessage.Error.SetExpectedDataType(errorType);
}
}

this.OnResponseReceived(rpc);

// Complete the caller's request with the response asynchronously so it doesn't delay handling of other JsonRpc messages.
await TaskScheduler.Default.SwitchTo(alwaysYield: true);
data.CompletionHandler(rpc);
data = null; // avoid invoking again if we throw later
}
else
{
this.OnResponseReceived(rpc);

// Complete the caller's request with the response asynchronously so it doesn't delay handling of other JsonRpc messages.
await TaskScheduler.Default.SwitchTo(alwaysYield: true);
data.CompletionHandler(rpc);
data = null; // avoid invoking again if we throw later
// Unexpected "response" to no request we have a record of. Raise disconnected event.
this.OnJsonRpcDisconnected(new JsonRpcDisconnectedEventArgs(
Resources.UnexpectedResponseWithNoMatchingRequest,
DisconnectedReason.RemoteProtocolViolation));
}
}
else
catch
{
// Unexpected "response" to no request we have a record of. Raise disconnected event.
this.OnJsonRpcDisconnected(new JsonRpcDisconnectedEventArgs(
Resources.UnexpectedResponseWithNoMatchingRequest,
DisconnectedReason.RemoteProtocolViolation));
this.OnResponseReceived(rpc);
}
}
else
Expand Down
Loading