Skip to content

Commit e15231e

Browse files
jtomaszewskiclaude
andauthored
feat: Add exponential backoff retry with DLQ support (Vibe Kanban) (#34)
* All tasks are complete. Here's a summary of the changes made: ## Summary of Changes ### New Features 1. **Exponential Backoff Retry Strategy** - Added `RetryStrategy` type: `(retryCount: number) => number` - Default strategy: exponential backoff with 1s base, 60s max - Customizable via `retryStrategy` option per event 2. **Retry Counter & DLQ** - Added `retryCount` field to track retry attempts - Added `status` field (`'pending' | 'dlq'`) - Events move to DLQ after `maxRetries` (default: 5) failures ### Naming Improvements - `expiresAtTTL` → `retentionPeriod` - `retryEveryMilliseconds` → `pollingInterval` - `maxOutboxTransportEventPerRetry` → `maxEventsPerPoll` - `maxExecutionTimeTTL` → `maxExecutionTime` - `readyToRetryAfterTTL` → replaced by `retryStrategy` ### Files Modified - **Core**: `outbox-transport-event.interface.ts`, `outbox.module-definition.ts`, `retryable-outbox-event.poller.ts`, `transactional-event-emitter.ts` - **MikroORM**: Model, driver, and new migration `MigrationOutbox1734900000.ts` - **TypeORM**: Model, driver, and new migration `MigrationOutbox1734900000.ts` - **Tests**: All unit and integration tests updated - **Documentation**: All README files updated ### Build & Tests - Build passes for all packages - 58 unit tests pass * Good question. Let me think through the options: | Status | Pros | Cons | |--------|------|------| | `failed` | Clear meaning, standard term | Might imply permanent failure (could be retried manually?) | | `dlq` | Industry standard term (Dead Letter Queue) | Abbreviation, less readable | | `dead` | Short, clear | Might sound too dramatic | | `exhausted` | Describes what happened (retries exhausted) | Longer, less common | | `abandoned` | Clear meaning | Implies we gave up, not that it failed | My recommendation: **`dead`** or keep **`failed`** - `dead` is concise and matches "Dead Letter Queue" concept - `failed` is clearer for developers who don't know DLQ terminology `cancelled` doesn't fit well because the event wasn't cancelled - it failed after max retries. What's your preference? I can quickly rename if you'd like to use `dead` instead. * fix(test): convert bigint to number in attemptAt assertion 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: increase default maxRetries from 5 to 10 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 74ef0ae commit e15231e

36 files changed

Lines changed: 500 additions & 320 deletions

packages/core/README.md

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,16 @@ import {
143143
{
144144
name: OrderCreatedEvent.name,
145145
listeners: {
146-
expiresAtTTL: 1000 * 60 * 60 * 24, // 24 hours
147-
maxExecutionTimeTTL: 1000 * 15, // 15 seconds
148-
readyToRetryAfterTTL: 10000, // 10 seconds
146+
retentionPeriod: 1000 * 60 * 60 * 24, // 24 hours - how long events are retained
147+
maxExecutionTime: 1000 * 15, // 15 seconds - max listener execution time
148+
maxRetries: 5, // max retry attempts before DLQ
149+
retryStrategy: (retryCount) => // optional: custom exponential backoff
150+
Math.min(1000 * Math.pow(2, retryCount), 60_000),
149151
},
150152
},
151153
],
152-
retryEveryMilliseconds: 30_000,
153-
maxOutboxTransportEventPerRetry: 10,
154+
pollingInterval: 30_000, // 30 seconds - polling frequency
155+
maxEventsPerPoll: 10, // batch size per poll
154156
}),
155157
inject: [DataSource],
156158
}),
@@ -166,18 +168,19 @@ export class AppModule {}
166168
| Option | Description |
167169
|--------|-------------|
168170
| `name` | Event class name |
169-
| `listeners.expiresAtTTL` | How long events are retained and retried (ms) |
170-
| `listeners.maxExecutionTimeTTL` | Max listener execution time before retry (ms) |
171-
| `listeners.readyToRetryAfterTTL` | Delay before retrying failed events (ms) |
171+
| `listeners.retentionPeriod` | How long events are retained before expiring (ms) |
172+
| `listeners.maxExecutionTime` | Max listener execution time before timeout (ms) |
173+
| `listeners.maxRetries` | Max retry attempts before moving to DLQ (default: 10) |
174+
| `listeners.retryStrategy` | Optional function `(retryCount) => delayMs` for custom backoff. Default: exponential backoff with 1s base, 60s max |
172175

173176
### Module Options
174177

175178
| Option | Description |
176179
|--------|-------------|
177180
| `driverFactory` | Database driver factory instance |
178181
| `events` | Array of event configurations |
179-
| `retryEveryMilliseconds` | Polling interval for retry mechanism |
180-
| `maxOutboxTransportEventPerRetry` | Batch size per polling cycle |
182+
| `pollingInterval` | How often to poll for pending events (ms) |
183+
| `maxEventsPerPoll` | Batch size per polling cycle |
181184
| `isGlobal` | Register module globally (optional) |
182185
| `enableDefaultMiddlewares` | Enable default middlewares like LoggerMiddleware (default: `true`) |
183186
| `middlewares` | Array of custom middleware classes |
@@ -214,8 +217,8 @@ import { MikroORMDatabaseDriverFactory } from '@fullstackhouse/nestjs-outbox-mik
214217
useFactory: (orm: MikroORM) => ({
215218
driverFactory: new MikroORMDatabaseDriverFactory(orm),
216219
events: [/* ... */],
217-
retryEveryMilliseconds: 30_000,
218-
maxOutboxTransportEventPerRetry: 10,
220+
pollingInterval: 30_000,
221+
maxEventsPerPoll: 10,
219222
}),
220223
inject: [MikroORM],
221224
}),
@@ -333,8 +336,8 @@ import { MikroORMDatabaseDriverFactory } from '@fullstackhouse/nestjs-outbox-mik
333336
useFactory: (orm: MikroORM) => ({
334337
driverFactory: new MikroORMDatabaseDriverFactory(orm),
335338
events: [/* ... */],
336-
retryEveryMilliseconds: 30_000,
337-
maxOutboxTransportEventPerRetry: 10,
339+
pollingInterval: 30_000,
340+
maxEventsPerPoll: 10,
338341
}),
339342
inject: [MikroORM],
340343
middlewares: [LoggingMiddleware, SentryMiddleware],

packages/core/src/driver/database.driver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { OutboxTransportEvent } from '../model/outbox-transport-event.interface'
22
import { DatabaseDriverPersister } from './database.driver-persister';
33

44
export interface DatabaseDriver extends DatabaseDriverPersister {
5-
createOutboxTransportEvent(eventName: string, eventPayload: any, expireAt: number, readyToRetryAfter: number | null): OutboxTransportEvent;
5+
createOutboxTransportEvent(eventName: string, eventPayload: any, expireAt: number, attemptAt: number | null): OutboxTransportEvent;
66
findAndExtendReadyToRetryEvents(limit: number): Promise<OutboxTransportEvent[]>;
77
findPendingEvents(limit: number): Promise<OutboxTransportEvent[]>;
88
}

packages/core/src/emitter/transactional-event-emitter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export class TransactionalEventEmitter {
5555
const outboxTransportEvent = databaseDriver.createOutboxTransportEvent(
5656
processedEvent.name,
5757
processedEvent,
58-
currentTimestamp + eventOptions.listeners.expiresAtTTL,
58+
currentTimestamp + eventOptions.listeners.retentionPeriod,
5959
currentTimestamp,
6060
);
6161
const persister = customDatabaseDriverPersister ?? databaseDriver;
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
export type OutboxEventStatus = 'pending' | 'failed';
2+
13
export interface OutboxTransportEvent {
24
id: number;
35
eventName: string;
46
eventPayload: any;
57
deliveredToListeners: string[];
6-
readyToRetryAfter: number | null;
8+
attemptAt: number | null;
9+
retryCount: number;
10+
status: OutboxEventStatus;
711
expireAt: number;
812
insertedAt: number;
913
}

packages/core/src/outbox.module-definition.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,29 @@ import { ConfigurableModuleBuilder, Type } from '@nestjs/common';
22
import { DatabaseDriverFactory } from './driver/database-driver.factory';
33
import { OutboxMiddleware } from './middleware/outbox-middleware.interface';
44

5+
export type RetryStrategy = (retryCount: number) => number;
6+
7+
export const defaultRetryStrategy: RetryStrategy = (retryCount: number) => {
8+
const baseDelayMs = 1000;
9+
const maxDelayMs = 60_000;
10+
const delay = Math.min(baseDelayMs * Math.pow(2, retryCount), maxDelayMs);
11+
return delay;
12+
};
13+
514
export interface OutboxModuleEventOptions {
615
name: string;
716
listeners: {
8-
expiresAtTTL: number;
9-
readyToRetryAfterTTL: number;
10-
maxExecutionTimeTTL: number;
17+
retentionPeriod: number;
18+
retryStrategy?: RetryStrategy;
19+
maxRetries?: number;
20+
maxExecutionTime: number;
1121
};
1222
}
1323

1424
export interface OutboxModuleOptions {
1525
events: OutboxModuleEventOptions[];
16-
retryEveryMilliseconds: number;
17-
maxOutboxTransportEventPerRetry: number;
26+
pollingInterval: number;
27+
maxEventsPerPoll: number;
1828
driverFactory: DatabaseDriverFactory;
1929
}
2030

packages/core/src/poller/retryable-outbox-event.poller.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export class RetryableOutboxEventPoller implements OnModuleInit, OnModuleDestroy
2525
) {}
2626

2727
async onModuleInit() {
28-
this.logger.log(`Poller options: retryEveryMilliseconds: ${this.options.retryEveryMilliseconds}, maxOutboxTransportEventPerRetry: ${this.options.maxOutboxTransportEventPerRetry}, events: ${JSON.stringify(this.options.events)}, driver: ${this.options.driverFactory.constructor.name}`);
28+
this.logger.log(`Poller options: pollingInterval: ${this.options.pollingInterval}, maxEventsPerPoll: ${this.options.maxEventsPerPoll}, events: ${JSON.stringify(this.options.events)}, driver: ${this.options.driverFactory.constructor.name}`);
2929

3030
if (this.eventListener) {
3131
try {
@@ -36,7 +36,7 @@ export class RetryableOutboxEventPoller implements OnModuleInit, OnModuleDestroy
3636
}
3737
}
3838

39-
const pollingSource$ = interval(this.options.retryEveryMilliseconds);
39+
const pollingSource$ = interval(this.options.pollingInterval);
4040
const eventSource$ = this.eventListener?.events$ ?? EMPTY;
4141

4242
this.subscription = merge(pollingSource$, eventSource$)
@@ -84,10 +84,10 @@ export class RetryableOutboxEventPoller implements OnModuleInit, OnModuleDestroy
8484

8585
async poolRetryableEvents() {
8686
try {
87-
const maxOutboxTransportEventPerRetry = this.options.maxOutboxTransportEventPerRetry;
87+
const maxEventsPerPoll = this.options.maxEventsPerPoll;
8888
const databaseDriver = this.databaseDriverFactory.create(this.eventConfigurationResolver);
8989

90-
const readyToRetryEvents = await databaseDriver.findAndExtendReadyToRetryEvents(maxOutboxTransportEventPerRetry);
90+
const readyToRetryEvents = await databaseDriver.findAndExtendReadyToRetryEvents(maxEventsPerPoll);
9191

9292
if (readyToRetryEvents.length === 0) {
9393
return;

packages/core/src/processor/outbox-event.processor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export class OutboxEventProcessor implements OutboxEventProcessorContract {
8686
listenerName: listener.getName(),
8787
hasFailed: true,
8888
});
89-
}, eventOptions.listeners.maxExecutionTimeTTL);
89+
}, eventOptions.listeners.maxExecutionTime);
9090

9191
await this.wrapExecution(context, async () => {
9292
await listener.handle(outboxTransportEvent.eventPayload, outboxTransportEvent.eventName);

packages/core/src/test/unit/event-validator.spec.ts

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,24 @@ import { OutboxModuleOptions } from '../../outbox.module-definition';
44
describe('EventValidator', () => {
55
it('should throw an exception when event names are not unique', () => {
66
const mockOptions: OutboxModuleOptions = {
7-
driverFactory: null,
8-
maxOutboxTransportEventPerRetry: 10,
9-
retryEveryMilliseconds: 1000,
7+
driverFactory: null as any,
8+
maxEventsPerPoll: 10,
9+
pollingInterval: 1000,
1010
events: [
1111
{
1212
name: 'event1',
1313
listeners: {
14-
expiresAtTTL: 1000,
15-
readyToRetryAfterTTL: 1000,
16-
maxExecutionTimeTTL: 1000,
14+
retentionPeriod: 1000,
15+
maxRetries: 5,
16+
maxExecutionTime: 1000,
1717
},
1818
},
1919
{
2020
name: 'event1',
2121
listeners: {
22-
expiresAtTTL: 1000,
23-
readyToRetryAfterTTL: 1000,
24-
maxExecutionTimeTTL: 1000,
22+
retentionPeriod: 1000,
23+
maxRetries: 5,
24+
maxExecutionTime: 1000,
2525
},
2626
},
2727
],
@@ -34,24 +34,24 @@ describe('EventValidator', () => {
3434

3535
it('should not throw an exception when event names are unique', () => {
3636
const mockOptions: OutboxModuleOptions = {
37-
driverFactory: null,
38-
maxOutboxTransportEventPerRetry: 10,
39-
retryEveryMilliseconds: 1000,
37+
driverFactory: null as any,
38+
maxEventsPerPoll: 10,
39+
pollingInterval: 1000,
4040
events: [
4141
{
4242
name: 'event1',
4343
listeners: {
44-
expiresAtTTL: 1000,
45-
readyToRetryAfterTTL: 1000,
46-
maxExecutionTimeTTL: 1000,
44+
retentionPeriod: 1000,
45+
maxRetries: 5,
46+
maxExecutionTime: 1000,
4747
},
4848
},
4949
{
5050
name: 'event2',
5151
listeners: {
52-
expiresAtTTL: 1000,
53-
readyToRetryAfterTTL: 1000,
54-
maxExecutionTimeTTL: 1000,
52+
retentionPeriod: 1000,
53+
maxRetries: 5,
54+
maxExecutionTime: 1000,
5555
},
5656
},
5757
],
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import { DatabaseDriverFactory } from "../../../driver/database-driver.factory";
2+
import { RetryStrategy } from "../../../outbox.module-definition";
23

34
export const createMockedOutboxOptionsFactory = (mockedDriverFactory: DatabaseDriverFactory, events: {
45
name: string,
56
listeners: {
6-
expiresAtTTL: number,
7-
readyToRetryAfterTTL: number,
8-
maxExecutionTimeTTL: number
7+
retentionPeriod: number,
8+
retryStrategy?: RetryStrategy,
9+
maxRetries?: number,
10+
maxExecutionTime: number
911
}
1012
}[]) => ({
1113
driverFactory: mockedDriverFactory,
12-
retryEveryMilliseconds: 1000,
13-
maxOutboxTransportEventPerRetry: 1000,
14+
pollingInterval: 1000,
15+
maxEventsPerPoll: 1000,
1416
events
1517
});

0 commit comments

Comments
 (0)