Skip to content
Merged
1 change: 0 additions & 1 deletion packages/core/src/transitions/fail-transition.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ describe("FailTransition", () => {
name: "Error",
stack: expect.any(String) as string,
level: expect.any(String) as string,
timestamp: expect.any(String) as string,
},
],
});
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/transitions/fail-transition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class FailTransition extends JobTransition {
}

apply(job: JobData): JobData {
logger().error(this.reason);
logger("Core").error(this.reason);
const error = toErrorData(this.reason);
job.errors ??= [];
const errData = {
Expand Down
2 changes: 0 additions & 2 deletions packages/core/src/transitions/retry-transition.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ describe("RetryTransition", () => {
name: "Error",
stack: expect.any(String) as string,
level: expect.any(String) as string,
timestamp: expect.any(String) as string,
},
],
});
Expand Down Expand Up @@ -101,7 +100,6 @@ describe("RetryTransition", () => {
name: "Error",
stack: expect.any(String) as string,
level: expect.any(String) as string,
timestamp: expect.any(String) as string,
},
],
});
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/transitions/retry-transition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class RetryTransition extends JobTransition {
return new FailTransition(this.reason).apply(job);
}

logger().error(this.reason);
logger("Core").error(this.reason);
const reason = toErrorData(this.reason);

const delay = this.delay ?? this.calculateBackoff(job.attempt);
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/transitions/run-transition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { JobTransition } from "./transition";
*/
export class RunTransition extends JobTransition {
apply(job: JobData): JobData {
logger("Core").info(`Running job ${job.class} with args: ${JSON.stringify(job.args)}`);
logger("Core").info(`Running job #${job.id} - ${job.class} with args: ${JSON.stringify(job.args)}`);
job.state = "running";
job.attempted_at = new Date();
job.attempt = job.attempt + 1;
Expand Down
108 changes: 106 additions & 2 deletions packages/engine/src/execution/executor-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
import { Backend } from "@sidequest/backend";
import { CompletedResult, JobData } from "@sidequest/core";
import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core";
import EventEmitter from "events";
import { JobTransitioner } from "../job/job-transitioner";
import { grantQueueConfig } from "../queue/grant-queue-config";
import { DummyJob } from "../test-jobs/dummy-job";
import { ExecutorManager } from "./executor-manager";

const runMock = vi.fn();
const runMock = vi.hoisted(() => vi.fn());

vi.mock("../shared-runner", () => ({
RunnerPool: vi.fn().mockImplementation(() => ({
Expand Down Expand Up @@ -68,6 +69,109 @@ describe("ExecutorManager", () => {
await executorManager.destroy();
});

sidequestTest("should abort job execution on job cancel", async ({ backend, config }) => {
await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

let expectedPromise;
runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => {
const promise = new Promise((_, reject) => {
signal.on("abort", () => {
reject(new Error("The task has been aborted"));
});
});
await backend.updateJob({ ...job, state: "canceled" });
expectedPromise = promise;
return promise;
});

const execPromise = executorManager.execute(queryConfig, jobData);

expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.availableSlotsGlobal()).toEqual(9);

await execPromise;
expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
expect(runMock).toHaveReturnedWith(expectedPromise);
await expect(expectedPromise).rejects.toThrow("The task has been aborted");
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
expect(executorManager.availableSlotsGlobal()).toEqual(10);

// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RunTransition));
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition));

await executorManager.destroy();
});

sidequestTest("should abort job execution on timeout", async ({ backend, config }) => {
jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 });

const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

let expectedPromise;
runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => {
const promise = new Promise((_, reject) => {
signal.on("abort", () => {
reject(new Error("The task has been aborted"));
});
});
expectedPromise = promise;
return promise;
});

const execPromise = executorManager.execute(queryConfig, jobData);

expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.availableSlotsGlobal()).toEqual(9);

await execPromise;
expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
expect(runMock).toHaveReturnedWith(expectedPromise);
await expect(expectedPromise).rejects.toThrow("The task has been aborted");
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
expect(executorManager.availableSlotsGlobal()).toEqual(10);

// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RunTransition));
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition));

await executorManager.destroy();
});

sidequestTest("should retry when unhandled error", async ({ backend, config }) => {
await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

runMock.mockImplementationOnce(() => {
throw new Error("Unhandled error during job execution");
});

const execPromise = executorManager.execute(queryConfig, jobData);

expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.availableSlotsGlobal()).toEqual(9);

await execPromise;
expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
expect(executorManager.availableSlotsGlobal()).toEqual(10);

// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RunTransition));
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition));

await executorManager.destroy();
});

sidequestTest("snoozes job when queue is full", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); // No available slots
const executorManager = new ExecutorManager(backend, config);
Expand Down
40 changes: 30 additions & 10 deletions packages/engine/src/execution/executor-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { Backend } from "@sidequest/backend";
import { JobData, JobTransitionFactory, logger, QueueConfig, RunTransition, SnoozeTransition } from "@sidequest/core";
import {
JobData,
JobTransitionFactory,
logger,
QueueConfig,
RetryTransition,
RunTransition,
SnoozeTransition,
} from "@sidequest/core";
import EventEmitter from "events";
import { NonNullableEngineConfig } from "../engine";
import { JobTransitioner } from "../job/job-transitioner";
Expand Down Expand Up @@ -90,13 +98,11 @@ export class ExecutorManager {
job = await JobTransitioner.apply(this.backend, job, new RunTransition());

const signal = new EventEmitter();

let isRunning = true;

const jobChecker = async () => {
const cancellationCheck = async () => {
while (isRunning) {
const watchedJob = await this.backend.getJob(job.id);
if (watchedJob?.state === "canceled") {
if (watchedJob!.state === "canceled") {
logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`);
signal.emit("abort");
isRunning = false;
Expand All @@ -105,11 +111,25 @@ export class ExecutorManager {
await new Promise((r) => setTimeout(r, 1000));
}
};
void jobChecker();
void cancellationCheck();

try {
logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`);
const result = await this.runnerPool.run(job, signal);

const runPromise = this.runnerPool.run(job, signal);

if (job.timeout) {
void new Promise(() => {
setTimeout(() => {
logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`);
signal.emit("abort");
void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`));
}, job.timeout!);
});
}

const result = await runPromise;

isRunning = false;
logger("Executor Manager").debug(`Job ${job.id} completed with result: ${JSON.stringify(result)}`);
const transition = JobTransitionFactory.create(result);
Expand All @@ -118,10 +138,10 @@ export class ExecutorManager {
isRunning = false;
const err = error as Error;
if (err.message === "The task has been aborted") {
logger("Executor Manager").debug(`Job ${job.id} was canceled`);
logger("Executor Manager").debug(`Job ${job.id} was aborted`);
} else {
logger("Executor Manager").error(`Error executing job ${job.id}: ${err.message}`);
throw error;
logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`);
await JobTransitioner.apply(this.backend, job, new RetryTransition(err));
}
} finally {
isRunning = false;
Expand Down
2 changes: 1 addition & 1 deletion packages/engine/src/job/job-transitioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class JobTransitioner {
return jobData;
}

logger("JobTransitioner").info(`Applying transition ${transition.constructor.name} to job ${jobData.id}`);
logger("JobTransitioner").debug(`Applying transition ${transition.constructor.name} to job ${jobData.id}`);
const newData = transition.apply(jobData);
if (newData.uniqueness_config) {
const uniqueness = UniquenessFactory.create(newData.uniqueness_config);
Expand Down
17 changes: 16 additions & 1 deletion packages/engine/src/job/job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ describe("job.ts", () => {
expect(result.result).toBe("abc");
});

sidequestTest("should return the JobResult retyurne by run", async () => {
sidequestTest("should return the JobResult return by run", async () => {
class TransitionJob extends Job {
run() {
return { __is_job_transition__: true, type: "snooze" } as SnoozeResult;
Expand All @@ -218,5 +218,20 @@ describe("job.ts", () => {
expect(result.type).toBe("retry");
expect(result.error.message).toEqual("fail!");
});

sidequestTest("should return RetryResult if run unhandled promise", async () => {
class DummyUnhandled extends Job {
run() {
return new Promise(() => {
throw new Error("unhandled error");
});
}
}

const job = new DummyUnhandled();
const result = (await job.perform()) as RetryResult;
expect(result.type).toBe("retry");
expect(result.error.message).toEqual("unhandled error");
});
});
});
2 changes: 1 addition & 1 deletion packages/engine/src/job/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ async function hasClassExported(filePath: string, className: string): Promise<bo

return false;
} catch (e) {
logger().debug(e);
logger("Core").debug(e);
return false;
}
}
Loading