Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/backends/backend-test/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export function testBackend(backendFactory: () => Backend) {
beforeEach(async () => {
setTestBackend(backendFactory());
await backend.migrate();
await backend.truncate();
});

afterEach(async () => {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from "./backends";
export * from "./job";
export * from "./logger";
export * from "./schema";
export * from "./tools";
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/job/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./job";
115 changes: 115 additions & 0 deletions packages/core/src/job/job.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { CompletedResult, RetryResult, SnoozeResult } from "@sidequest/core";
import { Job } from "./job";

export class DummyJob extends Job {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
constructor(..._optional) {
super();
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
run(..._optional) {
return "dummy job";
}
}

describe("job.ts", () => {
beforeEach(() => {
vi.useFakeTimers();
});

afterEach(() => {
vi.restoreAllMocks();
});

it("should expose script and className correctly", async () => {
const job = new DummyJob();
await job.ready();
expect(typeof job.script).toBe("string");
expect(job.className).toBe("DummyJob");
});

it("creates a complete transition", () => {
const job = new DummyJob();
const transition = job.complete("foo bar");
expect(transition.result).toBe("foo bar");
});

it("creates a fail transition", () => {
const job = new DummyJob();
const transition = job.fail("error");
expect(transition.error).toEqual({ message: "error" });
});

it("creates a retry transition", () => {
const job = new DummyJob();
const transition = job.retry("reason", 1000);
expect(transition.error).toEqual({ message: "reason" });
expect(transition.delay).toEqual(1000);
});

it("creates a snooze transition", () => {
const job = new DummyJob();
const transition = job.snooze(1000);
expect(transition.delay).toBe(1000);
});

it("fail/retry should accept an Error object", () => {
const job = new DummyJob();
const error = new Error("fail");
expect(job.fail(error).error.message).toEqual("fail");
expect(job.retry(error).error.message).toEqual("fail");
});

describe("perform", () => {
it("should return CompleteResult if run returns a value", async () => {
class ValueJob extends Job {
run() {
return "abc";
}
}
const job = new ValueJob();
const result = (await job.perform()) as CompletedResult;
expect(result.type).toBe("completed");
expect(result.result).toBe("abc");
});

it("should return the JobResult return by run", async () => {
class TransitionJob extends Job {
run() {
return { __is_job_transition__: true, type: "snooze" } as SnoozeResult;
}
}
const job = new TransitionJob();
const result = (await job.perform()) as SnoozeResult;
expect(result.type).toBe("snooze");
});

it("should return RetryResult if run throws", async () => {
class ErrorJob extends Job {
run() {
throw new Error("fail!");
}
}
const job = new ErrorJob();
const result = (await job.perform()) as RetryResult;
expect(result.type).toBe("retry");
expect(result.error.message).toEqual("fail!");
});

it("should return RetryResult if run unhandled promise", async () => {
class DummyUnhandled extends Job {
run() {
return new Promise(() => {
throw new Error("unhandled error");
});
}
}

const job = new DummyUnhandled();
const result = (await job.perform()) as RetryResult;
expect(result.type).toBe("retry");
expect(result.error.message).toEqual("unhandled error");
});
});
});
19 changes: 5 additions & 14 deletions packages/engine/src/job/job.ts → packages/core/src/job/job.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
import {
CompletedResult,
ErrorData,
FailedResult,
isJobResult,
JobData,
JobResult,
JobState,
logger,
RetryResult,
SnoozeResult,
toErrorData,
UniquenessConfig,
} from "@sidequest/core";
import { access } from "fs/promises";
import { pathToFileURL } from "url";
import { logger } from "../logger";
import { ErrorData, JobData, JobState } from "../schema";
import { toErrorData } from "../tools";
import { CompletedResult, FailedResult, isJobResult, JobResult, RetryResult, SnoozeResult } from "../transitions";
import { UniquenessConfig } from "../uniquiness";

/**
* Type for a job class constructor.
Expand Down
1 change: 1 addition & 0 deletions packages/dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"dependencies": {
"@sidequest/backend": "workspace:*",
"@sidequest/core": "workspace:*",
"@sidequest/engine": "workspace:*",
"ejs": "^3.1.10",
"express": "^5.1.0",
"express-basic-auth": "^1.2.1",
Expand Down
14 changes: 9 additions & 5 deletions packages/dashboard/src/resources/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Backend } from "@sidequest/backend";
import { JobState } from "@sidequest/core";
import { CancelTransition, JobState, RerunTransition, SnoozeTransition } from "@sidequest/core";
import { JobTransitioner } from "@sidequest/engine";
import { Router } from "express";

export function createJobsRouter(backend: Backend) {
Expand Down Expand Up @@ -111,7 +112,11 @@ export function createJobsRouter(backend: Backend) {
const job = await backend?.getJob(jobId);

if (job) {
await backend.updateJob({ id: job.id, available_at: new Date() });
if (job.state === "canceled") {
await JobTransitioner.apply(backend, job, new RerunTransition());
} else {
await JobTransitioner.apply(backend, job, new SnoozeTransition(0));
}
res.header("HX-Trigger", "jobChanged").status(200).end();
} else {
res.status(404).end();
Expand All @@ -123,7 +128,7 @@ export function createJobsRouter(backend: Backend) {
const job = await backend?.getJob(jobId);

if (job) {
await backend.updateJob({ ...job, state: "canceled" });
await JobTransitioner.apply(backend, job, new CancelTransition());
res.header("HX-Trigger", "jobChanged").status(200).end();
} else {
res.status(404).end();
Expand All @@ -135,8 +140,7 @@ export function createJobsRouter(backend: Backend) {
const job = await backend?.getJob(jobId);

if (job) {
const maxAttempts = job.max_attempts === job.attempt ? job.max_attempts + 1 : job.max_attempts;
await backend.updateJob({ ...job, state: "waiting", max_attempts: maxAttempts });
await JobTransitioner.apply(backend, job, new RerunTransition());
res.header("HX-Trigger", "jobChanged").status(200).end();
} else {
res.status(404).end();
Expand Down
4 changes: 3 additions & 1 deletion packages/docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ description: Development guide for Sidequest.js
Sidequest is built as a monorepo with the following packages:

- **`sidequest`** - Main package combining all components
- **`@sidequest/docs`** - Documentation site using Vitepress
- **`@sidequest/core`** - Core functionality, logging, and schema definitions
- **`@sidequest/engine`** - Job processing engine with worker thread management
- **`@sidequest/backend`** - Abstract backend interface
- **`@sidequest/backend-test`** - Test suite for backend implementations
- **`@sidequest/sqlite-backend`** - SQLite backend implementation
- **`@sidequest/postgres-backend`** - PostgreSQL backend implementation
- **`@sidequest/mysql-backend`** - MySQL backend implementation
- **`@sidequest/mongo-backend`** - MongoDB backend implementation
- **`@sidequest/dashboard`** - Web dashboard with Express.js, EJS, and HTMX
- **`@sidequest/cli`** - Command-line interface tools
- **`@sidequest/backend-test`** - Test suite for backend implementations

## Setup

Expand Down
3 changes: 3 additions & 0 deletions packages/docs/engine/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ await Sidequest.start({
maxConcurrentJobs: 50,
minThreads: 4,
maxThreads: 8,
idleWorkerTimeout: 10000, // 10 seconds

// 4. Migration and startup
skipMigration: false,
Expand Down Expand Up @@ -155,6 +156,7 @@ await Sidequest.start({
| `maxConcurrentJobs` | Maximum number of jobs processed simultaneously across all queues | `10` |
| `minThreads` | Minimum number of worker threads to use | Number of CPU cores |
| `maxThreads` | Maximum number of worker threads to use | `minThreads * 2` |
| `idleWorkerTimeout` | Timeout (milliseconds) for idle workers before they are terminated | `10000` (10 seconds) |
| `skipMigration` | Whether to skip database migration on startup | `false` |
| `releaseStaleJobsIntervalMin` | Frequency (minutes) for releasing stale jobs. Set to `false` to disable | `60` |
| `releaseStaleJobsMaxStaleMs` | Maximum age (milliseconds) for a running job to be considered stale | `600000` (10 minutes) |
Expand Down Expand Up @@ -272,6 +274,7 @@ await Sidequest.start({
maxConcurrentJobs: 100,
minThreads: 8,
maxThreads: 16,
idleWorkerTimeout: 30000, // 30 seconds for high throughput
releaseStaleJobsIntervalMin: 30, // More frequent stale job cleanup
cleanupFinishedJobsIntervalMin: 30, // More frequent cleanup
queueDefaults: {
Expand Down
6 changes: 4 additions & 2 deletions packages/engine/src/engine.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Backend, BackendConfig, LazyBackend, MISC_FALLBACK, NewQueueData, QUEUE_FALLBACK } from "@sidequest/backend";
import { configureLogger, logger, LoggerOptions } from "@sidequest/core";
import { configureLogger, JobClassType, logger, LoggerOptions } from "@sidequest/core";
import { ChildProcess, fork } from "child_process";
import { cpus } from "os";
import path from "path";
import { JOB_BUILDER_FALLBACK } from "./job/constants";
import { JobClassType } from "./job/job";
import { JobBuilder, JobBuilderDefaults } from "./job/job-builder";
import { grantQueueConfig, QueueDefaults } from "./queue/grant-queue-config";
import { clearGracefulShutdown, gracefulShutdown } from "./utils/shutdown";
Expand Down Expand Up @@ -41,6 +40,8 @@ export interface EngineConfig {
minThreads?: number;
/** Maximum number of worker threads to use. Defaults to `minThreads * 2` */
maxThreads?: number;
/** Timeout in milliseconds for idle workers before they are terminated. Defaults to 10 seconds */
idleWorkerTimeout?: number;

/**
* Default job builder configuration.
Expand Down Expand Up @@ -128,6 +129,7 @@ export class Engine {
gracefulShutdown: config?.gracefulShutdown ?? true,
minThreads: config?.minThreads ?? cpus().length,
maxThreads: config?.maxThreads ?? cpus().length * 2,
idleWorkerTimeout: config?.idleWorkerTimeout ?? 10_000,
releaseStaleJobsMaxStaleMs: config?.releaseStaleJobsMaxStaleMs ?? MISC_FALLBACK.maxStaleMs, // 10 minutes
releaseStaleJobsMaxClaimedMs: config?.releaseStaleJobsMaxClaimedMs ?? MISC_FALLBACK.maxClaimedMs, // 1 minute
jobDefaults: {
Expand Down
1 change: 0 additions & 1 deletion packages/engine/src/job/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export * from "./constants";
export * from "./job";
export * from "./job-builder";
export * from "./job-transitioner";
Loading
Loading