Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions workspaces/tarball/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
"@nodesecure/mama": "^2.0.0",
"@nodesecure/npm-types": "^1.2.0",
"@nodesecure/utils": "^2.3.0",
"@openally/result": "1.3.0",
"hyperid": "3.3.0",
"pacote": "^21.0.0"
},
"devDependencies": {
Expand Down
191 changes: 191 additions & 0 deletions workspaces/tarball/src/class/NpmTarballWorkerPool.class.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Import Node.js Dependencies
import { EventEmitter } from "node:events";
import path from "node:path";

// Import Third-party Dependencies
import hyperid from "hyperid";
// import { type Result, Ok, Err } from "@openally/result";
import type { AstAnalyserOptions } from "@nodesecure/js-x-ray";
import type { Contact } from "@nodesecure/npm-types";

// Import Internal Dependencies
import { PooledWorker } from "./PooledWorker.class.ts";

export interface NpmTarballWorkerPoolOptions {
/**
* Number of workers in the pool
* @default 4
*/
workerCount?: number;
}

export interface WorkerTask {
location: string;
astAnalyserOptions?: AstAnalyserOptions;
}

export interface WorkerTaskWithId extends WorkerTask {
id: string;
}

type WorkerTaskResultOk = {
id: string;
result: ScanResultPayload;
};

type WorkerTaskResultErr = {
id: string;
error: string;
};

export type WorkerTaskResult = WorkerTaskResultOk | WorkerTaskResultErr;

export interface ScanResultPayload {
description?: string;
engines?: Record<string, any>;
repository?: any;
scripts?: Record<string, string>;
author?: Contact | null;
integrity?: string | null;
type: string;
size: number;
licenses: any[];
uniqueLicenseIds: string[];
warnings: any[];
flags: string[];
composition: {
extensions: string[];
files: string[];
minified: string[];
unused: string[];
missing: string[];
required_files: string[];
required_nodejs: string[];
required_thirdparty: string[];
required_subpath: Record<string, string>;
};
}

interface TaskPromiseHandler {
resolve: (result: ScanResultPayload) => void;
reject: (error: Error) => void;
}

export class NpmTarballWorkerPool extends EventEmitter {
#generateTaskId = hyperid();
#workers: PooledWorker[] = [];
#processingTasks: Map<string, TaskPromiseHandler> = new Map();
#waitingTasks: WorkerTaskWithId[] = [];
#isTerminated = false;

constructor(
options: NpmTarballWorkerPoolOptions = {}
) {
super();

const { workerCount = 4 } = options;
const workerPath = path.join(
import.meta.dirname,
"NpmTarballWorkerScript.js"
);

for (let i = 0; i < workerCount; i++) {
const worker = new PooledWorker(workerPath, {
onComplete: (worker, message) => this.#onWorkerComplete(worker, message),
onError: (worker, error) => this.#onWorkerError(worker, error)
});

this.#workers.push(worker);
}
}

#onWorkerComplete(
worker: PooledWorker,
message: WorkerTaskResult
): void {
const handler = this.#processingTasks.get(message.id);
if (handler) {
this.#processingTasks.delete(message.id);

if ("error" in message) {
handler.reject(new Error(message.error));
}
else {
handler.resolve(message.result);
}
}

const nextTask = this.#waitingTasks.shift();
if (nextTask) {
worker.execute(nextTask);
}
}

#onWorkerError(
worker: PooledWorker,
error: Error
): void {
const taskId = worker.currentTaskId;
if (taskId) {
const handler = this.#processingTasks.get(taskId);
if (handler) {
this.#processingTasks.delete(taskId);
handler.reject(error);
}
}

this.emit("error", error);
const nextTask = this.#waitingTasks.shift();
if (nextTask) {
worker.execute(nextTask);
}
}

scan(
task: WorkerTask
): Promise<ScanResultPayload> {
if (this.#isTerminated) {
return Promise.reject(
new Error("NpmTarballWorkerPool has been terminated")
);
}

const fullTask: WorkerTaskWithId = {
id: this.#generateTaskId(),
...task
};

const { promise, resolve, reject } = Promise.withResolvers<ScanResultPayload>();
this.#processingTasks.set(fullTask.id, { resolve, reject });

const availableWorker = this.#workers.find((worker) => worker.isAvailable) ?? null;
if (availableWorker) {
availableWorker.execute(fullTask);
}
else {
this.#waitingTasks.push(fullTask);
}

return promise;
}

async terminate(): Promise<void> {
this.#isTerminated = true;

const terminationError = new Error("NpmTarballWorkerPool terminated");
for (const handler of this.#processingTasks.values()) {
handler.reject(terminationError);
}
this.#processingTasks.clear();
this.#waitingTasks = [];

await Promise.all(
this.#workers.map((worker) => worker.terminate())
);
this.#workers = [];
}

[Symbol.asyncDispose](): Promise<void> {
return this.terminate();
}
}
128 changes: 128 additions & 0 deletions workspaces/tarball/src/class/NpmTarballWorkerScript.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Import Node.js Dependencies
import { parentPort } from "node:worker_threads";
import path from "node:path";

// Import Third-party Dependencies
import { ManifestManager } from "@nodesecure/mama";
import { type Warning } from "@nodesecure/js-x-ray";

// Import Internal Dependencies
import { NpmTarball } from "./NpmTarball.class.ts";
import {
isSensitiveFile,
booleanToFlags
} from "../utils/index.ts";
import {
getEmptyPackageWarning,
getSemVerWarning
} from "../warnings.ts";

import type {
WorkerTaskWithId,
WorkerTaskResult,
ScanResultPayload
} from "./NpmTarballWorkerPool.class.ts";

// CONSTANTS
const kNativeCodeExtensions = new Set([".gyp", ".c", ".cpp", ".node", ".so", ".h"]);

async function scanPackageInWorker(
task: WorkerTaskWithId
): Promise<ScanResultPayload> {
const { location, astAnalyserOptions } = task;

const mama = await ManifestManager.fromPackageJSON(
location
);
const tarex = new NpmTarball(mama);

const {
composition,
conformance,
code
} = await tarex.scanFiles(astAnalyserOptions);

const warnings: Warning[] = [];

// Check for empty package
if (
composition.files.length === 1 &&
composition.files.includes("package.json")
) {
warnings.push(getEmptyPackageWarning());
}

// Check for zero semver
if (mama.hasZeroSemver) {
warnings.push(getSemVerWarning(mama.document.version!));
}

warnings.push(...code.warnings);

const {
files,
dependencies,
flags
} = code.groupAndAnalyseDependencies(mama);

const computedFlags = booleanToFlags({
...flags,
hasExternalCapacity: code.flags.hasExternalCapacity || flags.hasExternalCapacity,
hasNoLicense: conformance.uniqueLicenseIds.length === 0,
hasMultipleLicenses: conformance.uniqueLicenseIds.length > 1,
hasMinifiedCode: code.minified.length > 0,
hasWarnings: warnings.length > 0,
hasBannedFile: composition.files.some((filePath) => isSensitiveFile(filePath)),
hasNativeCode: mama.flags.isNative ||
composition.files.some((file) => kNativeCodeExtensions.has(path.extname(file))),
hasScript: mama.flags.hasUnsafeScripts
});
const {
description, engines, repository, scripts
} = mama.document;

return {
description,
engines,
repository,
scripts,
author: mama.author,
integrity: mama.isWorkspace ? null : mama.integrity,
type: mama.moduleType,
size: composition.size,
licenses: conformance.licenses,
uniqueLicenseIds: conformance.uniqueLicenseIds,
warnings,
flags: Array.from(computedFlags),
composition: {
extensions: [...composition.ext],
files: composition.files,
minified: code.minified,
unused: dependencies.unused,
missing: dependencies.missing,
required_files: [...files],
required_nodejs: dependencies.nodejs,
required_thirdparty: dependencies.thirdparty,
required_subpath: dependencies.subpathImports
}
};
}

parentPort?.on("message", async(task: WorkerTaskWithId) => {
let message: WorkerTaskResult;

try {
const result = await scanPackageInWorker(task);

message = { id: task.id, result };
}
catch (error) {
const messageError = error instanceof Error ?
error.message :
String(error);

message = { id: task.id, error: messageError };
}

message && parentPort?.postMessage(message);
});
60 changes: 60 additions & 0 deletions workspaces/tarball/src/class/PooledWorker.class.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Import Node.js Dependencies
import { Worker } from "node:worker_threads";

// Import Internal Dependencies
import type {
WorkerTaskWithId,
WorkerTaskResult
} from "./NpmTarballWorkerPool.class.ts";

export interface PooledWorkerEvents {
onComplete: (worker: PooledWorker, result: WorkerTaskResult) => void;
onError: (worker: PooledWorker, error: Error) => void;
}

export class PooledWorker {
#worker: Worker;
#currentTaskId: string | null = null;
#events: PooledWorkerEvents;

constructor(
workerPath: string,
events: PooledWorkerEvents
) {
this.#events = events;
this.#worker = new Worker(workerPath);

this.#worker.on("message", (message: WorkerTaskResult) => {
this.#currentTaskId = null;
this.#events.onComplete(this, message);
});

this.#worker.on("error", (error: Error) => {
this.#currentTaskId = null;
this.#events.onError(this, error);
});
}

get isAvailable(): boolean {
return this.#currentTaskId === null;
}

get currentTaskId(): string | null {
return this.#currentTaskId;
}

execute(
task: WorkerTaskWithId
): void {
if (!this.isAvailable) {
throw new Error(`Worker is busy with task ${this.#currentTaskId}`);
}

this.#currentTaskId = task.id;
this.#worker.postMessage(task);
}

terminate(): Promise<number> {
return this.#worker.terminate();
}
}
6 changes: 6 additions & 0 deletions workspaces/tarball/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
export * from "./tarball.ts";
export * from "./class/NpmTarball.class.ts";
export {
NpmTarballWorkerPool,
type WorkerTask,
type NpmTarballWorkerPoolOptions,
type ScanResultPayload
} from "./class/NpmTarballWorkerPool.class.ts";