Skip to content

Commit a10959a

Browse files
committed
feat: implement staleJobs method for job management across backends
1 parent de1aee1 commit a10959a

File tree

4 files changed

+234
-70
lines changed

4 files changed

+234
-70
lines changed

packages/backends/backend-test/src/base.ts

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,4 +495,211 @@ export default function defineTestSuite() {
495495
expect(allJobs).toHaveLength(0);
496496
});
497497
});
498+
499+
describe("staleJobs", () => {
500+
it("should not find any stale job", async () => {
501+
const job: NewJobData = {
502+
queue: "default",
503+
class: "TestJob",
504+
args: [{ foo: "bar" }],
505+
constructor_args: [{}],
506+
state: "waiting",
507+
script: "test.js",
508+
attempt: 0,
509+
max_attempts: 5,
510+
};
511+
512+
let insertedJob = await backend.createNewJob(job);
513+
514+
insertedJob = await backend.createNewJob(job);
515+
await backend.updateJob({ ...insertedJob, state: "canceled" });
516+
517+
insertedJob = await backend.createNewJob(job);
518+
await backend.updateJob({ ...insertedJob, state: "claimed", claimed_at: new Date() });
519+
520+
insertedJob = await backend.createNewJob(job);
521+
await backend.updateJob({ ...insertedJob, state: "completed" });
522+
523+
insertedJob = await backend.createNewJob(job);
524+
await backend.updateJob({ ...insertedJob, state: "failed" });
525+
526+
insertedJob = await backend.createNewJob(job);
527+
await backend.updateJob({ ...insertedJob, state: "running", attempted_at: new Date(), timeout: 1000000 });
528+
529+
insertedJob = await backend.createNewJob(job);
530+
await backend.updateJob({ ...insertedJob, state: "running", attempted_at: new Date() });
531+
532+
const result = await backend.staleJobs();
533+
expect(result).toHaveLength(0);
534+
});
535+
536+
it("should find claimed stale job", async () => {
537+
const job: NewJobData = {
538+
queue: "default",
539+
class: "TestJob",
540+
args: [{ foo: "bar" }],
541+
constructor_args: [{}],
542+
state: "waiting",
543+
script: "test.js",
544+
attempt: 0,
545+
max_attempts: 5,
546+
};
547+
548+
let insertedJob = await backend.createNewJob(job);
549+
550+
insertedJob = await backend.createNewJob(job);
551+
await backend.updateJob({ ...insertedJob, state: "canceled", claimed_at: new Date(0) });
552+
553+
insertedJob = await backend.createNewJob(job);
554+
await backend.updateJob({ ...insertedJob, state: "claimed", claimed_at: new Date(0) });
555+
556+
insertedJob = await backend.createNewJob(job);
557+
await backend.updateJob({ ...insertedJob, state: "completed", claimed_at: new Date(0) });
558+
559+
insertedJob = await backend.createNewJob(job);
560+
await backend.updateJob({ ...insertedJob, state: "failed", claimed_at: new Date(0) });
561+
562+
insertedJob = await backend.createNewJob(job);
563+
await backend.updateJob({
564+
...insertedJob,
565+
state: "running",
566+
claimed_at: new Date(0),
567+
attempted_at: new Date(),
568+
timeout: 1000000,
569+
});
570+
571+
insertedJob = await backend.createNewJob(job);
572+
await backend.updateJob({ ...insertedJob, state: "running", claimed_at: new Date(0), attempted_at: new Date() });
573+
574+
const result = await backend.staleJobs();
575+
expect(result).toHaveLength(1);
576+
});
577+
578+
it("should find running stale job without timeout", async () => {
579+
const job: NewJobData = {
580+
queue: "default",
581+
class: "TestJob",
582+
args: [{ foo: "bar" }],
583+
constructor_args: [{}],
584+
state: "waiting",
585+
script: "test.js",
586+
attempt: 0,
587+
max_attempts: 5,
588+
};
589+
590+
let insertedJob = await backend.createNewJob(job);
591+
592+
insertedJob = await backend.createNewJob(job);
593+
await backend.updateJob({ ...insertedJob, state: "canceled", attempted_at: new Date(0) });
594+
595+
insertedJob = await backend.createNewJob(job);
596+
await backend.updateJob({ ...insertedJob, state: "claimed", attempted_at: new Date(0) });
597+
598+
insertedJob = await backend.createNewJob(job);
599+
await backend.updateJob({ ...insertedJob, state: "completed", attempted_at: new Date(0) });
600+
601+
insertedJob = await backend.createNewJob(job);
602+
await backend.updateJob({ ...insertedJob, state: "failed", attempted_at: new Date(0) });
603+
604+
insertedJob = await backend.createNewJob(job);
605+
await backend.updateJob({
606+
...insertedJob,
607+
state: "running",
608+
attempted_at: new Date(),
609+
timeout: 1000000,
610+
});
611+
612+
insertedJob = await backend.createNewJob(job);
613+
await backend.updateJob({ ...insertedJob, state: "running", attempted_at: new Date(0) });
614+
615+
const result = await backend.staleJobs();
616+
expect(result).toHaveLength(1);
617+
});
618+
619+
it("should find running stale job with timeout", async () => {
620+
const job: NewJobData = {
621+
queue: "default",
622+
class: "TestJob",
623+
args: [{ foo: "bar" }],
624+
constructor_args: [{}],
625+
state: "waiting",
626+
script: "test.js",
627+
attempt: 0,
628+
max_attempts: 5,
629+
};
630+
631+
let insertedJob = await backend.createNewJob(job);
632+
633+
insertedJob = await backend.createNewJob(job);
634+
await backend.updateJob({ ...insertedJob, state: "canceled", attempted_at: new Date() });
635+
636+
insertedJob = await backend.createNewJob(job);
637+
await backend.updateJob({ ...insertedJob, state: "claimed", attempted_at: new Date() });
638+
639+
insertedJob = await backend.createNewJob(job);
640+
await backend.updateJob({ ...insertedJob, state: "completed", attempted_at: new Date() });
641+
642+
insertedJob = await backend.createNewJob(job);
643+
await backend.updateJob({ ...insertedJob, state: "failed", attempted_at: new Date() });
644+
645+
const now = new Date();
646+
insertedJob = await backend.createNewJob(job);
647+
await backend.updateJob({
648+
...insertedJob,
649+
state: "running",
650+
attempted_at: new Date(now.getTime() - 1000001),
651+
timeout: 1000000,
652+
});
653+
654+
insertedJob = await backend.createNewJob(job);
655+
await backend.updateJob({
656+
...insertedJob,
657+
state: "running",
658+
attempted_at: new Date(now.getTime() - 5000),
659+
timeout: 1000000,
660+
});
661+
662+
insertedJob = await backend.createNewJob(job);
663+
await backend.updateJob({ ...insertedJob, state: "running", attempted_at: new Date() });
664+
665+
const result = await backend.staleJobs();
666+
expect(result).toHaveLength(1);
667+
});
668+
669+
it("should find many stale jobs if ms very low", async () => {
670+
const job: NewJobData = {
671+
queue: "default",
672+
class: "TestJob",
673+
args: [{ foo: "bar" }],
674+
constructor_args: [{}],
675+
state: "waiting",
676+
script: "test.js",
677+
attempt: 0,
678+
max_attempts: 5,
679+
};
680+
681+
let insertedJob = await backend.createNewJob(job);
682+
683+
insertedJob = await backend.createNewJob(job);
684+
await backend.updateJob({ ...insertedJob, state: "canceled" });
685+
686+
insertedJob = await backend.createNewJob(job);
687+
await backend.updateJob({ ...insertedJob, state: "claimed", claimed_at: new Date() });
688+
689+
insertedJob = await backend.createNewJob(job);
690+
await backend.updateJob({ ...insertedJob, state: "completed" });
691+
692+
insertedJob = await backend.createNewJob(job);
693+
await backend.updateJob({ ...insertedJob, state: "failed" });
694+
695+
insertedJob = await backend.createNewJob(job);
696+
await backend.updateJob({ ...insertedJob, state: "running", attempted_at: new Date(), timeout: -1 });
697+
698+
insertedJob = await backend.createNewJob(job);
699+
await backend.updateJob({ ...insertedJob, state: "running", attempted_at: new Date() });
700+
701+
const result = await backend.staleJobs(0, 0);
702+
expect(result).toHaveLength(3);
703+
});
704+
});
498705
}

packages/backends/backend/src/backend.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,33 @@ export abstract class SQLBackend {
131131
};
132132
}): Promise<JobData[]>;
133133

134-
abstract staleJobs(maxStaleMs?: number, maxClaimedMs?: number): Promise<JobData[]>;
134+
async staleJobs(maxStaleMs = 600_000, maxClaimedMs = 60_000): Promise<JobData[]> {
135+
const now = new Date();
136+
const jobs = (await this.knex("sidequest_jobs")
137+
.select("*")
138+
.where((qb) => {
139+
qb.where("state", "claimed")
140+
.andWhereNot("claimed_at", null)
141+
.andWhere("claimed_at", "<", new Date(now.getTime() - maxClaimedMs));
142+
})
143+
.orWhere((qb) => {
144+
qb.where("state", "running").andWhereNot("attempted_at", null);
145+
})) as JobData[];
146+
147+
const parsedJobs = jobs.map(safeParseJobData);
148+
149+
// We filter the running here to account for timeout and different DBs
150+
const filtered = parsedJobs.filter((job) => {
151+
if (job.state === "running" && job.timeout != null) {
152+
return new Date(job.attempted_at!).getTime() < now.getTime() - job.timeout;
153+
}
154+
if (job.state === "running" && job.timeout == null) {
155+
return new Date(job.attempted_at!).getTime() < now.getTime() - maxStaleMs;
156+
}
157+
return true; // already filtered `claimed` by SQL
158+
});
159+
return filtered;
160+
}
135161

136162
async deleteFinishedJobs(cutoffDate: Date): Promise<void> {
137163
await this.knex("sidequest_jobs")

packages/backends/postgres/src/postgres-backend.ts

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -90,35 +90,4 @@ export default class PostgresBackend extends SQLBackend {
9090
const result = await query;
9191
return result as JobData[];
9292
}
93-
94-
async staleJobs(maxStaleMs = 600_000, maxClaimedMs = 60_000): Promise<JobData[]> {
95-
const jobs = await this.knex("sidequest_jobs")
96-
.select("*")
97-
.whereRaw(
98-
`
99-
(
100-
state = 'claimed'
101-
AND claimed_at IS NOT NULL
102-
AND (EXTRACT(EPOCH FROM (NOW() - claimed_at)) * 1000) > ?
103-
)
104-
OR
105-
(
106-
state = 'running'
107-
AND attempted_at IS NOT NULL
108-
AND timeout IS NOT NULL
109-
AND (EXTRACT(EPOCH FROM (NOW() - attempted_at)) * 1000) > timeout
110-
)
111-
OR
112-
(
113-
state = 'running'
114-
AND attempted_at IS NOT NULL
115-
AND timeout IS NULL
116-
AND (EXTRACT(EPOCH FROM (NOW() - attempted_at)) * 1000) > ?
117-
)
118-
`,
119-
[maxClaimedMs, maxStaleMs],
120-
);
121-
122-
return jobs as JobData[];
123-
}
12493
}

packages/backends/sqlite/src/sqlite-backend.ts

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -100,42 +100,4 @@ export default class SqliteBackend extends SQLBackend {
100100
uniqueness_config: safeParse(job.uniqueness_config),
101101
})) as JobData[];
102102
}
103-
104-
async staleJobs(maxStaleMs = 600_000, maxClaimedMs = 60_000): Promise<JobData[]> {
105-
const jobs = (await this.knex("sidequest_jobs")
106-
.select("*")
107-
.whereRaw(
108-
`
109-
(
110-
state = 'claimed'
111-
AND claimed_at IS NOT NULL
112-
AND ((julianday('now') - julianday(claimed_at)) * 24 * 60 * 60 * 1000) > ?
113-
)
114-
OR
115-
(
116-
state = 'running'
117-
AND attempted_at IS NOT NULL
118-
AND timeout IS NOT NULL
119-
AND ((julianday('now') - julianday(attempted_at)) * 24 * 60 * 60 * 1000) > timeout
120-
)
121-
OR
122-
(
123-
state = 'running'
124-
AND attempted_at IS NOT NULL
125-
AND timeout IS NULL
126-
AND ((julianday('now') - julianday(attempted_at)) * 24 * 60 * 60 * 1000) > ?
127-
)
128-
`,
129-
[maxClaimedMs, maxStaleMs],
130-
)) as JobData[];
131-
132-
return jobs.map((job) => ({
133-
...job,
134-
args: safeParse(job.args),
135-
constructor_args: safeParse(job.constructor_args),
136-
result: safeParse(job.result),
137-
errors: safeParse(job.errors),
138-
uniqueness_config: safeParse(job.uniqueness_config),
139-
})) as JobData[];
140-
}
141103
}

0 commit comments

Comments
 (0)