diff --git a/packages/components/src/recycle-tree/tree/TreeNode.ts b/packages/components/src/recycle-tree/tree/TreeNode.ts index d0969c2956..1fc0dbad8e 100644 --- a/packages/components/src/recycle-tree/tree/TreeNode.ts +++ b/packages/components/src/recycle-tree/tree/TreeNode.ts @@ -39,15 +39,9 @@ const { Path } = path; * @param items 插入的数组 */ export function spliceArray(arr: number[], start: number, deleteCount = 0, items?: number[] | null) { - // 如果没有修改操作,直接返回原数组 - if (deleteCount === 0 && (!items || items.length === 0)) { - return arr; - } - - // 直接使用 slice + concat 避免 spread operator - const before = arr.slice(0, start); - const after = arr.slice(start + deleteCount); - return before.concat(items || []).concat(after); + const a = arr.slice(0); + a.splice(start, deleteCount, ...(items || [])); + return a; } export enum BranchOperatorStatus { @@ -576,10 +570,10 @@ export class CompositeTreeNode extends TreeNode implements ICompositeTreeNode { } /** - * 确保此"目录"的子级已加载(不影响"展开"状态) + * 确保此“目录”的子级已加载(不影响“展开”状态) * 如果子级已经加载,则返回的Promise将立即解决 * 否则,将发出重新加载请求并返回Promise - * 一旦返回的Promise.resolve,"CompositeTreeNode#children" 便可以访问到对于节点 + * 一旦返回的Promise.resolve,“CompositeTreeNode#children” 便可以访问到对于节点 */ public async ensureLoaded(token?: CancellationToken) { if (this._children) { diff --git a/packages/connection/__test__/browser/index.test.ts b/packages/connection/__test__/browser/index.test.ts index b5e29d271d..59953b1558 100644 --- a/packages/connection/__test__/browser/index.test.ts +++ b/packages/connection/__test__/browser/index.test.ts @@ -1,4 +1,4 @@ -import { WSWebSocketConnection, furySerializer } from '@opensumi/ide-connection'; +import { furySerializer } from '@opensumi/ide-connection'; import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket'; import { sleep } from '@opensumi/ide-core-common'; import { Server, WebSocket } from '@opensumi/mock-socket'; @@ -21,11 +21,10 @@ describe('connection browser', () => { let data2Received = false; mockServer.on('connection', (socket) => { - const connection = new WSWebSocketConnection(socket as any); - connection.onMessage((msg) => { + socket.on('message', (msg) => { const msgObj = furySerializer.deserialize(msg as Uint8Array); if (msgObj.kind === 'open') { - connection.send( + socket.send( furySerializer.serialize({ id: msgObj.id, kind: 'server-ready', diff --git a/packages/connection/__test__/common/buffers.test.ts b/packages/connection/__test__/common/buffers.test.ts index 99e48f0216..0d1c17f03c 100644 --- a/packages/connection/__test__/common/buffers.test.ts +++ b/packages/connection/__test__/common/buffers.test.ts @@ -253,158 +253,6 @@ describe('Buffers', () => { expect(cursor.lineOffset).toEqual(1); expect(list.pos(cursor.offset)).toEqual({ buf: cursor.line, offset: cursor.lineOffset }); }); - - // 测试空缓冲区行为 - describe('Empty Buffer Handling', () => { - it('should handle empty buffer operations', () => { - const empty = new Buffers(); - - expect(empty.byteLength).toBe(0); - expect(empty.slice()).toEqual(new Uint8Array(0)); - expect(() => empty.pos(0)).toThrow('out of range'); - expect(empty.splice(0, 0)).toEqual(expect.any(Buffers)); - }); - }); - - // 测试边界slice操作 - describe('Edge Case Slicing', () => { - const buffer = create([0, 1, 2, 3, 4, 5], [3, 3]); - - it('should handle start at chunk boundary', () => { - expect(buffer.slice(3, 5)).toEqual(new Uint8Array([3, 4])); - }); - - it('should handle end at chunk boundary', () => { - expect(buffer.slice(2, 3)).toEqual(new Uint8Array([2])); - }); - }); - it('should handle splice at exact chunk boundary', () => { - const buffer = createEnhanced([0, 1, 2, 3, 4, 5], [3, 3]); - buffer.splice(3, 2, new Uint8Array([99])); - expect(buffer.slice()).toEqual(new Uint8Array([0, 1, 2, 99, 5])); - }); - // 测试非法索引访问 - describe('Invalid Access Handling', () => { - const buffer = create([1, 2, 3], [3]); - - it('should throw on negative index', () => { - expect(() => buffer.pos(-1)).toThrow('out of range'); - }); - - it('should throw on overflow index', () => { - expect(() => buffer.pos(4)).toThrow('out of range'); - }); - }); - - // 测试超大缓冲区 - describe('Large Buffer Handling', () => { - const MB1 = new Uint8Array(1024 * 1024); - const buffer = new Buffers(); - - beforeAll(() => { - // 填充1MB数据 - for (let i = 0; i < 1024; i++) { - buffer.push(MB1.subarray(0, 1024)); - } - }); - - it('should handle 1GB data slicing', () => { - const slice = buffer.slice(1024 * 512, 1024 * 512 + 100); - expect(slice.byteLength).toBe(100); - }); - }); - - // 测试跨chunk的splice操作 - describe('Cross-Chunk Splicing', () => { - const buffer = create([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [3, 3, 4]); - - it('should splice across multiple chunks', () => { - const removed = buffer.splice(2, 5, new Uint8Array([99])); - - expect(buffer.slice()).toEqual(new Uint8Array([0, 1, 99, 7, 8, 9])); - expect(removed.slice()).toEqual(new Uint8Array([2, 3, 4, 5, 6])); - }); - }); - - // 测试slice4特殊方法 - describe('slice4 Special Cases', () => { - it('should handle partial slice4', () => { - const buffer = create([1, 2, 3], [3]); - expect(buffer.slice4(2)).toEqual(new Uint8Array([3, 0, 0, 0])); - }); - - it('should handle edge slice4', () => { - const buffer = create([1, 2, 3, 4, 5], [5]); - expect(buffer.slice4(1)).toEqual(new Uint8Array([2, 3, 4, 5])); - }); - }); - - // 测试Cursor高级功能 - describe('Cursor Advanced Operations', () => { - const buffer = create([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [3, 3, 4]); - - it('should handle moveTo across chunks', () => { - const cursor = buffer.cursor(); - cursor.moveTo(5); - expect(cursor.line).toBe(1); - expect(cursor.lineOffset).toBe(2); - }); - - it('should reset correctly', () => { - const cursor = buffer.cursor(5); - cursor.reset(); - expect(cursor.offset).toBe(0); - expect(cursor.line).toBe(0); - }); - }); - - // 测试Dispose行为 - describe('Resource Management', () => { - it('should clear resources on dispose', () => { - const buffer = create([1, 2, 3], [3]); - buffer.dispose(); - - expect(buffer.buffers).toEqual([]); - expect(buffer.byteLength).toBe(0); - }); - }); - - // 性能测试 - describe('Performance Tests', () => { - let largeBuffer: Buffers; - - beforeAll(() => { - largeBuffer = new Buffers(); - // 创建包含1000个10KB chunk的缓冲区 - for (let i = 0; i < 1000; i++) { - largeBuffer.push(new Uint8Array(10 * 1024)); - } - }); - - it('should handle slicing 1MB data under 50ms', () => { - const start = performance.now(); - const slice = largeBuffer.slice(0, 1024 * 1024); - const duration = performance.now() - start; - - expect(duration).toBeLessThan(50); - expect(slice.byteLength).toBe(1024 * 1024); - }); - - it('should handle 1000 splices under 1s', () => { - const buf = createEnhanced( - new Array(10000).fill(0).map((_, i) => i), - [100, 900, 9000], - ); - - const start = performance.now(); - for (let i = 0; i < 1000; i++) { - buf.splice(i % 100, 5, new Uint8Array([i])); - } - const duration = performance.now() - start; - - expect(duration).toBeLessThan(1000); - }); - }); }); function create(xs: number[], split: number[]) { @@ -416,16 +264,3 @@ function create(xs: number[], split: number[]) { }); return bufs; } - -function createEnhanced(xs: number[], split: number[]): Buffers { - const bufs = new Buffers(); - let offset = 0; - split.forEach((chunkSize) => { - if (chunkSize > 0) { - const chunk = new Uint8Array(xs.slice(offset, offset + chunkSize)); - bufs.push(chunk); - offset += chunkSize; - } - }); - return bufs; -} diff --git a/packages/connection/__test__/common/frame-decoder.test.ts b/packages/connection/__test__/common/frame-decoder.test.ts index f0d98be887..7054522028 100644 --- a/packages/connection/__test__/common/frame-decoder.test.ts +++ b/packages/connection/__test__/common/frame-decoder.test.ts @@ -35,9 +35,7 @@ console.timeEnd('createPayload'); // 1m const pressure = 1024 * 1024; -const purePackets = [p1k, p64k, p128k, p5m, p10m].map( - (v) => [LengthFieldBasedFrameDecoder.construct(v).dump(), v] as const, -); +const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [LengthFieldBasedFrameDecoder.construct(v), v] as const); const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0); @@ -50,7 +48,7 @@ purePackets.forEach((v) => { }); const mixedPackets = [p1m, p5m].map((v) => { - const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump(); + const sumiPacket = LengthFieldBasedFrameDecoder.construct(v); const newPacket = createPayload(1024 + sumiPacket.byteLength); newPacket.set(sumiPacket, 1024); return [newPacket, v] as const; @@ -61,7 +59,7 @@ const packets = [...purePackets, ...mixedPackets]; describe('frame decoder', () => { it('can create frame', () => { const content = new Uint8Array([1, 2, 3]); - const packet = LengthFieldBasedFrameDecoder.construct(content).dump(); + const packet = LengthFieldBasedFrameDecoder.construct(content); const reader = BinaryReader({}); reader.reset(packet); @@ -71,164 +69,67 @@ describe('frame decoder', () => { }); packets.forEach(([packet, expected]) => { - it(`can decode stream: ${round(packet.byteLength / 1024 / 1024, 2)}m`, async () => { + it(`can decode stream: ${round(packet.byteLength / 1024 / 1024, 2)}m`, (done) => { const decoder = new LengthFieldBasedFrameDecoder(); - const result = await new Promise((resolve) => { - decoder.onData((data) => resolve(data)); - - // Push the full packet - the decoder will handle chunking internally - decoder.push(packet); - }); - - fastExpectBufferEqual(result, expected); - decoder.dispose(); - }); - }); - - it('can decode a stream payload contains multiple frames', async () => { - const decoder = new LengthFieldBasedFrameDecoder(); - const receivedData: Uint8Array[] = []; - let resolved = false; - - const dataPromise = new Promise((resolve) => { decoder.onData((data) => { - receivedData.push(data); - if (receivedData.length === purePackets.length && !resolved) { - resolved = true; - resolve(); - } + fastExpectBufferEqual(data, expected); + decoder.dispose(); + done(); }); - }); - - // Push the full payload - the decoder will handle chunking internally - decoder.push(bigPayload); - - await dataPromise; - - // Verify all packets were received in order - receivedData.forEach((data, index) => { - fastExpectBufferEqual(data, purePackets[index][1]); - }); - decoder.dispose(); - }); - - it('can decode a stream it has no valid length info', async () => { - const v = createPayload(1024); - const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump(); + console.log('write chunk', packet.byteLength); + // write chunk by ${pressure} bytes + for (let i = 0; i < packet.byteLength; i += pressure) { + decoder.push(packet.subarray(i, i + pressure)); + logMemoryUsage(); + } - const decoder = new LengthFieldBasedFrameDecoder(); - const result = await new Promise((resolve) => { - decoder.onData((data) => resolve(data)); - decoder.push(sumiPacket); + logMemoryUsage(); }); - - fastExpectBufferEqual(result, v); - decoder.dispose(); }); - // 测试分块传输场景 - it('should handle chunked packets with split indicator', async () => { - const content = new Uint8Array([1, 2, 3]); - const fullPacket = LengthFieldBasedFrameDecoder.construct(content).dump(); - - // 将数据包拆分为三部分:指示符前半、指示符后半+长度、内容 - const chunks = [ - fullPacket.subarray(0, 2), // 0D 0A - fullPacket.subarray(2, 6), // 0D 0A + 长度字段前2字节 - fullPacket.subarray(6), // 剩余数据 - ]; - + it('can decode a stream payload contains multiple frames', (done) => { const decoder = new LengthFieldBasedFrameDecoder(); - const result = await new Promise((resolve) => { - decoder.onData(resolve); - - // 分三次推送数据 - chunks.forEach((chunk, i) => { - setTimeout(() => decoder.push(chunk), i * 10); - }); + const expectCount = purePackets.length; + let count = 0; + decoder.onData((data) => { + const expected = purePackets[count][1]; + fastExpectBufferEqual(data, expected); + + count++; + if (count === expectCount) { + decoder.dispose(); + done(); + } }); - fastExpectBufferEqual(result, content); - }); - - // 测试高频小数据包压力 - it('should handle 1000 sequential small packets', async () => { - const decoder = new LengthFieldBasedFrameDecoder(); - const received: Uint8Array[] = []; - - decoder.onData((data) => received.push(data)); - - // 生成1000个独立数据包 - for (let i = 0; i < 1000; i++) { - const packet = LengthFieldBasedFrameDecoder.construct(new Uint8Array([i % 256])).dump(); - - decoder.push(packet); + console.log('write chunk', bigPayload.byteLength); + // write chunk by ${pressure} bytes + for (let i = 0; i < bigPayload.byteLength; i += pressure) { + decoder.push(bigPayload.subarray(i, i + pressure)); + logMemoryUsage(); } - // 等待处理完成 - await new Promise((resolve) => setTimeout(resolve, 1000)); - - expect(received.length).toBe(1000); - received.forEach((data, i) => { - expect(data[0]).toBe(i % 256); - }); + logMemoryUsage(); }); - // 测试内存稳定性 - it('should not leak memory after processing', async () => { - const initialMemory = process.memoryUsage().heapUsed; - - for (let i = 0; i < 100; i++) { - const decoder = new LengthFieldBasedFrameDecoder(); - const packet = LengthFieldBasedFrameDecoder.construct( - createPayload(1024 * 1024), // 1MB payload - ).dump(); - - await new Promise((resolve) => { - decoder.onData(() => resolve()); - decoder.push(packet); - }); - - decoder.dispose(); - } - - const finalMemory = process.memoryUsage().heapUsed; - expect(finalMemory - initialMemory).toBeLessThan(5 * 1024 * 1024); // 允许5MB波动 - }); - - // 测试空数据包处理 - it('should handle zero-length payload', async () => { - const decoder = new LengthFieldBasedFrameDecoder(); - const emptyPacket = LengthFieldBasedFrameDecoder.construct(new Uint8Array(0)).dump(); - - const result = await new Promise((resolve) => { - decoder.onData(resolve); - decoder.push(emptyPacket); - }); - - expect(result.byteLength).toBe(0); - }); + it('can decode a stream it has no valid length info', (done) => { + const v = createPayload(1024); + const sumiPacket = LengthFieldBasedFrameDecoder.construct(v); - // 测试并发推送 - it('should handle concurrent pushes', async () => { const decoder = new LengthFieldBasedFrameDecoder(); - const content = new Uint8Array([1, 2, 3]); - const packet = LengthFieldBasedFrameDecoder.construct(content).dump(); - - const chunk1 = packet.subarray(0, 4); - const chunk2 = packet.subarray(4); - - const resultPromise = new Promise((resolve) => { - decoder.onData(resolve); + decoder.onData((data) => { + fastExpectBufferEqual(data, v); + done(); }); - // 同时推送两个chunk - await Promise.all([decoder.push(chunk1), decoder.push(chunk2)]); - - const result = await resultPromise; - fastExpectBufferEqual(result, content); + console.log('write chunk', sumiPacket.byteLength); + // use pressure = 2 to simulate the header and payload are separated + const pressure = 2; + for (let i = 0; i < sumiPacket.byteLength; i += pressure) { + decoder.push(sumiPacket.subarray(i, i + pressure)); + } }); }); diff --git a/packages/connection/__test__/node/index.test.ts b/packages/connection/__test__/node/index.test.ts index e7793a0392..a748825db8 100644 --- a/packages/connection/__test__/node/index.test.ts +++ b/packages/connection/__test__/node/index.test.ts @@ -8,7 +8,7 @@ import { SumiConnection } from '@opensumi/ide-connection/src/common/rpc/connecti import { Deferred } from '@opensumi/ide-core-common'; import { RPCService } from '../../src'; -import { ChannelMessage, RPCServiceCenter, initRPCService } from '../../src/common'; +import { RPCServiceCenter, initRPCService } from '../../src/common'; import { CommonChannelPathHandler } from '../../src/common/server-handler'; import { WSChannel } from '../../src/common/ws-channel'; import { CommonChannelHandler, WebSocketServerRoute } from '../../src/node'; @@ -62,16 +62,14 @@ describe('connection', () => { }); const clientId = 'TEST_CLIENT'; const wsConnection = new WSWebSocketConnection(connection); - const wrappedConnection = wrapSerializer(wsConnection, furySerializer); - const channel = new WSChannel(wrapSerializer(wsConnection, furySerializer), { id: 'TEST_CHANNEL_ID', }); - - wrappedConnection.onMessage((msg: ChannelMessage) => { - if (msg.kind === 'server-ready') { - if (msg.id === 'TEST_CHANNEL_ID') { - channel.dispatch(msg); + connection.on('message', (msg: Uint8Array) => { + const msgObj = furySerializer.deserialize(msg); + if (msgObj.kind === 'server-ready') { + if (msgObj.id === 'TEST_CHANNEL_ID') { + channel.dispatch(msgObj); } } }); diff --git a/packages/connection/__test__/node/ws-channel.test.ts b/packages/connection/__test__/node/ws-channel.test.ts index b0c1070ad1..f71828a1f8 100644 --- a/packages/connection/__test__/node/ws-channel.test.ts +++ b/packages/connection/__test__/node/ws-channel.test.ts @@ -85,7 +85,7 @@ describe('ws channel node', () => { }), new Promise((resolve) => { for (let i = 0; i < total; i++) { - channel2.send('hello' + i); + channel2.send('hello'); } resolve(); }), @@ -95,7 +95,7 @@ describe('ws channel node', () => { socket2.destroy(); socket2.end(); }, - 30 * 1000, + 20 * 1000, ); it( @@ -149,6 +149,6 @@ describe('ws channel node', () => { socket2.destroy(); socket2.end(); }, - 30 * 1000, + 20 * 1000, ); }); diff --git a/packages/connection/src/common/buffers/buffers.ts b/packages/connection/src/common/buffers/buffers.ts index e3abacbcb9..583056910b 100644 --- a/packages/connection/src/common/buffers/buffers.ts +++ b/packages/connection/src/common/buffers/buffers.ts @@ -5,7 +5,6 @@ */ export const emptyBuffer = new Uint8Array(0); -export const buffer4Capacity = new Uint8Array(4); export function copy( source: Uint8Array, @@ -73,39 +72,6 @@ export class Buffers { return target; } - slice4(start: number) { - let end = start + 4; - const buffers = this.buffers; - - if (end > this.size) { - end = this.size; - } - - if (start >= end) { - return emptyBuffer; - } - - let startBytes = 0; - let si = 0; - for (; si < buffers.length && startBytes + buffers[si].length <= start; si++) { - startBytes += buffers[si].length; - } - - const target = buffer4Capacity; - - let ti = 0; - for (let ii = si; ti < end - start && ii < buffers.length; ii++) { - const len = buffers[ii].length; - - const _start = ti === 0 ? start - startBytes : 0; - const _end = ti + len >= end - start ? Math.min(_start + (end - start) - ti, len) : len; - copy(buffers[ii], target, ti, _start, _end); - ti += _end - _start; - } - - return target; - } - pos(i: number): { buf: number; offset: number } { if (i < 0 || i >= this.size) { throw new Error(`out of range, ${i} not in [0, ${this.size})`); @@ -302,12 +268,6 @@ export class Cursor { return buffers; } - read4() { - const buffers = this.buffers.slice4(this.offset); - this.skip(4); - return buffers; - } - skip(n: number) { let count = 0; while (this.chunkIndex < this.buffers.buffers.length) { diff --git a/packages/connection/src/common/connection/drivers/frame-decoder.ts b/packages/connection/src/common/connection/drivers/frame-decoder.ts index 3010bbdcde..eb5ef07b43 100644 --- a/packages/connection/src/common/connection/drivers/frame-decoder.ts +++ b/packages/connection/src/common/connection/drivers/frame-decoder.ts @@ -1,7 +1,6 @@ -/* eslint-disable no-console */ import { BinaryWriter } from '@furyjs/fury/dist/lib/writer'; -import { MaybeNull, readUInt32LE, setImmediate } from '@opensumi/ide-core-common'; +import { Emitter, readUInt32LE } from '@opensumi/ide-core-common'; import { Buffers } from '../../buffers/buffers'; @@ -10,40 +9,30 @@ import { Buffers } from '../../buffers/buffers'; */ export const indicator = new Uint8Array([0x0d, 0x0a, 0x0d, 0x0a]); -/** - * The number of bytes in the length field. - * - * How many bytes are used to represent data length. - * - * For example, if the length field is 4 bytes, then the maximum length of the data is 2^32 = 4GB - */ -const lengthFieldLength = 4; - /** * sticky packet unpacking problems are generally problems at the transport layer. * we use a length field to represent the length of the data, and then read the data according to the length */ export class LengthFieldBasedFrameDecoder { - private static readonly MAX_ITERATIONS = 50; - - private _onDataListener: MaybeNull<(data: Uint8Array) => void>; - onData(listener: (data: Uint8Array) => void) { - this._onDataListener = listener; - return { - dispose: () => { - this._onDataListener = null; - }, - }; - } + protected dataEmitter = new Emitter(); + onData = this.dataEmitter.event; protected buffers = new Buffers(); protected cursor = this.buffers.cursor(); - private processingPromise: Promise | null = null; protected contentLength = -1; protected state = 0; + /** + * The number of bytes in the length field. + * + * How many bytes are used to represent data length. + * + * For example, if the length field is 4 bytes, then the maximum length of the data is 2^32 = 4GB + */ + lengthFieldLength = 4; + reset() { this.contentLength = -1; this.state = 0; @@ -52,81 +41,38 @@ export class LengthFieldBasedFrameDecoder { push(chunk: Uint8Array): void { this.buffers.push(chunk); + let done = false; - // 确保同一时间只有一个处理过程 - if (!this.processingPromise) { - this.processingPromise = this.processBuffers().finally(() => { - this.processingPromise = null; - }); + while (!done) { + done = this.readFrame(); } } - private async processBuffers(): Promise { - let iterations = 0; - let hasMoreData = false; - - do { - hasMoreData = false; - while (iterations < LengthFieldBasedFrameDecoder.MAX_ITERATIONS) { - if (this.buffers.byteLength === 0) { - break; - } - - const result = await this.readFrame(); - if (result === true) { - break; - } - - iterations++; - if (iterations % 10 === 0) { - await new Promise((resolve) => setTimeout(resolve, 0)); - } - } - - // 检查剩余数据 - if (this.buffers.byteLength > 0) { - hasMoreData = true; - // 异步继续处理,避免阻塞 - await new Promise((resolve) => setImmediate(resolve)); - iterations = 0; // 重置迭代计数器 - } - } while (hasMoreData); - } - - protected async readFrame(): Promise { - try { - const found = this.readLengthField(); - if (!found) { - return true; - } + protected readFrame(): boolean { + const found = this.readLengthField(); + if (found) { const start = this.cursor.offset; const end = start + this.contentLength; - if (end > this.buffers.byteLength) { - return true; - } - const binary = this.buffers.slice(start, end); - // 立即清理已处理的数据 - this.buffers.splice(0, end); - this.reset(); + this.dataEmitter.fire(binary); - if (this._onDataListener) { - try { - await Promise.resolve().then(() => this._onDataListener?.(binary)); - } catch (error) { - console.error('[Frame Decoder] Error in data listener:', error); - } + if (this.buffers.byteLength > end) { + this.contentLength = -1; + this.state = 0; + this.cursor.moveTo(end); + // has more data, continue to parse + return false; } - return false; - } catch (error) { - console.error('[Frame Decoder] Error processing frame:', error); + // delete used buffers + this.buffers.splice(0, end); this.reset(); - return true; } + + return true; } protected readLengthField() { @@ -147,13 +93,13 @@ export class LengthFieldBasedFrameDecoder { } if (this.contentLength === -1) { - if (this.cursor.offset + lengthFieldLength > bufferLength) { + if (this.cursor.offset + this.lengthFieldLength > bufferLength) { // Not enough data yet, wait for more data return false; } // read the content length - const buf = this.cursor.read4(); + const buf = this.cursor.read(this.lengthFieldLength); // fury writer use little endian this.contentLength = readUInt32LE(buf, 0); } @@ -172,12 +118,12 @@ export class LengthFieldBasedFrameDecoder { let result = iter.next(); while (!result.done) { switch (result.value) { - case 0x0d: // \r + case 0x0d: switch (this.state) { case 0: this.state = 1; break; - case 2: // 第二个 \r + case 2: this.state = 3; break; default: @@ -185,12 +131,12 @@ export class LengthFieldBasedFrameDecoder { break; } break; - case 0x0a: // \n + case 0x0a: switch (this.state) { case 1: this.state = 2; break; - case 3: // 第二个 \n + case 3: this.state = 4; iter.return(); break; @@ -208,23 +154,17 @@ export class LengthFieldBasedFrameDecoder { } dispose() { - this._onDataListener = null; + this.dataEmitter.dispose(); this.buffers.dispose(); - this.reset(); } + static writer = BinaryWriter({}); + static construct(content: Uint8Array) { - // 每次都创建新的 writer,避免所有权问题 - const writer = BinaryWriter({}); - - try { - writer.buffer(indicator); - writer.uint32(content.byteLength); - writer.buffer(content); - return writer; - } catch (error) { - console.warn('[Frame Decoder] Error constructing frame:', error); - throw error; - } + LengthFieldBasedFrameDecoder.writer.reset(); + LengthFieldBasedFrameDecoder.writer.buffer(indicator); + LengthFieldBasedFrameDecoder.writer.uint32(content.byteLength); + LengthFieldBasedFrameDecoder.writer.buffer(content); + return LengthFieldBasedFrameDecoder.writer.dump(); } } diff --git a/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts b/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts index f4fa99cb92..4b4694da79 100644 --- a/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts +++ b/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts @@ -1,74 +1,20 @@ -/* eslint-disable no-console */ import { IDisposable } from '@opensumi/ide-core-common'; import ReconnectingWebSocket, { Options as ReconnectingWebSocketOptions, UrlProvider, } from '@opensumi/reconnecting-websocket'; -import { chunkSize } from '../../constants'; - import { BaseConnection } from './base'; -import { LengthFieldBasedFrameDecoder } from './frame-decoder'; import type { ErrorEvent } from '@opensumi/reconnecting-websocket'; export class ReconnectingWebSocketConnection extends BaseConnection { - protected decoder = new LengthFieldBasedFrameDecoder(); - private sendQueue: Array<{ data: Uint8Array; resolve: () => void; reject: (error: Error) => void }> = []; - private sending = false; - - protected constructor(private socket: ReconnectingWebSocket) { + constructor(private socket: ReconnectingWebSocket) { super(); - - if (socket.binaryType === 'arraybuffer') { - this.socket.addEventListener('message', this.arrayBufferHandler); - } else if (socket.binaryType === 'blob') { - throw new Error('blob is not implemented'); - } } - private async processSendQueue() { - if (this.sending) { - return; - } - this.sending = true; - - while (this.sendQueue.length > 0) { - const { data, resolve, reject } = this.sendQueue[0]; - let handle: { get: () => Uint8Array; dispose: () => void } | null = null; - - try { - handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); - const packet = handle.get(); - - for (let i = 0; i < packet.byteLength; i += chunkSize) { - await new Promise((resolve) => { - const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); - this.socket.send(chunk); - resolve(); - }); - } - - resolve(); - } catch (error) { - console.error('[ReconnectingWebSocket] Error sending data:', error); - reject(error); - } finally { - if (handle) { - handle.dispose(); - } - } - this.sendQueue.shift(); - } - - this.sending = false; - } - - send(data: Uint8Array): Promise { - return new Promise((resolve, reject) => { - this.sendQueue.push({ data, resolve, reject }); - this.processSendQueue(); - }); + send(data: Uint8Array): void { + this.socket.send(data); } isOpen(): boolean { @@ -83,8 +29,29 @@ export class ReconnectingWebSocketConnection extends BaseConnection }, }; } + onMessage(cb: (data: Uint8Array) => void): IDisposable { - return this.decoder.onData(cb); + const handler = (e: MessageEvent) => { + let buffer: Promise; + if (e.data instanceof Blob) { + buffer = e.data.arrayBuffer(); + } else if (e.data instanceof ArrayBuffer) { + buffer = Promise.resolve(e.data); + } else if (e.data?.constructor?.name === 'Buffer') { + // Compatibility with nodejs Buffer in test environment + buffer = Promise.resolve(e.data); + } else { + throw new Error('unknown message type, expect Blob or ArrayBuffer, received: ' + typeof e.data); + } + buffer.then((v) => cb(new Uint8Array(v, 0, v.byteLength))); + }; + + this.socket.addEventListener('message', handler); + return { + dispose: () => { + this.socket.removeEventListener('message', handler); + }, + }; } onceClose(cb: (code?: number, reason?: string) => void): IDisposable { const disposable = this.onClose(wrapper); @@ -124,15 +91,8 @@ export class ReconnectingWebSocketConnection extends BaseConnection }; } - private arrayBufferHandler = (e: MessageEvent) => { - const buffer: ArrayBuffer = e.data; - this.decoder.push(new Uint8Array(buffer, 0, buffer.byteLength)); - }; - dispose(): void { - this.socket.removeEventListener('message', this.arrayBufferHandler); - this.sendQueue = []; - this.sending = false; + // do nothing } static forURL(url: UrlProvider, protocols?: string | string[], options?: ReconnectingWebSocketOptions) { diff --git a/packages/connection/src/common/connection/drivers/stream.ts b/packages/connection/src/common/connection/drivers/stream.ts index 20659e1cf1..5639ec3d02 100644 --- a/packages/connection/src/common/connection/drivers/stream.ts +++ b/packages/connection/src/common/connection/drivers/stream.ts @@ -1,4 +1,3 @@ -/* eslint-disable no-console */ import { IDisposable } from '@opensumi/ide-core-common'; import { BaseConnection } from './base'; @@ -22,16 +21,10 @@ export class StreamConnection extends BaseConnection { } send(data: Uint8Array): void { - const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); - try { - this.writable.write(handle.get(), (error) => { - if (error) { - console.error('Failed to write data:', error); - } - }); - } finally { - handle.dispose(); - } + const result = LengthFieldBasedFrameDecoder.construct(data); + this.writable.write(result, () => { + // TODO: logger error + }); } onMessage(cb: (data: Uint8Array) => void): IDisposable { diff --git a/packages/connection/src/common/connection/drivers/ws-websocket.ts b/packages/connection/src/common/connection/drivers/ws-websocket.ts index 2de66a77c3..d353fbfb07 100644 --- a/packages/connection/src/common/connection/drivers/ws-websocket.ts +++ b/packages/connection/src/common/connection/drivers/ws-websocket.ts @@ -1,102 +1,24 @@ -/* eslint-disable no-console */ import { IDisposable } from '@opensumi/ide-core-common'; -import { chunkSize } from '../../constants'; - import { BaseConnection } from './base'; -import { LengthFieldBasedFrameDecoder } from './frame-decoder'; import type WebSocket from 'ws'; -interface SendQueueItem { - data: Uint8Array; - resolve: () => void; - reject: (error: Error) => void; -} - export class WSWebSocketConnection extends BaseConnection { - protected decoder = new LengthFieldBasedFrameDecoder(); - private static readonly MAX_QUEUE_SIZE = 1000; // 限制队列长度 - - private sendQueue: SendQueueItem[] = []; - private pendingSize = 0; - private sending = false; - constructor(public socket: WebSocket) { super(); - this.socket.on('message', (data: Buffer) => { - this.decoder.push(data); - }); } - - private async processSendQueue() { - if (this.sending) { - return; - } - this.sending = true; - - while (this.sendQueue.length > 0) { - const { data, resolve, reject } = this.sendQueue[0]; - let handle: { get: () => Uint8Array; dispose: () => void } | null = null; - - try { - handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); - const packet = handle.get(); - - for (let i = 0; i < packet.byteLength; i += chunkSize) { - if (!this.isOpen()) { - throw new Error('Connection closed while sending'); - } - - await new Promise((resolve, reject) => { - const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); - this.socket.send(chunk, { binary: true }, (error?: Error) => { - if (error) { - reject(error); - } else { - resolve(); - } - }); - }); - } - - resolve(); - } catch (error) { - reject(error instanceof Error ? error : new Error(String(error))); - } finally { - if (handle) { - try { - handle.dispose(); - } catch (error) { - console.warn('[WSWebSocket] Error disposing handle:', error); - } - } - this.pendingSize -= this.sendQueue[0].data.byteLength; - this.sendQueue.shift(); - } - } - - this.sending = false; - } - - send(data: Uint8Array): Promise { - return new Promise((resolve, reject) => { - // 检查队列大小限制 - if (this.sendQueue.length >= WSWebSocketConnection.MAX_QUEUE_SIZE) { - reject(new Error('Send queue full')); - return; - } - - this.pendingSize += data.byteLength; - this.sendQueue.push({ data, resolve, reject }); - this.processSendQueue().catch((error) => { - console.error('[WSWebSocket] Error processing queue:', error); - }); - }); + send(data: Uint8Array): void { + this.socket.send(data); } onMessage(cb: (data: Uint8Array) => void): IDisposable { - return this.decoder.onData(cb); + this.socket.on('message', cb); + return { + dispose: () => { + this.socket.off('message', cb); + }, + }; } onceClose(cb: () => void): IDisposable { this.socket.once('close', cb); @@ -113,12 +35,5 @@ export class WSWebSocketConnection extends BaseConnection { dispose(): void { this.socket.removeAllListeners(); - // 拒绝所有待发送的消息 - while (this.sendQueue.length > 0) { - const { reject } = this.sendQueue.shift()!; - reject(new Error('Connection disposed')); - } - this.pendingSize = 0; - this.sending = false; } } diff --git a/packages/connection/src/common/constants.ts b/packages/connection/src/common/constants.ts index 017e9d4216..c98cfccb5b 100644 --- a/packages/connection/src/common/constants.ts +++ b/packages/connection/src/common/constants.ts @@ -1,6 +1 @@ export const METHOD_NOT_REGISTERED = '$$METHOD_NOT_REGISTERED'; - -/** - * 分片大小, 1MB - */ -export const chunkSize = 1 * 1024 * 1024; diff --git a/packages/connection/src/common/fury-extends/one-of.ts b/packages/connection/src/common/fury-extends/one-of.ts index dd263f369f..33d9e50de3 100644 --- a/packages/connection/src/common/fury-extends/one-of.ts +++ b/packages/connection/src/common/fury-extends/one-of.ts @@ -68,9 +68,6 @@ export const oneOf = ( case 7: v = serializers[7].read(); break; - default: { - throw new Error('unknown index: ' + idx); - } } v.kind = kinds[idx]; diff --git a/packages/connection/src/node/common-channel-handler.ts b/packages/connection/src/node/common-channel-handler.ts index 076432dcce..156613f256 100644 --- a/packages/connection/src/node/common-channel-handler.ts +++ b/packages/connection/src/node/common-channel-handler.ts @@ -42,7 +42,8 @@ export class CommonChannelHandler extends BaseCommonChannelHandler implements We ...this.options.wsServerOptions, }); this.wsServer.on('connection', (connection: WebSocket) => { - this.receiveConnection(new WSWebSocketConnection(connection)); + const wsConnection = new WSWebSocketConnection(connection); + this.receiveConnection(wsConnection); }); } diff --git a/packages/core-browser/__tests__/bootstrap/connection.test.ts b/packages/core-browser/__tests__/bootstrap/connection.test.ts index 6bf8ee0be7..b87b9da2ac 100644 --- a/packages/core-browser/__tests__/bootstrap/connection.test.ts +++ b/packages/core-browser/__tests__/bootstrap/connection.test.ts @@ -1,6 +1,6 @@ import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket'; -import { BrowserConnectionErrorEvent, IEventBus, sleep } from '@opensumi/ide-core-common'; +import { BrowserConnectionErrorEvent, IEventBus } from '@opensumi/ide-core-common'; import { createBrowserInjector } from '@opensumi/ide-dev-tool/src/injector-helper'; import { MockInjector } from '@opensumi/ide-dev-tool/src/mock-injector'; import { Server, WebSocket } from '@opensumi/mock-socket'; @@ -34,8 +34,11 @@ describe('packages/core-browser/src/bootstrap/connection.test.ts', () => { const channelHandler = new WSChannelHandler(ReconnectingWebSocketConnection.forURL(fakeWSURL), 'test-client-id'); createConnectionService(injector, [], channelHandler); stateService.state = 'core_module_initialized'; - - sleep(4000).then(() => { + new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 4000); + }).then(() => { mockServer.simulate('error'); }); });