Skip to content

Commit 02be96c

Browse files
authored
feat(core): throttle PostgreSQL event listener notifications (Vibe Kanban) (#40)
* Done. Now with `throttleTime(100, asyncScheduler, { leading: true, trailing: true })`: | Events arrive at | Poll triggered at | |------------------|-------------------| | 0ms | 0ms (leading) | | 10ms | - | | 20ms | - | | 50ms | - | | 80ms | 100ms (trailing) | If more events arrive after the 100ms window closes, the cycle repeats - first one fires immediately, then trailing fires after another 100ms if there were more. The 100ms value is a reasonable default. Want me to make it configurable via `OutboxModuleOptions`? * Done. Now configurable via `eventListenerThrottleMs`: ```typescript OutboxModule.forRoot({ events: [...], pollingInterval: 5000, maxEventsPerPoll: 100, driverFactory: mikroOrmDriverFactory, eventListenerThrottleMs: 50, // optional, defaults to 100ms }) ```
1 parent b12a5a8 commit 02be96c

2 files changed

Lines changed: 12 additions & 3 deletions

File tree

packages/core/src/outbox.module-definition.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ export interface OutboxModuleOptions {
2626
pollingInterval: number;
2727
maxEventsPerPoll: number;
2828
driverFactory: DatabaseDriverFactory;
29+
/**
30+
* Throttle interval for event listener notifications (ms).
31+
* First event triggers immediately, subsequent events within window are batched.
32+
* @default 100
33+
*/
34+
eventListenerThrottleMs?: number;
2935
}
3036

3137
export const { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN, ASYNC_OPTIONS_TYPE } = new ConfigurableModuleBuilder<OutboxModuleOptions>()

packages/core/src/poller/retryable-outbox-event.poller.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit, Optional } from '@nestjs/common';
2-
import { EMPTY, Subscription, catchError, concatMap, from, interval, merge, repeat } from 'rxjs';
2+
import { EMPTY, Subscription, asyncScheduler, catchError, concatMap, from, interval, merge, repeat, throttleTime } from 'rxjs';
33
import { DATABASE_DRIVER_FACTORY_TOKEN, DatabaseDriverFactory } from '../driver/database-driver.factory';
44
import { TransactionalEventEmitter } from '../emitter/transactional-event-emitter';
55
import { OutboxModuleOptions, MODULE_OPTIONS_TOKEN } from '../outbox.module-definition';
@@ -39,9 +39,12 @@ export class RetryableOutboxEventPoller implements OnModuleInit, OnModuleDestroy
3939
}
4040

4141
const pollingSource$ = interval(this.options.pollingInterval);
42-
const eventSource$ = this.eventListener?.events$ ?? EMPTY;
42+
const throttleMs = this.options.eventListenerThrottleMs ?? 100;
43+
const throttledEventSource$ = (this.eventListener?.events$ ?? EMPTY).pipe(
44+
throttleTime(throttleMs, asyncScheduler, { leading: true, trailing: true }),
45+
);
4346

44-
this.subscription = merge(pollingSource$, eventSource$)
47+
this.subscription = merge(pollingSource$, throttledEventSource$)
4548
.pipe(
4649
concatMap(() => {
4750
if (this.isShuttingDown) {

0 commit comments

Comments
 (0)