From 5fdda390acd424b3e60a528fd93c8c6095346a23 Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Tue, 29 Jul 2025 11:27:38 -0300 Subject: [PATCH 1/4] feat: Enhance job execution handling with retry and abort scenarios --- .../src/execution/executor-manager.test.ts | 65 ++++++++++++++++++- .../engine/src/execution/executor-manager.ts | 14 +++- packages/engine/src/job/job.test.ts | 17 ++++- 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 0219c12..aaceb8c 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -1,12 +1,13 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; import { Backend } from "@sidequest/backend"; -import { CompletedResult, JobData } from "@sidequest/core"; +import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core"; import EventEmitter from "events"; +import { JobTransitioner } from "../job/job-transitioner"; import { grantQueueConfig } from "../queue/grant-queue-config"; import { DummyJob } from "../test-jobs/dummy-job"; import { ExecutorManager } from "./executor-manager"; -const runMock = vi.fn(); +const runMock = vi.hoisted(() => vi.fn()); vi.mock("../shared-runner", () => ({ RunnerPool: vi.fn().mockImplementation(() => ({ @@ -68,6 +69,66 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); + sidequestTest("should abort job execution", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4); + + let expectedPromise; + runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + const promise = new Promise((_, reject) => { + signal.on("abort", () => { + reject(new Error("The task has been aborted")); + }); + }); + await backend.updateJob({ ...job, state: "canceled" }); + expectedPromise = promise; + return promise; + }); + + const execPromise = executorManager.execute(queryConfig, jobData); + + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0); + expect(executorManager.availableSlotsGlobal()).toEqual(9); + + await execPromise; + expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toHaveReturnedWith(expectedPromise); + await expect(expectedPromise).rejects.toThrow("The task has been aborted"); + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); + expect(executorManager.availableSlotsGlobal()).toEqual(10); + await executorManager.destroy(); + }); + + sidequestTest("should retry when unhandled error", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4); + + runMock.mockImplementationOnce(() => { + throw new Error("Unhandled error during job execution"); + }); + + const execPromise = executorManager.execute(queryConfig, jobData); + + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0); + expect(executorManager.availableSlotsGlobal()).toEqual(9); + + await execPromise; + expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); + expect(executorManager.availableSlotsGlobal()).toEqual(10); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RunTransition)); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition)); + + await executorManager.destroy(); + }); + sidequestTest("snoozes job when queue is full", async ({ backend, config }) => { const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); // No available slots const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 847fa0f..21b6afe 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -1,5 +1,13 @@ import { Backend } from "@sidequest/backend"; -import { JobData, JobTransitionFactory, logger, QueueConfig, RunTransition, SnoozeTransition } from "@sidequest/core"; +import { + JobData, + JobTransitionFactory, + logger, + QueueConfig, + RetryTransition, + RunTransition, + SnoozeTransition, +} from "@sidequest/core"; import EventEmitter from "events"; import { JobTransitioner } from "../job/job-transitioner"; import { RunnerPool } from "../shared-runner"; @@ -123,8 +131,8 @@ export class ExecutorManager { if (err.message === "The task has been aborted") { logger("Executor Manager").debug(`Job ${job.id} was canceled`); } else { - logger("Executor Manager").error(`Error executing job ${job.id}: ${err.message}`); - throw error; + logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); + await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); } } finally { isRunning = false; diff --git a/packages/engine/src/job/job.test.ts b/packages/engine/src/job/job.test.ts index c485e5f..2e03a02 100644 --- a/packages/engine/src/job/job.test.ts +++ b/packages/engine/src/job/job.test.ts @@ -196,7 +196,7 @@ describe("job.ts", () => { expect(result.result).toBe("abc"); }); - sidequestTest("should return the JobResult retyurne by run", async () => { + sidequestTest("should return the JobResult return by run", async () => { class TransitionJob extends Job { run() { return { __is_job_transition__: true, type: "snooze" } as SnoozeResult; @@ -218,5 +218,20 @@ describe("job.ts", () => { expect(result.type).toBe("retry"); expect(result.error.message).toEqual("fail!"); }); + + sidequestTest("should return RetryResult if run unhandled promise", async () => { + class DummyUnhandled extends Job { + run() { + return new Promise(() => { + throw new Error("unhandled error"); + }); + } + } + + const job = new DummyUnhandled(); + const result = (await job.perform()) as RetryResult; + expect(result.type).toBe("retry"); + expect(result.error.message).toEqual("unhandled error"); + }); }); }); From 59588e60e778a93efb689d012624392f8a2436aa Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Tue, 29 Jul 2025 11:30:39 -0300 Subject: [PATCH 2/4] refactor: Simplify ExecutorManager instantiation by passing config directly --- packages/engine/src/execution/executor-manager.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 1a00158..baf1684 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -73,7 +73,7 @@ describe("ExecutorManager", () => { await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); - const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4); + const executorManager = new ExecutorManager(backend, config); let expectedPromise; runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { @@ -105,7 +105,7 @@ describe("ExecutorManager", () => { await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); - const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4); + const executorManager = new ExecutorManager(backend, config); runMock.mockImplementationOnce(() => { throw new Error("Unhandled error during job execution"); From 1c92ec2c58deeec9ad3491ae9d974a948d48f8e2 Mon Sep 17 00:00:00 2001 From: Giovani Guizzo Date: Tue, 29 Jul 2025 13:07:09 -0300 Subject: [PATCH 3/4] refactor: Update logging to use "Core" context and improve job execution messages --- .../src/transitions/fail-transition.test.ts | 1 - .../core/src/transitions/fail-transition.ts | 2 +- .../src/transitions/retry-transition.test.ts | 2 - .../core/src/transitions/retry-transition.ts | 2 +- .../core/src/transitions/run-transition.ts | 2 +- .../src/execution/executor-manager.test.ts | 45 ++++++++++++++++++- .../engine/src/execution/executor-manager.ts | 27 ++++++++--- packages/engine/src/job/job-transitioner.ts | 2 +- packages/engine/src/job/job.ts | 2 +- 9 files changed, 69 insertions(+), 16 deletions(-) diff --git a/packages/core/src/transitions/fail-transition.test.ts b/packages/core/src/transitions/fail-transition.test.ts index 5e3fd69..54477cb 100644 --- a/packages/core/src/transitions/fail-transition.test.ts +++ b/packages/core/src/transitions/fail-transition.test.ts @@ -64,7 +64,6 @@ describe("FailTransition", () => { name: "Error", stack: expect.any(String) as string, level: expect.any(String) as string, - timestamp: expect.any(String) as string, }, ], }); diff --git a/packages/core/src/transitions/fail-transition.ts b/packages/core/src/transitions/fail-transition.ts index b328d46..0e8fd66 100644 --- a/packages/core/src/transitions/fail-transition.ts +++ b/packages/core/src/transitions/fail-transition.ts @@ -27,7 +27,7 @@ export class FailTransition extends JobTransition { } apply(job: JobData): JobData { - logger().error(this.reason); + logger("Core").error(this.reason); const error = toErrorData(this.reason); job.errors ??= []; const errData = { diff --git a/packages/core/src/transitions/retry-transition.test.ts b/packages/core/src/transitions/retry-transition.test.ts index b607594..9163b15 100644 --- a/packages/core/src/transitions/retry-transition.test.ts +++ b/packages/core/src/transitions/retry-transition.test.ts @@ -69,7 +69,6 @@ describe("RetryTransition", () => { name: "Error", stack: expect.any(String) as string, level: expect.any(String) as string, - timestamp: expect.any(String) as string, }, ], }); @@ -101,7 +100,6 @@ describe("RetryTransition", () => { name: "Error", stack: expect.any(String) as string, level: expect.any(String) as string, - timestamp: expect.any(String) as string, }, ], }); diff --git a/packages/core/src/transitions/retry-transition.ts b/packages/core/src/transitions/retry-transition.ts index bceba59..af53f00 100644 --- a/packages/core/src/transitions/retry-transition.ts +++ b/packages/core/src/transitions/retry-transition.ts @@ -39,7 +39,7 @@ export class RetryTransition extends JobTransition { return new FailTransition(this.reason).apply(job); } - logger().error(this.reason); + logger("Core").error(this.reason); const reason = toErrorData(this.reason); const delay = this.delay ?? this.calculateBackoff(job.attempt); diff --git a/packages/core/src/transitions/run-transition.ts b/packages/core/src/transitions/run-transition.ts index 2036ee6..6d60f62 100644 --- a/packages/core/src/transitions/run-transition.ts +++ b/packages/core/src/transitions/run-transition.ts @@ -14,7 +14,7 @@ import { JobTransition } from "./transition"; */ export class RunTransition extends JobTransition { apply(job: JobData): JobData { - logger("Core").info(`Running job ${job.class} with args: ${JSON.stringify(job.args)}`); + logger("Core").info(`Running job #${job.id} - ${job.class} with args: ${JSON.stringify(job.args)}`); job.state = "running"; job.attempted_at = new Date(); job.attempt = job.attempt + 1; diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index baf1684..ab5287b 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -69,7 +69,7 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - sidequestTest("should abort job execution", async ({ backend, config }) => { + sidequestTest("should abort job execution on job cancel", async ({ backend, config }) => { await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); @@ -98,6 +98,49 @@ describe("ExecutorManager", () => { await expect(expectedPromise).rejects.toThrow("The task has been aborted"); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); expect(executorManager.availableSlotsGlobal()).toEqual(10); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RunTransition)); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition)); + + await executorManager.destroy(); + }); + + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { + jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + let expectedPromise; + runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + const promise = new Promise((_, reject) => { + signal.on("abort", () => { + reject(new Error("The task has been aborted")); + }); + }); + expectedPromise = promise; + return promise; + }); + + const execPromise = executorManager.execute(queryConfig, jobData); + + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0); + expect(executorManager.availableSlotsGlobal()).toEqual(9); + + await execPromise; + expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toHaveReturnedWith(expectedPromise); + await expect(expectedPromise).rejects.toThrow("The task has been aborted"); + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); + expect(executorManager.availableSlotsGlobal()).toEqual(10); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RunTransition)); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition)); + await executorManager.destroy(); }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 33fc436..6557650 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -98,13 +98,11 @@ export class ExecutorManager { job = await JobTransitioner.apply(this.backend, job, new RunTransition()); const signal = new EventEmitter(); - let isRunning = true; - - const jobChecker = async () => { + const cancellationCheck = async () => { while (isRunning) { const watchedJob = await this.backend.getJob(job.id); - if (watchedJob?.state === "canceled") { + if (watchedJob!.state === "canceled") { logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); signal.emit("abort"); isRunning = false; @@ -113,11 +111,26 @@ export class ExecutorManager { await new Promise((r) => setTimeout(r, 1000)); } }; - void jobChecker(); + void cancellationCheck(); try { logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); - const result = await this.runnerPool.run(job, signal); + + const runPromise = this.runnerPool.run(job, signal); + + if (job.timeout) { + void new Promise(() => { + const nodeTimeout = setTimeout(() => { + logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); + signal.emit("abort"); + void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`)); + clearTimeout(nodeTimeout); + }, job.timeout!); + }); + } + + const result = await runPromise; + isRunning = false; logger("Executor Manager").debug(`Job ${job.id} completed with result: ${JSON.stringify(result)}`); const transition = JobTransitionFactory.create(result); @@ -126,7 +139,7 @@ export class ExecutorManager { isRunning = false; const err = error as Error; if (err.message === "The task has been aborted") { - logger("Executor Manager").debug(`Job ${job.id} was canceled`); + logger("Executor Manager").debug(`Job ${job.id} was aborted`); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); diff --git a/packages/engine/src/job/job-transitioner.ts b/packages/engine/src/job/job-transitioner.ts index 46198d6..ff46208 100644 --- a/packages/engine/src/job/job-transitioner.ts +++ b/packages/engine/src/job/job-transitioner.ts @@ -23,7 +23,7 @@ export class JobTransitioner { return jobData; } - logger("JobTransitioner").info(`Applying transition ${transition.constructor.name} to job ${jobData.id}`); + logger("JobTransitioner").debug(`Applying transition ${transition.constructor.name} to job ${jobData.id}`); const newData = transition.apply(jobData); if (newData.uniqueness_config) { const uniqueness = UniquenessFactory.create(newData.uniqueness_config); diff --git a/packages/engine/src/job/job.ts b/packages/engine/src/job/job.ts index 4842758..3ab8a36 100644 --- a/packages/engine/src/job/job.ts +++ b/packages/engine/src/job/job.ts @@ -318,7 +318,7 @@ async function hasClassExported(filePath: string, className: string): Promise Date: Tue, 29 Jul 2025 13:33:29 -0300 Subject: [PATCH 4/4] fix: Simplify job timeout handling by removing unnecessary clearTimeout --- packages/engine/src/execution/executor-manager.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 6557650..5d65054 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -120,11 +120,10 @@ export class ExecutorManager { if (job.timeout) { void new Promise(() => { - const nodeTimeout = setTimeout(() => { + setTimeout(() => { logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); signal.emit("abort"); void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`)); - clearTimeout(nodeTimeout); }, job.timeout!); }); }