Skip to content

Commit 83fbfe0

Browse files
committed
feat(executor): add timeout protection and plannedRuns parameter
- Add Promise.race wrapper around sutInstance.run() to enforce per-run timeout limits (300s default for large graphs) - Add optional plannedRuns parameter to execute() method to support parallel worker run filtering - Use effectivePlannedRuns to avoid shadowing the parameter name This prevents long-running graph traversals from blocking parallel workers and allows workers to execute disjoint run subsets.
1 parent fd34053 commit 83fbfe0

File tree

4 files changed

+253
-10
lines changed

4 files changed

+253
-10
lines changed

src/cli-commands/evaluate.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ const runExecutePhase = async (options: EvaluateOptions, sutRegistry: ExpansionS
616616
await executeParallel(remainingRuns, suts, cases, executorConfigWithCallbacks, {
617617
workers: options.parallelWorkers,
618618
checkpointDir: executeDir,
619+
timeoutMs: options.timeoutMs,
619620
});
620621

621622
// Merge worker checkpoints after parallel execution (main process only)
@@ -641,7 +642,8 @@ const runExecutePhase = async (options: EvaluateOptions, sutRegistry: ExpansionS
641642
}
642643
} else {
643644
// Single-process async execution (concurrent but single-threaded)
644-
await executor.execute(suts, cases, extractMetrics);
645+
// Pass remainingRuns to avoid re-executing completed runs
646+
await executor.execute(suts, cases, extractMetrics, remainingRuns);
645647
}
646648
}
647649

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/**
2+
* Unit tests for Executor
3+
*
4+
* Tests executor behavior including planned runs filtering.
5+
*/
6+
7+
import { beforeEach, describe, expect, it } from "vitest";
8+
9+
import type { CaseDefinition } from "../../types/case.js";
10+
import type { SutDefinition } from "../../types/sut.js";
11+
import { Executor } from "../executor.js";
12+
13+
/**
14+
* Mock expander for testing.
15+
*/
16+
class MockExpander {
17+
async expand(): Promise<void> {
18+
// No-op
19+
}
20+
}
21+
22+
/**
23+
* Mock result for testing.
24+
*/
25+
interface MockResult {
26+
value: string;
27+
}
28+
29+
/**
30+
* Create a mock SUT definition.
31+
* @param id
32+
*/
33+
const createMockSut = (id: string): SutDefinition<MockExpander, MockResult> => ({
34+
registration: {
35+
id,
36+
name: `Mock SUT ${id}`,
37+
role: "primary",
38+
version: "1.0.0",
39+
config: Object.freeze({}),
40+
tags: [],
41+
},
42+
factory: () =>
43+
({
44+
run: async () => ({ value: id }),
45+
}) as never,
46+
});
47+
48+
/**
49+
* Create a mock case definition.
50+
* @param id
51+
*/
52+
const createMockCase = (id: string): CaseDefinition<MockExpander> => ({
53+
case: {
54+
caseId: id,
55+
caseClass: "test",
56+
name: `Test Case ${id}`,
57+
version: "1.0.0",
58+
inputs: {},
59+
},
60+
createExpander: async () => new MockExpander(),
61+
getSeeds: () => [],
62+
});
63+
64+
describe("Executor", () => {
65+
let executor: Executor<MockExpander, MockResult>;
66+
67+
beforeEach(() => {
68+
executor = new Executor();
69+
});
70+
71+
describe("plan", () => {
72+
it("should generate planned runs for SUTs and cases", () => {
73+
const suts = [createMockSut("sut1"), createMockSut("sut2")];
74+
const cases = [createMockCase("case1"), createMockCase("case2")];
75+
76+
const planned = executor.plan(suts as never, cases as never);
77+
78+
expect(planned).toHaveLength(4); // 2 SUTs x 2 cases
79+
});
80+
81+
it("should include repetition in planned runs", () => {
82+
const executorWithRep = new Executor({ repetitions: 3 });
83+
const suts = [createMockSut("sut1")];
84+
const cases = [createMockCase("case1")];
85+
86+
const planned = executorWithRep.plan(suts as never, cases as never);
87+
88+
expect(planned).toHaveLength(3); // 1 SUT x 1 case x 3 reps
89+
expect(planned[0].repetition).toBe(0);
90+
expect(planned[1].repetition).toBe(1);
91+
expect(planned[2].repetition).toBe(2);
92+
});
93+
94+
it("should generate unique run IDs", () => {
95+
const suts = [createMockSut("sut1")];
96+
const cases = [createMockCase("case1"), createMockCase("case2")];
97+
98+
const planned = executor.plan(suts as never, cases as never);
99+
const runIds = new Set(planned.map((r) => r.runId));
100+
101+
expect(runIds.size).toBe(planned.length); // All unique
102+
});
103+
});
104+
105+
describe("execute with plannedRuns parameter", () => {
106+
it("should use provided plannedRuns instead of planning", async () => {
107+
const suts = [createMockSut("sut1")];
108+
const cases = [createMockCase("case1")];
109+
110+
// Plan all runs
111+
const allPlanned = executor.plan(suts, cases);
112+
113+
// Filter to only run the first one
114+
const filteredRuns = [allPlanned[0]];
115+
116+
const summary = await executor.execute(
117+
suts as never,
118+
cases as never,
119+
() => ({}),
120+
filteredRuns
121+
);
122+
123+
expect(summary.totalRuns).toBe(1);
124+
expect(summary.successfulRuns).toBe(1);
125+
});
126+
127+
it("should use filtered plannedRuns for single execution", async () => {
128+
const suts = [createMockSut("sut1")];
129+
const cases = [createMockCase("case1"), createMockCase("case2")];
130+
131+
// Plan all runs
132+
const allPlanned = executor.plan(suts, cases);
133+
134+
// Filter to only run half
135+
const filteredRuns = allPlanned.slice(0, 1);
136+
137+
const summary = await executor.execute(
138+
suts as never,
139+
cases as never,
140+
() => ({}),
141+
filteredRuns
142+
);
143+
144+
expect(summary.totalRuns).toBe(1);
145+
expect(summary.successfulRuns).toBe(1);
146+
});
147+
148+
it("should plan all runs when plannedRuns is undefined", async () => {
149+
const suts = [createMockSut("sut1")];
150+
const cases = [createMockCase("case1")];
151+
152+
const summary = await executor.execute(
153+
suts as never,
154+
cases as never,
155+
() => ({})
156+
);
157+
158+
expect(summary.totalRuns).toBe(1);
159+
expect(summary.successfulRuns).toBe(1);
160+
});
161+
162+
it("should work with empty plannedRuns array", async () => {
163+
const suts = [createMockSut("sut1")];
164+
const cases = [createMockCase("case1")];
165+
166+
const summary = await executor.execute(suts as never, cases as never, () => ({}), []);
167+
168+
expect(summary.totalRuns).toBe(0);
169+
expect(summary.successfulRuns).toBe(0);
170+
});
171+
});
172+
173+
describe("execute with parallel execution", () => {
174+
it("should use provided plannedRuns with concurrency > 1", async () => {
175+
const executorWithConcurrency = new Executor({ concurrency: 2 });
176+
const suts = [createMockSut("sut1"), createMockSut("sut2")];
177+
const cases = [createMockCase("case1")];
178+
179+
// Plan all runs
180+
const allPlanned = executorWithConcurrency.plan(suts as never, cases as never);
181+
182+
// Filter to only run one
183+
const filteredRuns = [allPlanned[0]];
184+
185+
const summary = await executorWithConcurrency.execute(
186+
suts as never,
187+
cases as never,
188+
() => ({}),
189+
filteredRuns
190+
);
191+
192+
expect(summary.totalRuns).toBe(1);
193+
expect(summary.successfulRuns).toBe(1);
194+
});
195+
});
196+
});

src/experiments/framework/executor/executor.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,16 +217,18 @@ export class Executor<TExpander, TResult> {
217217
* @param suts - SUTs to execute
218218
* @param cases - Cases to run against
219219
* @param metricsExtractor - Function to extract metrics from result
220+
* @param plannedRuns - Optional pre-filtered planned runs (for parallel workers)
220221
* @returns Execution summary with all results
221222
*/
222223
async execute(
223224
suts: SutDefinition<TExpander, TResult>[],
224225
cases: CaseDefinition<TExpander>[],
225-
metricsExtractor: (result: TResult) => Record<string, number>
226+
metricsExtractor: (result: TResult) => Record<string, number>,
227+
plannedRuns?: PlannedRun[]
226228
): Promise<ExecutionSummary> {
227229
const startTime = performance.now();
228230

229-
const plannedRuns = this.plan(suts, cases);
231+
const effectivePlannedRuns = plannedRuns ?? this.plan(suts, cases);
230232
const sutMap = new Map(suts.map((s) => [s.registration.id, s]));
231233
const caseMap = new Map(cases.map((c) => [c.case.caseId, c]));
232234

@@ -236,7 +238,7 @@ export class Executor<TExpander, TResult> {
236238
if (concurrency <= 1) {
237239
// Sequential execution (original behavior)
238240
return this.executeSequential(
239-
plannedRuns,
241+
effectivePlannedRuns,
240242
sutMap,
241243
caseMap,
242244
metricsExtractor,
@@ -246,7 +248,7 @@ export class Executor<TExpander, TResult> {
246248

247249
// Parallel execution with concurrency limit
248250
return this.executeParallel(
249-
plannedRuns,
251+
effectivePlannedRuns,
250252
sutMap,
251253
caseMap,
252254
metricsExtractor,
@@ -476,8 +478,14 @@ export class Executor<TExpander, TResult> {
476478
// Create SUT instance
477479
const sutInstance = sutDef.factory(expander, seeds, run.config);
478480

479-
// Execute
480-
const sutResult = await sutInstance.run();
481+
// Execute with timeout if configured
482+
let sutResult: TResult;
483+
sutResult = await (this.config.timeoutMs > 0 ? Promise.race([
484+
sutInstance.run(),
485+
new Promise<never>((_, reject) =>
486+
setTimeout(() => reject(new Error(`Timeout after ${this.config.timeoutMs}ms`)), this.config.timeoutMs)
487+
),
488+
]) : sutInstance.run());
481489

482490
const executionTimeMs = performance.now() - runStartTime;
483491

src/experiments/framework/executor/parallel-executor.ts

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,35 @@
1313
import { spawn } from "node:child_process";
1414
import { randomBytes } from "node:crypto";
1515
import { cpus } from "node:os";
16-
import { resolve } from "node:path";
16+
import { dirname, resolve } from "node:path";
1717

1818
import type { EvaluationResult } from "../types/result.js";
1919
import type { ExecutorConfig, PlannedRun } from "./executor.js";
2020

21+
/**
22+
* Get the package root directory by resolving from the entry point script.
23+
* The CLI entry point is dist/cli.js, so we go up one level from there.
24+
*/
25+
const getPackageRoot = (): string => {
26+
// Get the directory containing the entry point script
27+
// process.argv[1] is the path to the executed script (e.g., /path/to/graphbox/dist/cli.js)
28+
const entryPoint = process.argv[1];
29+
30+
// Resolve to absolute path first (handles relative paths like "dist/cli.js")
31+
const absoluteEntry = resolve(entryPoint);
32+
const entryDir = dirname(absoluteEntry);
33+
34+
// If entry point is in dist/, go up one level to get package root
35+
if (entryDir.endsWith("/dist") || entryDir.endsWith(String.raw`\dist`)) {
36+
return entryDir.slice(0, -5); // Remove "/dist"
37+
}
38+
39+
// Fallback: use current directory
40+
return process.cwd();
41+
};
42+
43+
const PACKAGE_ROOT = getPackageRoot();
44+
2145
export interface ParallelExecutorOptions {
2246
/** Number of parallel processes (default: CPU count) */
2347
workers?: number;
@@ -27,6 +51,9 @@ export interface ParallelExecutorOptions {
2751

2852
/** Checkpoint directory (defaults to "results/execute") */
2953
checkpointDir?: string;
54+
55+
/** Per-run timeout in milliseconds (0 = no timeout) */
56+
timeoutMs?: number;
3057
}
3158

3259
/**
@@ -89,10 +116,14 @@ export const shardPath = (checkpointDir: string, workerIndex: number): string =>
89116
export const executeParallel = async (runs: PlannedRun[], suts: unknown, cases: unknown[], config: ExecutorConfig & { onResult?: (result: EvaluationResult) => void }, options: ParallelExecutorOptions = {}): Promise<{ results: EvaluationResult[]; errors: Array<{ runId: string; error: string }> }> => {
90117
const numberWorkers = options.workers ?? cpus().length;
91118
const nodePath = options.nodePath ?? process.execPath;
92-
const checkpointDir = options.checkpointDir ?? resolve(process.cwd(), "results/execute");
119+
const checkpointDir = options.checkpointDir ?? resolve(PACKAGE_ROOT, "results/execute");
120+
const timeoutMs = options.timeoutMs ?? config.timeoutMs ?? 0;
93121

94122
console.log(`ParallelExecutor: Spawning ${numberWorkers} processes for ${runs.length} runs`);
95123
console.log(`Checkpoint directory: ${checkpointDir}`);
124+
if (timeoutMs > 0) {
125+
console.log(`Per-run timeout: ${timeoutMs}ms (${Math.round(timeoutMs / 1000)}s)`);
126+
}
96127

97128
// Generate unique names for each worker
98129
const workerNames = generateWorkerNames(numberWorkers);
@@ -122,15 +153,21 @@ export const executeParallel = async (runs: PlannedRun[], suts: unknown, cases:
122153
const workerCheckpointPath = shardPath(checkpointDir, index);
123154

124155
const arguments_ = [
125-
resolve(process.cwd(), "dist/cli.js"),
156+
resolve(PACKAGE_ROOT, "dist/cli.js"),
126157
"evaluate",
127158
"--phase=execute",
128159
"--checkpoint-mode=file",
129160
`--run-filter=${runFilter}`, // JSON array - needs to be quoted in shell but spawn() handles this
130161
];
131162

163+
// Add timeout if specified
164+
if (timeoutMs > 0) {
165+
arguments_.push(`--timeout=${timeoutMs}`);
166+
}
167+
132168
return spawn(nodePath, arguments_, {
133169
stdio: "inherit",
170+
cwd: PACKAGE_ROOT, // Ensure workers use the package root as working directory
134171
env: {
135172
...process.env,
136173
NODE_OPTIONS: "--max-old-space-size=4096",

0 commit comments

Comments
 (0)