diff --git a/packages/connection/__test__/common/multiplexer.test.ts b/packages/connection/__test__/common/multiplexer.test.ts deleted file mode 100644 index fc12a789a4..0000000000 --- a/packages/connection/__test__/common/multiplexer.test.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { extractServiceAndMethod, getRPCName } from '@opensumi/ide-connection/lib/common/rpc/multiplexer'; - -describe('Multiplexer', () => { - it('can construct rpc name', () => { - const rpcId = '@opensumi/runner'; - const method = '$fetch'; - - const name = getRPCName(rpcId, method); - const [serviceId, methodName] = extractServiceAndMethod(name); - - expect(serviceId).toBe(rpcId); - expect(methodName).toBe(method); - }); -}); diff --git a/packages/connection/__test__/node/index.test.ts b/packages/connection/__test__/node/index.test.ts index 511cc75477..f9eb94d6c1 100644 --- a/packages/connection/__test__/node/index.test.ts +++ b/packages/connection/__test__/node/index.test.ts @@ -4,13 +4,10 @@ import WebSocket from 'ws'; import { WSWebSocketConnection } from '@opensumi/ide-connection/src/common/connection'; import { SumiConnection } from '@opensumi/ide-connection/src/common/rpc/connection'; -import { Deferred, Emitter, Uri } from '@opensumi/ide-core-common'; -import { createMockPairRPCProtocol } from '@opensumi/ide-extension/__mocks__/initRPCProtocol'; +import { Deferred } from '@opensumi/ide-core-common'; -import { ProxyIdentifier, RPCService } from '../../src'; +import { RPCService } from '../../src'; import { RPCServiceCenter, initRPCService } from '../../src/common'; -import { SimpleConnection } from '../../src/common/connection/drivers/simple'; -import { SumiConnectionMultiplexer } from '../../src/common/rpc/multiplexer'; import { WSChannel, parse } from '../../src/common/ws-channel'; import { CommonChannelHandler, WebSocketServerRoute, commonChannelPathHandler } from '../../src/node'; @@ -200,84 +197,4 @@ describe('connection', () => { wss.close(); clientConnection!.close(); }); - - it('RPCProtocol', async () => { - const { rpcProtocolExt: aProtocol, rpcProtocolMain: bProtocol } = createMockPairRPCProtocol(); - - const testMainIdentifier = ProxyIdentifier.for('testIendifier'); - const mockMainIndetifierMethod = jest.fn(); - const mockUriTestFn = jest.fn((uri) => uri); - const mockErrorFn = jest.fn(() => { - throw new Error('custom error'); - }); - - aProtocol.set(testMainIdentifier, { - $test: mockMainIndetifierMethod, - $getUri: mockUriTestFn, - $errorFunction: mockErrorFn, - }); - - function errorFunction() { - return bProtocol.getProxy(testMainIdentifier).$errorFunction(); - } - - const testUri = Uri.file('/workspace/README.md'); - await bProtocol.getProxy(testMainIdentifier).$test(); - await bProtocol.getProxy(testMainIdentifier).$getUri(testUri); - expect(mockMainIndetifierMethod.mock.calls.length).toBe(1); - expect(mockUriTestFn.mock.results[0].value).toBeInstanceOf(Uri); - expect(mockUriTestFn.mock.results[0].value.toString()).toBe(testUri.toString()); - await expect(errorFunction()).rejects.toThrow(new Error('custom error')); - }); - - it('RPCProtocol Timeout', async () => { - const emitterTimeoutA = new Emitter(); - const emitterTimeoutB = new Emitter(); - const emitterTimeoutC = new Emitter(); - - const mockClientTA = { - onMessage: emitterTimeoutA.event, - send: (msg) => emitterTimeoutB.fire(msg), - }; - const mockClientTB = { - onMessage: emitterTimeoutB.event, - send: (msg) => emitterTimeoutA.fire(msg), - }; - const mockClientTC = { - onMessage: emitterTimeoutC.event, - send: (msg) => emitterTimeoutA.fire(msg), - }; - - const timeoutAProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTA)); - const timeoutBProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTB)); - const timeoutCProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTC), { - timeout: 1000, - }); - - const testTimeoutIdentifier = ProxyIdentifier.for('testTimeoutIdentifier'); - timeoutAProtocol.set(testTimeoutIdentifier, { - $test: jest.fn(), - }); - - await expect(timeoutBProtocol.getProxy(testTimeoutIdentifier).$test()).resolves.toBe(undefined); - - await expect(timeoutCProtocol.getProxy(testTimeoutIdentifier).$test()).rejects.toThrowErrorMatchingInlineSnapshot( - '"method testTimeoutIdentifier||$test timeout"', - ); - }); - it('multiplexer rpc id can have slash', async () => { - const { rpcProtocolExt, rpcProtocolMain } = createMockPairRPCProtocol(); - - const rpcId = '@opensumi/runner'; - const method = '$fetchConfigurations'; - - rpcProtocolMain.set(ProxyIdentifier.for(rpcId), { - [method]: () => 'mock', - }); - - const runner = rpcProtocolExt.getProxy(ProxyIdentifier.for(rpcId)); - - const result = await runner.$fetchConfigurations(); - expect(result).toBe('mock'); - }); }); diff --git a/packages/connection/__test__/node/multiplexer.test.ts b/packages/connection/__test__/node/multiplexer.test.ts new file mode 100644 index 0000000000..327b83ea72 --- /dev/null +++ b/packages/connection/__test__/node/multiplexer.test.ts @@ -0,0 +1,88 @@ +import { Emitter, Uri } from '@opensumi/ide-core-common'; +import { createMockPairRPCProtocol } from '@opensumi/ide-extension/__mocks__/initRPCProtocol'; + +import { ProxyIdentifier } from '../../src'; +import { SimpleConnection } from '../../src/common/connection/drivers/simple'; +import { SumiConnectionMultiplexer } from '../../src/common/rpc/multiplexer'; + +describe('connection', () => { + it('RPCProtocol', async () => { + const { rpcProtocolExt: aProtocol, rpcProtocolMain: bProtocol } = createMockPairRPCProtocol(); + + const testMainIdentifier = ProxyIdentifier.for('testIendifier'); + const mockMainIndetifierMethod = jest.fn(); + const mockUriTestFn = jest.fn((uri) => uri); + const mockErrorFn = jest.fn(() => { + throw new Error('custom error'); + }); + + aProtocol.set(testMainIdentifier, { + $test: mockMainIndetifierMethod, + $getUri: mockUriTestFn, + $errorFunction: mockErrorFn, + }); + + function errorFunction() { + return bProtocol.getProxy(testMainIdentifier).$errorFunction(); + } + + const testUri = Uri.file('/workspace/README.md'); + await bProtocol.getProxy(testMainIdentifier).$test(); + await bProtocol.getProxy(testMainIdentifier).$getUri(testUri); + expect(mockMainIndetifierMethod.mock.calls.length).toBe(1); + expect(mockUriTestFn.mock.results[0].value).toBeInstanceOf(Uri); + expect(mockUriTestFn.mock.results[0].value.toString()).toBe(testUri.toString()); + await expect(errorFunction()).rejects.toThrow(new Error('custom error')); + }); + + it('RPCProtocol Timeout', async () => { + const emitterTimeoutA = new Emitter(); + const emitterTimeoutB = new Emitter(); + const emitterTimeoutC = new Emitter(); + + const mockClientTA = { + onMessage: emitterTimeoutA.event, + send: (msg) => emitterTimeoutB.fire(msg), + }; + const mockClientTB = { + onMessage: emitterTimeoutB.event, + send: (msg) => emitterTimeoutA.fire(msg), + }; + const mockClientTC = { + onMessage: emitterTimeoutC.event, + send: (msg) => emitterTimeoutA.fire(msg), + }; + + const timeoutAProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTA)); + const timeoutBProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTB)); + const timeoutCProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTC), { + timeout: 1000, + }); + + const testTimeoutIdentifier = ProxyIdentifier.for('@opensumi/runner'); + timeoutAProtocol.set(testTimeoutIdentifier, { + $test: jest.fn(), + }); + + await expect(timeoutBProtocol.getProxy(testTimeoutIdentifier).$test()).resolves.toBe(undefined); + + await expect(timeoutCProtocol.getProxy(testTimeoutIdentifier).$test()).rejects.toThrowErrorMatchingInlineSnapshot( + '"method @opensumi_runner/$test timeout"', + ); + }); + it('multiplexer rpc id can have slash', async () => { + const { rpcProtocolExt, rpcProtocolMain } = createMockPairRPCProtocol(); + + const rpcId = '@opensumi/runner'; + const method = '$fetchConfigurations'; + + rpcProtocolMain.set(ProxyIdentifier.for(rpcId), { + [method]: () => 'mock', + }); + + const runner = rpcProtocolExt.getProxy(ProxyIdentifier.for(rpcId)); + + const result = await runner.$fetchConfigurations(); + expect(result).toBe('mock'); + }); +}); diff --git a/packages/connection/src/common/rpc/multiplexer.ts b/packages/connection/src/common/rpc/multiplexer.ts index 832b0cdb82..b3e95a01d8 100644 --- a/packages/connection/src/common/rpc/multiplexer.ts +++ b/packages/connection/src/common/rpc/multiplexer.ts @@ -24,34 +24,35 @@ export interface IRPCProtocol { get(identifier: ProxyIdentifier): T; } -const SEP = '||'; -const SEP_LENGTH = SEP.length; - -export function getRPCName(serviceId: string, methodName: string) { - return `${serviceId}${SEP}${methodName}`; -} - -export function extractServiceAndMethod(rpcId: string): [string, string] { - const idx = rpcId.indexOf(SEP); - return [rpcId.substring(0, idx), rpcId.substring(idx + SEP_LENGTH)]; -} - /** * A connection multiplexer that allows to register multiple local RPC services and to create proxies for them. */ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCProtocol { - private readonly _locals: Map; - private readonly _proxies: Map; + protected static SEP = '/'; + protected static SEP_LENGTH = SumiConnectionMultiplexer.SEP.length; + + protected static getRPCName(serviceId: string, methodName: string) { + return `${serviceId}${SumiConnectionMultiplexer.SEP}${methodName}`; + } + + protected static extractServiceAndMethod(rpcId: string): [string, string] { + const idx = rpcId.indexOf(SumiConnectionMultiplexer.SEP); + return [rpcId.substring(0, idx), rpcId.substring(idx + SumiConnectionMultiplexer.SEP_LENGTH)]; + } + + protected static normalizeServiceId(serviceId: string) { + return serviceId.replace(/\//g, '_'); + } + + protected readonly _locals: Map; + protected readonly _proxies: Map; constructor(protected socket: BaseConnection, protected options: ISumiConnectionOptions = {}) { super(socket, options); this._locals = new Map(); this._proxies = new Map(); - this.onRequestNotFound((rpcName: string, args: any[]) => { - const [rpcId, methodName] = extractServiceAndMethod(rpcName); - return this._doInvokeHandler(rpcId, methodName, args); - }); + this.onRequestNotFound((rpcName: string, args: any[]) => this._doInvokeHandler(rpcName, args)); // call `listen` implicitly // compatible behavior with the RPCProtocol @@ -59,23 +60,25 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro } public set(identifier: ProxyIdentifier, instance: any) { - this._locals.set(identifier.serviceId, instance); + this._locals.set(SumiConnectionMultiplexer.normalizeServiceId(identifier.serviceId), instance); return instance; } public get(identifier: ProxyIdentifier) { - return this._locals.get(identifier.serviceId); + return this._locals.get(SumiConnectionMultiplexer.normalizeServiceId(identifier.serviceId)); } public getProxy(proxyId: ProxyIdentifier) { - if (!this._proxies.has(proxyId.serviceId)) { - this._proxies.set(proxyId.serviceId, this._createProxy(proxyId.serviceId)); + const serviceId = SumiConnectionMultiplexer.normalizeServiceId(proxyId.serviceId); + + if (!this._proxies.has(serviceId)) { + this._proxies.set(serviceId, this._createProxy(serviceId)); } - return this._proxies.get(proxyId.serviceId); + return this._proxies.get(serviceId); } - private _createProxy(rpcId: string) { + protected _createProxy(rpcId: string) { const handler = { get: (target: any, name: string) => { if (typeof name === 'symbol') { @@ -83,7 +86,7 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro } // charCodeAt(0) === 36 means starts with $ if (!target[name] && name.charCodeAt(0) === 36) { - const rpcName = getRPCName(rpcId, name); + const rpcName = SumiConnectionMultiplexer.getRPCName(rpcId, name); target[name] = (...args: any[]) => this.sendRequest(rpcName, ...args); } @@ -94,7 +97,9 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro return new Proxy(Object.create(null), handler); } - private async _doInvokeHandler(rpcId: string, methodName: string, args: any[]): Promise { + protected async _doInvokeHandler(rpcName: string, args: any[]): Promise { + const [rpcId, methodName] = SumiConnectionMultiplexer.extractServiceAndMethod(rpcName); + const actor = this._locals.get(rpcId); if (!actor) { throw new Error('Unknown actor ' + rpcId); diff --git a/packages/extension/src/hosted/ext.host.ts b/packages/extension/src/hosted/ext.host.ts index e428b56be2..ceac41fb8c 100644 --- a/packages/extension/src/hosted/ext.host.ts +++ b/packages/extension/src/hosted/ext.host.ts @@ -550,10 +550,7 @@ export default class ExtensionHostServiceImpl implements IExtensionHostService { } this.logger.debug('extension extend service', extension.id, 'service', service); - this.rpcProtocol.set( - { serviceId: `${EXTENSION_EXTEND_SERVICE_PREFIX}:${extension.id}` } as ProxyIdentifier, - service, - ); + this.rpcProtocol.set(new ProxyIdentifier(`${EXTENSION_EXTEND_SERVICE_PREFIX}:${extension.id}`), service); } public async $activateExtension(id: string) {