Skip to content

Commit 112a057

Browse files
authored
feat: optimize stream packet parse performance (#3282)
* feat: optimize stream packet parse performance * test: fix testcase * test: fix testcase * test: fix testcase * chore: update code
1 parent c899849 commit 112a057

5 files changed

Lines changed: 453 additions & 115 deletions

File tree

packages/connection/__test__/common/buffers.test.ts

Lines changed: 193 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { Buffers } from '../../src/common/connection/buffers';
1+
import { Buffers, copy } from '../../src/common/connection/buffers';
22

33
describe('Buffers', () => {
4-
it('can append and slice', () => {
4+
it('can push and slice', () => {
55
const list = new Buffers();
66
list.push(new Uint8Array([1, 2, 3]));
77
list.push(new Uint8Array([4, 5, 6]));
@@ -18,6 +18,25 @@ describe('Buffers', () => {
1818
expect(list.slice(2, 6)).toEqual(new Uint8Array([3, 4, 5, 6]));
1919
});
2020

21+
it('slice', () => {
22+
const xs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
23+
const splits = [[4, 2, 3, 1], [2, 2, 2, 2, 2], [1, 6, 3, 1], [9, 2], [10], [5, 5]];
24+
25+
splits.forEach(function (split) {
26+
const bufs = create(xs, split);
27+
expect(new Uint8Array(xs)).toEqual(bufs.slice());
28+
29+
for (let i = 0; i < xs.length; i++) {
30+
for (let j = i; j < xs.length; j++) {
31+
const a = bufs.slice(i, j);
32+
const b = new Uint8Array(xs.slice(i, j));
33+
34+
expect(a).toEqual(b);
35+
}
36+
}
37+
});
38+
});
39+
2140
it('can splice', () => {
2241
const list = new Buffers();
2342
list.push(new Uint8Array([1, 2, 3]));
@@ -44,10 +63,54 @@ describe('Buffers', () => {
4463
new Uint8Array([4, 5, 6]),
4564
]);
4665
});
66+
67+
it('splice', () => {
68+
const xs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
69+
const splits = [[4, 2, 3, 1], [2, 2, 2, 2, 2], [1, 6, 3, 1], [9, 2], [10], [5, 5]];
70+
71+
splits.forEach(function (split) {
72+
for (let i = 0; i < xs.length; i++) {
73+
for (let j = i; j < xs.length; j++) {
74+
const bufs = create(xs, split);
75+
const xs_ = xs.slice();
76+
77+
const a_ = bufs.splice(i, j);
78+
const a = [].slice.call(a_.slice());
79+
const b = xs_.splice(i, j);
80+
expect(a).toEqual(b);
81+
expect(bufs.slice()).toEqual(new Uint8Array(xs_));
82+
}
83+
}
84+
});
85+
});
86+
87+
it('spliceRep', () => {
88+
const xs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
89+
const splits = [[4, 2, 3, 1], [2, 2, 2, 2, 2], [1, 6, 3, 1], [9, 2], [10], [5, 5]];
90+
const reps = [[], [1], [5, 6], [3, 1, 3, 3, 7], [9, 8, 7, 6, 5, 4, 3, 2, 1, 2, 3, 4, 5]];
91+
92+
splits.forEach(function (split) {
93+
reps.forEach(function (rep) {
94+
for (let i = 0; i < xs.length; i++) {
95+
for (let j = i; j < xs.length; j++) {
96+
const bufs = create(xs, split);
97+
const xs_ = xs.slice();
98+
99+
const a_ = bufs.splice(i, j, new Uint8Array(rep));
100+
const a = [].slice.call(a_.slice());
101+
const b = xs_.splice(i, j, ...rep);
102+
103+
expect(a).toEqual(b);
104+
expect(bufs.slice()).toEqual(new Uint8Array(xs_));
105+
}
106+
}
107+
});
108+
});
109+
});
110+
47111
it('can copy', () => {
48112
const list = new Buffers();
49113
list.push(new Uint8Array([1, 2, 3]));
50-
51114
list.push(new Uint8Array([4, 5, 6]));
52115

53116
const target = new Uint8Array(7);
@@ -72,5 +135,132 @@ describe('Buffers', () => {
72135
list.set(1, 10);
73136
expect(list.get(0)).toEqual(9);
74137
expect(list.get(1)).toEqual(10);
138+
139+
const xs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
140+
const splits = [[4, 2, 3, 1], [2, 2, 2, 2, 2], [1, 6, 3, 1], [9, 2], [10], [5, 5]];
141+
142+
splits.forEach(function (split) {
143+
const bufs = create(xs, split);
144+
const buf = new Uint8Array(xs);
145+
146+
for (let i = 0; i < xs.length; i++) {
147+
for (let j = i; j < xs.length; j++) {
148+
const t0 = new Uint8Array(j - i);
149+
const t1 = new Uint8Array(j - i);
150+
151+
expect(bufs.copy(t0, 0, i, j)).toEqual(copy(buf, t1, 0, i, j));
152+
expect([].slice.call(t0)).toEqual([].slice.call(t1));
153+
}
154+
}
155+
});
156+
});
157+
158+
it('unshift', () => {
159+
const bufs = new Buffers();
160+
bufs.unshift(new Uint8Array([6, 7, 8, 9]));
161+
bufs.unshift(new Uint8Array([4, 5]));
162+
bufs.unshift(new Uint8Array([1, 2, 3]));
163+
bufs.unshift(new Uint8Array([0]));
164+
165+
expect([].slice.call(bufs.slice())).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
166+
expect(bufs.buffers.length).toEqual(4);
167+
expect(bufs.byteLength).toEqual(10);
168+
});
169+
170+
it('cursor should work', () => {
171+
const list = new Buffers();
172+
list.push(new Uint8Array([1, 2, 3, 4]));
173+
list.push(new Uint8Array([4, 5, 6, 7, 8, 9]));
174+
list.push(new Uint8Array([7, 8, 9]));
175+
176+
const cursor = list.cursor(0);
177+
let count = 0;
178+
for (const a of cursor.iterator()) {
179+
expect(a).toEqual(list.get(count));
180+
if (count === 4) {
181+
break;
182+
}
183+
184+
count++;
185+
}
186+
expect(count).toEqual(4);
187+
expect(cursor.value).toEqual(list.get(5));
188+
expect(cursor.offset).toEqual(5);
189+
expect(cursor.line).toEqual(1);
190+
expect(cursor.lineOffset).toEqual(1);
191+
expect(cursor.lineWidth).toEqual(6);
192+
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
193+
194+
count++;
195+
196+
// reentrant
197+
for (const a of cursor.iterator()) {
198+
expect(a).toEqual(list.get(count));
199+
count++;
200+
}
201+
202+
// because here we always let count plus 1
203+
expect(count).toEqual(list.byteLength);
204+
expect(cursor.offset).toEqual(list.byteLength);
205+
expect(cursor.line).toEqual(3);
206+
expect(cursor.lineOffset).toEqual(0);
207+
});
208+
209+
it('cursor can break', () => {
210+
const list = new Buffers();
211+
list.push(new Uint8Array([1, 2, 3, 4]));
212+
list.push(new Uint8Array([4, 5, 6, 7, 8, 9]));
213+
list.push(new Uint8Array([7, 8, 9]));
214+
215+
const cursor = list.cursor(0);
216+
let count = 0;
217+
const iter = cursor.iterator();
218+
for (const a of iter) {
219+
expect(a).toEqual(list.get(count));
220+
if (count === 4) {
221+
iter.return();
222+
} else {
223+
count++;
224+
}
225+
}
226+
227+
expect(count).toEqual(4);
228+
229+
expect(cursor.offset).toEqual(5);
230+
expect(cursor.value).toEqual(list.get(cursor.offset));
231+
expect(cursor.line).toEqual(1);
232+
expect(cursor.lineOffset).toEqual(1);
233+
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
234+
235+
cursor.move(1);
236+
expect(cursor.offset).toEqual(6);
237+
expect(cursor.value).toEqual(list.get(cursor.offset));
238+
expect(cursor.line).toEqual(1);
239+
expect(cursor.lineOffset).toEqual(2);
240+
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
241+
242+
cursor.move(2);
243+
expect(cursor.offset).toEqual(8);
244+
expect(cursor.value).toEqual(list.get(cursor.offset));
245+
expect(cursor.line).toEqual(1);
246+
expect(cursor.lineOffset).toEqual(4);
247+
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
248+
249+
cursor.move(3);
250+
expect(cursor.offset).toEqual(11);
251+
expect(cursor.value).toEqual(list.get(cursor.offset));
252+
expect(cursor.line).toEqual(2);
253+
expect(cursor.lineOffset).toEqual(1);
254+
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
75255
});
76256
});
257+
258+
function create(xs: number[], split: number[]) {
259+
const bufs = new Buffers();
260+
let offset = 0;
261+
split.forEach(function (i) {
262+
bufs.push(new Uint8Array(xs.slice(offset, offset + i)));
263+
offset += i;
264+
});
265+
return bufs;
266+
}

packages/connection/__test__/common/stream-decoder.test.ts

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ const mixedPackets = [p1m, p5m].map((v) => {
5151
return [newPacket, v] as const;
5252
});
5353

54+
const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0);
55+
56+
let offset = 0;
57+
const bigPayload = createPayload(size);
58+
purePackets.forEach((v) => {
59+
const sumiPacket = v[0];
60+
bigPayload.set(sumiPacket, offset);
61+
offset += sumiPacket.byteLength;
62+
});
63+
5464
const packets = [...purePackets, ...mixedPackets];
5565

5666
describe('stream-packet', () => {
@@ -69,14 +79,7 @@ describe('stream-packet', () => {
6979
const decoder = new StreamPacketDecoder();
7080

7181
decoder.onData((data) => {
72-
expect(data.byteLength).toEqual(expected.byteLength);
73-
for (let i = 0; i < 10; i++) {
74-
// 随机选一些数据(<= 100字节),对比是否正确,对比整个数组的话,超大 buffer 会很耗时
75-
const start = Math.floor(Math.random() * data.byteLength);
76-
const end = Math.floor(Math.random() * 1024);
77-
78-
expect(data.subarray(start, end)).toEqual(expected.subarray(start, end));
79-
}
82+
fastExpectBufferEqual(data, expected);
8083
decoder.dispose();
8184
done();
8285
});
@@ -91,6 +94,50 @@ describe('stream-packet', () => {
9194
logMemoryUsage();
9295
});
9396
});
97+
98+
it('can decode a stream payload contains multiple packets', (done) => {
99+
const decoder = new StreamPacketDecoder();
100+
const expectCount = purePackets.length;
101+
let count = 0;
102+
decoder.onData((data) => {
103+
const expected = purePackets[count][1];
104+
fastExpectBufferEqual(data, expected);
105+
106+
count++;
107+
if (count === expectCount) {
108+
decoder.dispose();
109+
done();
110+
}
111+
});
112+
113+
console.log('write chunk', bigPayload.byteLength);
114+
// write chunk by ${pressure} bytes
115+
for (let i = 0; i < bigPayload.byteLength; i += pressure) {
116+
decoder.push(bigPayload.subarray(i, i + pressure));
117+
logMemoryUsage();
118+
}
119+
120+
logMemoryUsage();
121+
});
122+
123+
it('can decode a stream it header and payload are separated', (done) => {
124+
const v = createPayload(1024);
125+
const sumiPacket = createStreamPacket(v);
126+
127+
const decoder = new StreamPacketDecoder();
128+
decoder.onData((data) => {
129+
fastExpectBufferEqual(data, v);
130+
done();
131+
});
132+
133+
console.log('write chunk', sumiPacket.byteLength);
134+
135+
// use pressure = 2 to simulate the header and payload are separated
136+
const pressure = 2;
137+
for (let i = 0; i < sumiPacket.byteLength; i += pressure) {
138+
decoder.push(sumiPacket.subarray(i, i + pressure));
139+
}
140+
});
94141
});
95142

96143
function logMemoryUsage() {
@@ -103,3 +150,16 @@ function logMemoryUsage() {
103150

104151
console.log(text);
105152
}
153+
154+
function fastExpectBufferEqual(data: Uint8Array, expected: Uint8Array, confidenceLevel = 10) {
155+
expect(data.byteLength).toEqual(expected.byteLength);
156+
157+
for (let i = 0; i < confidenceLevel; i++) {
158+
// 如果对比整个 Uint8Array 的话,超大 Uint8Array 会很耗时
159+
// 随机选一些数据(<= 1024 字节),对比是否正确,
160+
const start = Math.floor(Math.random() * data.byteLength);
161+
const end = Math.floor(Math.random() * 1024);
162+
163+
expect(data.subarray(start, end)).toEqual(expected.subarray(start, end));
164+
}
165+
}

packages/connection/__test__/node/ws-channel.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import net from 'net';
22

33
import { WSChannel } from '@opensumi/ide-connection';
44
import { normalizedIpcHandlerPathAsync } from '@opensumi/ide-core-common/lib/utils/ipc';
5+
56
const total = 1000;
67

78
describe('ws channel node', () => {

0 commit comments

Comments
 (0)