Skip to content

Commit 47c7b3a

Browse files
yuvaltmgcrea
authored andcommitted
fix: use JavaScript Date instead of SQL NOW() for timezone consistency
This fixes timezone issues where jobs weren't being dequeued when the application server and database have different timezone settings. Changes: - Use JavaScript Date objects passed as query parameters instead of SQL NOW() - Add AbortController for faster stop() - immediately interrupts polling - Make AbortSignal optional in waitFor() for backwards compatibility - Deprecate alignTimeZone option (no longer needed) Closes #11
1 parent fcc637e commit 47c7b3a

File tree

2 files changed

+113
-72
lines changed

2 files changed

+113
-72
lines changed

src/PrismaQueue.ts

Lines changed: 94 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ import assert from "node:assert";
66
import { PrismaJob } from "./PrismaJob";
77
import type { DatabaseJob, JobCreator, JobPayload, JobResult, JobWorker } from "./types";
88
import {
9+
AbortError,
910
calculateDelay,
1011
debug,
1112
escape,
12-
getCurrentTimeZone,
1313
getTableName,
14-
isValidTimeZone,
1514
serializeError,
1615
uncapitalize,
1716
waitFor,
@@ -27,6 +26,10 @@ export type PrismaQueueOptions = {
2726
modelName?: string;
2827
tableName?: string;
2928
deleteOn?: "success" | "failure" | "always" | "never";
29+
/**
30+
* @deprecated This option is deprecated and will be removed in a future version.
31+
* The queue now uses JavaScript Date objects instead of SQL NOW() to avoid timezone issues.
32+
*/
3033
alignTimeZone?: boolean;
3134
};
3235

@@ -76,6 +79,7 @@ export class PrismaQueue<
7679

7780
private concurrency = 0;
7881
private stopped = true;
82+
private abortController = new AbortController();
7983

8084
/**
8185
* Constructs a PrismaQueue object with specified options and a worker function.
@@ -127,6 +131,14 @@ export class PrismaQueue<
127131
error,
128132
);
129133
});
134+
135+
// Warn about deprecated alignTimeZone option
136+
if (alignTimeZone) {
137+
console.warn(
138+
"[prisma-queue] The alignTimeZone option is deprecated and will be removed in a future version. " +
139+
"The queue now uses JavaScript Date objects instead of SQL NOW() to avoid timezone issues.",
140+
);
141+
}
130142
}
131143

132144
/**
@@ -147,6 +159,8 @@ export class PrismaQueue<
147159
return;
148160
}
149161
this.stopped = false;
162+
// Reset abort controller for new start
163+
this.abortController = new AbortController();
150164
return this.poll();
151165
}
152166

@@ -157,13 +171,10 @@ export class PrismaQueue<
157171
* @param options.timeout - Maximum time in milliseconds to wait for in-flight jobs (default: 30000)
158172
*/
159173
public async stop(options: { timeout?: number } = {}): Promise<void> {
160-
const { pollInterval } = this.config;
161174
const { timeout = 30000 } = options;
162175
debug(`stopping queue named="${this.name}"...`);
163176
this.stopped = true;
164-
165-
// Wait for the polling loop to notice the stop flag
166-
await waitFor(pollInterval);
177+
this.abortController.abort();
167178

168179
// Wait for all in-flight jobs to complete
169180
const checkInterval = 100; // Check every 100ms
@@ -202,34 +213,48 @@ export class PrismaQueue<
202213
debug(`enqueue`, this.name, payloadOrFunction, options);
203214
const { name: queueName, config } = this;
204215
const { key = null, cron = null, maxAttempts = config.maxAttempts, priority = 0, runAt } = options;
216+
const queueJobKey = uncapitalize(this.config.modelName) as "queueJob";
217+
const now = new Date();
205218
const record = await this.#prisma.$transaction(async (client) => {
219+
const model = client[queueJobKey];
206220
const payload =
207221
payloadOrFunction instanceof Function ? await payloadOrFunction(client) : payloadOrFunction;
208-
const data = { queue: queueName, cron, payload, maxAttempts, priority, key };
222+
const data = {
223+
queue: queueName,
224+
cron,
225+
payload,
226+
maxAttempts,
227+
priority,
228+
key,
229+
createdAt: now,
230+
runAt: runAt ?? now,
231+
};
209232
if (key && runAt) {
210-
const { count } = await this.model.deleteMany({
233+
const { count } = await model.deleteMany({
211234
where: {
212235
queue: queueName,
213236
key,
214237
runAt: {
215-
gte: new Date(),
238+
gte: now,
216239
not: runAt,
217240
},
218241
},
219242
});
220243
if (count > 0) {
221244
debug(`deleted ${count} conflicting upcoming queue jobs`);
222245
}
223-
const update = { ...data, ...(runAt ? { runAt } : {}) };
224-
return await this.model.upsert({
246+
return await model.upsert({
225247
where: { key_runAt: { key, runAt } },
226-
create: { ...update },
227-
update,
248+
create: data,
249+
update: data,
228250
});
229251
}
230-
return await this.model.create({ data });
252+
return await model.create({ data });
253+
});
254+
const job = new PrismaJob(record as DatabaseJob<T, U>, {
255+
model: this.model,
256+
client: this.#prisma,
231257
});
232-
const job = new PrismaJob(record as DatabaseJob<T, U>, { model: this.model, client: this.#prisma });
233258
this.emit("enqueue", job);
234259
return job;
235260
}
@@ -259,45 +284,53 @@ export class PrismaQueue<
259284
`polling queue named="${this.name}" with pollInterval=${pollInterval} maxConcurrency=${maxConcurrency}...`,
260285
);
261286

262-
while (!this.stopped) {
263-
// Wait for the queue to be ready
264-
if (this.concurrency >= maxConcurrency) {
265-
await waitFor(pollInterval);
266-
continue;
267-
}
268-
// Query the queue size only when needed to reduce database load.
269-
const queueSize = await this.size(true);
270-
if (queueSize === 0) {
271-
await waitFor(pollInterval);
272-
continue;
273-
}
287+
try {
288+
while (!this.stopped) {
289+
// Wait for the queue to be ready
290+
if (this.concurrency >= maxConcurrency) {
291+
await waitFor(pollInterval, this.abortController.signal);
292+
continue;
293+
}
294+
// Query the queue size only when needed to reduce database load.
295+
const queueSize = await this.size(true);
296+
if (queueSize === 0) {
297+
await waitFor(pollInterval, this.abortController.signal);
298+
continue;
299+
}
274300

275-
// Process available jobs up to concurrency limit
276-
const slotsAvailable = maxConcurrency - this.concurrency;
277-
const jobsToProcess = Math.min(queueSize, slotsAvailable);
301+
// Process available jobs up to concurrency limit
302+
const slotsAvailable = maxConcurrency - this.concurrency;
303+
const jobsToProcess = Math.min(queueSize, slotsAvailable);
278304

279-
for (let i = 0; i < jobsToProcess && !this.stopped; i++) {
280-
debug(`processing job from queue named="${this.name}"...`);
281-
this.concurrency++;
282-
setImmediate(() => {
283-
this.dequeue()
284-
.then((job) => {
285-
if (job) {
286-
debug(`dequeued job({id: ${job.id}, payload: ${JSON.stringify(job.payload)}})`);
287-
}
288-
})
289-
.catch((error: unknown) => {
290-
this.emit("error", error);
291-
})
292-
.finally(() => {
293-
this.concurrency--;
294-
});
295-
});
296-
await waitFor(jobInterval);
297-
}
305+
for (let i = 0; i < jobsToProcess && !this.stopped; i++) {
306+
debug(`processing job from queue named="${this.name}"...`);
307+
this.concurrency++;
308+
setImmediate(() => {
309+
this.dequeue()
310+
.then((job) => {
311+
if (job) {
312+
debug(`dequeued job({id: ${job.id}, payload: ${JSON.stringify(job.payload)}})`);
313+
}
314+
})
315+
.catch((error: unknown) => {
316+
this.emit("error", error);
317+
})
318+
.finally(() => {
319+
this.concurrency--;
320+
});
321+
});
322+
await waitFor(jobInterval, this.abortController.signal);
323+
}
298324

299-
// Wait before checking queue again
300-
await waitFor(jobInterval * 2);
325+
// Wait before checking queue again
326+
await waitFor(jobInterval * 2, this.abortController.signal);
327+
}
328+
} catch (error) {
329+
if (error instanceof AbortError) {
330+
debug(`polling for queue named="${this.name}" was aborted`);
331+
} else {
332+
throw error;
333+
}
301334
}
302335
}
303336

@@ -311,47 +344,39 @@ export class PrismaQueue<
311344
}
312345
debug(`dequeuing from queue named="${this.name}"...`);
313346
const { name: queueName } = this;
314-
const { tableName: tableNameRaw, deleteOn, alignTimeZone } = this.config;
347+
const { tableName: tableNameRaw, deleteOn } = this.config;
315348
const tableName = escape(tableNameRaw);
316349
const queueJobKey = uncapitalize(this.config.modelName) as "queueJob";
350+
const now = new Date();
317351
const job = await this.#prisma.$transaction(
318352
async (client) => {
319-
if (alignTimeZone) {
320-
const [{ TimeZone: dbTimeZone }] =
321-
await client.$queryRawUnsafe<[{ TimeZone: string }]>("SHOW TIME ZONE");
322-
const localTimeZone = getCurrentTimeZone();
323-
if (dbTimeZone !== localTimeZone) {
324-
// Validate timezone to prevent SQL injection
325-
if (!isValidTimeZone(localTimeZone)) {
326-
throw new Error(`Invalid timezone: ${localTimeZone}`);
327-
}
328-
debug(`aligning database timezone from ${dbTimeZone} to ${localTimeZone}!`);
329-
await client.$executeRawUnsafe(`SET LOCAL TIME ZONE '${localTimeZone}';`);
330-
}
331-
}
332353
const rows = await client.$queryRawUnsafe<DatabaseJob<T, U>[]>(
333-
`UPDATE ${tableName} SET "processedAt" = NOW(), "attempts" = "attempts" + 1
354+
`UPDATE ${tableName} SET "processedAt" = $2, "attempts" = "attempts" + 1
334355
WHERE id = (
335356
SELECT id
336357
FROM ${tableName}
337358
WHERE (${tableName}."queue" = $1)
338359
AND (${tableName}."finishedAt" IS NULL)
339-
AND (${tableName}."runAt" < NOW())
340-
AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" < NOW())
360+
AND (${tableName}."runAt" <= $2)
361+
AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" <= $2)
341362
ORDER BY ${tableName}."priority" ASC, ${tableName}."runAt" ASC
342363
FOR UPDATE SKIP LOCKED
343364
LIMIT 1
344365
)
345366
RETURNING *;`,
346367
queueName,
368+
now,
347369
);
348370
if (!rows.length || !rows[0]) {
349371
debug(`no jobs found in queue named="${this.name}"`);
350372
// @NOTE Failed to acquire a lock
351373
return null;
352374
}
353375
const { id, payload, attempts, maxAttempts } = rows[0];
354-
const job = new PrismaJob<T, U>(rows[0], { model: client[queueJobKey], client });
376+
const job = new PrismaJob<T, U>(rows[0], {
377+
model: client[queueJobKey],
378+
client,
379+
});
355380
let result;
356381
try {
357382
debug(`starting worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`);

src/utils/time.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
1-
export const waitFor = async (ms: number) =>
2-
new Promise((resolve) => {
3-
setTimeout(resolve, ms);
1+
export class AbortError extends Error {
2+
constructor(message: string) {
3+
super(message);
4+
this.name = "AbortError";
5+
}
6+
}
7+
8+
export const waitFor = async (ms: number, signal?: AbortSignal): Promise<void> =>
9+
new Promise((resolve, reject) => {
10+
const timeout = setTimeout(() => {
11+
resolve();
12+
}, ms);
13+
14+
if (signal) {
15+
signal.addEventListener("abort", () => {
16+
clearTimeout(timeout);
17+
reject(new AbortError("Aborted"));
18+
});
19+
}
420
});
521

622
export const calculateDelay = (attempts: number): number =>

0 commit comments

Comments
 (0)