diff --git a/.gitignore b/.gitignore index ccd688b1..ae34a4a3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ node_modules dist lcov.info +tsconfig.tsbuildinfo diff --git a/eslint.config.js b/eslint.config.js index 4c2da49f..20a7bb92 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -4,6 +4,7 @@ module.exports = require('neostandard')({ semi: true, ts: true, noStyle: true, + noJsx: true, ignores: ['dist', 'node_modules', 'docs/build', 'docs/.docusaurus'], globals: { SharedArrayBuffer: true, diff --git a/package.json b/package.json index 9e782996..eacbe077 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ }, "scripts": { "build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs", + "build:clean": "rm -rf ./dist tsconfig.tsbuildinfo", "lint": "eslint && tsc --noEmit && tsc --project test/tsconfig.json --noEmit", "test": "node scripts/run-tests.js --pattern='test/**/*test.ts'", "test:ci": "npm run build && npm run lint && npm run test:coverage", diff --git a/scripts/run-tests.js b/scripts/run-tests.js index 4057714f..d2d557e1 100644 --- a/scripts/run-tests.js +++ b/scripts/run-tests.js @@ -26,6 +26,11 @@ const options = { short: 'o', description: 'Run only tests marked with { only: true }', default: false, + }, + verbose: { + type: 'boolean', + description: 'Enable verbose output', + default: false, } }; @@ -36,9 +41,13 @@ const { const pattern = values.pattern; const isCoverage = values.coverage; const runOnly = values.only; +const log = values.verbose ? console.log : () => {}; const testFiles = globSync(pattern, { absolute: true }); +log(`Running tests with pattern: ${pattern}`); +log(`-- Files: ${testFiles.join(', ')}`); + const args = [ '--enable-source-maps', '--import=tsx', @@ -47,8 +56,9 @@ const args = [ ]; -let result; +log(`Args: \t\n${args.join('\t\n')}`); +let result; // we skip coverage for node 20 // because this issuse happen https://github.com/nodejs/node/pull/53315 if (isCoverage && !process.version.startsWith('v20.')) { diff --git a/src/abort.ts b/src/abort.ts index 63515dff..352037a0 100644 --- a/src/abort.ts +++ b/src/abort.ts @@ -24,8 +24,6 @@ export type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter; export class AbortError extends Error { constructor (reason?: AbortSignalEventTarget['reason']) { - // TS does not recognizes the cause clause - // @ts-expect-error super('The task has been aborted', { cause: reason }); } diff --git a/src/common.ts b/src/common.ts index 2c20b0ae..4499b243 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,10 +1,10 @@ -import { fileURLToPath, URL } from 'node:url'; -import { availableParallelism } from 'node:os'; +import { fileURLToPath, URL } from "node:url"; +import { availableParallelism } from "node:os"; -import { kMovable, kTransferable, kValue } from './symbols'; +import { kMovable, kTransferable, kValue } from "./symbols"; // States wether the worker is ready to receive tasks -export const READY = '_WORKER_READY'; +export const READY = "_WORKER_READY"; /** * True if the object implements the Transferable interface @@ -13,10 +13,10 @@ export const READY = '_WORKER_READY'; * @param {unknown} value * @return {*} {boolean} */ -export function isTransferable (value: unknown): boolean { +export function isTransferable(value: unknown): boolean { return ( value != null && - typeof value === 'object' && + typeof value === "object" && kTransferable in value && kValue in value ); @@ -31,49 +31,71 @@ export function isTransferable (value: unknown): boolean { * @param {(unknown & PiscinaMovable)} value * @return {*} {boolean} */ -export function isMovable (value: any): boolean { +export function isMovable(value: any): boolean { return isTransferable(value) && value[kMovable] === true; } -export function markMovable (value: {}): void { +export function markMovable(value: {}): void { Object.defineProperty(value, kMovable, { enumerable: false, configurable: true, writable: true, - value: true + value: true, }); } // State of Piscina pool export const commonState = { isWorkerThread: false, - workerData: undefined + workerData: undefined, }; -export function maybeFileURLToPath (filename : string) : string { - return filename.startsWith('file:') +export function maybeFileURLToPath(filename: string): string { + return filename.startsWith("file:") ? fileURLToPath(new URL(filename)) : filename; } -export function getAvailableParallelism () : number { +export function getAvailableParallelism(): number { return availableParallelism(); } -export function promiseResolvers () : - { promise: Promise, - resolve: (res: T) => void, - reject: (err: Error) => void - } { +export function promiseResolvers(): { + promise: Promise; + resolve: (res: T) => void; + reject: (err: Error) => void; +} { // @ts-expect-error - available from v24 onwards if (Promise.withResolvers != null) return Promise.withResolvers(); let res: (res: T) => void; - let rej: (err: Error) => void + let rej: (err: Error) => void; return { - promise: new Promise((resolve, reject) => { res = resolve; rej = reject; } ), + promise: new Promise((resolve, reject) => { + res = resolve; + rej = reject; + }), resolve: res!, - reject: rej! - } -} \ No newline at end of file + reject: rej!, + }; +} + +// Ring Buffer +export const RING_BUFFER_INDEXES = { + READ_INDEX: 0, + WRITE_INDEX: 1, + STATUS_INDEX: 2, +}; + +export const RING_BUFFER_STATUSES = { + PAUSED: 0, + RESUME: 1, + ENDED: 2, + ERRORED: 3, // ? +}; + +export const GeneratorFunctionConstructor = (function*(){}).constructor.name; +export const AsyncGeneratorConstructor = (async function*(){}).constructor.name; +export const AsyncFunctionConstructor = (async function(){}).constructor.name; +export const FunctionConstructor = (function(){}).constructor.name; \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 1162a614..b9ea38ea 100644 --- a/src/index.ts +++ b/src/index.ts @@ -100,14 +100,16 @@ interface RunOptions { transferList? : TransferList, filename? : string | null, signal? : AbortSignalAny | null, - name? : Func | null + name? : Func | null, + bufferSize?: number | null } interface FilledRunOptions extends RunOptions { transferList : TransferList | never, filename : string | null, signal : AbortSignalAny | null, - name : Func | null + name : Func | null, + bufferSize?: number | null } interface CloseOptions { @@ -135,7 +137,8 @@ const kDefaultRunOptions : FilledRunOptions = { transferList: undefined, filename: null, signal: null, - name: null + name: null, + bufferSize: null, }; const kDefaultCloseOptions : Required = { @@ -285,20 +288,25 @@ class ThreadPool { this.workers.add(workerInfo); function onMessage (this: ThreadPool, message : ResponseMessage) { - const { taskId, result } = message; + const { taskId, result, done } = message; // In case of success: Call the callback that was passed to `runTask`, // remove the `TaskInfo` associated with the Worker, which marks it as // free again. - const taskInfo = workerInfo.popTask(taskId); - this.workers.taskDone(workerInfo); + const taskInfo = done === 0 ? workerInfo.getTask(taskId) : workerInfo.popTask(taskId); - if (taskInfo == null) { /* c8 ignore next */ const err = new Error( `Unexpected message from Worker: ${inspect(message)}`); this.publicInterface.emit('error', err); + this._processPendingMessages(); + return; + } + + if (done === 1) { + this.workers.taskDone(workerInfo); + taskInfo!.done(message.error, result); } else { - taskInfo.done(message.error, result); + taskInfo!.handleResponse(message); } this._processPendingMessages(); @@ -471,13 +479,15 @@ class ThreadPool { options : RunOptions) : Promise { let { filename, - name + name, + bufferSize } = options; const { transferList = [] } = options; filename = filename ?? this.options.filename; name = name ?? this.options.name; + bufferSize = bufferSize ?? 1024 * 1024; if (typeof filename !== 'string') { return Promise.reject(Errors.FilenameNotProvided()); @@ -501,21 +511,30 @@ class ThreadPool { transferList, filename, name, + bufferSize, abortSignal: signal, triggerAsyncId: this.publicInterface.asyncResource.asyncId() }, (err : Error | null, result : any) => { - this.completed++; - if (taskInfo.started) { - this.histogram?.recordRunTime(performance.now() - taskInfo.started); - } - if (err !== null) { - reject(err); - } else { - resolve(result); - } + queueMicrotask(this._maybeDrain.bind(this)) + this.completed++; + if (taskInfo.started) { + this.histogram?.recordRunTime(performance.now() - taskInfo.started); + } + + // Promise already resolved, this is for statistics only + if (taskInfo.redeable != null) return; - queueMicrotask(this._maybeDrain.bind(this)) + if (err !== null) { + reject(err); + } else { + resolve(result); + } + }, + // Until now, we just assume this is a streamed response and we jump into handling the chunks as stream + (err) => { + if (err != null) reject(err); + else resolve(taskInfo.redeable); }); if (signal != null) { @@ -782,7 +801,8 @@ export default class Piscina an transferList, filename, name, - signal + signal, + bufferSize } = options; if (transferList !== undefined && !Array.isArray(transferList)) { return Promise.reject( @@ -799,6 +819,17 @@ export default class Piscina an return Promise.reject( new TypeError('signal argument must be an object')); } + if (bufferSize != null && + ( + typeof bufferSize !== 'number'|| + !Number.isInteger(bufferSize) || + !Number.isFinite(bufferSize) || + bufferSize <= 0 + ) + ) { + return Promise.reject( + new TypeError('bufferSize argument must be a finite integer')); + } return this.#pool.runTask(task, { transferList, filename, name, signal }); } diff --git a/src/task_queue/index.ts b/src/task_queue/index.ts index 5aa9d470..e3383662 100644 --- a/src/task_queue/index.ts +++ b/src/task_queue/index.ts @@ -1,6 +1,7 @@ import type { MessagePort } from 'node:worker_threads'; import { performance } from 'node:perf_hooks'; import { AsyncResource } from 'node:async_hooks'; +import assert from 'node:assert'; import type { WorkerInfo } from '../worker_pool'; import type { Task, TaskQueue, PiscinaTask } from './common'; @@ -8,12 +9,15 @@ import type { Task, TaskQueue, PiscinaTask } from './common'; import { onabort, type AbortSignalAny } from '../abort'; import { isMovable } from '../common'; import { kTransferable, kValue, kQueueOptions } from '../symbols'; +import { WorkerStream } from '../worker_pool/worker_stream'; +import { ResponseMessage } from '../types'; export { ArrayTaskQueue } from './array_queue'; export { FixedQueue } from './fixed_queue'; export type TaskCallback = (err: Error, result: any) => void +export type TaskResponseCallback = (err: Error | null) => void // Grab the type of `transferList` off `MessagePort`. At the time of writing, // only ArrayBuffer and MessagePort are valid, but let's avoid having to update // our types here every time Node.js adds support for more objects. @@ -31,6 +35,7 @@ type TaskInfoParameters = { name : string; abortSignal : AbortSignalAny | null; triggerAsyncId : number; + bufferSize: number; } /** @@ -66,6 +71,7 @@ function taskIdFactory() { export class TaskInfo extends AsyncResource implements Task { static getTaskId: () => string = taskIdFactory(); callback : TaskCallback; + onResponseCallback : TaskResponseCallback; task : any; transferList : TransferList; filename : string; @@ -76,6 +82,8 @@ export class TaskInfo extends AsyncResource implements Task { created : number; started : number; aborted = false; + redeable: WorkerStream | null = null; + bufferSize: number; _abortListener: (() => void) = () => { this.aborted = true; }; _abortCleaner: (() => void) | null = null; @@ -86,10 +94,13 @@ export class TaskInfo extends AsyncResource implements Task { name, abortSignal, triggerAsyncId, + bufferSize }: TaskInfoParameters, - callback: TaskCallback) { + callback: TaskCallback, + onResponseCallback: TaskResponseCallback) { super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId }); this.callback = callback; + this.onResponseCallback = onResponseCallback; this.task = task; this.transferList = transferList; @@ -113,6 +124,7 @@ export class TaskInfo extends AsyncResource implements Task { this.abortSignal = abortSignal; this.created = performance.now(); this.started = 0; + this.bufferSize = bufferSize; } onAbort (value: (() => void)) { @@ -132,10 +144,18 @@ export class TaskInfo extends AsyncResource implements Task { return ret; } - // TODO: implement - helpful for streaming chunks of data from worker to parent - onResponse(_result: any) {} + // Until now, we only assume onResponse hints a streamed response + handleResponse(result: ResponseMessage) : void { + const { shared, error } = result; + assert(shared != null); + + this.redeable = new WorkerStream(shared!); + this.onResponseCallback(error); + } done (err : Error | null, result? : any) : void { + if (this.redeable != null && err != null) this.redeable?.destroy(err); + this.runInAsyncScope(this.callback, null, err, result); this.emitDestroy(); // `TaskInfo`s are used only once. // If an abort signal was used, remove the listener from it when diff --git a/src/types.ts b/src/types.ts index fd9dd069..079ec300 100644 --- a/src/types.ts +++ b/src/types.ts @@ -18,6 +18,7 @@ export interface RequestMessage { filename: string name: string histogramEnabled: number + bufferSize: number, } export interface ReadyMessage { @@ -29,7 +30,13 @@ export interface ResponseMessage { result: any error: Error | null time: number | null + done: 0 | 1 // 0 = continue, 1 = done + shared?: { + state: SharedArrayBuffer, + data: SharedArrayBuffer + } } + export const commonState = { isWorkerThread: false, workerData: undefined diff --git a/src/worker.ts b/src/worker.ts index 10f306ca..65678a5e 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -8,6 +8,7 @@ import type { ResponseMessage, StartupMessage } from './types'; +import { WorkerStreamWriter } from './worker_pool/worker_stream'; import { kResponseCountField, kRequestCountField, @@ -17,7 +18,11 @@ import { import { READY, commonState, - isMovable + isMovable, + GeneratorFunctionConstructor, + AsyncGeneratorConstructor, + AsyncFunctionConstructor, + FunctionConstructor, } from './common'; commonState.isWorkerThread = true; @@ -171,7 +176,7 @@ async function onMessage ( sharedBuffer : Int32Array, message : RequestMessage) { currentTasks++; - const { taskId, task, filename, name } = message; + const { taskId, task, filename, name, bufferSize } = message; let response : ResponseMessage; let transferList : any[] = []; const start = message.histogramEnabled === 1 ? performance.now() : null; @@ -181,31 +186,107 @@ async function onMessage ( if (handler === null) { throw new Error(`No handler function exported from ${filename}`); } - let result = await handler(task); - if (isMovable(result)) { - transferList = transferList.concat(result[kTransferable]); - result = result[kValue]; + + let result: any = null; + switch (handler.constructor.name) { + case FunctionConstructor: { + result = handler(task); + + // Handle theneable + if (result?.then != null) { + result = await result; + } + + if (isMovable(result)) { + transferList = transferList.concat(result[kTransferable]); + result = result[kValue]; + } + + break + } + case AsyncFunctionConstructor: { + result = await handler(task) + if (isMovable(result)) { + transferList = transferList.concat(result[kTransferable]); + result = result[kValue]; + } + break; + } + case AsyncGeneratorConstructor: { + const state = new SharedArrayBuffer(128); + const data = new SharedArrayBuffer(bufferSize); + const writer = new WorkerStreamWriter(state, data); + const res = { + taskId, + shared: { + state, + data, + }, + done: 0, + error: null, + }; + + port.postMessage(res); + writer.prepare(); + + try { + for await (const chunk of handler(task)) { + if (writer.write(chunk) === false) { + await writer.wait(); + } + } + writer.end(); + } catch (e) { + writer.destroy(); + throw e + } + + break; + } + case GeneratorFunctionConstructor: { + const state = new SharedArrayBuffer(128); + const data = new SharedArrayBuffer(bufferSize); + const writer = new WorkerStreamWriter(state, data); + + const res = { + taskId, + shared: { + state, + data, + }, + done: 0, + error: null, + }; + + port.postMessage(res); + writer.prepare(); + + try { + for (const chunk of handler(task)) { + if (writer.write(chunk) === false) { + await writer.wait(); + } + } + writer.end(); + } catch (e) { + writer.destroy(); + throw e + } + + break; + } + default: { + throw new Error(`Unsupported handler exported from ${filename}`); + } } + response = { taskId, + done: 1, result, error: null, time: start == null ? null : Math.round(performance.now() - start) }; - - if (useAtomics && !useAsyncAtomics) { - // If the task used e.g. console.log(), wait for the stream to drain - // before potentially entering the `Atomics.wait()` loop, and before - // returning the result so that messages will always be printed even - // if the process would otherwise be ready to exit. - if (process.stdout.writableLength > 0) { - await new Promise((resolve) => process.stdout.write('', resolve)); - } - - if (process.stderr.writableLength > 0) { - await new Promise((resolve) => process.stderr.write('', resolve)); - } - } } catch (error) { response = { taskId, @@ -213,6 +294,7 @@ async function onMessage ( // It may be worth taking a look at the error cloning algorithm we // use in Node.js core here, it's quite a bit more flexible error: error, + done: 1, time: start == null ? null : Math.round(performance.now() - start) }; } diff --git a/src/worker_pool/index.ts b/src/worker_pool/index.ts index 075bfaaa..2fcd3e62 100644 --- a/src/worker_pool/index.ts +++ b/src/worker_pool/index.ts @@ -161,7 +161,8 @@ export class WorkerInfo extends AsynchronouslyCreatedResource { taskId: taskInfo.taskId, filename: taskInfo.filename, name: taskInfo.name, - histogramEnabled: this.histogram != null ? 1 : 0 + histogramEnabled: this.histogram != null ? 1 : 0, + bufferSize: taskInfo.bufferSize, }; try { @@ -220,6 +221,10 @@ export class WorkerInfo extends AsynchronouslyCreatedResource { return task; } + getTask (taskId: string): TaskInfo | null { + return this.taskInfos.get(taskId) ?? null; + } + get interface (): PiscinaWorker { const worker = this; return { diff --git a/src/worker_pool/worker_stream.ts b/src/worker_pool/worker_stream.ts new file mode 100644 index 00000000..a73452c5 --- /dev/null +++ b/src/worker_pool/worker_stream.ts @@ -0,0 +1,360 @@ +import assert from 'node:assert'; +import { Readable } from 'node:stream'; + +import { RING_BUFFER_INDEXES, RING_BUFFER_STATUSES } from '../common'; + +type WorkerStreamSharedBuffers = { + state: SharedArrayBuffer; + data: SharedArrayBuffer; +}; + +const RING_BUFFER_CONSTANTS = { + headerOffset: 4, +}; + +class WorkerStreamBuffer { + #buffer: Buffer | ArrayBuffer; + #type: 0 | 1 = 0; // 0 = Buffer, 1 = ArrayBuffer + constructor(chunk: Buffer | ArrayBuffer | string) { + if (ArrayBuffer.isView(chunk)) { + this.#buffer = chunk; + this.#type = 1; + } else if (typeof chunk === 'string') { + this.#buffer = Buffer.from(chunk); + } else { + this.#buffer = chunk; + } + } + + parse(): Buffer { + switch (this.#type) { + case 1: + return Buffer.from(this.#buffer as ArrayBuffer); + default: + return this.#buffer as Buffer; + break; + } + } + + get byteLength(): number { + return this.#buffer.byteLength; + } + + get length(): number { + // @ts-expect-error + return this.#buffer.length; + } +} + +export class WorkerStreamWriter { + #state: Int32Array; + #data: Buffer; + #view: DataView; + #readIdx: number = 0; + #writeIdx: number = 0; + #status: number = 0; + #firstChunk: boolean = false; + #ended: boolean = false; + #needsFlush: boolean = false; + #paused: boolean = false; + + constructor(state: SharedArrayBuffer, data: SharedArrayBuffer) { + this.#state = new Int32Array(state); + this.#data = Buffer.from(data); + this.#view = new DataView(data); + } + + prepare(): void { + this.#firstChunk = true; + queueMicrotask(() => + Atomics.store(this.#state, RING_BUFFER_INDEXES.STATUS_INDEX, this.#status) + ); + Atomics.store(this.#state, RING_BUFFER_INDEXES.READ_INDEX, this.#readIdx); + Atomics.store(this.#state, RING_BUFFER_INDEXES.WRITE_INDEX, this.#writeIdx); + Atomics.notify(this.#state, RING_BUFFER_INDEXES.READ_INDEX); + Atomics.notify(this.#state, RING_BUFFER_INDEXES.WRITE_INDEX); + } + + write(chunk: Buffer | ArrayBufferLike | string): boolean { + assert(!this.#ended, 'writer already closed'); + if ( + typeof chunk !== 'string' && + Buffer.isBuffer(chunk) === false && + ArrayBuffer.isView(chunk) === false + ) { + throw new TypeError( + '(Async)Iterators should only return string, buffer or typed arrays' + ); + } + + // chunk = ( + // Buffer.isBuffer(chunk) || ArrayBuffer.isView(chunk) + // ? chunk + // : Buffer.from(chunk) + // ) as Buffer; + + return this.#write(new WorkerStreamBuffer(chunk)); + } + + #write(chunk: WorkerStreamBuffer): boolean { + // process._rawDebug('>>> thread - before copy - data length', this.#data.byteLength) + // process._rawDebug('>>> thread - before copy - data', this.#data) + // process._rawDebug('>>> thread - before copy - write index', this.#writeIdx) + // // @ts-expect-error + // process._rawDebug('>>> thread - before copy - chunk length', chunk.length) + // process._rawDebug( + // '>>> thread - before copy - chunk byteLength', + // chunk.byteLength + // ); + + // Chunk header with the size of the chunk + // this.#data[this.#writeIdx] = chunk.byteLength; + // this.#data.writeInt32LE(chunk.byteLength, this.#writeIdx) + this.#view.setUint32(this.#writeIdx, chunk.byteLength, true); + this.#writeIdx += RING_BUFFER_CONSTANTS.headerOffset; + // this.#data[++this.#writeIdx] = chunk; + // (chunk as Buffer).copy(this.#data, this.#writeIdx); + this.#data.set(chunk.parse(), this.#writeIdx); + + this.#writeIdx += chunk.length; + // process._rawDebug('>>> thread - after copy - write index', this.#writeIdx) + // process._rawDebug('>>> thread - after copy - data', this.#data) + + // Update write idx for consistency + Atomics.store(this.#state, RING_BUFFER_INDEXES.WRITE_INDEX, this.#writeIdx); + + // If paused and init because first chunk, let's change state and move + if ( + this.#firstChunk === true && + this.#status === RING_BUFFER_STATUSES.PAUSED + ) { + Atomics.store( + this.#state, + RING_BUFFER_INDEXES.STATUS_INDEX, + (this.#status = RING_BUFFER_STATUSES.RESUME) + ); + Atomics.notify(this.#state, RING_BUFFER_INDEXES.WRITE_INDEX); + this.#firstChunk = false; + } else { + this.#status = Atomics.load( + this.#state, + RING_BUFFER_INDEXES.STATUS_INDEX + ); + } + + if (this.#status === 0) { + this.#paused = true; + return false; + } + + if (this.#writeIdx === this.#readIdx) { + this.#needsFlush = true; + return false; + } + + return true; + } + + async wait(): Promise { + if (this.#needsFlush) { + // @ts-expect-error - to allow further tasks to flush + await Atomics.waitAsync( + this.#state, + RING_BUFFER_INDEXES.READ_INDEX, + this.#readIdx + ); + this.#readIdx = Atomics.load(this.#state, RING_BUFFER_INDEXES.READ_INDEX); + } else if (this.#paused) { + // @ts-expect-error - to allow further tasks to flush + await Atomics.waitAsync( + this.#state, + RING_BUFFER_INDEXES.STATUS_INDEX, + RING_BUFFER_STATUSES.PAUSED + ); // 0 - pause, 1 - resume, 2 - end, 3 - errored (?), + this.#status = Atomics.load( + this.#state, + RING_BUFFER_INDEXES.STATUS_INDEX + ); + } + } + + destroy() { + this.#ended = true; + Atomics.store(this.#state, RING_BUFFER_INDEXES.STATUS_INDEX, 3); // errored + Atomics.notify(this.#state, RING_BUFFER_INDEXES.STATUS_INDEX); + } + + end(): void { + this.#ended = true; + Atomics.store(this.#state, RING_BUFFER_INDEXES.STATUS_INDEX, 2); + Atomics.notify(this.#state, RING_BUFFER_INDEXES.STATUS_INDEX); + } +} + +class WorkerStream extends Readable { + #shared: WorkerStreamSharedBuffers; + #readIdx: number = 0; + #writeIdx: number = 4; + #status: number | null = null; + #waiting: boolean = false; + + constructor(shared: WorkerStreamSharedBuffers) { + super(); + + assert.ok(shared.data); + assert.ok(shared.state); + this.#shared = shared; + } + + _read() { + const state = new Int32Array(this.#shared.state); + const buffer = Buffer.from(this.#shared.data); + // const view = new DataView(this.#shared.data); + + this.#status = Atomics.load(state, RING_BUFFER_INDEXES.STATUS_INDEX); + // console.log('>>main: initial status', this.#status); + // console.log('>>main: initial buffer status', buffer); + + // If thread has not yet buffered chunks + // if paused on the other side, let's await + if (this.#status === 0) { + Atomics.wait( + state, + RING_BUFFER_INDEXES.STATUS_INDEX, + RING_BUFFER_STATUSES.PAUSED + ); + this.#status = Atomics.load(state, RING_BUFFER_INDEXES.STATUS_INDEX); + } + + // If errored or waiting for more data, just stop + if (this.#status === 3 || this.#waiting) { + return; + } + + // console.log('>>main: after status', this.#status); + // console.log('>>main: after buffer status', buffer); + + while (true) { + // console.log( + // `main>>>: Status ${this.#status} - Header Index: ${ + // this.#readIdx + // } - Chunk Index - ${this.#readIdx + RING_BUFFER_CONSTANTS.headerOffset}` + // ); + + // const header = buffer[this.#readIdx]; + const headersection = buffer.subarray( + this.#readIdx, + this.#readIdx + RING_BUFFER_CONSTANTS.headerOffset + ); + // console.log('>>> main: header section', headersection); + const header = headersection.readInt32LE(); + // const temp = buffer[this.#readIdx]; + // console.log('>>> main: header', header); + this.#readIdx += RING_BUFFER_CONSTANTS.headerOffset; + const temp = buffer.subarray(this.#readIdx, this.#readIdx + header); + // TODO: test parsing a more complex payload. + // TODO: support long payload on ringbuffer + // TODO: support restarting indexes on ringbuffer + // TODO: support customized buffer allocation + // TODO: Benchmark + // TODO: cleanup + + // console.log('main>>> chunk as string', temp); + this.#readIdx += header; + this.#writeIdx = Atomics.load(state, RING_BUFFER_INDEXES.WRITE_INDEX); + // console.log('>>> main - readidx:', this.#readIdx, '- writeidx:', this.#writeIdx) + if (!this.push(`${temp}`)) { + // console.log('main>>> pausing'); + return; + } + + this.#status = Atomics.load(state, RING_BUFFER_INDEXES.STATUS_INDEX); + + // console.log('>>> main - new status:', this.#status); + if (this.#status !== 1) { + switch (this.#status) { + // Stream paused + case 0: { + // Update read cursor and wake up thread if awaiting for further + // signal + Atomics.store(state, RING_BUFFER_INDEXES.READ_INDEX, this.#readIdx); + Atomics.notify(state, RING_BUFFER_INDEXES.READ_INDEX); + + // We have drained the buffer but thread hasn't yet + // updated it + if (this.#readIdx === this.#writeIdx) { + Atomics.wait( + state, + RING_BUFFER_INDEXES.WRITE_INDEX, + this.#writeIdx + ); + } + break; + } + + // Stream ended + case 2: { + if (this.#readIdx === this.#writeIdx) { + this.push(null); + return; + } + break; + } + + // Stream failed + case 3: { + return; + } + } + // if (this.#status === 2) { + // if (this.#readIdx === this.#writeIdx) { + // this.push(null); + // return; + // } + // // Means buffer needs to be drained + // } else if (this.#status === 0) { + // // Update read cursor and wake up thread if awaiting for further + // // signal + // Atomics.store(state, RING_BUFFER_INDEXES.READ_INDEX, this.#readIdx); + // Atomics.notify(state, RING_BUFFER_INDEXES.READ_INDEX); + + // // We have drained the buffer but thread hasn't yet + // // updated it + // if (this.#readIdx === this.#writeIdx) { + // Atomics.wait( + // state, + // RING_BUFFER_INDEXES.WRITE_INDEX, + // this.#writeIdx + // ); + // } + // } + } else if (this.#readIdx === this.#writeIdx) { + // We have drained the buffer but thread hasn't + // added more data just yet + // console.log( + // '>>> main - waiting for drain', + // this.#readIdx, + // this.#writeIdx + // ); + this.#waiting = true; + // @ts-expect-error + const { async, value } = Atomics.waitAsync( + state, + RING_BUFFER_INDEXES.WRITE_INDEX, + this.#writeIdx + ); + if (async === true) { + value.then(() => { + this.#waiting = false; + }); + } else { + this.#waiting = false; + } + + return; + } + } + } +} + +export { WorkerStream }; \ No newline at end of file diff --git a/test/fixtures/async-iterator.js b/test/fixtures/async-iterator.js new file mode 100644 index 00000000..d95a81e9 --- /dev/null +++ b/test/fixtures/async-iterator.js @@ -0,0 +1,10 @@ +module.exports = async function* ({ length = 5, throwNext = false } = {}) { + const median = Math.floor(length / 2); + for (let i = 0; i < length; i++) { + if (throwNext && i === median) { + throw new Error('Thrown error'); + } + + yield Buffer.from(`${i}`); + } +}; diff --git a/test/fixtures/bad-iterators.js b/test/fixtures/bad-iterators.js new file mode 100644 index 00000000..36c4e8dc --- /dev/null +++ b/test/fixtures/bad-iterators.js @@ -0,0 +1,36 @@ + +module.exports = { + asyncIterator: async function* () { + yield Buffer.from('1'); + yield 2; + }, + asyncIterator2: async function* () { + yield new Int8Array([0x31]); + yield new Int8Array([0x31]); + yield new Int8Array([0x31]); + yield new Int8Array([0x31]); + yield {} + }, + asyncIterator3: async function* () { + yield '1'; + yield [] + }, + asyncIterator4: async function* () { + yield new Set(); + }, + syncIterator: function* () { + yield {}; + }, + syncIterator2: async function* () { + yield new Int8Array([0x31]); + yield {} + }, + syncIterator3: async function* () { + yield '1'; + yield [] + }, + syncIterator4: async function* () { + yield Buffer.from('1'); + yield 2; + }, +}; diff --git a/test/fixtures/iterator.js b/test/fixtures/iterator.js new file mode 100644 index 00000000..e9d778b8 --- /dev/null +++ b/test/fixtures/iterator.js @@ -0,0 +1,11 @@ + +module.exports = function* ({ length, throwNext }) { + const median = Math.floor(length / 2); + for (let i = 0; i < length; i++) { + if (throwNext && i === median) { + throw new Error('Thrown error'); + } + + yield `${i}`; + } +}; diff --git a/test/iterators.test.ts b/test/iterators.test.ts new file mode 100644 index 00000000..664f04c0 --- /dev/null +++ b/test/iterators.test.ts @@ -0,0 +1,170 @@ +import { Readable } from 'node:stream'; +import { resolve } from 'node:path'; +import { test } from 'node:test'; +import { once } from 'node:events'; + +import Piscina from '..'; + +test('should handle iterator throw', async (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'iterator.js'), + }); + + t.plan(1); + const [result] = await once(await pool.run({ length: 5, throwNext: true }), 'error'); + t.assert.equal(result.message, 'Thrown error'); +}); + +test('should handle iterator throw (async)', async (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'async-iterator.js'), + }); + + t.plan(1); + const [result] = await once(await pool.run({ length: 5, throwNext: true }), 'error'); + t.assert.equal(result.message, 'Thrown error'); +}); + +test('should support iterator with custom buffer size', (t, done) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'iterator.js'), + }); + + t.plan(1); + pool.run({ length: 10 }, { bufferSize: 100 }).then((red: Readable) => { + const chunks: Buffer[] = []; + red.on('data', (chunk) => { + chunks.push(chunk); + }); + + red.on('end', () => { + t.assert.equal(Buffer.concat(chunks).toString('utf-8'), '0123456789'); + done(); + }); + }); +}); + +test('should throw with custom buffer size not valid', (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'iterator.js'), + }); + + t.plan(5); + t.assert.rejects(pool.run({}, { bufferSize: Infinity })); + // @ts-expect-error + t.assert.rejects(pool.run({}, { bufferSize: '1' })); + t.assert.rejects(pool.run({}, { bufferSize: 0 })); + t.assert.rejects(pool.run({}, { bufferSize: -1 })); + t.assert.rejects(pool.run({}, { bufferSize: 0.1 })); +}); + +// Time to adjust and test +test('should support iterator', async (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'iterator.js'), + }); + + t.after(pool.close.bind(pool)); + t.plan(1); + const redeable = await pool.run({ length: 10 }); + + let chunks = ''; + for await (const chunk of redeable) { + chunks += chunk; + } + + t.assert.equal(chunks, '0123456789'); +}); + +test('should support async iterator', async (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'async-iterator.js'), + }); + + t.plan(1); + t.after(() => pool.close()); + const redeable = await pool.run({ length: 10 }); + let chunks = ''; + redeable.setEncoding('utf-8'); + + for await (const chunk of redeable) { + chunks += chunk; + } + + t.assert.equal(chunks, '0123456789'); +}); + +// TODO: flaky tests. Research and enable it later +test('should throw on invalid output (async)', { skip: true }, async (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'bad-iterators.js'), + }); + + t.plan(4); + t.after(pool.close.bind(pool)); + + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'asyncIterator' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); + + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'asyncIterator2' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); + + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'asyncIterator3' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); + + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'asyncIterator4' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); +}); + +// TODO: flaky tests. Research and enable it later +test('should throw on invalid output', { skip: true }, async (t) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures', 'bad-iterators.js'), + // concurrentTasksPerWorker: 4, + }); + + t.plan(4); + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'syncIterator' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); + + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'syncIterator2' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); + + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'syncIterator3' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); + + await t.assert.rejects(async () => { + const read = await pool.run('', { name: 'syncIterator4' }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of read) { /* empty */ } + }, new TypeError('(Async)Iterators should only return string, buffer or typed arrays')); +}); diff --git a/tsconfig.json b/tsconfig.json index 1cb3ed5f..f85950a8 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,9 +1,10 @@ { "compilerOptions": { - "target": "es2019", + "incremental": true, + "target": "ES2022", "module": "commonjs", "moduleResolution": "node", - "lib": ["es2019"], + "lib": ["ES2022"], "outDir": "dist", "rootDir": "./src", "declaration": true,