Skip to content
65 changes: 63 additions & 2 deletions packages/engine/src/execution/executor-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
import { Backend } from "@sidequest/backend";
import { CompletedResult, JobData } from "@sidequest/core";
import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core";
import EventEmitter from "events";
import { JobTransitioner } from "../job/job-transitioner";
import { grantQueueConfig } from "../queue/grant-queue-config";
import { DummyJob } from "../test-jobs/dummy-job";
import { ExecutorManager } from "./executor-manager";

const runMock = vi.fn();
const runMock = vi.hoisted(() => vi.fn());

vi.mock("../shared-runner", () => ({
RunnerPool: vi.fn().mockImplementation(() => ({
Expand Down Expand Up @@ -68,6 +69,66 @@ describe("ExecutorManager", () => {
await executorManager.destroy();
});

sidequestTest("should abort job execution", async ({ backend, config }) => {
await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

let expectedPromise;
runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => {
const promise = new Promise((_, reject) => {
signal.on("abort", () => {
reject(new Error("The task has been aborted"));
});
});
await backend.updateJob({ ...job, state: "canceled" });
expectedPromise = promise;
return promise;
});

const execPromise = executorManager.execute(queryConfig, jobData);

expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.availableSlotsGlobal()).toEqual(9);

await execPromise;
expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
expect(runMock).toHaveReturnedWith(expectedPromise);
await expect(expectedPromise).rejects.toThrow("The task has been aborted");
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
expect(executorManager.availableSlotsGlobal()).toEqual(10);
await executorManager.destroy();
});

sidequestTest("should retry when unhandled error", async ({ backend, config }) => {
await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

runMock.mockImplementationOnce(() => {
throw new Error("Unhandled error during job execution");
});

const execPromise = executorManager.execute(queryConfig, jobData);

expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.availableSlotsGlobal()).toEqual(9);

await execPromise;
expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
expect(executorManager.availableSlotsGlobal()).toEqual(10);

// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RunTransition));
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition));

await executorManager.destroy();
});

sidequestTest("snoozes job when queue is full", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); // No available slots
const executorManager = new ExecutorManager(backend, config);
Expand Down
14 changes: 11 additions & 3 deletions packages/engine/src/execution/executor-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { Backend } from "@sidequest/backend";
import { JobData, JobTransitionFactory, logger, QueueConfig, RunTransition, SnoozeTransition } from "@sidequest/core";
import {
JobData,
JobTransitionFactory,
logger,
QueueConfig,
RetryTransition,
RunTransition,
SnoozeTransition,
} from "@sidequest/core";
import EventEmitter from "events";
import { NonNullableEngineConfig } from "../engine";
import { JobTransitioner } from "../job/job-transitioner";
Expand Down Expand Up @@ -120,8 +128,8 @@ export class ExecutorManager {
if (err.message === "The task has been aborted") {
logger("Executor Manager").debug(`Job ${job.id} was canceled`);
} else {
logger("Executor Manager").error(`Error executing job ${job.id}: ${err.message}`);
throw error;
logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`);
await JobTransitioner.apply(this.backend, job, new RetryTransition(err));
}
} finally {
isRunning = false;
Expand Down
17 changes: 16 additions & 1 deletion packages/engine/src/job/job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ describe("job.ts", () => {
expect(result.result).toBe("abc");
});

sidequestTest("should return the JobResult retyurne by run", async () => {
sidequestTest("should return the JobResult return by run", async () => {
class TransitionJob extends Job {
run() {
return { __is_job_transition__: true, type: "snooze" } as SnoozeResult;
Expand All @@ -218,5 +218,20 @@ describe("job.ts", () => {
expect(result.type).toBe("retry");
expect(result.error.message).toEqual("fail!");
});

sidequestTest("should return RetryResult if run unhandled promise", async () => {
class DummyUnhandled extends Job {
run() {
return new Promise(() => {
throw new Error("unhandled error");
});
}
}

const job = new DummyUnhandled();
const result = (await job.perform()) as RetryResult;
expect(result.type).toBe("retry");
expect(result.error.message).toEqual("unhandled error");
});
});
});
Loading