Skip to content

Commit 9bf5f7b

Browse files
committed
import spubsub scenario tests
1 parent 06fc7b7 commit 9bf5f7b

File tree

9 files changed

+771
-55
lines changed

9 files changed

+771
-55
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,12 +338,8 @@ export default class RedisCommandsQueue {
338338
return this.#addPubSubCommand(command);
339339
}
340340

341-
getShardedChannels(): IterableIterator<string> {
342-
return this.#pubSub.getShardedChannels();
343-
}
344-
345-
removeShardedListeners(channel: string): ChannelListeners {
346-
return this.#pubSub.removeShardedListeners(channel);
341+
removeAllPubSubListeners() {
342+
return this.#pubSub.removeAllListeners();
347343
}
348344

349345
resubscribe(chainId?: symbol) {

packages/client/lib/client/index.ts

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ export default class RedisClient<
820820
const resubscribePromise = this.#queue.resubscribe(chainId);
821821
resubscribePromise?.catch(error => {
822822
if (error.message && error.message.startsWith('MOVED')) {
823-
this.emit('__MOVED')
823+
this.emit('__MOVED', this._self.#queue.removeAllPubSubListeners());
824824
}
825825
});
826826
if (resubscribePromise) {
@@ -1197,14 +1197,6 @@ export default class RedisClient<
11971197

11981198
sUnsubscribe = this.SUNSUBSCRIBE;
11991199

1200-
getShardedChannels(): IterableIterator<string> {
1201-
return this._self.#queue.getShardedChannels();
1202-
}
1203-
1204-
removeShardedListeners(channel: string): ChannelListeners {
1205-
return this._self.#queue.removeShardedListeners(channel);
1206-
}
1207-
12081200
async WATCH(key: RedisVariadicArgument) {
12091201
const reply = await this._self.sendCommand(
12101202
pushVariadicArguments(['WATCH'], key)

packages/client/lib/client/pub-sub.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,20 @@ export class PubSub {
404404
return listeners;
405405
}
406406

407-
getShardedChannels(): IterableIterator<string> {
408-
return this.listeners[PUBSUB_TYPE.SHARDED].keys()
407+
removeAllListeners() {
408+
const result = {
409+
[PUBSUB_TYPE.CHANNELS]: this.listeners[PUBSUB_TYPE.CHANNELS],
410+
[PUBSUB_TYPE.PATTERNS]: this.listeners[PUBSUB_TYPE.PATTERNS],
411+
[PUBSUB_TYPE.SHARDED]: this.listeners[PUBSUB_TYPE.SHARDED]
412+
}
413+
414+
this.#updateIsActive();
415+
416+
this.listeners[PUBSUB_TYPE.CHANNELS] = new Map();
417+
this.listeners[PUBSUB_TYPE.PATTERNS] = new Map();
418+
this.listeners[PUBSUB_TYPE.SHARDED] = new Map();
419+
420+
return result;
409421
}
410422

411423
#emitPubSubMessage(

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { RedisClusterClientOptions, RedisClusterOptions } from '.';
22
import { RootNodesUnavailableError } from '../errors';
33
import RedisClient, { RedisClientOptions, RedisClientType } from '../client';
44
import { EventEmitter } from 'node:stream';
5-
import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pub-sub';
5+
import { ChannelListeners, PUBSUB_TYPE, PubSubListeners, PubSubTypeListeners } from '../client/pub-sub';
66
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
77
import calculateSlot from 'cluster-key-slot';
88
import { RedisSocketOptions } from '../client/socket';
@@ -186,21 +186,6 @@ export default class RedisClusterSlots<
186186
this.clientSideCache?.clear();
187187
this.clientSideCache?.disable();
188188

189-
190-
const allChannelListeners = new Map<string, ChannelListeners>();
191-
192-
for (const master of this.masters) {
193-
const shardedClient = master.pubSub?.client;
194-
if (!shardedClient) continue;
195-
for (const channel of shardedClient.getShardedChannels()) {
196-
const listeners = shardedClient.removeShardedListeners(channel);
197-
if(allChannelListeners.get(channel)) {
198-
console.warn(`Found existing listeners, will be overwritten...`);
199-
}
200-
allChannelListeners.set(channel, listeners);
201-
}
202-
}
203-
204189
try {
205190
const addressesInUse = new Set<string>(),
206191
promises: Array<Promise<unknown>> = [],
@@ -256,9 +241,6 @@ export default class RedisClusterSlots<
256241
this.nodeByAddress.delete(address);
257242
}
258243

259-
this.#emit('__refreshShardedChannels', allChannelListeners);
260-
261-
262244
await Promise.all(promises);
263245
this.clientSideCache?.enable();
264246

@@ -357,26 +339,29 @@ export default class RedisClusterSlots<
357339
const socket =
358340
this.#getNodeAddress(node.address) ??
359341
{ host: node.host, port: node.port, };
360-
const client = Object.freeze({
342+
const clientInfo = Object.freeze({
361343
host: socket.host,
362344
port: socket.port,
363345
});
364346
const emit = this.#emit;
365-
return this.#clientFactory(
347+
const client = this.#clientFactory(
366348
this.#clientOptionsDefaults({
367349
clientSideCache: this.clientSideCache,
368350
RESP: this.#options.RESP,
369351
socket,
370352
readonly,
371353
}))
372-
.on('error', error => emit('node-error', error, client))
373-
.on('reconnecting', () => emit('node-reconnecting', client))
374-
.once('ready', () => emit('node-ready', client))
375-
.once('connect', () => emit('node-connect', client))
376-
.once('end', () => emit('node-disconnect', client));
377-
.on('__MOVED', () => {
378-
this.rediscover(client);
379-
})
354+
.on('error', error => emit('node-error', error, clientInfo))
355+
.on('reconnecting', () => emit('node-reconnecting', clientInfo))
356+
.once('ready', () => emit('node-ready', clientInfo))
357+
.once('connect', () => emit('node-connect', clientInfo))
358+
.once('end', () => emit('node-disconnect', clientInfo))
359+
.on('__MOVED', async (allPubSubListeners: PubSubListeners) => {
360+
await this.rediscover(client);
361+
this.#emit('__resubscribeAllPubSubListeners', allPubSubListeners);
362+
});
363+
364+
return client;
380365
}
381366

382367
#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {

packages/client/lib/cluster/index.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { EventEmitter } from 'node:events';
66
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
77
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
88
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
9-
import { ChannelListeners, PubSubListener } from '../client/pub-sub';
9+
import { PubSubListener, PubSubListeners } from '../client/pub-sub';
1010
import { ErrorReply } from '../errors';
1111
import { RedisTcpSocketOptions } from '../client/socket';
1212
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
@@ -310,7 +310,7 @@ export default class RedisCluster<
310310

311311
this._options = options;
312312
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
313-
this.on('__refreshShardedChannels', this.refreshShardedChannelsSubscriptions.bind(this));
313+
this.on('__resubscribeAllPubSubListeners', this.resubscribeAllPubSubListeners.bind(this));
314314

315315
if (options?.commandOptions) {
316316
this._commandOptions = options.commandOptions;
@@ -585,15 +585,31 @@ export default class RedisCluster<
585585
);
586586
}
587587

588-
refreshShardedChannelsSubscriptions(allChannelListeners: Map<string, ChannelListeners>) {
589-
for(const [channel, listeners] of allChannelListeners) {
590-
for(const bufListener of listeners.buffers) {
588+
resubscribeAllPubSubListeners(allListeners: PubSubListeners) {
589+
for(const [channel, listeners] of allListeners.CHANNELS) {
590+
listeners.buffers.forEach(bufListener => {
591+
this.subscribe(channel, bufListener, true);
592+
});
593+
listeners.strings.forEach(strListener => {
594+
this.subscribe(channel, strListener);
595+
});
596+
};
597+
for (const [channel, listeners] of allListeners.PATTERNS) {
598+
listeners.buffers.forEach(bufListener => {
599+
this.pSubscribe(channel, bufListener, true);
600+
});
601+
listeners.strings.forEach(strListener => {
602+
this.pSubscribe(channel, strListener);
603+
});
604+
};
605+
for (const [channel, listeners] of allListeners.SHARDED) {
606+
listeners.buffers.forEach(bufListener => {
591607
this.sSubscribe(channel, bufListener, true);
592-
}
593-
for(const strListener of listeners.strings) {
608+
});
609+
listeners.strings.forEach(strListener => {
594610
this.sSubscribe(channel, strListener);
595-
}
596-
}
611+
});
612+
};
597613
}
598614

599615
sUnsubscribe = this.SUNSUBSCRIBE;

0 commit comments

Comments
 (0)