Skip to content

Commit 225bee3

Browse files
committed
feat: moving args to run function
1 parent 2ed74fa commit 225bee3

File tree

8 files changed

+54
-29
lines changed

8 files changed

+54
-29
lines changed

migrations/postgres/2_create_jobs_table.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ exports.up = async function(knex) {
88
table.string('class').notNullable();
99
table.jsonb('args').notNullable();
1010

11+
table.integer('timeout').nullable();
1112
table.integer('attempt').notNullable().defaultTo(0);
1213
table.integer('max_attempts').notNullable().defaultTo(5);
1314
table.specificType('result', 'jsonb').notNullable().defaultTo('{}');

src/backends/postgres/postgres-backend.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ export class PostgresBackend implements Backend{
4343
queue: job.queue,
4444
class: job.class,
4545
script: job.script,
46-
args: this.knex.raw('?', [JSON.stringify(job.args)])
46+
args: this.knex.raw('?', [JSON.stringify(job.args)]),
47+
timeout: job.timeout,
4748
}
4849

4950
const inserted = await this.knex('sidequest_jobs').insert(data).returning('*');
@@ -84,6 +85,7 @@ export class PostgresBackend implements Backend{
8485
state: job.state,
8586
script: job.script,
8687
class: job.class,
88+
timeout: job.timeout,
8789
attempt: job.attempt,
8890
max_attempts: job.max_attempts,
8991
errors: job.errors,

src/core/job.ts

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,14 @@ export abstract class Job {
2323
this.timeout = options.timeout;
2424
}
2525

26-
abstract run(): any | Promise<any>;
26+
abstract run(...args: any[]): any | Promise<any>;
27+
28+
static config(this: { new (...args: any[]): Job }, jobOptions: JobOptions){
29+
return new JobBuilder(this).config(jobOptions);
30+
}
2731

2832
static enqueue(this: { new (...args: any[]): Job }, ...args: any[]): JobData | Promise<JobData> {
29-
const job = new this(...args);
30-
const backend = Sidequest.getBackend();
31-
const jobData: JobData = {
32-
queue: job.queue,
33-
script: job.script,
34-
class: job.class,
35-
args: args,
36-
attempt: 0,
37-
max_attempts: 5
38-
}
39-
return backend.insertJob(jobData);
33+
return new JobBuilder(this).enqueue(args);
4034
}
4135
}
4236

@@ -52,4 +46,36 @@ function buildPath() {
5246
}
5347

5448
throw new Error('Could not determine the task path');
49+
}
50+
51+
class JobBuilder{
52+
JobClass: new (...args: any[]) => Job;
53+
job?: Job;
54+
55+
constructor(JobClass: { new (...args: any[]): Job }){
56+
this.JobClass = JobClass;
57+
}
58+
59+
config(options: JobOptions){
60+
this.job = new this.JobClass(options);
61+
return this;
62+
}
63+
64+
enqueue(...args: any[]){
65+
if(!this.job){
66+
this.job = new this.JobClass({ queue: 'default' });
67+
}
68+
69+
const backend = Sidequest.getBackend();
70+
const jobData: JobData = {
71+
queue: this.job.queue,
72+
script: this.job.script,
73+
class: this.job.class,
74+
args: args,
75+
attempt: 0,
76+
max_attempts: 5,
77+
timeout: this.job.timeout
78+
}
79+
return backend.insertJob(jobData);
80+
}
5581
}

src/core/schema/job-data.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export type JobData = {
77
script: string;
88
class: string;
99
args: any[];
10+
timeout?: number;
1011
attempt: number;
1112
max_attempts: number;
1213
result?: any;

src/test-jobs/dynamic-dummy-job.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
import { Job } from '../sidequest';
22

33
export class DynamicDummyJob extends Job {
4-
constructor(queue: string){
5-
super({
6-
queue: queue
7-
})
8-
}
9-
104
async run(): Promise<any> {
115
await new Promise((r) => { setTimeout(r, 800)})
126
return 'dummy job';

src/workers/executor.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ describe('executror.ts', ()=>{
1717
run: async () => 'ok'
1818
};
1919

20-
const result = await executeTask(job as any);
20+
const result = await executeTask(job as any, []);
2121
assert.equal(result, 'ok');
2222
});
2323

@@ -27,7 +27,7 @@ describe('executror.ts', ()=>{
2727
};
2828

2929
try {
30-
await executeTask(job as any);
30+
await executeTask(job as any, []);
3131
assert.fail('Expected error, but promise resolved');
3232
} catch (err: any) {
3333
assert.instanceOf(err, Error);
@@ -43,7 +43,7 @@ describe('executror.ts', ()=>{
4343
};
4444

4545
try {
46-
await executeTask(job as any);
46+
await executeTask(job as any, []);
4747
assert.fail('Expected timeout, but promise resolved');
4848
} catch (err: any) {
4949
assert.instanceOf(err, Error);

src/workers/executor.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ export async function execute(jobData: JobData, config: SidequestConfig): Promis
1515
}
1616

1717
const args = jobData.args;
18-
const job: Job = new JobClass(...args);
18+
const job: Job = new JobClass({ queue: jobData.queue, timeout: jobData.timeout });
1919

2020
jobData = await JobActions.setRunning(jobData);
2121
try {
2222
logger().info(`Running job ${jobData.class} with args: ${JSON.stringify(jobData.args)}`);
23-
const result = await executeTask(job);
23+
const result = await executeTask(job, args);
2424
jobData = await JobActions.setComplete(jobData, result);
2525
logger().info(`Job ${jobData.class} has completed with args: ${JSON.stringify(jobData.args)}`);
2626
} catch (error: any){
@@ -29,9 +29,10 @@ export async function execute(jobData: JobData, config: SidequestConfig): Promis
2929
}
3030
}
3131

32-
export function executeTask(job: Job){
32+
export function executeTask(job: Job, args: any[]){
3333
const promises: Promise<any>[] = [];
3434

35+
console.log(job)
3536
if(job.timeout){
3637
const timeout = new Promise((resolve, reject) => setTimeout(()=> {
3738
reject(new Error(`Job ${job.class} timed out: ${ JSON.stringify(job) }`));
@@ -41,7 +42,7 @@ export function executeTask(job: Job){
4142

4243
const run = new Promise(async (resolve, reject) => {
4344
try {
44-
const result = await job.run();
45+
const result = await job.run(...args);
4546
resolve(result);
4647
} catch(error: any){
4748
reject(error);

src/workers/main.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ describe('main.ts', ()=> {
7777

7878
await worker.run(config);
7979

80-
await DynamicDummyJob.enqueue(lowQueueName);
81-
await DynamicDummyJob.enqueue(mediumQueueName);
82-
await DynamicDummyJob.enqueue(highQueueName);
80+
await DynamicDummyJob.config({queue: lowQueueName }).enqueue();
81+
await DynamicDummyJob.config({queue: mediumQueueName }).enqueue();
82+
await DynamicDummyJob.config({queue: highQueueName }).enqueue();
8383

8484
const executed: JobData[] = [];
8585

0 commit comments

Comments
 (0)