Skip to content

Commit 6970c6b

Browse files
authored
feat(multiplexer): use slash to separate rpc id (#3583)
1 parent 1733d40 commit 6970c6b

5 files changed

Lines changed: 122 additions & 129 deletions

File tree

packages/connection/__test__/common/multiplexer.test.ts

Lines changed: 0 additions & 14 deletions
This file was deleted.

packages/connection/__test__/node/index.test.ts

Lines changed: 2 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@ import WebSocket from 'ws';
44

55
import { WSWebSocketConnection } from '@opensumi/ide-connection/src/common/connection';
66
import { SumiConnection } from '@opensumi/ide-connection/src/common/rpc/connection';
7-
import { Deferred, Emitter, Uri } from '@opensumi/ide-core-common';
8-
import { createMockPairRPCProtocol } from '@opensumi/ide-extension/__mocks__/initRPCProtocol';
7+
import { Deferred } from '@opensumi/ide-core-common';
98

10-
import { ProxyIdentifier, RPCService } from '../../src';
9+
import { RPCService } from '../../src';
1110
import { RPCServiceCenter, initRPCService } from '../../src/common';
12-
import { SimpleConnection } from '../../src/common/connection/drivers/simple';
13-
import { SumiConnectionMultiplexer } from '../../src/common/rpc/multiplexer';
1411
import { WSChannel, parse } from '../../src/common/ws-channel';
1512
import { CommonChannelHandler, WebSocketServerRoute, commonChannelPathHandler } from '../../src/node';
1613

@@ -200,84 +197,4 @@ describe('connection', () => {
200197
wss.close();
201198
clientConnection!.close();
202199
});
203-
204-
it('RPCProtocol', async () => {
205-
const { rpcProtocolExt: aProtocol, rpcProtocolMain: bProtocol } = createMockPairRPCProtocol();
206-
207-
const testMainIdentifier = ProxyIdentifier.for('testIendifier');
208-
const mockMainIndetifierMethod = jest.fn();
209-
const mockUriTestFn = jest.fn((uri) => uri);
210-
const mockErrorFn = jest.fn(() => {
211-
throw new Error('custom error');
212-
});
213-
214-
aProtocol.set(testMainIdentifier, {
215-
$test: mockMainIndetifierMethod,
216-
$getUri: mockUriTestFn,
217-
$errorFunction: mockErrorFn,
218-
});
219-
220-
function errorFunction() {
221-
return bProtocol.getProxy(testMainIdentifier).$errorFunction();
222-
}
223-
224-
const testUri = Uri.file('/workspace/README.md');
225-
await bProtocol.getProxy(testMainIdentifier).$test();
226-
await bProtocol.getProxy(testMainIdentifier).$getUri(testUri);
227-
expect(mockMainIndetifierMethod.mock.calls.length).toBe(1);
228-
expect(mockUriTestFn.mock.results[0].value).toBeInstanceOf(Uri);
229-
expect(mockUriTestFn.mock.results[0].value.toString()).toBe(testUri.toString());
230-
await expect(errorFunction()).rejects.toThrow(new Error('custom error'));
231-
});
232-
233-
it('RPCProtocol Timeout', async () => {
234-
const emitterTimeoutA = new Emitter<any>();
235-
const emitterTimeoutB = new Emitter<any>();
236-
const emitterTimeoutC = new Emitter<any>();
237-
238-
const mockClientTA = {
239-
onMessage: emitterTimeoutA.event,
240-
send: (msg) => emitterTimeoutB.fire(msg),
241-
};
242-
const mockClientTB = {
243-
onMessage: emitterTimeoutB.event,
244-
send: (msg) => emitterTimeoutA.fire(msg),
245-
};
246-
const mockClientTC = {
247-
onMessage: emitterTimeoutC.event,
248-
send: (msg) => emitterTimeoutA.fire(msg),
249-
};
250-
251-
const timeoutAProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTA));
252-
const timeoutBProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTB));
253-
const timeoutCProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTC), {
254-
timeout: 1000,
255-
});
256-
257-
const testTimeoutIdentifier = ProxyIdentifier.for('testTimeoutIdentifier');
258-
timeoutAProtocol.set(testTimeoutIdentifier, {
259-
$test: jest.fn(),
260-
});
261-
262-
await expect(timeoutBProtocol.getProxy(testTimeoutIdentifier).$test()).resolves.toBe(undefined);
263-
264-
await expect(timeoutCProtocol.getProxy(testTimeoutIdentifier).$test()).rejects.toThrowErrorMatchingInlineSnapshot(
265-
'"method testTimeoutIdentifier||$test timeout"',
266-
);
267-
});
268-
it('multiplexer rpc id can have slash', async () => {
269-
const { rpcProtocolExt, rpcProtocolMain } = createMockPairRPCProtocol();
270-
271-
const rpcId = '@opensumi/runner';
272-
const method = '$fetchConfigurations';
273-
274-
rpcProtocolMain.set(ProxyIdentifier.for(rpcId), {
275-
[method]: () => 'mock',
276-
});
277-
278-
const runner = rpcProtocolExt.getProxy(ProxyIdentifier.for(rpcId));
279-
280-
const result = await runner.$fetchConfigurations();
281-
expect(result).toBe('mock');
282-
});
283200
});
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { Emitter, Uri } from '@opensumi/ide-core-common';
2+
import { createMockPairRPCProtocol } from '@opensumi/ide-extension/__mocks__/initRPCProtocol';
3+
4+
import { ProxyIdentifier } from '../../src';
5+
import { SimpleConnection } from '../../src/common/connection/drivers/simple';
6+
import { SumiConnectionMultiplexer } from '../../src/common/rpc/multiplexer';
7+
8+
describe('connection', () => {
9+
it('RPCProtocol', async () => {
10+
const { rpcProtocolExt: aProtocol, rpcProtocolMain: bProtocol } = createMockPairRPCProtocol();
11+
12+
const testMainIdentifier = ProxyIdentifier.for('testIendifier');
13+
const mockMainIndetifierMethod = jest.fn();
14+
const mockUriTestFn = jest.fn((uri) => uri);
15+
const mockErrorFn = jest.fn(() => {
16+
throw new Error('custom error');
17+
});
18+
19+
aProtocol.set(testMainIdentifier, {
20+
$test: mockMainIndetifierMethod,
21+
$getUri: mockUriTestFn,
22+
$errorFunction: mockErrorFn,
23+
});
24+
25+
function errorFunction() {
26+
return bProtocol.getProxy(testMainIdentifier).$errorFunction();
27+
}
28+
29+
const testUri = Uri.file('/workspace/README.md');
30+
await bProtocol.getProxy(testMainIdentifier).$test();
31+
await bProtocol.getProxy(testMainIdentifier).$getUri(testUri);
32+
expect(mockMainIndetifierMethod.mock.calls.length).toBe(1);
33+
expect(mockUriTestFn.mock.results[0].value).toBeInstanceOf(Uri);
34+
expect(mockUriTestFn.mock.results[0].value.toString()).toBe(testUri.toString());
35+
await expect(errorFunction()).rejects.toThrow(new Error('custom error'));
36+
});
37+
38+
it('RPCProtocol Timeout', async () => {
39+
const emitterTimeoutA = new Emitter<any>();
40+
const emitterTimeoutB = new Emitter<any>();
41+
const emitterTimeoutC = new Emitter<any>();
42+
43+
const mockClientTA = {
44+
onMessage: emitterTimeoutA.event,
45+
send: (msg) => emitterTimeoutB.fire(msg),
46+
};
47+
const mockClientTB = {
48+
onMessage: emitterTimeoutB.event,
49+
send: (msg) => emitterTimeoutA.fire(msg),
50+
};
51+
const mockClientTC = {
52+
onMessage: emitterTimeoutC.event,
53+
send: (msg) => emitterTimeoutA.fire(msg),
54+
};
55+
56+
const timeoutAProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTA));
57+
const timeoutBProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTB));
58+
const timeoutCProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTC), {
59+
timeout: 1000,
60+
});
61+
62+
const testTimeoutIdentifier = ProxyIdentifier.for('@opensumi/runner');
63+
timeoutAProtocol.set(testTimeoutIdentifier, {
64+
$test: jest.fn(),
65+
});
66+
67+
await expect(timeoutBProtocol.getProxy(testTimeoutIdentifier).$test()).resolves.toBe(undefined);
68+
69+
await expect(timeoutCProtocol.getProxy(testTimeoutIdentifier).$test()).rejects.toThrowErrorMatchingInlineSnapshot(
70+
'"method @opensumi_runner/$test timeout"',
71+
);
72+
});
73+
it('multiplexer rpc id can have slash', async () => {
74+
const { rpcProtocolExt, rpcProtocolMain } = createMockPairRPCProtocol();
75+
76+
const rpcId = '@opensumi/runner';
77+
const method = '$fetchConfigurations';
78+
79+
rpcProtocolMain.set(ProxyIdentifier.for(rpcId), {
80+
[method]: () => 'mock',
81+
});
82+
83+
const runner = rpcProtocolExt.getProxy(ProxyIdentifier.for(rpcId));
84+
85+
const result = await runner.$fetchConfigurations();
86+
expect(result).toBe('mock');
87+
});
88+
});

packages/connection/src/common/rpc/multiplexer.ts

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,66 +24,69 @@ export interface IRPCProtocol {
2424
get<T>(identifier: ProxyIdentifier<T>): T;
2525
}
2626

27-
const SEP = '||';
28-
const SEP_LENGTH = SEP.length;
29-
30-
export function getRPCName(serviceId: string, methodName: string) {
31-
return `${serviceId}${SEP}${methodName}`;
32-
}
33-
34-
export function extractServiceAndMethod(rpcId: string): [string, string] {
35-
const idx = rpcId.indexOf(SEP);
36-
return [rpcId.substring(0, idx), rpcId.substring(idx + SEP_LENGTH)];
37-
}
38-
3927
/**
4028
* A connection multiplexer that allows to register multiple local RPC services and to create proxies for them.
4129
*/
4230
export class SumiConnectionMultiplexer extends SumiConnection implements IRPCProtocol {
43-
private readonly _locals: Map<string, any>;
44-
private readonly _proxies: Map<string, any>;
31+
protected static SEP = '/';
32+
protected static SEP_LENGTH = SumiConnectionMultiplexer.SEP.length;
33+
34+
protected static getRPCName(serviceId: string, methodName: string) {
35+
return `${serviceId}${SumiConnectionMultiplexer.SEP}${methodName}`;
36+
}
37+
38+
protected static extractServiceAndMethod(rpcId: string): [string, string] {
39+
const idx = rpcId.indexOf(SumiConnectionMultiplexer.SEP);
40+
return [rpcId.substring(0, idx), rpcId.substring(idx + SumiConnectionMultiplexer.SEP_LENGTH)];
41+
}
42+
43+
protected static normalizeServiceId(serviceId: string) {
44+
return serviceId.replace(/\//g, '_');
45+
}
46+
47+
protected readonly _locals: Map<string, any>;
48+
protected readonly _proxies: Map<string, any>;
4549

4650
constructor(protected socket: BaseConnection<Uint8Array>, protected options: ISumiConnectionOptions = {}) {
4751
super(socket, options);
4852
this._locals = new Map();
4953
this._proxies = new Map();
5054

51-
this.onRequestNotFound((rpcName: string, args: any[]) => {
52-
const [rpcId, methodName] = extractServiceAndMethod(rpcName);
53-
return this._doInvokeHandler(rpcId, methodName, args);
54-
});
55+
this.onRequestNotFound((rpcName: string, args: any[]) => this._doInvokeHandler(rpcName, args));
5556

5657
// call `listen` implicitly
5758
// compatible behavior with the RPCProtocol
5859
this.listen();
5960
}
6061

6162
public set<T>(identifier: ProxyIdentifier<T>, instance: any) {
62-
this._locals.set(identifier.serviceId, instance);
63+
this._locals.set(SumiConnectionMultiplexer.normalizeServiceId(identifier.serviceId), instance);
6364
return instance;
6465
}
6566

6667
public get<T>(identifier: ProxyIdentifier<T>) {
67-
return this._locals.get(identifier.serviceId);
68+
return this._locals.get(SumiConnectionMultiplexer.normalizeServiceId(identifier.serviceId));
6869
}
6970

7071
public getProxy<T>(proxyId: ProxyIdentifier<T>) {
71-
if (!this._proxies.has(proxyId.serviceId)) {
72-
this._proxies.set(proxyId.serviceId, this._createProxy(proxyId.serviceId));
72+
const serviceId = SumiConnectionMultiplexer.normalizeServiceId(proxyId.serviceId);
73+
74+
if (!this._proxies.has(serviceId)) {
75+
this._proxies.set(serviceId, this._createProxy(serviceId));
7376
}
7477

75-
return this._proxies.get(proxyId.serviceId);
78+
return this._proxies.get(serviceId);
7679
}
7780

78-
private _createProxy(rpcId: string) {
81+
protected _createProxy(rpcId: string) {
7982
const handler = {
8083
get: (target: any, name: string) => {
8184
if (typeof name === 'symbol') {
8285
return null;
8386
}
8487
// charCodeAt(0) === 36 means starts with $
8588
if (!target[name] && name.charCodeAt(0) === 36) {
86-
const rpcName = getRPCName(rpcId, name);
89+
const rpcName = SumiConnectionMultiplexer.getRPCName(rpcId, name);
8790
target[name] = (...args: any[]) => this.sendRequest(rpcName, ...args);
8891
}
8992

@@ -94,7 +97,9 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro
9497
return new Proxy(Object.create(null), handler);
9598
}
9699

97-
private async _doInvokeHandler(rpcId: string, methodName: string, args: any[]): Promise<any> {
100+
protected async _doInvokeHandler(rpcName: string, args: any[]): Promise<any> {
101+
const [rpcId, methodName] = SumiConnectionMultiplexer.extractServiceAndMethod(rpcName);
102+
98103
const actor = this._locals.get(rpcId);
99104
if (!actor) {
100105
throw new Error('Unknown actor ' + rpcId);

packages/extension/src/hosted/ext.host.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,7 @@ export default class ExtensionHostServiceImpl implements IExtensionHostService {
550550
}
551551

552552
this.logger.debug('extension extend service', extension.id, 'service', service);
553-
this.rpcProtocol.set(
554-
{ serviceId: `${EXTENSION_EXTEND_SERVICE_PREFIX}:${extension.id}` } as ProxyIdentifier<any>,
555-
service,
556-
);
553+
this.rpcProtocol.set(new ProxyIdentifier<any>(`${EXTENSION_EXTEND_SERVICE_PREFIX}:${extension.id}`), service);
557554
}
558555

559556
public async $activateExtension(id: string) {

0 commit comments

Comments
 (0)