Skip to content

Commit 35fe09d

Browse files
authored
feat: add exclusive concurrency controls for workflows and tasks (#15177)
Adds exclusive concurrency controls to prevent race conditions when multiple jobs operate on the same resource. Jobs with the same concurrency key will not run in parallel. ## Key Changes - Added `concurrency` option to workflows and tasks for defining resource keys - Added `enableConcurrencyControl` feature flag (defaults to `false`, will be `true` in 4.0) - Added indexed `concurrencyKey` field to jobs collection when `enableConcurrencyControl` is set to `true` ## Usage Example ```typescript export default buildConfig({ jobs: { enableConcurrencyControl: true, workflows: [{ slug: 'syncDocument', concurrency: ({ input }) => `sync:${input.documentId}`, handler: async ({ job }) => { // Only one job per documentId runs at a time } }] } }) ```
1 parent dd494be commit 35fe09d

File tree

18 files changed

+1101
-16
lines changed

18 files changed

+1101
-16
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ test/media
4646
*payloadtests.db-journal
4747
*payloadtests.db-shm
4848
*payloadtests.db-wal
49+
*payload.db
4950
/versions
5051
no-restrict-file-*
5152

docs/jobs-queue/tasks.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Simply add a task to the `jobs.tasks` array in your Payload config. A task consi
3232
| `onFail` | Function to be executed if the task fails. |
3333
| `onSuccess` | Function to be executed if the task succeeds. |
3434
| `retries` | Specify the number of times that this step should be retried if it fails. If this is undefined, the task will either inherit the retries from the workflow or have no retries. If this is 0, the task will not be retried. By default, this is undefined. |
35+
| `concurrency` | Control how jobs with the same concurrency key are handled. Jobs with the same key will run exclusively (one at a time). Requires `jobs.enableConcurrencyControl: true` to be set. See [Concurrency Controls](/docs/jobs-queue/workflows#concurrency-controls) for details. |
3536

3637
The logic for the Task is defined in the `handler` - which can be defined as a function, or a path to a function. The `handler` will run once a worker picks up a Job that includes this task.
3738

docs/jobs-queue/workflows.mdx

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ To define a JS-based workflow, simply add a workflow to the `jobs.workflows` arr
5656
| `label` | Define a human-friendly label for this workflow. |
5757
| `queue` | Optionally, define the queue name that this workflow should be tied to. Defaults to "default". |
5858
| `retries` | You can define `retries` on the workflow level, which will enforce that the workflow can only fail up to that number of retries. If a task does not have retries specified, it will inherit the retry count as specified on the workflow. You can specify `0` as `workflow` retries, which will disregard all `task` retry specifications and fail the entire workflow on any task failure. You can leave `workflow` retries as undefined, in which case, the workflow will respect what each task dictates as their own retry count. By default this is undefined, meaning workflows retries are defined by their tasks |
59+
| `concurrency` | Control how jobs with the same concurrency key are handled. Jobs with the same key will run exclusively (one at a time). Requires `jobs.enableConcurrencyControl: true` to be set. See [Concurrency Controls](#concurrency-controls) below for details. |
5960

6061
Example:
6162

@@ -341,3 +342,148 @@ handler: async ({ input, req }) => {
341342
}
342343
}
343344
```
345+
346+
### Concurrency Controls
347+
348+
When multiple jobs operate on the same resource, race conditions can occur. For example, if a user creates a document and then quickly updates it, two jobs might be queued that both try to process the same document simultaneously, leading to unexpected results.
349+
350+
The `concurrency` option allows you to prevent this by ensuring that jobs with the same "key" run exclusively (one at a time).
351+
352+
<Banner type="warning">
353+
**Important:** To use concurrency controls, you must first enable them in your
354+
Payload config by setting `jobs.enableConcurrencyControl: true`. This adds an
355+
indexed `concurrencyKey` field to your jobs collection schema and may require
356+
a database migration depending on your database adapter.
357+
</Banner>
358+
359+
#### Enabling Concurrency Controls
360+
361+
First, enable the feature in your Payload config:
362+
363+
```ts
364+
export default buildConfig({
365+
jobs: {
366+
enableConcurrencyControl: true,
367+
// ... your tasks and workflows
368+
},
369+
})
370+
```
371+
372+
Then add the `concurrency` option to your workflow configuration:
373+
374+
```ts
375+
export default buildConfig({
376+
jobs: {
377+
workflows: [
378+
{
379+
slug: 'syncDocument',
380+
inputSchema: [{ name: 'documentId', type: 'text', required: true }],
381+
// Jobs with the same concurrency key run one at a time
382+
concurrency: ({ input }) => `sync:${input.documentId}`,
383+
handler: async ({ job, inlineTask }) => {
384+
await inlineTask('fetch-and-update', {
385+
task: async ({ req }) => {
386+
// This runs exclusively - no other job for the same
387+
// documentId can run at the same time
388+
const doc = await req.payload.findByID({
389+
collection: 'posts',
390+
id: job.input.documentId,
391+
})
392+
393+
await req.payload.update({
394+
collection: 'posts',
395+
id: job.input.documentId,
396+
data: { syncedAt: new Date().toISOString() },
397+
})
398+
399+
return { output: { synced: true } }
400+
},
401+
})
402+
},
403+
},
404+
],
405+
},
406+
})
407+
```
408+
409+
#### How It Works
410+
411+
When you define a `concurrency` key:
412+
413+
1. **When queuing:** The concurrency key is computed from the job's input and stored on the job document.
414+
415+
2. **When running:** The job runner enforces exclusive execution through two mechanisms:
416+
417+
- It first checks which concurrency keys are currently being processed and excludes pending jobs with those keys from the query
418+
- If multiple pending jobs with the same key are picked up in the same batch, only the first one (by creation order) runs - the others are released back to `processing: false` and will be picked up on subsequent runs
419+
420+
3. **Result:** Jobs with the same concurrency key are guaranteed to run sequentially, never in parallel. All jobs are preserved and will eventually complete - they just wait their turn.
421+
422+
#### Concurrency Configuration Options
423+
424+
The `concurrency` option accepts either a function (shorthand) or an object with more options:
425+
426+
**Shorthand (function only):**
427+
428+
```ts
429+
// Exclusive concurrency is enabled by default
430+
concurrency: ({ input }) => `my-key:${input.resourceId}`
431+
```
432+
433+
**Full configuration:**
434+
435+
```ts
436+
concurrency: {
437+
// Function that returns a key to group related jobs
438+
// The queue name is provided to allow for queue-specific keys if needed
439+
key: ({ input, queue }) => `my-key:${input.resourceId}`,
440+
441+
// Only one job with this key can run at a time
442+
// @default true
443+
exclusive: true,
444+
}
445+
```
446+
447+
#### Use Cases
448+
449+
**1. Document processing (prevent race conditions):**
450+
451+
```ts
452+
concurrency: ({ input }) => `process:${input.documentId}`
453+
```
454+
455+
Multiple updates to the same document will be processed one at a time, in the order they were queued.
456+
457+
**2. Per-user operations:**
458+
459+
```ts
460+
concurrency: ({ input }) => `user:${input.userId}`
461+
```
462+
463+
Ensures only one job per user runs at a time, preventing conflicts when a user triggers multiple actions quickly.
464+
465+
**3. External API calls (respect rate limits):**
466+
467+
```ts
468+
concurrency: ({ input }) => `api:${input.resourceId}`
469+
```
470+
471+
Prevents parallel API calls for the same resource, useful when external services don't handle concurrent updates well.
472+
473+
**4. Queue-specific concurrency:**
474+
475+
```ts
476+
concurrency: ({ input, queue }) => `${queue}:process:${input.documentId}`
477+
```
478+
479+
Include the queue name in the key to allow the same resource to be processed concurrently in different queues (e.g., `emails` queue vs `default` queue).
480+
481+
#### Important Considerations
482+
483+
- **Key uniqueness:** The concurrency key should uniquely identify the resource being operated on. Include all relevant identifiers (collection slug, document ID, locale, etc.).
484+
485+
- **Global by default:** By default, concurrency is global across all queues. A job with key `sync:doc1` in the `default` queue will block a job with the same key in the `emails` queue. Include the queue name in your key if you want queue-specific concurrency.
486+
487+
- **No concurrency key = no restrictions:** Jobs without a concurrency configuration run in parallel as before.
488+
489+
- **Pending jobs wait:** Jobs that can't run due to concurrency constraints remain in the queue with `processing: false` and will be picked up on subsequent runs.

packages/payload/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1712,6 +1712,7 @@ export type {
17121712
} from './queues/config/types/taskTypes.js'
17131713
export type {
17141714
BaseJob,
1715+
ConcurrencyConfig,
17151716
JobLog,
17161717
JobTaskStatus,
17171718
RunningJob,

packages/payload/src/queues/config/collection.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ export const getDefaultJobsCollection: (jobsConfig: SanitizedConfig['jobs']) =>
1818
if (jobsConfig.workflows?.length) {
1919
jobsConfig.workflows.forEach((workflow) => {
2020
workflowSlugs.add(workflow.slug)
21+
22+
// Validate concurrency config requires enableConcurrencyControl flag
23+
if (workflow.concurrency && !jobsConfig.enableConcurrencyControl) {
24+
throw new Error(
25+
`Workflow "${workflow.slug}" uses concurrency controls but "jobs.enableConcurrencyControl" is not enabled. ` +
26+
`Set "jobs.enableConcurrencyControl: true" in your Payload config to use concurrency controls. ` +
27+
`Note: This adds a new indexed field to the jobs collection schema and may require a database migration.`,
28+
)
29+
}
2130
})
2231
}
2332

@@ -28,6 +37,16 @@ export const getDefaultJobsCollection: (jobsConfig: SanitizedConfig['jobs']) =>
2837
`Task slug "${task.slug}" is already used by a workflow. No tasks are allowed to have the same slug as a workflow.`,
2938
)
3039
}
40+
41+
// Validate concurrency config requires enableConcurrencyControl flag
42+
if (task.concurrency && !jobsConfig.enableConcurrencyControl) {
43+
throw new Error(
44+
`Task "${task.slug}" uses concurrency controls but "jobs.enableConcurrencyControl" is not enabled. ` +
45+
`Set "jobs.enableConcurrencyControl: true" in your Payload config to use concurrency controls. ` +
46+
`Note: This adds a new indexed field to the jobs collection schema and may require a database migration.`,
47+
)
48+
}
49+
3150
taskSlugs.add(task.slug)
3251
})
3352
}
@@ -215,6 +234,22 @@ export const getDefaultJobsCollection: (jobsConfig: SanitizedConfig['jobs']) =>
215234
defaultValue: false,
216235
index: true,
217236
},
237+
// Only add concurrencyKey field if concurrency control is enabled
238+
...(jobsConfig.enableConcurrencyControl
239+
? [
240+
{
241+
name: 'concurrencyKey',
242+
type: 'text',
243+
admin: {
244+
description:
245+
'Used for concurrency control. Jobs with the same key are subject to exclusive/supersedes rules.',
246+
position: 'sidebar',
247+
readOnly: true,
248+
},
249+
index: true,
250+
} as Field,
251+
]
252+
: []),
218253
],
219254
hooks: {
220255
afterRead: [

packages/payload/src/queues/config/types/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,18 @@ export type JobsConfig = {
146146
* @deprecated - this will be removed in 4.0
147147
*/
148148
depth?: number
149+
/**
150+
* Enable concurrency controls for workflows and tasks.
151+
* When enabled, adds a `concurrencyKey` field to the jobs collection schema.
152+
* This allows workflows and tasks to use the `concurrency` option to prevent race conditions.
153+
*
154+
* **Important:** Enabling this may require a database migration depending on your database adapter,
155+
* as it adds a new indexed field to the jobs collection schema.
156+
*
157+
* @default false
158+
* @todo In 4.0, this will default to `true`.
159+
*/
160+
enableConcurrencyControl?: boolean
149161
/**
150162
* Override any settings on the default Jobs collection. Accepts the default collection and allows you to return
151163
* a new collection.

packages/payload/src/queues/config/types/taskTypes.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type {
77
TypedJobs,
88
} from '../../../index.js'
99
import type { ScheduleConfig } from './index.js'
10-
import type { SingleTaskStatus } from './workflowTypes.js'
10+
import type { ConcurrencyConfig, SingleTaskStatus } from './workflowTypes.js'
1111

1212
export type TaskInputOutput = {
1313
input: object
@@ -218,6 +218,19 @@ export type RetryConfig = {
218218
export type TaskConfig<
219219
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput = TaskType,
220220
> = {
221+
/**
222+
* Job concurrency controls for preventing race conditions.
223+
*
224+
* Can be an object with full options, or a shorthand function that just returns the key
225+
* (in which case exclusive defaults to true).
226+
*/
227+
concurrency?: ConcurrencyConfig<
228+
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
229+
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['input']
230+
: TTaskSlugOrInputOutput extends TaskInputOutput
231+
? TTaskSlugOrInputOutput['input']
232+
: object
233+
>
221234
/**
222235
* The function that should be responsible for running the job.
223236
* You can either pass a string-based path to the job function file, or the job function itself.

packages/payload/src/queues/config/types/workflowTypes.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ export type BaseJob<
4545
TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false,
4646
> = {
4747
completedAt?: null | string
48+
/**
49+
* Used for concurrency control. Jobs with the same key are subject to exclusive/supersedes rules.
50+
*/
51+
concurrencyKey?: null | string
4852
createdAt: string
4953
error?: unknown
5054
hasError?: boolean
@@ -126,9 +130,44 @@ export type JobTaskStatus = {
126130
}
127131
}
128132

133+
/**
134+
* Concurrency configuration for workflows and tasks.
135+
* Controls how jobs with the same concurrency key are handled.
136+
*/
137+
export type ConcurrencyConfig<TInput = object> =
138+
| ((args: { input: TInput; queue: string }) => string)
139+
// Shorthand: key function only, exclusive defaults to true
140+
| {
141+
/**
142+
* Only one job with this key can run at a time.
143+
* Other jobs with the same key remain queued until the running job completes.
144+
* @default true
145+
*/
146+
exclusive?: boolean
147+
/**
148+
* Function that returns a key to group related jobs.
149+
* Jobs with the same key are subject to concurrency rules.
150+
* The queue name is provided to allow for queue-specific concurrency keys if needed.
151+
*/
152+
key: (args: { input: TInput; queue: string }) => string
153+
}
154+
129155
export type WorkflowConfig<
130156
TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false,
131157
> = {
158+
/**
159+
* Job concurrency controls for preventing race conditions.
160+
*
161+
* Can be an object with full options, or a shorthand function that just returns the key
162+
* (in which case exclusive defaults to true).
163+
*/
164+
concurrency?: ConcurrencyConfig<
165+
TWorkflowSlugOrInput extends false
166+
? object
167+
: TWorkflowSlugOrInput extends keyof TypedJobs['workflows']
168+
? TypedJobs['workflows'][TWorkflowSlugOrInput]['input']
169+
: TWorkflowSlugOrInput
170+
>
132171
/**
133172
* You can either pass a string-based path to the workflow function file, or the workflow function itself.
134173
*

0 commit comments

Comments
 (0)