Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/connection/__test__/browser/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('connection browser', () => {
furySerializer.serialize({
id: msgObj.id,
kind: 'server-ready',
token: '',
traceId: '',
}),
);
} else if (msgObj.kind === 'data') {
Expand Down Expand Up @@ -66,7 +66,7 @@ describe('connection browser', () => {
channel.dispatch({
kind: 'server-ready',
id: 'test',
token: '',
traceId: '',
});
await sleep(500);
// message queue flushed
Expand Down
18 changes: 9 additions & 9 deletions packages/connection/__test__/common/fury-extends/one-of.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* eslint-disable no-console */

import { OpenMessage, PingMessage, PongMessage, ServerReadyMessage, furySerializer } from '@opensumi/ide-connection';
import { OpenMessage, PingMessage, PongMessage, ServerReadyMessage } from '../../../src/common/channel/types';
import { furySerializer } from '../../../src/common/serializer';

const parse = furySerializer.deserialize;
const stringify = furySerializer.serialize;
Expand All @@ -9,11 +8,12 @@ describe('oneOf', () => {
function testIt(obj: any) {
const bytes = stringify(obj);
const obj2 = parse(bytes);
expect(obj2).toEqual(obj);
const str = JSON.stringify(obj);

console.log('bytes.length', bytes.byteLength);
console.log('json length', str.length);
// 确保 obj 里的所有字段都在 obj2 里
// eslint-disable-next-line guard-for-in
for (const key in Object.keys(obj)) {
expect(obj2[key]).toEqual(obj[key]);
}
}

it('should serialize and deserialize', () => {
Expand All @@ -36,15 +36,15 @@ describe('oneOf', () => {
clientId: '123',
id: '456',
path: '/test',
connectionToken: 'abc',
traceId: '',
};

testIt(obj3);

const obj4: ServerReadyMessage = {
kind: 'server-ready',
id: '456',
token: '',
traceId: '',
};

testIt(obj4);
Expand Down
32 changes: 16 additions & 16 deletions packages/connection/src/common/channel/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,42 @@ export type ChannelMessage =
| CloseMessage
| ErrorMessage;

export interface BaseMessage {
kind: string;
id: string;
traceId?: string;
}

/**
* `ping` and `pong` are used to detect whether the connection is alive.
*/
export interface PingMessage {
export interface PingMessage extends BaseMessage {
kind: 'ping';
id: string;
}

/**
* when server receive a `ping` message, it should reply a `pong` message, vice versa.
*/
export interface PongMessage {
export interface PongMessage extends BaseMessage {
kind: 'pong';
id: string;
}

/**
* `data` message indicate that the channel has received some data.
* the `content` field is the data, it should be a string.
*/
export interface DataMessage {
export interface DataMessage extends BaseMessage {
kind: 'data';
id: string;
content: string;
}

export interface BinaryMessage {
export interface BinaryMessage extends BaseMessage {
kind: 'binary';
id: string;
binary: Uint8Array;
}

export interface CloseMessage {
export interface CloseMessage extends BaseMessage {
kind: 'close';
id: string;
code: number;
reason: string;
}
Expand All @@ -52,19 +53,18 @@ export interface CloseMessage {
* `path` is used to identify which handler should be used to handle the channel.
* `clientId` is used to identify the client.
*/
export interface OpenMessage {
export interface OpenMessage extends BaseMessage {
kind: 'open';
id: string;
path: string;
clientId: string;
connectionToken: string;
traceId: string;
}

export enum ErrorMessageCode {
ChannelNotFound = 1,
}

export interface ErrorMessage {
export interface ErrorMessage extends BaseMessage {
kind: 'error';
id: string;
code: ErrorMessageCode;
Expand All @@ -75,8 +75,8 @@ export interface ErrorMessage {
* when server receive a `open` message, it should reply a `server-ready` message.
* this is indicate that the channel is ready to use.
*/
export interface ServerReadyMessage {
export interface ServerReadyMessage extends BaseMessage {
kind: 'server-ready';
id: string;
token: string;
traceId: string;
}
2 changes: 2 additions & 0 deletions packages/connection/src/common/rpc-service/center.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ export class RPCServiceCenter implements IDisposable {

dispose(): void {
this._disposables.dispose();
this.proxies.forEach((proxy) => proxy.dispose());
this.proxies = [];
}
}

Expand Down
24 changes: 15 additions & 9 deletions packages/connection/src/common/serializer/fury.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,50 @@ import { oneOf } from '../fury-extends/one-of';

import { ISerializer } from './types';

function baseFields() {
return {
id: Type.string(),
};
}

export const PingProtocol = Type.object('ping', {
id: Type.string(),
...baseFields(),
});

export const PongProtocol = Type.object('pong', {
id: Type.string(),
...baseFields(),
});

export const OpenProtocol = Type.object('open', {
...baseFields(),
clientId: Type.string(),
id: Type.string(),
path: Type.string(),
connectionToken: Type.string(),
traceId: Type.string(),
});

export const ServerReadyProtocol = Type.object('server-ready', {
id: Type.string(),
...baseFields(),
token: Type.string(),
});

export const ErrorProtocol = Type.object('error', {
id: Type.string(),
...baseFields(),
code: Type.uint16(),
message: Type.string(),
});

export const DataProtocol = Type.object('data', {
id: Type.string(),
...baseFields(),
content: Type.string(),
});

export const BinaryProtocol = Type.object('binary', {
id: Type.string(),
...baseFields(),
binary: Type.binary(),
});

export const CloseProtocol = Type.object('close', {
id: Type.string(),
...baseFields(),
code: Type.uint32(),
reason: Type.string(),
});
Expand Down
30 changes: 17 additions & 13 deletions packages/connection/src/common/server-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ export interface ChannelHandlerOptions {
serializer?: ISerializer<ChannelMessage, any>;
}

enum ServerChannelCloseCode {
ConnectionClosed = 1,
NewChannelOpened = 2,
}

export abstract class BaseCommonChannelHandler {
protected channelMap: Map<string, WSServerChannel> = new Map();

Expand Down Expand Up @@ -135,25 +140,24 @@ export abstract class BaseCommonChannelHandler {

const wrappedConnection = wrapSerializer(connection, this.serializer);

const getOrCreateChannel = (id: string, clientId: string) => {
let channel = this.channelMap.get(id);
if (!channel) {
channel = new WSServerChannel(wrappedConnection, { id, clientId, logger: this.logger });
this.channelMap.set(id, channel);
}
return channel;
};

wrappedConnection.onMessage((msg: ChannelMessage) => {
try {
switch (msg.kind) {
case 'open': {
const { id, path, connectionToken } = msg;
const { id, path, traceId } = msg;
clientId = msg.clientId;

this.logger.log(`open a new connection channel ${clientId} with path ${path}`);
const channel = getOrCreateChannel(id, clientId);
let channel = this.channelMap.get(id);
if (channel) {
channel.close(ServerChannelCloseCode.NewChannelOpened, 'new channel opened for the same channel id');
channel.dispose();
}

channel = new WSServerChannel(wrappedConnection, { id, clientId, logger: this.logger });
this.channelMap.set(id, channel);
commonChannelPathHandler.openChannel(path, channel, clientId);
channel.serverReady(connectionToken);
channel.serverReady(traceId);
break;
}
default: {
Expand Down Expand Up @@ -186,7 +190,7 @@ export abstract class BaseCommonChannelHandler {
Array.from(this.channelMap.values())
.filter((channel) => channel.clientId === clientId)
.forEach((channel) => {
channel.close(1, 'close');
channel.close(ServerChannelCloseCode.ConnectionClosed, 'connection closed');
channel.dispose();
this.channelMap.delete(channel.id);
this.logger.log(`Remove connection channel ${channel.id}`);
Expand Down
Loading