Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/secret_scanning.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ paths-ignore:
- "src/workerd/api/node/tests/fixtures/rsa_private_b.pem"
- "src/workerd/api/node/tests/fixtures/rsa_pss_private_2048.pem"
- "src/workerd/api/node/tests/fixtures/x448_private.pem"
- "src/workerd/api/node/tests/fixtures/tls-nodejs-tcp-server.pem"
31 changes: 31 additions & 0 deletions src/node/_tls_wrap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
//
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

import { connect, TLSSocket } from 'node-internal:internal_tls_wrap';
export { connect, TLSSocket };
export default {
connect,
TLSSocket,
};
17 changes: 17 additions & 0 deletions src/node/internal/internal_errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,23 @@ export class ERR_CRYPTO_SIGN_KEY_REQUIRED extends NodeError {
}
}

export class ERR_TLS_HANDSHAKE_TIMEOUT extends NodeError {
constructor() {
super('ERR_TLS_HANDSHAKE_TIMEOUT', 'TLS handshake timeout');
}
}

export class ConnResetException extends NodeError {
path?: string | undefined;
host?: string | undefined;
port?: number | undefined;
localAddress?: string | undefined;

constructor(message: string) {
super('ECONNRESET', message);
}
}

export function aggregateTwoErrors(innerError: any, outerError: any) {
if (innerError && outerError && innerError !== outerError) {
if (Array.isArray(outerError.errors)) {
Expand Down
178 changes: 102 additions & 76 deletions src/node/internal/internal_net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import inner from 'cloudflare-internal:sockets';

import {
AbortError,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
Expand All @@ -45,11 +46,20 @@ import {
validateInt32,
validateNumber,
validatePort,
validateObject,
} from 'node-internal:validators';

import { isUint8Array, isArrayBufferView } from 'node-internal:internal_types';
import { Duplex } from 'node-internal:streams_duplex';
import { Buffer } from 'node-internal:internal_buffer';
import type {
IpcSocketConnectOpts,
SocketConnectOpts,
TcpSocketConnectOpts,
AddressInfo,
Socket as _Socket,
OnReadOpts,
} from 'node:net';

const kLastWriteQueueSize = Symbol('kLastWriteQueueSize');
const kTimeout = Symbol('kTimeout');
Expand Down Expand Up @@ -100,23 +110,18 @@ export type SocketOptions = {
writableObjectMode?: boolean;
keepAliveInitialDelay?: number;
fd?: number;
handle?: VoidFunction;
handle?: Socket['_handle'];
noDelay?: boolean;
keepAlive?: boolean;
allowHalfOpen?: boolean;
allowHalfOpen?: boolean | undefined;
emitClose?: boolean;
signal?: AbortSignal;
onread?: { callback?: () => Uint8Array; buffer?: Uint8Array };
signal?: AbortSignal | undefined;
onread?:
| ({ callback?: () => Uint8Array; buffer?: Uint8Array } & OnReadOpts)
| null
| undefined;
};

import type {
IpcSocketConnectOpts,
SocketConnectOpts,
TcpSocketConnectOpts,
AddressInfo,
Socket as _Socket,
} from 'node:net';

export function BlockList(): void {
throw new Error('BlockList is not implemented');
}
Expand All @@ -129,42 +134,43 @@ export function Server(): void {
throw new Error('Server is not implemented');
}

export interface Socket extends _Socket {
timeout: number;
connecting: boolean;
_aborted: boolean;
_hadError: boolean;
_parent: null | Socket;
_host: null | string;
_peername: null | string;
_getsockname():
// @ts-expect-error TS2323 Redeclare error.
export declare class Socket extends _Socket {
public timeout: number;
public connecting: boolean;
public _aborted: boolean;
public _hadError: boolean;
public _parent: null | Socket;
public _host: null | string;
public _peername: null | string;
public _getsockname():
| {}
| {
address?: string;
port?: number;
family?: string;
};
[kLastWriteQueueSize]: number | null | undefined;
[kTimeout]: Socket | null | undefined;
[kBuffer]: null | boolean | Uint8Array;
[kBufferCb]:
public [kLastWriteQueueSize]: number | null | undefined;
public [kTimeout]: Socket | null | undefined;
public [kBuffer]: null | boolean | Uint8Array;
public [kBufferCb]:
| null
| undefined
| ((len?: number, buf?: Buffer) => boolean | Uint8Array);
[kBufferGen]: null | (() => undefined | Uint8Array);
[kSocketInfo]: null | {
public [kBufferGen]: null | (() => undefined | Uint8Array);
public [kSocketInfo]: null | {
address?: string;
port?: number;
family?: number | string;
remoteAddress?: Record<string, unknown>;
};
[kBytesRead]: number;
[kBytesWritten]: number;
_closeAfterHandlingError: boolean;
_handle: null | {
public [kBytesRead]: number;
public [kBytesWritten]: number;
public _closeAfterHandlingError: boolean;
public _handle: null | {
writeQueueSize?: number;
lastWriteQueueSize?: number;
reading?: boolean;
reading?: boolean | undefined;
bytesRead: number;
bytesWritten: number;
socket: ReturnType<typeof inner.connect>;
Expand All @@ -177,29 +183,31 @@ export interface Socket extends _Socket {
write(data: string | ArrayBufferView): Promise<void>;
};
};
_sockname?: null | AddressInfo;
_onTimeout(): void;
_unrefTimer(): void;
_writeGeneric(
public _sockname?: null | AddressInfo;
public _onTimeout(): void;
public _unrefTimer(): void;
public _writeGeneric(
writev: boolean,
data: { chunk: string | ArrayBufferView; encoding: string }[],
encoding: string,
cb: (err?: Error) => void
): void;
_final(cb: (err?: Error) => void): void;
_read(n: number): void;
_reset(): void;
_getpeername(): Record<string, unknown>;
_writableState: null | unknown[];
}

export interface SocketConstructor {
(this: unknown, options?: SocketOptions): Socket;
new (options?: SocketOptions): Socket;
prototype: Socket;
public _final(cb: (err?: Error) => void): void;
public _read(n: number): void;
public _reset(): void;
public _getpeername(): Record<string, unknown>;
public _writableState: null | unknown[];

// Defined by TLSSocket
public encrypted?: boolean;
public _finishInit(): void;

public constructor(options?: SocketOptions);
public prototype: Socket;
}

export const Socket = function (this: Socket, options?: SocketOptions): Socket {
// @ts-expect-error TS2323 Redeclare error.
export function Socket(this: Socket, options?: SocketOptions): Socket {
if (!(this instanceof Socket)) {
return new Socket(options);
}
Expand Down Expand Up @@ -236,11 +244,7 @@ export const Socket = function (this: Socket, options?: SocketOptions): Socket {
options = { ...options };
}

if (options.handle) {
// We are not supporting the options.handle option for now. This is the
// option that allows users to pass in a handle to an existing socket.
throw new ERR_OPTION_NOT_IMPLEMENTED('options.handle');
} else if (options.fd !== undefined) {
if (options.fd !== undefined) {
// We are not supporting the options.fd option for now. This is the option
// that allows users to pass in a file descriptor to an existing socket.
// Workers doesn't have file descriptors and does not use them in any way.
Expand Down Expand Up @@ -287,10 +291,15 @@ export const Socket = function (this: Socket, options?: SocketOptions): Socket {

Duplex.call(this, options);

if (options.handle) {
validateObject(options.handle, 'options.handle');
this._handle = options.handle;
}

this.once('end', onReadableStreamEnd);

if (options.signal) {
addClientAbortSignalOption(this, options);
addClientAbortSignalOption(this, options.signal);
}

const onread = options.onread;
Expand Down Expand Up @@ -319,7 +328,7 @@ export const Socket = function (this: Socket, options?: SocketOptions): Socket {
}

return this;
} as SocketConstructor;
}

Object.setPrototypeOf(Socket.prototype, Duplex.prototype);
Object.setPrototypeOf(Socket, Duplex);
Expand Down Expand Up @@ -691,26 +700,39 @@ Socket.prototype._destroy = function (
// ======================================================================================
// Connection

Socket.prototype.connect = function (this: Socket, ...args: unknown[]): Socket {
// @ts-expect-error TS2322 Type inconsistencies between types/node
Socket.prototype.connect = function (
this: Socket,
...args: unknown[]
): Socket | undefined {
let normalized;
// @ts-expect-error TS7015 Required not to overcomplicate types
if (Array.isArray(args[0]) && args[0][normalizedArgsSymbol]) {
normalized = args[0];
} else {
normalized = _normalizeArgs(args);
}
const options = normalized[0] as TcpSocketConnectOpts & IpcSocketConnectOpts;
const cb = normalized[1] as ((err: Error | null) => void) | null;

if (this.connecting) {
throw new ERR_SOCKET_CONNECTING();
}
if (this._aborted) {
if (cb) {
cb(new AbortError());
} else {
throw new AbortError();
}
return undefined;
}
// TODO(later): In Node.js a Socket instance can be reset so that it can be reused.
// We haven't yet implemented that here. We can consider doing so but it's not an
// immediate priority. Implementing it correctly requires making sure the internal
// state of the socket is correctly reset.
if (this.destroyed) {
throw new ERR_SOCKET_CLOSED();
}
let normalized;
// @ts-expect-error TS7015 Required not to overcomplicate types
if (Array.isArray(args[0]) && args[0][normalizedArgsSymbol]) {
normalized = args[0];
} else {
normalized = _normalizeArgs(args);
}
const options = normalized[0] as TcpSocketConnectOpts & IpcSocketConnectOpts;
const cb = normalized[1] as VoidFunction | null;

if (cb !== null) {
this.once('connect', cb);
Expand Down Expand Up @@ -942,7 +964,7 @@ function cleanupAfterDestroy(
queueMicrotask(() => socket.emit('close', error != null));
}

function initializeConnection(
export function initializeConnection(
socket: Socket,
options: TcpSocketConnectOpts
): void {
Expand Down Expand Up @@ -1000,12 +1022,12 @@ function initializeConnection(
const handle = inner.connect(`${host}:${port}`, {
allowHalfOpen: socket.allowHalfOpen,
// A Node.js socket is always capable of being upgraded to the TLS socket.
secureTransport: 'starttls',
// We are not going to pass the high water mark here. The outer Node.js
secureTransport: socket.encrypted ? 'on' : 'starttls',
// We are not going to pass the high water-mark here. The outer Node.js
// stream will implement the appropriate backpressure for us.
});

// Our version of the socket._handle is necessarily different than Node.js'.
// Our version of the socket._handle is necessarily different from Node.js'.
// It serves the same purpose but any code that may exist that is depending
// on `_handle` being a particular type (which it shouldn't be) will fail.
socket._handle = {
Expand Down Expand Up @@ -1087,6 +1109,10 @@ function onConnectionOpened(this: Socket): void {
this.emit('connect');
this.emit('ready');

if (this.encrypted) {
// This is required for TLSSocket
this._finishInit();
}
if (!this.isPaused()) {
maybeStartReading(this);
}
Expand Down Expand Up @@ -1175,6 +1201,10 @@ async function startRead(socket: Socket): Promise<void> {
break;
}
}
} catch (_err) {
// Ignore error, and don't log them.
// This is mostly triggered for invalid sockets with following errors:
// - "This ReadableStream belongs to an object that is closing."
} finally {
// Disable eslint to match Node.js behavior
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
Expand Down Expand Up @@ -1288,12 +1318,8 @@ export function _normalizeArgs(args: unknown[]): unknown[] {
return arr;
}

function addClientAbortSignalOption(
self: Socket,
options: { signal?: AbortSignal }
): void {
validateAbortSignal(options.signal, 'options.signal');
const { signal } = options;
function addClientAbortSignalOption(self: Socket, signal: AbortSignal): void {
validateAbortSignal(signal, 'options.signal');
let disposable: Disposable | undefined;

function onAbort(): void {
Expand All @@ -1303,7 +1329,7 @@ function addClientAbortSignalOption(
}

if (signal.aborted) {
queueMicrotask(onAbort);
onAbort();
} else {
queueMicrotask(() => {
disposable = addAbortListener(signal, onAbort);
Expand Down
Loading
Loading