Skip to content

Commit d1aa884

Browse files
authored
feat: add sumi rpc (#3284)
* feat: add sumi rpc * test: fix testcase * chore: update code * fix: forward binary in ext host * feat: optimize proxy create function * feat: using an incrementing request id * feat: support one of * fix: protocol repo not defined * feat: use text decoder * refactor: update code * refactor: update protocol register * test: fix testcase * test: update testcase * chore: update code * chore: update types * feat: connection add dispose method * feat: avoid create function multiple times * refactor: simplify ext connection implementation * feat(rpc): support cancellation * refactor: update code * feat(rpc): support any serializer * refactor: update code * feat: any serializer can reviver uri * feat: ext rpc protocol use channel * fix: on request not found args * fix: rpc timeout not work * refactor: update code * docs: add more docs * fix: rpc broken * docs: update docs * fix: message port on message not work * test: fix types * refactor: use new style MessagePort event listener * fix: any serializer cannot handle Uint8Array * test: fix testcase * refactor: update code * test: remove only case * feat: compatible with legacy * refactor: rpc use sumi connection * refactor: add sumi connection multiplexer * test: fix testcases
1 parent 4e76703 commit d1aa884

125 files changed

Lines changed: 3243 additions & 1321 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

packages/connection/__test__/common/buffers.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Buffers, copy } from '../../src/common/connection/buffers';
1+
import { Buffers, copy } from '../../src/common/buffers/buffers';
22

33
describe('Buffers', () => {
44
it('can push and slice', () => {
@@ -232,21 +232,21 @@ describe('Buffers', () => {
232232
expect(cursor.lineOffset).toEqual(1);
233233
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
234234

235-
cursor.move(1);
235+
cursor.skip(1);
236236
expect(cursor.offset).toEqual(6);
237237
expect(cursor.value).toEqual(list.get(cursor.offset));
238238
expect(cursor.line).toEqual(1);
239239
expect(cursor.lineOffset).toEqual(2);
240240
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
241241

242-
cursor.move(2);
242+
cursor.skip(2);
243243
expect(cursor.offset).toEqual(8);
244244
expect(cursor.value).toEqual(list.get(cursor.offset));
245245
expect(cursor.line).toEqual(1);
246246
expect(cursor.lineOffset).toEqual(4);
247247
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
248248

249-
cursor.move(3);
249+
cursor.skip(3);
250250
expect(cursor.offset).toEqual(11);
251251
expect(cursor.value).toEqual(list.get(cursor.offset));
252252
expect(cursor.line).toEqual(2);

packages/connection/__test__/common/stream-decoder.test.ts renamed to packages/connection/__test__/common/frame-decoder.test.ts

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import { BinaryReader } from '@furyjs/fury/dist/lib/reader';
44

55
import {
6-
StreamPacketDecoder,
7-
createStreamPacket,
8-
kMagicNumber,
9-
} from '../../src/common/connection/drivers/stream-decoder';
10-
11-
const reader = BinaryReader({});
6+
LengthFieldBasedFrameDecoder,
7+
indicator,
8+
prependLengthField,
9+
} from '../../src/common/connection/drivers/frame-decoder';
1210

1311
function round(x: number, count: number) {
1412
return Math.round(x * 10 ** count) / 10 ** count;
@@ -42,14 +40,7 @@ console.timeEnd('createPayload');
4240
// 1m
4341
const pressure = 1024 * 1024;
4442

45-
const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [createStreamPacket(v), v] as const);
46-
47-
const mixedPackets = [p1m, p5m].map((v) => {
48-
const sumiPacket = createStreamPacket(v);
49-
const newPacket = createPayload(1024 + sumiPacket.byteLength);
50-
newPacket.set(sumiPacket, 1024);
51-
return [newPacket, v] as const;
52-
});
43+
const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [prependLengthField(v), v] as const);
5344

5445
const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0);
5546

@@ -61,22 +52,30 @@ purePackets.forEach((v) => {
6152
offset += sumiPacket.byteLength;
6253
});
6354

55+
const mixedPackets = [p1m, p5m].map((v) => {
56+
const sumiPacket = prependLengthField(v);
57+
const newPacket = createPayload(1024 + sumiPacket.byteLength);
58+
newPacket.set(sumiPacket, 1024);
59+
return [newPacket, v] as const;
60+
});
61+
6462
const packets = [...purePackets, ...mixedPackets];
6563

66-
describe('stream-packet', () => {
67-
it('can create sumi stream packet', () => {
64+
describe('frame decoder', () => {
65+
it('can create frame', () => {
6866
const content = new Uint8Array([1, 2, 3]);
69-
const packet = createStreamPacket(content);
67+
const packet = prependLengthField(content);
68+
const reader = BinaryReader({});
7069

7170
reader.reset(packet);
72-
expect(reader.uint32()).toBe(kMagicNumber);
73-
expect(reader.varUInt32()).toBe(content.byteLength);
71+
expect(Uint8Array.from(reader.buffer(4))).toEqual(indicator);
72+
expect(reader.uint32()).toBe(content.byteLength);
7473
expect(Uint8Array.from(reader.buffer(content.byteLength))).toEqual(content);
7574
});
7675

7776
packets.forEach(([packet, expected]) => {
78-
it(`can decode stream packet: ${round(packet.byteLength / 1024 / 1024, 2)}m`, (done) => {
79-
const decoder = new StreamPacketDecoder();
77+
it(`can decode stream: ${round(packet.byteLength / 1024 / 1024, 2)}m`, (done) => {
78+
const decoder = new LengthFieldBasedFrameDecoder();
8079

8180
decoder.onData((data) => {
8281
fastExpectBufferEqual(data, expected);
@@ -95,8 +94,8 @@ describe('stream-packet', () => {
9594
});
9695
});
9796

98-
it('can decode a stream payload contains multiple packets', (done) => {
99-
const decoder = new StreamPacketDecoder();
97+
it('can decode a stream payload contains multiple frames', (done) => {
98+
const decoder = new LengthFieldBasedFrameDecoder();
10099
const expectCount = purePackets.length;
101100
let count = 0;
102101
decoder.onData((data) => {
@@ -120,18 +119,17 @@ describe('stream-packet', () => {
120119
logMemoryUsage();
121120
});
122121

123-
it('can decode a stream it header and payload are separated', (done) => {
122+
it('can decode a stream it has no valid length info', (done) => {
124123
const v = createPayload(1024);
125-
const sumiPacket = createStreamPacket(v);
124+
const sumiPacket = prependLengthField(v);
126125

127-
const decoder = new StreamPacketDecoder();
126+
const decoder = new LengthFieldBasedFrameDecoder();
128127
decoder.onData((data) => {
129128
fastExpectBufferEqual(data, v);
130129
done();
131130
});
132131

133132
console.log('write chunk', sumiPacket.byteLength);
134-
135133
// use pressure = 2 to simulate the header and payload are separated
136134
const pressure = 2;
137135
for (let i = 0; i < sumiPacket.byteLength; i += pressure) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/* eslint-disable no-console */
2+
3+
import { PingMessage, PongMessage, parse, stringify } from '../../../src/common/ws-channel';
4+
5+
describe('oneOf', () => {
6+
function testIt(obj: any) {
7+
const bytes = stringify(obj);
8+
const obj2 = parse(bytes);
9+
expect(obj2).toEqual(obj);
10+
const str = JSON.stringify(obj);
11+
12+
console.log('bytes.length', bytes.byteLength);
13+
console.log('json length', str.length);
14+
}
15+
16+
it('should serialize and deserialize', () => {
17+
const obj = {
18+
kind: 'ping',
19+
clientId: '123',
20+
id: '456',
21+
} as PingMessage;
22+
23+
testIt(obj);
24+
25+
const obj2 = {
26+
kind: 'pong',
27+
clientId: '123',
28+
id: '456',
29+
} as PongMessage;
30+
31+
testIt(obj2);
32+
33+
const obj3 = {
34+
kind: 'open',
35+
clientId: '123',
36+
id: '456',
37+
path: '/test',
38+
};
39+
40+
testIt(obj3);
41+
42+
const obj4 = {
43+
kind: 'server-ready',
44+
id: '456',
45+
};
46+
47+
testIt(obj4);
48+
49+
const obj5 = {
50+
kind: 'data',
51+
id: '456',
52+
content: 'hello',
53+
};
54+
55+
testIt(obj5);
56+
57+
const obj6 = {
58+
kind: 'binary',
59+
id: '456',
60+
binary: Buffer.from([1, 2, 3]),
61+
};
62+
testIt(obj6);
63+
});
64+
});
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { IConnectionPair } from './utils';
2+
3+
export const test = (
4+
name: string,
5+
options: {
6+
pairFactory: () => IConnectionPair;
7+
factory: (pair: IConnectionPair) => any;
8+
},
9+
) => {
10+
const { factory, pairFactory } = options;
11+
describe(name, () => {
12+
let pair: IConnectionPair;
13+
beforeEach(() => {
14+
pair = pairFactory();
15+
});
16+
17+
afterEach(() => {
18+
pair && pair.close();
19+
});
20+
21+
it('can call method', async () => {
22+
const { invoker1, invoker2 } = factory(pair);
23+
24+
const result = await invoker1.add(1, 2);
25+
expect(result).toBe(3);
26+
27+
const result2 = await invoker2.shortUrl('1234567890abcdefg');
28+
expect(result2).toBe('1234567890');
29+
30+
const result3 = await invoker2.returnUndefined();
31+
expect(result3).toBeUndefined();
32+
});
33+
});
34+
};
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { test } from './common-tester';
2+
import { createLegacyRPCClientPair, createMessageConnectionPair } from './utils';
3+
4+
const factory = (pair: any) => createLegacyRPCClientPair(pair);
5+
6+
test('legacy json rpc', {
7+
factory,
8+
pairFactory: createMessageConnectionPair,
9+
});
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { Type } from '@furyjs/fury';
2+
3+
import { METHOD_NOT_REGISTERED } from '@opensumi/ide-connection/lib/common/constants';
4+
5+
import { test } from './common-tester';
6+
import { createConnectionPair, createSumiRPCClientPair } from './utils';
7+
8+
test('sumi rpc', {
9+
factory: createSumiRPCClientPair,
10+
pairFactory: createConnectionPair,
11+
});
12+
13+
describe('sumi rpc only', () => {
14+
let pair: ReturnType<typeof createConnectionPair>;
15+
beforeEach(() => {
16+
pair = createConnectionPair();
17+
});
18+
19+
afterEach(() => {
20+
pair && pair.close();
21+
});
22+
23+
it('can throw error', async () => {
24+
const { invoker1, invoker2, client1, client2 } = createSumiRPCClientPair(pair);
25+
26+
client1.listenService({
27+
shortUrl: (url: string) => {
28+
if (!url) {
29+
throw new Error('url is empty');
30+
}
31+
return url.slice(0, 10);
32+
},
33+
});
34+
client2.listenService({
35+
add: (a: number, b: number) => {
36+
if (a === 0) {
37+
throw new Error('a is zero');
38+
}
39+
return a + b;
40+
},
41+
});
42+
43+
await expect(invoker1.add(0, 2)).rejects.toThrow('a is zero');
44+
await expect(invoker2.shortUrl('')).rejects.toThrow('url is empty');
45+
46+
await expect(invoker1.throwAString()).resolves.toEqual(METHOD_NOT_REGISTERED);
47+
await expect(invoker2.throwAString()).resolves.toEqual(METHOD_NOT_REGISTERED);
48+
49+
client2.listenService({
50+
throwAString: () => {
51+
// eslint-disable-next-line no-throw-literal
52+
throw 'a string';
53+
},
54+
});
55+
56+
try {
57+
await invoker1.throwAString();
58+
} catch (error) {
59+
expect(typeof error).toBe('string');
60+
expect(error).toBe('a string');
61+
}
62+
});
63+
64+
it('can throw error when method not found', async () => {
65+
const { invoker1, invoker2, repo } = createSumiRPCClientPair(pair);
66+
repo.loadProtocolMethod({
67+
method: 'notFound',
68+
request: [],
69+
response: {
70+
type: Type.any(),
71+
},
72+
});
73+
74+
await expect(invoker1.notFound()).resolves.toEqual(METHOD_NOT_REGISTERED);
75+
await expect(invoker2.notFound()).resolves.toEqual(METHOD_NOT_REGISTERED);
76+
});
77+
});

0 commit comments

Comments
 (0)