Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions packages/connection/__test__/common/multiplexer.test.ts

This file was deleted.

87 changes: 2 additions & 85 deletions packages/connection/__test__/node/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ import WebSocket from 'ws';

import { WSWebSocketConnection } from '@opensumi/ide-connection/src/common/connection';
import { SumiConnection } from '@opensumi/ide-connection/src/common/rpc/connection';
import { Deferred, Emitter, Uri } from '@opensumi/ide-core-common';
import { createMockPairRPCProtocol } from '@opensumi/ide-extension/__mocks__/initRPCProtocol';
import { Deferred } from '@opensumi/ide-core-common';

import { ProxyIdentifier, RPCService } from '../../src';
import { RPCService } from '../../src';
import { RPCServiceCenter, initRPCService } from '../../src/common';
import { SimpleConnection } from '../../src/common/connection/drivers/simple';
import { SumiConnectionMultiplexer } from '../../src/common/rpc/multiplexer';
import { WSChannel, parse } from '../../src/common/ws-channel';
import { CommonChannelHandler, WebSocketServerRoute, commonChannelPathHandler } from '../../src/node';

Expand Down Expand Up @@ -200,84 +197,4 @@ describe('connection', () => {
wss.close();
clientConnection!.close();
});

it('RPCProtocol', async () => {
const { rpcProtocolExt: aProtocol, rpcProtocolMain: bProtocol } = createMockPairRPCProtocol();

const testMainIdentifier = ProxyIdentifier.for('testIendifier');
const mockMainIndetifierMethod = jest.fn();
const mockUriTestFn = jest.fn((uri) => uri);
const mockErrorFn = jest.fn(() => {
throw new Error('custom error');
});

aProtocol.set(testMainIdentifier, {
$test: mockMainIndetifierMethod,
$getUri: mockUriTestFn,
$errorFunction: mockErrorFn,
});

function errorFunction() {
return bProtocol.getProxy(testMainIdentifier).$errorFunction();
}

const testUri = Uri.file('/workspace/README.md');
await bProtocol.getProxy(testMainIdentifier).$test();
await bProtocol.getProxy(testMainIdentifier).$getUri(testUri);
expect(mockMainIndetifierMethod.mock.calls.length).toBe(1);
expect(mockUriTestFn.mock.results[0].value).toBeInstanceOf(Uri);
expect(mockUriTestFn.mock.results[0].value.toString()).toBe(testUri.toString());
await expect(errorFunction()).rejects.toThrow(new Error('custom error'));
});

it('RPCProtocol Timeout', async () => {
const emitterTimeoutA = new Emitter<any>();
const emitterTimeoutB = new Emitter<any>();
const emitterTimeoutC = new Emitter<any>();

const mockClientTA = {
onMessage: emitterTimeoutA.event,
send: (msg) => emitterTimeoutB.fire(msg),
};
const mockClientTB = {
onMessage: emitterTimeoutB.event,
send: (msg) => emitterTimeoutA.fire(msg),
};
const mockClientTC = {
onMessage: emitterTimeoutC.event,
send: (msg) => emitterTimeoutA.fire(msg),
};

const timeoutAProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTA));
const timeoutBProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTB));
const timeoutCProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTC), {
timeout: 1000,
});

const testTimeoutIdentifier = ProxyIdentifier.for('testTimeoutIdentifier');
timeoutAProtocol.set(testTimeoutIdentifier, {
$test: jest.fn(),
});

await expect(timeoutBProtocol.getProxy(testTimeoutIdentifier).$test()).resolves.toBe(undefined);

await expect(timeoutCProtocol.getProxy(testTimeoutIdentifier).$test()).rejects.toThrowErrorMatchingInlineSnapshot(
'"method testTimeoutIdentifier||$test timeout"',
);
});
it('multiplexer rpc id can have slash', async () => {
const { rpcProtocolExt, rpcProtocolMain } = createMockPairRPCProtocol();

const rpcId = '@opensumi/runner';
const method = '$fetchConfigurations';

rpcProtocolMain.set(ProxyIdentifier.for(rpcId), {
[method]: () => 'mock',
});

const runner = rpcProtocolExt.getProxy(ProxyIdentifier.for(rpcId));

const result = await runner.$fetchConfigurations();
expect(result).toBe('mock');
});
});
88 changes: 88 additions & 0 deletions packages/connection/__test__/node/multiplexer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Emitter, Uri } from '@opensumi/ide-core-common';
import { createMockPairRPCProtocol } from '@opensumi/ide-extension/__mocks__/initRPCProtocol';

import { ProxyIdentifier } from '../../src';
import { SimpleConnection } from '../../src/common/connection/drivers/simple';
import { SumiConnectionMultiplexer } from '../../src/common/rpc/multiplexer';

describe('connection', () => {
it('RPCProtocol', async () => {
const { rpcProtocolExt: aProtocol, rpcProtocolMain: bProtocol } = createMockPairRPCProtocol();

const testMainIdentifier = ProxyIdentifier.for('testIendifier');
const mockMainIndetifierMethod = jest.fn();
const mockUriTestFn = jest.fn((uri) => uri);
const mockErrorFn = jest.fn(() => {
throw new Error('custom error');
});

aProtocol.set(testMainIdentifier, {
$test: mockMainIndetifierMethod,
$getUri: mockUriTestFn,
$errorFunction: mockErrorFn,
});

function errorFunction() {
return bProtocol.getProxy(testMainIdentifier).$errorFunction();
}

const testUri = Uri.file('/workspace/README.md');
await bProtocol.getProxy(testMainIdentifier).$test();
await bProtocol.getProxy(testMainIdentifier).$getUri(testUri);
expect(mockMainIndetifierMethod.mock.calls.length).toBe(1);
expect(mockUriTestFn.mock.results[0].value).toBeInstanceOf(Uri);
expect(mockUriTestFn.mock.results[0].value.toString()).toBe(testUri.toString());
await expect(errorFunction()).rejects.toThrow(new Error('custom error'));
});

it('RPCProtocol Timeout', async () => {
const emitterTimeoutA = new Emitter<any>();
const emitterTimeoutB = new Emitter<any>();
const emitterTimeoutC = new Emitter<any>();

const mockClientTA = {
onMessage: emitterTimeoutA.event,
send: (msg) => emitterTimeoutB.fire(msg),
};
const mockClientTB = {
onMessage: emitterTimeoutB.event,
send: (msg) => emitterTimeoutA.fire(msg),
};
const mockClientTC = {
onMessage: emitterTimeoutC.event,
send: (msg) => emitterTimeoutA.fire(msg),
};

const timeoutAProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTA));
const timeoutBProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTB));
const timeoutCProtocol = new SumiConnectionMultiplexer(new SimpleConnection(mockClientTC), {
timeout: 1000,
});

const testTimeoutIdentifier = ProxyIdentifier.for('@opensumi/runner');
timeoutAProtocol.set(testTimeoutIdentifier, {
$test: jest.fn(),
});

await expect(timeoutBProtocol.getProxy(testTimeoutIdentifier).$test()).resolves.toBe(undefined);

await expect(timeoutCProtocol.getProxy(testTimeoutIdentifier).$test()).rejects.toThrowErrorMatchingInlineSnapshot(
'"method @opensumi_runner/$test timeout"',
);
});
it('multiplexer rpc id can have slash', async () => {
const { rpcProtocolExt, rpcProtocolMain } = createMockPairRPCProtocol();

const rpcId = '@opensumi/runner';
const method = '$fetchConfigurations';

rpcProtocolMain.set(ProxyIdentifier.for(rpcId), {
[method]: () => 'mock',
});

const runner = rpcProtocolExt.getProxy(ProxyIdentifier.for(rpcId));

const result = await runner.$fetchConfigurations();
expect(result).toBe('mock');
});
});
57 changes: 31 additions & 26 deletions packages/connection/src/common/rpc/multiplexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,66 +24,69 @@ export interface IRPCProtocol {
get<T>(identifier: ProxyIdentifier<T>): T;
}

const SEP = '||';
const SEP_LENGTH = SEP.length;

export function getRPCName(serviceId: string, methodName: string) {
return `${serviceId}${SEP}${methodName}`;
}

export function extractServiceAndMethod(rpcId: string): [string, string] {
const idx = rpcId.indexOf(SEP);
return [rpcId.substring(0, idx), rpcId.substring(idx + SEP_LENGTH)];
}

/**
* A connection multiplexer that allows to register multiple local RPC services and to create proxies for them.
*/
export class SumiConnectionMultiplexer extends SumiConnection implements IRPCProtocol {
private readonly _locals: Map<string, any>;
private readonly _proxies: Map<string, any>;
protected static SEP = '/';
protected static SEP_LENGTH = SumiConnectionMultiplexer.SEP.length;

protected static getRPCName(serviceId: string, methodName: string) {
return `${serviceId}${SumiConnectionMultiplexer.SEP}${methodName}`;
}

protected static extractServiceAndMethod(rpcId: string): [string, string] {
const idx = rpcId.indexOf(SumiConnectionMultiplexer.SEP);
return [rpcId.substring(0, idx), rpcId.substring(idx + SumiConnectionMultiplexer.SEP_LENGTH)];
}

protected static normalizeServiceId(serviceId: string) {
return serviceId.replace(/\//g, '_');
Comment thread
bytemain marked this conversation as resolved.
}

protected readonly _locals: Map<string, any>;
protected readonly _proxies: Map<string, any>;

constructor(protected socket: BaseConnection<Uint8Array>, protected options: ISumiConnectionOptions = {}) {
super(socket, options);
this._locals = new Map();
this._proxies = new Map();

this.onRequestNotFound((rpcName: string, args: any[]) => {
const [rpcId, methodName] = extractServiceAndMethod(rpcName);
return this._doInvokeHandler(rpcId, methodName, args);
});
this.onRequestNotFound((rpcName: string, args: any[]) => this._doInvokeHandler(rpcName, args));

// call `listen` implicitly
// compatible behavior with the RPCProtocol
this.listen();
}

public set<T>(identifier: ProxyIdentifier<T>, instance: any) {
this._locals.set(identifier.serviceId, instance);
this._locals.set(SumiConnectionMultiplexer.normalizeServiceId(identifier.serviceId), instance);
return instance;
}

public get<T>(identifier: ProxyIdentifier<T>) {
return this._locals.get(identifier.serviceId);
return this._locals.get(SumiConnectionMultiplexer.normalizeServiceId(identifier.serviceId));
}

public getProxy<T>(proxyId: ProxyIdentifier<T>) {
if (!this._proxies.has(proxyId.serviceId)) {
this._proxies.set(proxyId.serviceId, this._createProxy(proxyId.serviceId));
const serviceId = SumiConnectionMultiplexer.normalizeServiceId(proxyId.serviceId);

if (!this._proxies.has(serviceId)) {
this._proxies.set(serviceId, this._createProxy(serviceId));
}

return this._proxies.get(proxyId.serviceId);
return this._proxies.get(serviceId);
}

private _createProxy(rpcId: string) {
protected _createProxy(rpcId: string) {
const handler = {
get: (target: any, name: string) => {
if (typeof name === 'symbol') {
return null;
}
// charCodeAt(0) === 36 means starts with $
if (!target[name] && name.charCodeAt(0) === 36) {
const rpcName = getRPCName(rpcId, name);
const rpcName = SumiConnectionMultiplexer.getRPCName(rpcId, name);
target[name] = (...args: any[]) => this.sendRequest(rpcName, ...args);
}

Expand All @@ -94,7 +97,9 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro
return new Proxy(Object.create(null), handler);
}

private async _doInvokeHandler(rpcId: string, methodName: string, args: any[]): Promise<any> {
protected async _doInvokeHandler(rpcName: string, args: any[]): Promise<any> {
const [rpcId, methodName] = SumiConnectionMultiplexer.extractServiceAndMethod(rpcName);

const actor = this._locals.get(rpcId);
if (!actor) {
throw new Error('Unknown actor ' + rpcId);
Expand Down
5 changes: 1 addition & 4 deletions packages/extension/src/hosted/ext.host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,7 @@ export default class ExtensionHostServiceImpl implements IExtensionHostService {
}

this.logger.debug('extension extend service', extension.id, 'service', service);
this.rpcProtocol.set(
{ serviceId: `${EXTENSION_EXTEND_SERVICE_PREFIX}:${extension.id}` } as ProxyIdentifier<any>,
service,
);
this.rpcProtocol.set(new ProxyIdentifier<any>(`${EXTENSION_EXTEND_SERVICE_PREFIX}:${extension.id}`), service);
}

public async $activateExtension(id: string) {
Expand Down