Skip to content

Commit 9d9e29a

Browse files
committed
feat(rpc): improve disconnect handling and cleaning up RpcMessageSubject correctly
This fixes also dangling Promises when connection closes between creating a RpcMessageSubject and fully handling it. All Promises will now always reject on disconnect/error
1 parent 1cb03cd commit 9d9e29a

File tree

9 files changed

+231
-145
lines changed

9 files changed

+231
-145
lines changed

packages/broker/src/kernel.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ export interface Queue {
6464
export class BrokerConnection extends RpcKernelBaseConnection {
6565
protected subscribedChannels: string[] = [];
6666
protected locks = new Map<number, ProcessLock>();
67-
protected replies = new Map<number, ((message: RpcMessage) => void)>();
6867

6968
constructor(
7069
logger: Logger,
@@ -93,7 +92,7 @@ export class BrokerConnection extends RpcKernelBaseConnection {
9392
const promises: Promise<void>[] = [];
9493

9594
for (const connection of this.connections.connections) {
96-
if (connection === this) continue;
95+
if ((connection as any) === this) continue;
9796
promises.push(connection.sendMessage<brokerEntityFields>(BrokerType.EntityFields, {
9897
name,
9998
fields,

packages/rpc/src/client/action.ts

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ import { EntityState, EntitySubjectStore } from './entity-state.js';
3434
import { assertType, deserializeType, ReflectionKind, Type, TypeObjectLiteral, typeOf } from '@deepkit/type';
3535
import { ProgressTracker, ProgressTrackerState } from '@deepkit/core-rxjs';
3636

37-
interface ResponseActionObservableError extends rpcActionObservableSubscribeId, WrappedV {
38-
}
39-
4037
type ControllerStateActionTypes = {
4138
callSchema: TypeObjectLiteral, //with args, method, and controller as property
4239
resultSchema: TypeObjectLiteral, //with v as property
@@ -56,7 +53,7 @@ export class RpcControllerState {
5653
public peerId?: string;
5754

5855
constructor(
59-
public controller: string
56+
public controller: string,
6057
) {
6158

6259
}
@@ -87,7 +84,11 @@ export class RpcActionClient {
8784
constructor(protected client: WritableClient) {
8885
}
8986

90-
public action<T>(controller: RpcControllerState, method: string, args: any[], options: { timeout?: number, dontWaitForConnection?: true, typeReuseDisabled?: boolean } = {}) {
87+
public action<T>(controller: RpcControllerState, method: string, args: any[], options: {
88+
timeout?: number,
89+
dontWaitForConnection?: true,
90+
typeReuseDisabled?: boolean
91+
} = {}) {
9192
const progress = ClientProgress.getNext();
9293

9394
return asyncOperation<any>(async (resolve, reject) => {
@@ -112,12 +113,12 @@ export class RpcActionClient {
112113
const subject = this.client.sendMessage(RpcTypes.Action, {
113114
controller: controller.controller,
114115
method: method,
115-
args
116+
args,
116117
}, types.callSchema, {
117118
peerId: controller.peerId,
118119
dontWaitForConnection: options.dontWaitForConnection,
119120
timeout: options.timeout,
120-
}).onReply((reply) => {
121+
}).onRejected(reject).onReply((reply) => {
121122
try {
122123
// console.log('client: answer', RpcTypes[reply.type], reply.composite);
123124

@@ -207,7 +208,7 @@ export class RpcActionClient {
207208
unsubscribe: () => {
208209
delete subscribers[id];
209210
subject.send<rpcActionObservableSubscribeId>(RpcTypes.ActionObservableUnsubscribe, { id });
210-
}
211+
},
211212
};
212213
});
213214
(observable as any).disconnect = () => {
@@ -257,7 +258,7 @@ export class RpcActionClient {
257258
//whenever the client changes something, it's synced back to the server.
258259
//this is important to handle the stop signal.
259260
const oldChanged = observableSubject.changed;
260-
observableSubject.changed = function (this: ProgressTracker) {
261+
observableSubject.changed = function(this: ProgressTracker) {
261262
subject.send(RpcTypes.ActionObservableProgressNext, this.value, typeOf<ProgressTrackerState[]>());
262263
return oldChanged.apply(this);
263264
};
@@ -434,12 +435,12 @@ export class RpcActionClient {
434435
const a = this.client.sendMessage<rpcActionType>(RpcTypes.ActionType, {
435436
controller: controller.controller,
436437
method: method,
437-
disableTypeReuse: typeReuseDisabled
438+
disableTypeReuse: typeReuseDisabled,
438439
}, undefined, {
439440
peerId: controller.peerId,
440441
dontWaitForConnection: options.dontWaitForConnection,
441442
timeout: options.timeout,
442-
});
443+
}).onRejected(reject);
443444

444445
const parsed = await a.firstThenClose<rpcResponseActionType>(RpcTypes.ResponseActionType, typeOf<rpcResponseActionType>());
445446

@@ -465,8 +466,8 @@ export class RpcActionClient {
465466
name: 'v',
466467
parent: Object as any,
467468
optional: true,
468-
type: { kind: ReflectionKind.array, type: unwrappedReturnType }
469-
}]
469+
type: { kind: ReflectionKind.array, type: unwrappedReturnType },
470+
}],
470471
};
471472
}
472473

@@ -480,21 +481,21 @@ export class RpcActionClient {
480481
{ kind: ReflectionKind.propertySignature, name: 'controller', type: { kind: ReflectionKind.string } },
481482
{ kind: ReflectionKind.propertySignature, name: 'method', type: { kind: ReflectionKind.string } },
482483
{ kind: ReflectionKind.propertySignature, name: 'args', type: parameters },
483-
]
484+
],
484485
} as TypeObjectLiteral,
485486
resultSchema: {
486487
kind: ReflectionKind.objectLiteral,
487488
types: [
488489
{ kind: ReflectionKind.propertySignature, name: 'v', type: unwrappedReturnType },
489-
]
490+
],
490491
} as TypeObjectLiteral,
491492
observableNextSchema: {
492493
kind: ReflectionKind.objectLiteral,
493494
types: [
494495
{ kind: ReflectionKind.propertySignature, name: 'id', type: { kind: ReflectionKind.number } },
495496
{ kind: ReflectionKind.propertySignature, name: 'v', type: unwrappedReturnType },
496-
]
497-
} as TypeObjectLiteral
497+
],
498+
} as TypeObjectLiteral,
498499
};
499500

500501
resolve(state.types);

packages/rpc/src/client/client-direct.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@ export class RpcDirectClientAdapter implements ClientTransportAdapter {
2424
}
2525

2626
public async connect(connection: TransportClientConnection) {
27+
let closed = false;
2728
const kernelConnection = this.rpcKernel.createConnection({
28-
writeBinary: (buffer) => connection.readBinary(buffer),
29+
writeBinary: (buffer) => {
30+
if (closed) return;
31+
connection.readBinary(buffer);
32+
},
2933
close: () => {
34+
closed = true;
3035
connection.onClose('closed');
3136
},
3237
}, this.injector);
@@ -39,6 +44,7 @@ export class RpcDirectClientAdapter implements ClientTransportAdapter {
3944
return 0;
4045
},
4146
close() {
47+
closed = true;
4248
kernelConnection.close();
4349
},
4450
writeBinary(buffer) {

0 commit comments

Comments
 (0)