Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 53 additions & 48 deletions packages/components/src/notification/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import cls from 'classnames';
import React from 'react';

import { IAction } from '@opensumi/ide-core-common';
import { isUndefined } from '@opensumi/ide-utils';
import { CancelablePromise, createCancelablePromise, isUndefined } from '@opensumi/ide-utils';

import { Button } from '../button';
import { MessageType } from '../common';
Expand Down Expand Up @@ -33,55 +33,60 @@ export function open<T = string>(
description?: string | React.ReactNode,
duration?: number,
onClose?: () => void,
): Promise<T | undefined> | undefined {
return new Promise((resolve) => {
const args: ArgsProps = {
key,
className: cls('kt-notification-wrapper', {
['kt-notification-info']: type === MessageType.Info,
['kt-notification-error']: type === MessageType.Error,
['kt-notification-warn']: type === MessageType.Warning,
}),
duration: isUndefined(duration) ? DURATION[type] : duration,
onClose: () => {
onClose && onClose();
cachedArgs.delete(key);
resolve(undefined);
},
btn: buttons
? buttons.map((button, index) => {
const isStringButton = typeof button === 'string';
const buttonProps = {
className: `${cls('kt-notification-button')}${isStringButton ? '' : button.class}`,
ghost: isStringButton ? index === 0 : !button.primary,
key: isStringButton ? button : button.id,
onClick: () => {
resolve(button as any);
antdNotification.close(key);
if (!isStringButton) {
button.run();
}
},
};
const text = isStringButton ? button : button.label;
return (
<Button size='small' {...buttonProps}>
{text}
</Button>
);
})
: null,
message,
description,
};
cachedArgs.set(key, [type, args]);
): CancelablePromise<T | undefined> | undefined {
return createCancelablePromise<T | undefined>((token) => {
token.onCancellationRequested(() => {
close(key);
});
return new Promise((resolve) => {
const args: ArgsProps = {
key,
className: cls('kt-notification-wrapper', {
['kt-notification-info']: type === MessageType.Info,
['kt-notification-error']: type === MessageType.Error,
['kt-notification-warn']: type === MessageType.Warning,
}),
duration: isUndefined(duration) ? DURATION[type] : duration,
onClose: () => {
onClose && onClose();
cachedArgs.delete(key);
resolve(undefined);
},
btn: buttons
? buttons.map((button, index) => {
const isStringButton = typeof button === 'string';
const buttonProps = {
className: `${cls('kt-notification-button')}${isStringButton ? '' : button.class}`,
ghost: isStringButton ? index === 0 : !button.primary,
key: isStringButton ? button : button.id,
onClick: () => {
resolve(button as any);
antdNotification.close(key);
if (!isStringButton) {
button.run();
}
},
};
const text = isStringButton ? button : button.label;
return (
<Button size='small' {...buttonProps}>
{text}
</Button>
);
})
: null,
message,
description,
};
cachedArgs.set(key, [type, args]);

// closable 为 false 时,不展示 closeIcon
if (!closable) {
args.closeIcon = <span />;
}
// closable 为 false 时,不展示 closeIcon
if (!closable) {
args.closeIcon = <span />;
}

doOpenNotification(type, args);
doOpenNotification(type, args);
});
});
}

Expand Down
98 changes: 58 additions & 40 deletions packages/connection/src/browser/ws-channel-handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IReporterService, REPORT_NAME } from '@opensumi/ide-core-common';
import { Barrier, IReporterService, MultiMap, REPORT_NAME } from '@opensumi/ide-core-common';

import { NetSocketConnection } from '../common/connection';
import { ReconnectingWebSocketConnection } from '../common/connection/drivers/reconnecting-websocket';
Expand All @@ -10,12 +10,17 @@ import { WSChannel, parse, pingMessage } from '../common/ws-channel';
*/
export class WSChannelHandler {
private channelMap: Map<string, WSChannel> = new Map();
private channelCloseEventMap: Map<string, WSCloseInfo> = new Map();
private channelCloseEventMap = new MultiMap<string, WSCloseInfo>();
private logger = console;
public clientId: string;
private heartbeatMessageTimer: NodeJS.Timeout | null;
private reporterService: IReporterService;

/**
* 保证在连接建立后再执行后续操作
*/
private openingBarrier = new Barrier();

LOG_TAG: string;

constructor(public connection: ReconnectingWebSocketConnection | NetSocketConnection, logger: any, clientId: string) {
Expand Down Expand Up @@ -64,68 +69,81 @@ export class WSChannelHandler {
});

const reopenExistsChannel = () => {
if (this.channelMap.size) {
if (this.channelMap.size > 0) {
this.channelMap.forEach((channel) => {
channel.onOpen(() => {
const closeInfo = this.channelCloseEventMap.get(channel.id);
this.reporterService &&
this.reporterService.point(REPORT_NAME.CHANNEL_RECONNECT, REPORT_NAME.CHANNEL_RECONNECT, closeInfo);
this.logger.log(this.LOG_TAG, `channel reconnect ${this.clientId}:${channel.channelPath}`);
});

channel.open(channel.channelPath, this.clientId);
// 针对前端需要重新设置下后台状态的情况
channel.fireReopen();
});
}
};

await new Promise<void>((resolve) => {
if (this.connection.isOpen()) {
this.heartbeatMessage();
resolve();
this.connection.onceClose((code, reason) => {
this.channelMap.forEach((channel) => {
channel.close(code ?? 1000, reason ?? '');
});
});

if (this.connection.isOpen()) {
this.heartbeatMessage();
this.openingBarrier.open();
}

this.connection.onOpen(() => {
this.heartbeatMessage();
// 说明是重连
if (this.openingBarrier.isOpen()) {
reopenExistsChannel();
} else {
this.connection.onOpen(() => {
this.heartbeatMessage();
resolve();
reopenExistsChannel();
});
this.openingBarrier.open();
}
});

this.connection.onceClose((code, reason) => {
if (this.channelMap.size) {
this.channelMap.forEach((channel) => {
channel.close(code ?? 1000, reason ?? '');
});
}
});
await this.openingBarrier.wait();
}

public async openChannel(channelPath: string) {
const channelId = `${this.clientId}:${channelPath}`;
const channel = new WSChannel(this.connection, {
id: channelId,
id: `${this.clientId}:${channelPath}`,
logger: this.logger,
});

this.channelMap.set(channel.id, channel);

await new Promise<void>((resolve) => {
channel.onOpen(() => {
resolve();
});
channel.onceClose((code: number, reason: string) => {
this.channelCloseEventMap.set(channelId, {
channelPath,
closeEvent: { code, reason },
connectInfo: (navigator as any).connection as ConnectionInfo,
channel.onOpen(() => {
const closeInfo = this.channelCloseEventMap.get(channel.id);
if (closeInfo) {
closeInfo.forEach((info) => {
this.reporterService &&
this.reporterService.point(REPORT_NAME.CHANNEL_RECONNECT, REPORT_NAME.CHANNEL_RECONNECT, info);
});
this.logger.log(this.LOG_TAG, 'channel close: ', `code: ${code}, reason: ${reason}`);

this.channelCloseEventMap.delete(channel.id);

this.logger.log(this.LOG_TAG, `channel reconnect ${this.clientId}:${channel.channelPath}`);
} else {
this.logger.log(this.LOG_TAG, `channel open ${this.clientId}:${channel.channelPath}`);
}
});

channel.onClose((code: number, reason: string) => {
this.channelCloseEventMap.set(channel.id, {
channelPath,
closeEvent: { code, reason },
connectInfo: (navigator as any).connection as ConnectionInfo,
});
channel.open(channelPath, this.clientId);
this.logger.log(this.LOG_TAG, 'channel close: ', `code: ${code}, reason: ${reason}`);
});

const barrier = new Barrier();
const dispose = channel.onOpen(() => {
barrier.open();
dispose.dispose();
});

channel.open(channelPath, this.clientId);

await barrier.wait();

return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ export class ReconnectingWebSocketConnection extends BaseConnection<Uint8Array>
static forURL(url: UrlProvider, protocols?: string | string[], options?: ReconnectingWebSocketOptions) {
const rawConnection = new ReconnectingWebSocket(url, protocols, options);
rawConnection.binaryType = 'arraybuffer';
const connection = new ReconnectingWebSocketConnection(rawConnection);

return connection;
return new ReconnectingWebSocketConnection(rawConnection);
}
}
12 changes: 5 additions & 7 deletions packages/connection/src/common/rpc-service/center.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Deferred } from '@opensumi/ide-core-common';
import { addElement } from '@opensumi/ide-utils/lib/arrays';

import { METHOD_NOT_REGISTERED } from '../constants';
import { TSumiProtocol } from '../rpc';
Expand Down Expand Up @@ -54,15 +55,14 @@ export class RPCServiceCenter {

this.protocolRegistry.applyTo(connection.io);

const index = this.proxies.length - 1;
const proxy = new ProxySumi(this.serviceRegistry, this.logger);
proxy.listen(connection);

this.proxies.push(proxy);
const remove = addElement(this.proxies, proxy);

return {
dispose: () => {
this.proxies.splice(index, 1);
remove.dispose();
proxy.dispose();
},
};
Expand All @@ -73,16 +73,14 @@ export class RPCServiceCenter {
this.deferred.resolve();
}

const index = this.proxies.length - 1;

const proxy = new ProxyJson(this.serviceRegistry, this.logger);
proxy.listen(connection);

this.proxies.push(proxy);
const remove = addElement(this.proxies, proxy);

return {
dispose: () => {
this.proxies.splice(index, 1);
remove.dispose();
proxy.dispose();
},
};
Expand Down
10 changes: 8 additions & 2 deletions packages/connection/src/node/common-channel-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import { CommonChannelHandlerOptions, WebSocketHandler } from './ws';

export { commonChannelPathHandler };

export interface WebSocketConnection extends WebSocket {
routeParam: {
pathname: string;
};
}

/**
* Channel Handler for nodejs
*/
Expand All @@ -27,7 +33,7 @@ export class CommonChannelHandler extends BaseCommonChannelHandler implements We
}

private initWSServer() {
this.logger.log('init Common Channel Handler');
this.logger.log('init common channel handler');
this.wsServer = new WebSocket.Server({
noServer: true,
...this.options.wsServerOptions,
Expand All @@ -43,7 +49,7 @@ export class CommonChannelHandler extends BaseCommonChannelHandler implements We

if (routeResult) {
this.wsServer.handleUpgrade(request, socket, head, (connection) => {
(connection as any).routeParam = {
(connection as WebSocketConnection).routeParam = {
pathname,
};

Expand Down
6 changes: 4 additions & 2 deletions packages/connection/src/node/ws.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import assert from 'assert';
import http from 'http';
import url from 'url';

import ws from 'ws';

Expand Down Expand Up @@ -90,7 +89,10 @@ export class WebSocketServerRoute {

server.on('upgrade', (request, socket, head) => {
assert(request.url, 'cannot parse url from http request');
const wsPathname: string = url.parse(request.url).pathname as string;

// request.url: `/path?query=a#hash`
const url = new URL(request.url, 'wss://base');
const wsPathname: string = url.pathname;

let wsHandlerIndex = 0;
const wsHandlerLength = wsServerHandlerArr.length;
Expand Down
Loading