diff --git a/.prettierignore b/.prettierignore index 9c9af84..f8176b3 100644 --- a/.prettierignore +++ b/.prettierignore @@ -18,7 +18,4 @@ migrations/ *.log *.tmp -packages/docs/.vitepress/cache - -# generated file -CHANGELOG.md \ No newline at end of file +packages/docs/.vitepress/cache \ No newline at end of file diff --git a/packages/docs/jobs/recurring.md b/packages/docs/jobs/recurring.md index dbd688e..80d6a6b 100644 --- a/packages/docs/jobs/recurring.md +++ b/packages/docs/jobs/recurring.md @@ -67,6 +67,6 @@ Sidequest.build(MyJob).schedule("*/5 * * * * *", "foo"); // Every 5 seconds with ## Limitations and Recommendations - **Persistence:** If your application restarts, any scheduled jobs must be re-scheduled via code. (This is by design and similar to other popular job libraries.) -- **Clustering:** In a multi-instance environment, each instance will create its own scheduled jobs unless you coordinate or restrict scheduling to a single node. - To avoid duplicate executions, we recommend enabling job uniqueness with a period window (e.g., “unique per hour” or “unique per minute”). +- **Clustering:** In a multi-instance environment, each instance will create its own scheduled jobs unless you coordinate or restrict scheduling to a single node. + To avoid duplicate executions, we recommend enabling job uniqueness with a period window (e.g., “unique per hour” or “unique per minute”). This ensures that even if multiple nodes schedule the same job, only one will actually run for each interval. diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 331a70b..39ad5ba 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -48,10 +48,6 @@ export interface EngineConfig { minThreads?: number; /** Maximum number of worker threads to use. Defaults to `minThreads * 2` */ maxThreads?: number; - /** Time in milliseconds to wait between dispatcher cycles when no jobs are available. Defaults to 1000 ms */ - idlePollingInterval?: number; - /** Maximum number of jobs to claim from a single queue when concurrency is unlimited. Defaults to 10 */ - maxClaimedJobsByQueue?: number; /** * Default job builder configuration. @@ -131,8 +127,6 @@ export class Engine { cleanupFinishedJobsOlderThan: config?.cleanupFinishedJobsOlderThan ?? 30 * 24 * 60 * 60 * 1000, releaseStaleJobsIntervalMin: config?.releaseStaleJobsIntervalMin ?? 60, maxConcurrentJobs: config?.maxConcurrentJobs ?? 10, - idlePollingInterval: config?.idlePollingInterval ?? 100, - maxClaimedJobsByQueue: config?.maxClaimedJobsByQueue ?? 20, skipMigration: config?.skipMigration ?? false, logger: { level: config?.logger?.level ?? "info", diff --git a/packages/engine/src/execution/dispatcher.test.ts b/packages/engine/src/execution/dispatcher.test.ts index 8ba964b..202bf78 100644 --- a/packages/engine/src/execution/dispatcher.test.ts +++ b/packages/engine/src/execution/dispatcher.test.ts @@ -1,6 +1,6 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; import { Backend } from "@sidequest/backend"; -import { CompletedResult, JobData, QueueConfig } from "@sidequest/core"; +import { CompletedResult, JobData } from "@sidequest/core"; import { EngineConfig } from "../engine"; import { DummyJob } from "../test-jobs/dummy-job"; import { Dispatcher } from "./dispatcher"; @@ -19,7 +19,7 @@ vi.mock("../shared-runner", () => ({ async function createJob(backend: Backend, queue = "default") { const job = new DummyJob(); await job.ready(); - return await backend.createNewJob({ + await backend.createNewJob({ queue: queue, state: "waiting", script: job.script, @@ -59,8 +59,6 @@ describe("Dispatcher", () => { backend, new QueueManager(backend, config.queues!), new ExecutorManager(backend, config.maxConcurrentJobs!, 2, 4), - 100, - 20, ); dispatcher.start(); @@ -76,63 +74,31 @@ describe("Dispatcher", () => { await dispatcher.stop(); }); - sidequestTest( - "claim up to 20 jobs when the max concurrency and job concurrency was unlimited", - async ({ backend }) => { - const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]); + sidequestTest("does not claim job when there is no available slot for the queue", async ({ backend }) => { + await createJob(backend, "noop"); - const dispatcher = new Dispatcher( - backend, - new QueueManager(backend, [{ name: "default", concurrency: 0 }]), - new ExecutorManager(backend, 0, 2, 4), - 100, - 20, - ); - - dispatcher.start(); - - await vi.waitUntil(() => { - return claimSpy.mock.calls.length > 0; - }); - - expect(claimSpy).toHaveBeenCalledWith("default", 20); - - await dispatcher.stop(); - }, - ); - - sidequestTest("breaks queue loop when availableSlots is MAX_SAFE_INTEGER", async ({ backend }) => { - const queue1 = { name: "queue1", concurrency: 0 }; // interpreted as unlimited - const queue2 = { name: "queue2", concurrency: 10 }; - - const queueManager = new QueueManager(backend, [queue1, queue2]); - const executorManager = new ExecutorManager(backend, 0, 2, 4); // global also unlimited - - const dispatcher = new Dispatcher(backend, queueManager, executorManager, 100, 20); - - const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]); - const executeSpy = vi.spyOn(executorManager, "execute"); + expect(await backend.listJobs({ state: "waiting" })).toHaveLength(2); - // queue1 and global are unlimited → MAX_SAFE_INTEGER - vi.spyOn(executorManager, "availableSlotsByQueue").mockImplementation((q) => - q.name === "queue1" ? Number.MAX_SAFE_INTEGER : 10, + const dispatcher = new Dispatcher( + backend, + new QueueManager(backend, config.queues!), + new ExecutorManager(backend, config.maxConcurrentJobs!, 2, 4), ); - vi.spyOn(executorManager, "availableSlotsGlobal").mockReturnValue(Number.MAX_SAFE_INTEGER); - - vi.spyOn(queueManager, "getActiveQueuesWithRunnableJobs").mockResolvedValue([ - queue1 as unknown as QueueConfig, - queue2 as unknown as QueueConfig, - ]); - dispatcher.start(); - await vi.waitUntil(() => claimSpy.mock.calls.length > 0); + runMock.mockImplementationOnce(() => { + return { type: "completed", result: "foo", __is_job_transition__: true } as CompletedResult; + }); - expect(claimSpy).toHaveBeenCalledTimes(1); - expect(claimSpy).toHaveBeenCalledWith("queue1", 20); // capped by safeAvailableSlots - expect(claimSpy).not.toHaveBeenCalledWith("queue2", expect.anything()); + let jobs: JobData[]; - expect(executeSpy).not.toHaveBeenCalled(); + await vi.waitUntil(async () => { + jobs = await backend.listJobs({ state: "waiting" }); + return jobs.length === 1; + }); + + expect(jobs!).toHaveLength(1); + expect(jobs![0].queue).toEqual("noop"); await dispatcher.stop(); }); @@ -147,8 +113,6 @@ describe("Dispatcher", () => { backend, new QueueManager(backend, config.queues!), new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4), - 100, - 20, ); dispatcher.start(); @@ -168,49 +132,5 @@ describe("Dispatcher", () => { await dispatcher.stop(); }); - - sidequestTest("does not claim more jobs than queue concurrency allows", async ({ backend }) => { - const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]); - - const dispatcher = new Dispatcher( - backend, - new QueueManager(backend, [{ name: "default", concurrency: 10 }]), - new ExecutorManager(backend, 20, 2, 4), - 100, - 20, - ); - - dispatcher.start(); - - await vi.waitUntil(() => { - return claimSpy.mock.calls.length > 0; - }); - - expect(claimSpy).toHaveBeenCalledWith("default", 10); - - await dispatcher.stop(); - }); - - sidequestTest("does not claim more jobs than global concurrency allows", async ({ backend }) => { - const claimSpy = vi.spyOn(backend, "claimPendingJob").mockResolvedValue([]); - - const dispatcher = new Dispatcher( - backend, - new QueueManager(backend, [{ name: "default", concurrency: 10 }]), - new ExecutorManager(backend, 1, 2, 4), - 100, - 20, - ); - - dispatcher.start(); - - await vi.waitUntil(() => { - return claimSpy.mock.calls.length > 0; - }); - - expect(claimSpy).toHaveBeenCalledWith("default", 1); - - await dispatcher.stop(); - }); }); }); diff --git a/packages/engine/src/execution/dispatcher.ts b/packages/engine/src/execution/dispatcher.ts index e850e9d..2445687 100644 --- a/packages/engine/src/execution/dispatcher.ts +++ b/packages/engine/src/execution/dispatcher.ts @@ -3,6 +3,8 @@ import { JobData, logger } from "@sidequest/core"; import { ExecutorManager } from "./executor-manager"; import { QueueManager } from "./queue-manager"; +const sleepDelay = 100; + /** * Dispatcher for managing job execution and queue polling. */ @@ -20,8 +22,6 @@ export class Dispatcher { private backend: Backend, private queueManager: QueueManager, private executorManager: ExecutorManager, - private idlePollingInterval: number, - private maxClaimedJobsPerCycle: number, ) {} /** @@ -35,24 +35,21 @@ export class Dispatcher { let shouldSleep = true; for (const queue of queues) { - const queueAvailableSlots = this.executorManager.availableSlotsByQueue(queue); - if (queueAvailableSlots <= 0) { + const availableSlots = this.executorManager.availableSlotsByQueue(queue); + if (availableSlots <= 0) { logger("Dispatcher").debug(`Queue ${queue.name} limit reached!`); - await this.sleep(this.idlePollingInterval); + await this.sleep(sleepDelay); continue; } const globalSlots = this.executorManager.availableSlotsGlobal(); if (globalSlots <= 0) { logger("Dispatcher").debug(`Global concurrency limit reached!`); - await this.sleep(this.idlePollingInterval); + await this.sleep(sleepDelay); continue; } - const availableSlots = Math.min(queueAvailableSlots, globalSlots); - const safeAvailableSlots = - Number.MAX_SAFE_INTEGER === availableSlots ? this.maxClaimedJobsPerCycle : availableSlots; - const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, safeAvailableSlots); + const jobs: JobData[] = await this.backend.claimPendingJob(queue.name, availableSlots); if (jobs.length > 0) { // if a job was found on any queue do not sleep @@ -63,14 +60,10 @@ export class Dispatcher { // does not await for job execution. void this.executorManager.execute(queue, job); } - - if (availableSlots === Number.MAX_SAFE_INTEGER) { - break; - } } if (shouldSleep) { - await this.sleep(this.idlePollingInterval); + await this.sleep(sleepDelay); } } } diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index c6e805c..0219c12 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -69,7 +69,7 @@ describe("ExecutorManager", () => { }); sidequestTest("snoozes job when queue is full", async ({ backend, config }) => { - const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); // Unlimited slots + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); // No available slots const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4); // Set up job in claimed state (as it would be when passed to execute) @@ -77,27 +77,29 @@ describe("ExecutorManager", () => { await executorManager.execute(queryConfig, jobData); - expect(runMock).toHaveBeenCalled(); + // Verify the job runner was NOT called since the job was snoozed + expect(runMock).not.toHaveBeenCalled(); // Verify slots remain unchanged (no job was actually executed) - expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(Number.MAX_SAFE_INTEGER); + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0); expect(executorManager.totalActiveWorkers()).toEqual(0); await executorManager.destroy(); }); sidequestTest("snoozes job when global slots are full", async ({ backend }) => { const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 5 }); // Queue has slots - const executorManager = new ExecutorManager(backend, 0, 2, 4); // But global is unlimited + const executorManager = new ExecutorManager(backend, 0, 2, 4); // But global max is 0 // Set up job in claimed state jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); await executorManager.execute(queryConfig, jobData); - expect(runMock).toHaveBeenCalled(); + // Verify the job runner was NOT called + expect(runMock).not.toHaveBeenCalled(); // Verify global slots show as full - expect(executorManager.availableSlotsGlobal()).toEqual(Number.MAX_SAFE_INTEGER); + expect(executorManager.availableSlotsGlobal()).toEqual(0); expect(executorManager.totalActiveWorkers()).toEqual(0); await executorManager.destroy(); }); @@ -111,12 +113,12 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - sidequestTest("Number.MAX_SAFE_INTEGER", async ({ backend, config }) => { + sidequestTest("returns zero as min value", async ({ backend, config }) => { const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); const executorManager = new ExecutorManager(backend, config.maxConcurrentJobs, 2, 4); void executorManager.execute(queryConfig, jobData); - expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(Number.MAX_SAFE_INTEGER); + expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(0); await executorManager.destroy(); }); }); @@ -128,12 +130,12 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); - sidequestTest("Number.MAX_SAFE_INTEGER", async ({ backend }) => { + sidequestTest("returns zero as min value", async ({ backend }) => { const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 0 }); const executorManager = new ExecutorManager(backend, 0, 2, 4); void executorManager.execute(queryConfig, jobData); - expect(executorManager.availableSlotsGlobal()).toEqual(Number.MAX_SAFE_INTEGER); + expect(executorManager.availableSlotsGlobal()).toEqual(0); await executorManager.destroy(); }); }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index 9199f4e..847fa0f 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -42,10 +42,6 @@ export class ExecutorManager { const activeJobs = this.activeByQueue[queueConfig.name]; const limit = queueConfig.concurrency ?? 10; - if (limit === 0) { - return Number.MAX_SAFE_INTEGER; - } - const availableSlots = limit - activeJobs.size; if (availableSlots < 0) { return 0; @@ -59,10 +55,6 @@ export class ExecutorManager { */ availableSlotsGlobal() { const limit = this.maxConcurrentJobs; - if (limit === 0) { - return Number.MAX_SAFE_INTEGER; - } - const availableSlots = limit - this.activeJobs.size; if (availableSlots < 0) { return 0; diff --git a/packages/engine/src/workers/main.ts b/packages/engine/src/workers/main.ts index 7600748..a973719 100644 --- a/packages/engine/src/workers/main.ts +++ b/packages/engine/src/workers/main.ts @@ -34,8 +34,6 @@ export class MainWorker { nonNullConfig.minThreads, nonNullConfig.maxThreads, ), - nonNullConfig.idlePollingInterval, - nonNullConfig.maxClaimedJobsByQueue, ); this.dispatcher.start();