Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ migrations/
*.log
*.tmp

packages/docs/.vitepress/cache
packages/docs/.vitepress/cache

# generated file
CHANGELOG.md
4 changes: 2 additions & 2 deletions packages/docs/jobs/recurring.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ Sidequest.build(MyJob).schedule("*/5 * * * * *", "foo"); // Every 5 seconds with
## Limitations and Recommendations

- **Persistence:** If your application restarts, any scheduled jobs must be re-scheduled via code. (This is by design and similar to other popular job libraries.)
- **Clustering:** In a multi-instance environment, each instance will create its own scheduled jobs unless you coordinate or restrict scheduling to a single node.
To avoid duplicate executions, we recommend enabling job uniqueness with a period window (e.g., “unique per hour” or “unique per minute”).
- **Clustering:** In a multi-instance environment, each instance will create its own scheduled jobs unless you coordinate or restrict scheduling to a single node.
To avoid duplicate executions, we recommend enabling job uniqueness with a period window (e.g., “unique per hour” or “unique per minute”).
This ensures that even if multiple nodes schedule the same job, only one will actually run for each interval.
105 changes: 85 additions & 20 deletions packages/engine/src/execution/dispatcher.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
import { Backend } from "@sidequest/backend";
import { CompletedResult, JobData } from "@sidequest/core";
import { CompletedResult, JobData, QueueConfig } from "@sidequest/core";
import { EngineConfig } from "../engine";
import { DummyJob } from "../test-jobs/dummy-job";
import { Dispatcher } from "./dispatcher";
Expand All @@ -19,7 +19,7 @@ vi.mock("../shared-runner", () => ({
async function createJob(backend: Backend, queue = "default") {
const job = new DummyJob();
await job.ready();
await backend.createNewJob({
return await backend.createNewJob({
queue: queue,
state: "waiting",
script: job.script,
Expand Down Expand Up @@ -74,34 +74,59 @@ describe("Dispatcher", () => {
await dispatcher.stop();
});

sidequestTest("does not claim job when there is no available slot for the queue", async ({ backend }) => {
await createJob(backend, "noop");

expect(await backend.listJobs({ state: "waiting" })).toHaveLength(2);

sidequestTest("claim up to 20 jobs when the max concurency and job concurrency was unlimited", async ({ backend }) => {
const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]);

const dispatcher = new Dispatcher(
backend,
new QueueManager(backend, config.queues!),
new ExecutorManager(backend, config.maxConcurrentJobs!, 2, 4),
new QueueManager(backend, [{ name: "default", concurrency: 0 }]),
new ExecutorManager(backend, 0, 2, 4),
);

dispatcher.start();

runMock.mockImplementationOnce(() => {
return { type: "completed", result: "foo", __is_job_transition__: true } as CompletedResult;
await vi.waitUntil(() => {
return claimSpy.mock.calls.length > 0;
});

let jobs: JobData[];

await vi.waitUntil(async () => {
jobs = await backend.listJobs({ state: "waiting" });
return jobs.length === 1;
});
expect(claimSpy).toHaveBeenCalledWith("default", 20);

expect(jobs!).toHaveLength(1);
expect(jobs![0].queue).toEqual("noop");
await dispatcher.stop();
});

sidequestTest("breaks queue loop when availableSlots is MAX_SAFE_INTEGER", async ({ backend }) => {
const queue1 = { name: "queue1", concurrency: 0 }; // interpreted as unlimited
const queue2 = { name: "queue2", concurrency: 10 };

const queueManager = new QueueManager(backend, [queue1, queue2]);
const executorManager = new ExecutorManager(backend, 0, 2, 4); // global also unlimited

const dispatcher = new Dispatcher(backend, queueManager, executorManager);

const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]);
const executeSpy = vi.spyOn(executorManager, "execute");

// queue1 and global are unlimited → MAX_SAFE_INTEGER
vi.spyOn(executorManager, "availableSlotsByQueue").mockImplementation((q) =>
q.name === "queue1" ? Number.MAX_SAFE_INTEGER : 10,
);
vi.spyOn(executorManager, "availableSlotsGlobal").mockReturnValue(Number.MAX_SAFE_INTEGER);

vi.spyOn(queueManager, "getActiveQueuesWithRunnableJobs").mockResolvedValue([queue1 as unknown as QueueConfig, queue2 as unknown as QueueConfig]);

dispatcher.start();

await vi.waitUntil(() => claimSpy.mock.calls.length > 0);

expect(claimSpy).toHaveBeenCalledTimes(1);
expect(claimSpy).toHaveBeenCalledWith("queue1", 20); // capped by safeAvailableSlots
expect(claimSpy).not.toHaveBeenCalledWith("queue2", expect.anything());

expect(executeSpy).not.toHaveBeenCalled();

await dispatcher.stop();
});


sidequestTest("does not claim job when there is no available global slot", async ({ backend }) => {
config.maxConcurrentJobs = 1;
Expand Down Expand Up @@ -132,5 +157,45 @@ describe("Dispatcher", () => {

await dispatcher.stop();
});

sidequestTest("does not claim more jobs than queue concurrency allows", async ({ backend }) => {
const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]);

const dispatcher = new Dispatcher(
backend,
new QueueManager(backend, [{ name: "default", concurrency: 10 }]),
new ExecutorManager(backend, 20, 2, 4),
);

dispatcher.start();

await vi.waitUntil(() => {
return claimSpy.mock.calls.length > 0;
});

expect(claimSpy).toHaveBeenCalledWith("default", 10);

await dispatcher.stop();
});

sidequestTest("does not claim more jobs than global concurrency allows", async ({ backend }) => {
const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]);

const dispatcher = new Dispatcher(
backend,
new QueueManager(backend, [{ name: "default", concurrency: 10 }]),
new ExecutorManager(backend, 1, 2, 4),
);

dispatcher.start();

await vi.waitUntil(() => {
return claimSpy.mock.calls.length > 0;
});

expect(claimSpy).toHaveBeenCalledWith("default", 1);

await dispatcher.stop();
});
});
});
13 changes: 10 additions & 3 deletions packages/engine/src/execution/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ExecutorManager } from "./executor-manager";
import { QueueManager } from "./queue-manager";

const sleepDelay = 100;
const maxSafeClaim = 20;

/**
* Dispatcher for managing job execution and queue polling.
Expand Down Expand Up @@ -35,8 +36,8 @@ export class Dispatcher {
let shouldSleep = true;

for (const queue of queues) {
const availableSlots = this.executorManager.availableSlotsByQueue(queue);
if (availableSlots <= 0) {
const queueAvailableSlots = this.executorManager.availableSlotsByQueue(queue);
if (queueAvailableSlots <= 0) {
logger("Dispatcher").debug(`Queue ${queue.name} limit reached!`);
await this.sleep(sleepDelay);
continue;
Expand All @@ -49,7 +50,9 @@ export class Dispatcher {
continue;
}

const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, availableSlots);
const availableSlots = Math.min(queueAvailableSlots, globalSlots);
const safeAvailableSlots = Number.MAX_SAFE_INTEGER === availableSlots ? maxSafeClaim : availableSlots;
const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, safeAvailableSlots);

if (jobs.length > 0) {
// if a job was found on any queue do not sleep
Expand All @@ -60,6 +63,10 @@ export class Dispatcher {
// does not await for job execution.
void this.executorManager.execute(queue, job);
}

if(availableSlots == Number.MAX_SAFE_INTEGER){
break;
}
}

if (shouldSleep) {
Expand Down
22 changes: 10 additions & 12 deletions packages/engine/src/execution/executor-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,35 @@ describe("ExecutorManager", () => {
});

sidequestTest("snoozes job when queue is full", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); // No available slots
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); // Unlimited slots
const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4);

// Set up job in claimed state (as it would be when passed to execute)
jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

await executorManager.execute(queryConfig, jobData);

// Verify the job runner was NOT called since the job was snoozed
expect(runMock).not.toHaveBeenCalled();
expect(runMock).toHaveBeenCalled();

// Verify slots remain unchanged (no job was actually executed)
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(Number.MAX_SAFE_INTEGER);
expect(executorManager.totalActiveWorkers()).toEqual(0);
await executorManager.destroy();
});

sidequestTest("snoozes job when global slots are full", async ({ backend }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 5 }); // Queue has slots
const executorManager = new ExecutorManager(backend, 0, 2, 4); // But global max is 0
const executorManager = new ExecutorManager(backend, 0, 2, 4); // But global is unlimited

// Set up job in claimed state
jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

await executorManager.execute(queryConfig, jobData);

// Verify the job runner was NOT called
expect(runMock).not.toHaveBeenCalled();
expect(runMock).toHaveBeenCalled();

// Verify global slots show as full
expect(executorManager.availableSlotsGlobal()).toEqual(0);
expect(executorManager.availableSlotsGlobal()).toEqual(Number.MAX_SAFE_INTEGER);
expect(executorManager.totalActiveWorkers()).toEqual(0);
await executorManager.destroy();
});
Expand All @@ -113,12 +111,12 @@ describe("ExecutorManager", () => {
await executorManager.destroy();
});

sidequestTest("returns zero as min value", async ({ backend, config }) => {
sidequestTest("Number.MAX_SAFE_INTEGER", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 });
const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4);

void executorManager.execute(queryConfig, jobData);
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0);
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(Number.MAX_SAFE_INTEGER);
await executorManager.destroy();
});
});
Expand All @@ -130,12 +128,12 @@ describe("ExecutorManager", () => {
await executorManager.destroy();
});

sidequestTest("returns zero as min value", async ({ backend }) => {
sidequestTest("Number.MAX_SAFE_INTEGER", async ({ backend }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 });
const executorManager = new ExecutorManager(backend, 0, 2, 4);

void executorManager.execute(queryConfig, jobData);
expect(executorManager.availableSlotsGlobal()).toEqual(0);
expect(executorManager.availableSlotsGlobal()).toEqual(Number.MAX_SAFE_INTEGER);
await executorManager.destroy();
});
});
Expand Down
8 changes: 8 additions & 0 deletions packages/engine/src/execution/executor-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export class ExecutorManager {
const activeJobs = this.activeByQueue[queueConfig.name];
const limit = queueConfig.concurrency ?? 10;

if(limit === 0){
return Number.MAX_SAFE_INTEGER;
}

const availableSlots = limit - activeJobs.size;
if (availableSlots < 0) {
return 0;
Expand All @@ -55,6 +59,10 @@ export class ExecutorManager {
*/
availableSlotsGlobal() {
const limit = this.maxConcurrentJobs;
if(limit === 0){
return Number.MAX_SAFE_INTEGER;
}

const availableSlots = limit - this.activeJobs.size;
if (availableSlots < 0) {
return 0;
Expand Down
Loading