Skip to content

Commit 3fd5a7c

Browse files
committed
fix: frame broken
1 parent 71caec6 commit 3fd5a7c

10 files changed

Lines changed: 240 additions & 254 deletions

File tree

packages/connection/__test__/common/buffers.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Buffers, copy } from '../../src/common/connection/buffers';
1+
import { Buffers, copy } from '../../src/common/buffers/buffers';
22

33
describe('Buffers', () => {
44
it('can push and slice', () => {
@@ -232,21 +232,21 @@ describe('Buffers', () => {
232232
expect(cursor.lineOffset).toEqual(1);
233233
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
234234

235-
cursor.move(1);
235+
cursor.skip(1);
236236
expect(cursor.offset).toEqual(6);
237237
expect(cursor.value).toEqual(list.get(cursor.offset));
238238
expect(cursor.line).toEqual(1);
239239
expect(cursor.lineOffset).toEqual(2);
240240
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
241241

242-
cursor.move(2);
242+
cursor.skip(2);
243243
expect(cursor.offset).toEqual(8);
244244
expect(cursor.value).toEqual(list.get(cursor.offset));
245245
expect(cursor.line).toEqual(1);
246246
expect(cursor.lineOffset).toEqual(4);
247247
expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset });
248248

249-
cursor.move(3);
249+
cursor.skip(3);
250250
expect(cursor.offset).toEqual(11);
251251
expect(cursor.value).toEqual(list.get(cursor.offset));
252252
expect(cursor.line).toEqual(2);

packages/connection/__test__/common/stream-decoder.test.ts renamed to packages/connection/__test__/common/frame-decoder.test.ts

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import { BinaryReader } from '@furyjs/fury/dist/lib/reader';
44

55
import {
6-
StreamPacketDecoder,
7-
createStreamPacket,
8-
kMagicNumber,
9-
} from '../../src/common/connection/drivers/stream-decoder';
10-
11-
const reader = BinaryReader({});
6+
LengthFieldBasedFrameDecoder,
7+
indicator,
8+
prependLengthField,
9+
} from '../../src/common/connection/drivers/frame-decoder';
1210

1311
function round(x: number, count: number) {
1412
return Math.round(x * 10 ** count) / 10 ** count;
@@ -42,14 +40,7 @@ console.timeEnd('createPayload');
4240
// 1m
4341
const pressure = 1024 * 1024;
4442

45-
const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [createStreamPacket(v), v] as const);
46-
47-
const mixedPackets = [p1m, p5m].map((v) => {
48-
const sumiPacket = createStreamPacket(v);
49-
const newPacket = createPayload(1024 + sumiPacket.byteLength);
50-
newPacket.set(sumiPacket, 1024);
51-
return [newPacket, v] as const;
52-
});
43+
const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [prependLengthField(v), v] as const);
5344

5445
const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0);
5546

@@ -61,22 +52,30 @@ purePackets.forEach((v) => {
6152
offset += sumiPacket.byteLength;
6253
});
6354

55+
const mixedPackets = [p1m, p5m].map((v) => {
56+
const sumiPacket = prependLengthField(v);
57+
const newPacket = createPayload(1024 + sumiPacket.byteLength);
58+
newPacket.set(sumiPacket, 1024);
59+
return [newPacket, v] as const;
60+
});
61+
6462
const packets = [...purePackets, ...mixedPackets];
6563

66-
describe('stream-packet', () => {
67-
it('can create sumi stream packet', () => {
64+
describe('frame decoder', () => {
65+
it('can create frame', () => {
6866
const content = new Uint8Array([1, 2, 3]);
69-
const packet = createStreamPacket(content);
67+
const packet = prependLengthField(content);
68+
const reader = BinaryReader({});
7069

7170
reader.reset(packet);
72-
expect(reader.uint32()).toBe(kMagicNumber);
73-
expect(reader.varUInt32()).toBe(content.byteLength);
71+
expect(Uint8Array.from(reader.buffer(4))).toEqual(indicator);
72+
expect(reader.uint32()).toBe(content.byteLength);
7473
expect(Uint8Array.from(reader.buffer(content.byteLength))).toEqual(content);
7574
});
7675

7776
packets.forEach(([packet, expected]) => {
78-
it(`can decode stream packet: ${round(packet.byteLength / 1024 / 1024, 2)}m`, (done) => {
79-
const decoder = new StreamPacketDecoder();
77+
it(`can decode stream: ${round(packet.byteLength / 1024 / 1024, 2)}m`, (done) => {
78+
const decoder = new LengthFieldBasedFrameDecoder();
8079

8180
decoder.onData((data) => {
8281
fastExpectBufferEqual(data, expected);
@@ -95,8 +94,8 @@ describe('stream-packet', () => {
9594
});
9695
});
9796

98-
it('can decode a stream payload contains multiple packets', (done) => {
99-
const decoder = new StreamPacketDecoder();
97+
it('can decode a stream payload contains multiple frames', (done) => {
98+
const decoder = new LengthFieldBasedFrameDecoder();
10099
const expectCount = purePackets.length;
101100
let count = 0;
102101
decoder.onData((data) => {
@@ -120,18 +119,17 @@ describe('stream-packet', () => {
120119
logMemoryUsage();
121120
});
122121

123-
it('can decode a stream it header and payload are separated', (done) => {
122+
it('can decode a stream it has no valid length info', (done) => {
124123
const v = createPayload(1024);
125-
const sumiPacket = createStreamPacket(v);
124+
const sumiPacket = prependLengthField(v);
126125

127-
const decoder = new StreamPacketDecoder();
126+
const decoder = new LengthFieldBasedFrameDecoder();
128127
decoder.onData((data) => {
129128
fastExpectBufferEqual(data, v);
130129
done();
131130
});
132131

133132
console.log('write chunk', sumiPacket.byteLength);
134-
135133
// use pressure = 2 to simulate the header and payload are separated
136134
const pressure = 2;
137135
for (let i = 0; i < sumiPacket.byteLength; i += pressure) {

packages/connection/__test__/node/ws-channel.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import net from 'net';
22

33
import { WSChannel } from '@opensumi/ide-connection';
4-
import { copy } from '@opensumi/ide-connection/lib/common/connection/buffers';
54
import { normalizedIpcHandlerPathAsync } from '@opensumi/ide-core-common/lib/utils/ipc';
65

6+
import { copy } from '../../src/common/buffers/buffers';
7+
78
const total = 1000;
89

910
describe('ws channel node', () => {

packages/connection/docs/ws-channel.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@
55
`WSChannel` 一开始是为了支持在浏览器环境下同时支持创建两个 RPCService 实例而出现的。我们在浏览器的 Browser 层需要和 Main 和 Ext 两个环境进行通信,建立两个 WebSocket 连接显然是不合适的。因此我们需要一个能够支持多路复用的 WebSocket 连接,然后 `WSChannel``WSChannelServer` 就应运而生。
66

77
其实这种多路复用是非常有用的,在 Node.js 里我们也需要它,因为我们也会需要在一个 `net.Socket` 连接中同时传输多种信息。我遇到的情况就是需要同时支持 `@vscode/jsonrpc` 和 sumi-rpc,所以我把 Node.js 的通信连接各种东西也改成了 `WSChannel`
8+
9+
所以 `WSChannel` 是两个端通信的最基础的一层,需要发消息都是由 `WSChannel` 发送给对端的 `WSChannel` 来进行分发的。

packages/connection/src/common/connection/buffers.ts renamed to packages/connection/src/common/buffers/buffers.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,34 @@ export class Cursor {
261261
}
262262
}
263263

264-
move(n: number) {
265-
this.offset += n;
266-
this.updatePosition();
264+
private skipCursor(n: number) {
265+
let count = 0;
266+
while (this.chunkIndex < this.buffers.buffers.length) {
267+
const chunk = this.buffers.buffers[this.chunkIndex];
268+
const chunkLength = chunk.byteLength;
269+
270+
while (this.chunkOffset < chunkLength) {
271+
this.chunkOffset++;
272+
this.offset++;
273+
274+
if (++count === n) {
275+
return;
276+
}
277+
}
278+
this.chunkOffset = 0;
279+
this.chunkIndex++;
280+
}
281+
}
282+
283+
read(n: number) {
284+
const end = this.offset + n;
285+
const buffers = this.buffers.slice(this.offset, end);
286+
this.skip(n);
287+
return buffers;
288+
}
289+
290+
skip(n: number) {
291+
this.skipCursor(n);
267292
}
268293

269294
moveTo(n: number) {
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { BinaryWriter } from '@furyjs/fury/dist/lib/writer';
2+
3+
import { readUInt32LE, Emitter } from '@opensumi/ide-core-common';
4+
5+
import { Buffers } from '../../buffers/buffers';
6+
7+
const writer = BinaryWriter({});
8+
9+
/**
10+
* You can use `Buffer.from('\r\n\r\n')` to get this indicator.
11+
*/
12+
export const indicator = new Uint8Array([0x0d, 0x0a, 0x0d, 0x0a]);
13+
14+
/**
15+
* sticky packet unpacking problems are generally problems at the transport layer.
16+
* we use a length field to represent the length of the data, and then read the data according to the length
17+
*/
18+
export function prependLengthField(content: Uint8Array) {
19+
writer.reset();
20+
writer.buffer(indicator);
21+
writer.uint32(content.byteLength);
22+
writer.buffer(content);
23+
return writer.dump();
24+
}
25+
26+
export class LengthFieldBasedFrameDecoder {
27+
protected dataEmitter = new Emitter<Uint8Array>();
28+
onData = this.dataEmitter.event;
29+
30+
protected buffers = new Buffers();
31+
protected cursor = this.buffers.cursor();
32+
33+
protected contentLength = -1;
34+
35+
protected state = 0;
36+
37+
/**
38+
* The number of bytes in the length field.
39+
*
40+
* How many bytes are used to represent data length.
41+
*
42+
* For example, if the length field is 4 bytes, then the maximum length of the data is 2^32 = 4GB
43+
*/
44+
lengthFieldLength = 4;
45+
46+
reset() {
47+
this.contentLength = -1;
48+
this.state = 0;
49+
this.cursor.reset();
50+
}
51+
52+
push(chunk: Uint8Array): void {
53+
this.buffers.push(chunk);
54+
let done = false;
55+
56+
while (!done) {
57+
done = this.readFrame();
58+
}
59+
}
60+
61+
protected readFrame(): boolean {
62+
const found = this.readLengthField();
63+
64+
if (found) {
65+
const start = this.cursor.offset;
66+
const end = start + this.contentLength;
67+
68+
const binary = this.buffers.slice(start, end);
69+
70+
this.dataEmitter.fire(binary);
71+
72+
if (this.buffers.byteLength > end) {
73+
this.contentLength = -1;
74+
this.state = 0;
75+
this.cursor.moveTo(end);
76+
// has more data, continue to parse
77+
return false;
78+
}
79+
80+
// delete used buffers
81+
this.buffers.splice(0, end);
82+
this.reset();
83+
}
84+
85+
return true;
86+
}
87+
88+
protected readLengthField() {
89+
const bufferLength = this.buffers.byteLength;
90+
91+
if (this.state !== 4) {
92+
if (this.cursor.offset + indicator.length > bufferLength) {
93+
// Not enough data yet, wait for more data
94+
return false;
95+
}
96+
97+
this.readIndicator();
98+
}
99+
100+
if (this.state !== 4) {
101+
// Not a complete indicator yet, wait for more data
102+
return false;
103+
}
104+
105+
if (this.contentLength === -1) {
106+
if (this.cursor.offset + this.lengthFieldLength > bufferLength) {
107+
// Not enough data yet, wait for more data
108+
return false;
109+
}
110+
111+
// read the content length
112+
const buf = this.cursor.read(this.lengthFieldLength);
113+
// fury writer use little endian
114+
this.contentLength = readUInt32LE(buf, 0);
115+
}
116+
117+
if (this.cursor.offset + this.contentLength > bufferLength) {
118+
// Not enough data yet, wait for more data
119+
return false;
120+
}
121+
122+
return true;
123+
}
124+
125+
protected readIndicator() {
126+
const iter = this.cursor.iterator();
127+
128+
let result = iter.next();
129+
while (!result.done) {
130+
switch (result.value) {
131+
case 0x0d:
132+
switch (this.state) {
133+
case 0:
134+
this.state = 1;
135+
break;
136+
case 2:
137+
this.state = 3;
138+
break;
139+
default:
140+
this.state = 0;
141+
break;
142+
}
143+
break;
144+
case 0x0a:
145+
switch (this.state) {
146+
case 1:
147+
this.state = 2;
148+
break;
149+
case 3:
150+
this.state = 4;
151+
iter.return();
152+
break;
153+
default:
154+
this.state = 0;
155+
break;
156+
}
157+
break;
158+
default:
159+
this.state = 0;
160+
break;
161+
}
162+
result = iter.next();
163+
}
164+
}
165+
166+
dispose() {
167+
this.dataEmitter.dispose();
168+
this.buffers.dispose();
169+
}
170+
}

packages/connection/src/common/connection/drivers/socket.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import type net from 'net';
33
import { IDisposable } from '@opensumi/ide-core-common';
44

55
import { BaseConnection } from './base';
6-
import { StreamPacketDecoder, createStreamPacket } from './stream-decoder';
6+
import { LengthFieldBasedFrameDecoder, prependLengthField } from './frame-decoder';
77

88
export class NetSocketConnection extends BaseConnection<Uint8Array> {
9-
protected decoder = new StreamPacketDecoder();
9+
protected decoder = new LengthFieldBasedFrameDecoder();
1010

1111
constructor(private socket: net.Socket) {
1212
super();
@@ -23,7 +23,7 @@ export class NetSocketConnection extends BaseConnection<Uint8Array> {
2323
}
2424

2525
send(data: Uint8Array): void {
26-
this.socket.write(createStreamPacket(data));
26+
this.socket.write(prependLengthField(data));
2727
}
2828

2929
onMessage(cb: (data: Uint8Array) => void): IDisposable {

0 commit comments

Comments
 (0)