diff --git a/workspaces/tarball/package.json b/workspaces/tarball/package.json index 913841f..5c3885f 100644 --- a/workspaces/tarball/package.json +++ b/workspaces/tarball/package.json @@ -45,6 +45,8 @@ "@nodesecure/mama": "^2.0.0", "@nodesecure/npm-types": "^1.2.0", "@nodesecure/utils": "^2.3.0", + "@openally/result": "1.3.0", + "hyperid": "3.3.0", "pacote": "^21.0.0" }, "devDependencies": { diff --git a/workspaces/tarball/src/class/NpmTarballWorkerPool.class.ts b/workspaces/tarball/src/class/NpmTarballWorkerPool.class.ts new file mode 100644 index 0000000..720deab --- /dev/null +++ b/workspaces/tarball/src/class/NpmTarballWorkerPool.class.ts @@ -0,0 +1,191 @@ +// Import Node.js Dependencies +import { EventEmitter } from "node:events"; +import path from "node:path"; + +// Import Third-party Dependencies +import hyperid from "hyperid"; +// import { type Result, Ok, Err } from "@openally/result"; +import type { AstAnalyserOptions } from "@nodesecure/js-x-ray"; +import type { Contact } from "@nodesecure/npm-types"; + +// Import Internal Dependencies +import { PooledWorker } from "./PooledWorker.class.ts"; + +export interface NpmTarballWorkerPoolOptions { + /** + * Number of workers in the pool + * @default 4 + */ + workerCount?: number; +} + +export interface WorkerTask { + location: string; + astAnalyserOptions?: AstAnalyserOptions; +} + +export interface WorkerTaskWithId extends WorkerTask { + id: string; +} + +type WorkerTaskResultOk = { + id: string; + result: ScanResultPayload; +}; + +type WorkerTaskResultErr = { + id: string; + error: string; +}; + +export type WorkerTaskResult = WorkerTaskResultOk | WorkerTaskResultErr; + +export interface ScanResultPayload { + description?: string; + engines?: Record; + repository?: any; + scripts?: Record; + author?: Contact | null; + integrity?: string | null; + type: string; + size: number; + licenses: any[]; + uniqueLicenseIds: string[]; + warnings: any[]; + flags: string[]; + composition: { + extensions: string[]; + files: string[]; + minified: string[]; + unused: string[]; + missing: string[]; + required_files: string[]; + required_nodejs: string[]; + required_thirdparty: string[]; + required_subpath: Record; + }; +} + +interface TaskPromiseHandler { + resolve: (result: ScanResultPayload) => void; + reject: (error: Error) => void; +} + +export class NpmTarballWorkerPool extends EventEmitter { + #generateTaskId = hyperid(); + #workers: PooledWorker[] = []; + #processingTasks: Map = new Map(); + #waitingTasks: WorkerTaskWithId[] = []; + #isTerminated = false; + + constructor( + options: NpmTarballWorkerPoolOptions = {} + ) { + super(); + + const { workerCount = 4 } = options; + const workerPath = path.join( + import.meta.dirname, + "NpmTarballWorkerScript.js" + ); + + for (let i = 0; i < workerCount; i++) { + const worker = new PooledWorker(workerPath, { + onComplete: (worker, message) => this.#onWorkerComplete(worker, message), + onError: (worker, error) => this.#onWorkerError(worker, error) + }); + + this.#workers.push(worker); + } + } + + #onWorkerComplete( + worker: PooledWorker, + message: WorkerTaskResult + ): void { + const handler = this.#processingTasks.get(message.id); + if (handler) { + this.#processingTasks.delete(message.id); + + if ("error" in message) { + handler.reject(new Error(message.error)); + } + else { + handler.resolve(message.result); + } + } + + const nextTask = this.#waitingTasks.shift(); + if (nextTask) { + worker.execute(nextTask); + } + } + + #onWorkerError( + worker: PooledWorker, + error: Error + ): void { + const taskId = worker.currentTaskId; + if (taskId) { + const handler = this.#processingTasks.get(taskId); + if (handler) { + this.#processingTasks.delete(taskId); + handler.reject(error); + } + } + + this.emit("error", error); + const nextTask = this.#waitingTasks.shift(); + if (nextTask) { + worker.execute(nextTask); + } + } + + scan( + task: WorkerTask + ): Promise { + if (this.#isTerminated) { + return Promise.reject( + new Error("NpmTarballWorkerPool has been terminated") + ); + } + + const fullTask: WorkerTaskWithId = { + id: this.#generateTaskId(), + ...task + }; + + const { promise, resolve, reject } = Promise.withResolvers(); + this.#processingTasks.set(fullTask.id, { resolve, reject }); + + const availableWorker = this.#workers.find((worker) => worker.isAvailable) ?? null; + if (availableWorker) { + availableWorker.execute(fullTask); + } + else { + this.#waitingTasks.push(fullTask); + } + + return promise; + } + + async terminate(): Promise { + this.#isTerminated = true; + + const terminationError = new Error("NpmTarballWorkerPool terminated"); + for (const handler of this.#processingTasks.values()) { + handler.reject(terminationError); + } + this.#processingTasks.clear(); + this.#waitingTasks = []; + + await Promise.all( + this.#workers.map((worker) => worker.terminate()) + ); + this.#workers = []; + } + + [Symbol.asyncDispose](): Promise { + return this.terminate(); + } +} diff --git a/workspaces/tarball/src/class/NpmTarballWorkerScript.ts b/workspaces/tarball/src/class/NpmTarballWorkerScript.ts new file mode 100644 index 0000000..6a79ae0 --- /dev/null +++ b/workspaces/tarball/src/class/NpmTarballWorkerScript.ts @@ -0,0 +1,128 @@ +// Import Node.js Dependencies +import { parentPort } from "node:worker_threads"; +import path from "node:path"; + +// Import Third-party Dependencies +import { ManifestManager } from "@nodesecure/mama"; +import { type Warning } from "@nodesecure/js-x-ray"; + +// Import Internal Dependencies +import { NpmTarball } from "./NpmTarball.class.ts"; +import { + isSensitiveFile, + booleanToFlags +} from "../utils/index.ts"; +import { + getEmptyPackageWarning, + getSemVerWarning +} from "../warnings.ts"; + +import type { + WorkerTaskWithId, + WorkerTaskResult, + ScanResultPayload +} from "./NpmTarballWorkerPool.class.ts"; + +// CONSTANTS +const kNativeCodeExtensions = new Set([".gyp", ".c", ".cpp", ".node", ".so", ".h"]); + +async function scanPackageInWorker( + task: WorkerTaskWithId +): Promise { + const { location, astAnalyserOptions } = task; + + const mama = await ManifestManager.fromPackageJSON( + location + ); + const tarex = new NpmTarball(mama); + + const { + composition, + conformance, + code + } = await tarex.scanFiles(astAnalyserOptions); + + const warnings: Warning[] = []; + + // Check for empty package + if ( + composition.files.length === 1 && + composition.files.includes("package.json") + ) { + warnings.push(getEmptyPackageWarning()); + } + + // Check for zero semver + if (mama.hasZeroSemver) { + warnings.push(getSemVerWarning(mama.document.version!)); + } + + warnings.push(...code.warnings); + + const { + files, + dependencies, + flags + } = code.groupAndAnalyseDependencies(mama); + + const computedFlags = booleanToFlags({ + ...flags, + hasExternalCapacity: code.flags.hasExternalCapacity || flags.hasExternalCapacity, + hasNoLicense: conformance.uniqueLicenseIds.length === 0, + hasMultipleLicenses: conformance.uniqueLicenseIds.length > 1, + hasMinifiedCode: code.minified.length > 0, + hasWarnings: warnings.length > 0, + hasBannedFile: composition.files.some((filePath) => isSensitiveFile(filePath)), + hasNativeCode: mama.flags.isNative || + composition.files.some((file) => kNativeCodeExtensions.has(path.extname(file))), + hasScript: mama.flags.hasUnsafeScripts + }); + const { + description, engines, repository, scripts + } = mama.document; + + return { + description, + engines, + repository, + scripts, + author: mama.author, + integrity: mama.isWorkspace ? null : mama.integrity, + type: mama.moduleType, + size: composition.size, + licenses: conformance.licenses, + uniqueLicenseIds: conformance.uniqueLicenseIds, + warnings, + flags: Array.from(computedFlags), + composition: { + extensions: [...composition.ext], + files: composition.files, + minified: code.minified, + unused: dependencies.unused, + missing: dependencies.missing, + required_files: [...files], + required_nodejs: dependencies.nodejs, + required_thirdparty: dependencies.thirdparty, + required_subpath: dependencies.subpathImports + } + }; +} + +parentPort?.on("message", async(task: WorkerTaskWithId) => { + let message: WorkerTaskResult; + + try { + const result = await scanPackageInWorker(task); + + message = { id: task.id, result }; + } + catch (error) { + const messageError = error instanceof Error ? + error.message : + String(error); + + message = { id: task.id, error: messageError }; + } + + message && parentPort?.postMessage(message); +}); diff --git a/workspaces/tarball/src/class/PooledWorker.class.ts b/workspaces/tarball/src/class/PooledWorker.class.ts new file mode 100644 index 0000000..7a4b240 --- /dev/null +++ b/workspaces/tarball/src/class/PooledWorker.class.ts @@ -0,0 +1,60 @@ +// Import Node.js Dependencies +import { Worker } from "node:worker_threads"; + +// Import Internal Dependencies +import type { + WorkerTaskWithId, + WorkerTaskResult +} from "./NpmTarballWorkerPool.class.ts"; + +export interface PooledWorkerEvents { + onComplete: (worker: PooledWorker, result: WorkerTaskResult) => void; + onError: (worker: PooledWorker, error: Error) => void; +} + +export class PooledWorker { + #worker: Worker; + #currentTaskId: string | null = null; + #events: PooledWorkerEvents; + + constructor( + workerPath: string, + events: PooledWorkerEvents + ) { + this.#events = events; + this.#worker = new Worker(workerPath); + + this.#worker.on("message", (message: WorkerTaskResult) => { + this.#currentTaskId = null; + this.#events.onComplete(this, message); + }); + + this.#worker.on("error", (error: Error) => { + this.#currentTaskId = null; + this.#events.onError(this, error); + }); + } + + get isAvailable(): boolean { + return this.#currentTaskId === null; + } + + get currentTaskId(): string | null { + return this.#currentTaskId; + } + + execute( + task: WorkerTaskWithId + ): void { + if (!this.isAvailable) { + throw new Error(`Worker is busy with task ${this.#currentTaskId}`); + } + + this.#currentTaskId = task.id; + this.#worker.postMessage(task); + } + + terminate(): Promise { + return this.#worker.terminate(); + } +} diff --git a/workspaces/tarball/src/index.ts b/workspaces/tarball/src/index.ts index 236de61..a84bb0b 100644 --- a/workspaces/tarball/src/index.ts +++ b/workspaces/tarball/src/index.ts @@ -1,2 +1,8 @@ export * from "./tarball.ts"; export * from "./class/NpmTarball.class.ts"; +export { + NpmTarballWorkerPool, + type WorkerTask, + type NpmTarballWorkerPoolOptions, + type ScanResultPayload +} from "./class/NpmTarballWorkerPool.class.ts";