Skip to content

Commit 818829b

Browse files
committed
refactor(BaseRedis): group as constructor param and cleanup subscribers
1 parent 8bdfe11 commit 818829b

File tree

2 files changed

+22
-12
lines changed

2 files changed

+22
-12
lines changed

packages/brokers/src/brokers/Broker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ export interface IBaseBroker<TEvents extends Record<string, any>> {
4444
/**
4545
* Subscribes to the given events, grouping them by the given group name
4646
*/
47-
subscribe(group: string, events: (keyof TEvents)[]): Promise<void>;
47+
subscribe(events: (keyof TEvents)[]): Promise<void>;
4848
/**
4949
* Unsubscribes from the given events - it's required to pass the same group name as when subscribing for proper cleanup
5050
*/
51-
unsubscribe(group: string, events: (keyof TEvents)[]): Promise<void>;
51+
unsubscribe(events: (keyof TEvents)[]): Promise<void>;
5252
}
5353

5454
export interface IPubSubBroker<TEvents extends Record<string, any>>

packages/brokers/src/brokers/redis/BaseRedis.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,19 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
2323
* How long to block for messages when polling
2424
*/
2525
blockTimeout?: number;
26+
27+
/**
28+
* Consumer group name to use for this broker
29+
*
30+
* @see {@link https://redis.io/commands/xreadgroup/}
31+
*/
32+
group: string;
33+
2634
/**
2735
* Max number of messages to poll at once
2836
*/
2937
maxChunk?: number;
38+
3039
/**
3140
* Unique consumer name.
3241
*
@@ -43,7 +52,7 @@ export const DefaultRedisBrokerOptions = {
4352
name: randomBytes(20).toString('hex'),
4453
maxChunk: 10,
4554
blockTimeout: 5_000,
46-
} as const satisfies Required<RedisBrokerOptions>;
55+
} as const satisfies Required<Omit<RedisBrokerOptions, 'group'>>;
4756

4857
/**
4958
* Helper class with shared Redis logic
@@ -93,32 +102,32 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
93102
/**
94103
* {@inheritDoc IBaseBroker.subscribe}
95104
*/
96-
public async subscribe(group: string, events: (keyof TEvents)[]): Promise<void> {
105+
public async subscribe(events: (keyof TEvents)[]): Promise<void> {
97106
await Promise.all(
98107
// @ts-expect-error: Intended
99108
events.map(async (event) => {
100109
this.subscribedEvents.add(event as string);
101110
try {
102-
return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM');
111+
return await this.redisClient.xgroup('CREATE', event as string, this.options.group, 0, 'MKSTREAM');
103112
} catch (error) {
104113
if (!(error instanceof ReplyError)) {
105114
throw error;
106115
}
107116
}
108117
}),
109118
);
110-
void this.listen(group);
119+
void this.listen();
111120
}
112121

113122
/**
114123
* {@inheritDoc IBaseBroker.unsubscribe}
115124
*/
116-
public async unsubscribe(group: string, events: (keyof TEvents)[]): Promise<void> {
125+
public async unsubscribe(events: (keyof TEvents)[]): Promise<void> {
117126
const commands: unknown[][] = Array.from({ length: events.length * 2 });
118127
for (let idx = 0; idx < commands.length; idx += 2) {
119128
const event = events[idx / 2];
120-
commands[idx] = ['xgroup', 'delconsumer', event as string, group, this.options.name];
121-
commands[idx + 1] = ['xcleangroup', event as string, group];
129+
commands[idx] = ['xgroup', 'delconsumer', event as string, this.options.group, this.options.name];
130+
commands[idx + 1] = ['xcleangroup', event as string, this.options.group];
122131
}
123132

124133
await this.redisClient.pipeline(commands).exec();
@@ -131,7 +140,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
131140
/**
132141
* Begins polling for events, firing them to {@link BaseRedisBroker.listen}
133142
*/
134-
protected async listen(group: string): Promise<void> {
143+
protected async listen(): Promise<void> {
135144
if (this.listening) {
136145
return;
137146
}
@@ -142,7 +151,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
142151
try {
143152
const data = await this.streamReadClient.xreadgroupBuffer(
144153
'GROUP',
145-
group,
154+
this.options.group,
146155
this.options.name,
147156
'COUNT',
148157
String(this.options.maxChunk),
@@ -169,7 +178,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
169178
continue;
170179
}
171180

172-
this.emitEvent(id, group, event.toString('utf8'), this.options.decode(data));
181+
this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data));
173182
}
174183
}
175184
} catch (error) {
@@ -185,6 +194,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
185194
* Destroys the broker, closing all connections
186195
*/
187196
public async destroy() {
197+
await this.unsubscribe(Array.from(this.subscribedEvents));
188198
this.streamReadClient.disconnect();
189199
this.redisClient.disconnect();
190200
this.removeAllListeners();

0 commit comments

Comments
 (0)