Skip to content

Commit 9d5c004

Browse files
committed
fix(ssubscribe): properly resubscribe in case of shard failover in RE TODO cleanup debug logs
1) when RE failover happens, there is a disconnect 2) affected Client reconnects and tries to resubscribe all existing listeners ISSUE #1: CROSSSLOT Error - client was doing ssubscribe ch1 ch2.. chN which, after the failover could result in CROSSSLOT ( naturally, becasuse now some slots could be owned by other shards ) FIX: send one ssubscribe command per channel instead of one ssubscribe for all channels ISSUE #2: MOVED Error - some/all of the channels might be moved somewhere else FIX: 1: propagate the error to the Cluster. 2: Cluster rediscovers topology. 3: Extract all existing subscriptions from all pubsub clients and resubscribe over the new topology. fixes: #2902
1 parent bd11e38 commit 9d5c004

File tree

5 files changed

+105
-13
lines changed

5 files changed

+105
-13
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ export default class RedisCommandsQueue {
320320
listener?: PubSubListener<T>,
321321
returnBuffers?: T
322322
) {
323+
console.log(`CQ::unsubscribe(${channels})`);
323324
const command = this.#pubSub.unsubscribe(type, channels, listener, returnBuffers);
324325
if (!command) return;
325326

@@ -338,6 +339,14 @@ export default class RedisCommandsQueue {
338339
return this.#addPubSubCommand(command);
339340
}
340341

342+
getShardedChannels(): IterableIterator<string> {
343+
return this.#pubSub.getShardedChannels();
344+
}
345+
346+
removeShardedListeners(channel: string): ChannelListeners {
347+
return this.#pubSub.removeShardedListeners(channel);
348+
}
349+
341350
resubscribe(chainId?: symbol) {
342351
const commands = this.#pubSub.resubscribe();
343352
if (!commands.length) return;

packages/client/lib/client/index.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -765,17 +765,17 @@ export default class RedisClient<
765765
}
766766
});
767767
}
768-
768+
769769
if (this.#clientSideCache) {
770770
commands.push({cmd: this.#clientSideCache.trackingOn()});
771771
}
772772

773773
if (this.#options?.emitInvalidate) {
774774
commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']});
775775
}
776-
776+
777777
const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options);
778-
778+
779779
if(maintenanceHandshakeCmd) {
780780
commands.push(maintenanceHandshakeCmd);
781781
};
@@ -818,6 +818,11 @@ export default class RedisClient<
818818
chainId = Symbol('Socket Initiator');
819819

820820
const resubscribePromise = this.#queue.resubscribe(chainId);
821+
resubscribePromise?.catch(error => {
822+
if (error.message && error.message.startsWith('MOVED')) {
823+
this.emit('__MOVED')
824+
}
825+
});
821826
if (resubscribePromise) {
822827
promises.push(resubscribePromise);
823828
}
@@ -1192,6 +1197,14 @@ export default class RedisClient<
11921197

11931198
sUnsubscribe = this.SUNSUBSCRIBE;
11941199

1200+
getShardedChannels(): IterableIterator<string> {
1201+
return this._self.#queue.getShardedChannels();
1202+
}
1203+
1204+
removeShardedListeners(channel: string): ChannelListeners {
1205+
return this._self.#queue.removeShardedListeners(channel);
1206+
}
1207+
11951208
async WATCH(key: RedisVariadicArgument) {
11961209
const reply = await this._self.sendCommand(
11971210
pushVariadicArguments(['WATCH'], key)

packages/client/lib/client/pub-sub.ts

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,25 +323,50 @@ export class PubSub {
323323
}
324324

325325
resubscribe() {
326-
const commands = [];
326+
const commands: PubSubCommand[] = [];
327327
for (const [type, listeners] of Object.entries(this.listeners)) {
328328
if (!listeners.size) continue;
329329

330330
this.#isActive = true;
331+
332+
if(type === PUBSUB_TYPE.SHARDED) {
333+
this.#shardedResubscribe(commands, listeners);
334+
} else {
335+
this.#normalResubscribe(commands, type, listeners);
336+
}
337+
}
338+
339+
return commands;
340+
}
341+
342+
#normalResubscribe(commands: PubSubCommand[], type: string, listeners: PubSubTypeListeners) {
343+
this.#subscribing++;
344+
const callback = () => this.#subscribing--;
345+
commands.push({
346+
args: [
347+
COMMANDS[type as PubSubType].subscribe,
348+
...listeners.keys()
349+
],
350+
channelsCounter: listeners.size,
351+
resolve: callback,
352+
reject: callback
353+
});
354+
}
355+
356+
#shardedResubscribe(commands: PubSubCommand[], listeners: PubSubTypeListeners) {
357+
const callback = () => this.#subscribing--;
358+
for(const channel of listeners.keys()) {
331359
this.#subscribing++;
332-
const callback = () => this.#subscribing--;
333360
commands.push({
334361
args: [
335-
COMMANDS[type as PubSubType].subscribe,
336-
...listeners.keys()
362+
COMMANDS[PUBSUB_TYPE.SHARDED].subscribe,
363+
channel
337364
],
338-
channelsCounter: listeners.size,
365+
channelsCounter: 1,
339366
resolve: callback,
340367
reject: callback
341-
} satisfies PubSubCommand);
368+
})
342369
}
343-
344-
return commands;
345370
}
346371

347372
handleMessageReply(reply: Array<Buffer>): boolean {
@@ -379,6 +404,10 @@ export class PubSub {
379404
return listeners;
380405
}
381406

407+
getShardedChannels(): IterableIterator<string> {
408+
return this.listeners[PUBSUB_TYPE.SHARDED].keys()
409+
}
410+
382411
#emitPubSubMessage(
383412
type: PubSubType,
384413
message: Buffer,

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ export default class RedisClusterSlots<
175175
throw new RootNodesUnavailableError();
176176
}
177177

178+
//TODO investigate what happens with the masters and replicas. It looks like they
178179
#resetSlots() {
179180
this.slots = new Array(RedisClusterSlots.#SLOTS);
180181
this.masters = [];
@@ -185,6 +186,22 @@ export default class RedisClusterSlots<
185186
async #discover(rootNode: RedisClusterClientOptions) {
186187
this.clientSideCache?.clear();
187188
this.clientSideCache?.disable();
189+
190+
191+
const allChannelListeners = new Map<string, ChannelListeners>();
192+
193+
for (const master of this.masters) {
194+
const shardedClient = master.pubSub?.client;
195+
if (!shardedClient) continue;
196+
for (const channel of shardedClient.getShardedChannels()) {
197+
const listeners = shardedClient.removeShardedListeners(channel);
198+
if(allChannelListeners.get(channel)) {
199+
console.warn(`Found existing listeners, will be overwritten...`);
200+
}
201+
allChannelListeners.set(channel, listeners);
202+
}
203+
}
204+
188205
try {
189206
const addressesInUse = new Set<string>(),
190207
promises: Array<Promise<unknown>> = [],
@@ -224,6 +241,7 @@ export default class RedisClusterSlots<
224241
}
225242
}
226243

244+
//Keep only the nodes that are still in use
227245
for (const [address, node] of this.nodeByAddress.entries()) {
228246
if (addressesInUse.has(address)) continue;
229247

@@ -239,6 +257,9 @@ export default class RedisClusterSlots<
239257
this.nodeByAddress.delete(address);
240258
}
241259

260+
this.#emit('__refreshShardedChannels', allChannelListeners);
261+
262+
242263
await Promise.all(promises);
243264
this.clientSideCache?.enable();
244265

@@ -354,6 +375,9 @@ export default class RedisClusterSlots<
354375
.once('ready', () => emit('node-ready', client))
355376
.once('connect', () => emit('node-connect', client))
356377
.once('end', () => emit('node-disconnect', client));
378+
.on('__MOVED', () => {
379+
this.rediscover(client);
380+
})
357381
}
358382

359383
#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
@@ -374,7 +398,9 @@ export default class RedisClusterSlots<
374398

375399
async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
376400
this.#runningRediscoverPromise ??= this.#rediscover(startWith)
377-
.finally(() => this.#runningRediscoverPromise = undefined);
401+
.finally(() => {
402+
this.#runningRediscoverPromise = undefined
403+
});
378404
return this.#runningRediscoverPromise;
379405
}
380406

packages/client/lib/cluster/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { EventEmitter } from 'node:events';
66
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
77
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
88
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
9-
import { PubSubListener } from '../client/pub-sub';
9+
import { ChannelListeners, PubSubListener } from '../client/pub-sub';
1010
import { ErrorReply } from '../errors';
1111
import { RedisTcpSocketOptions } from '../client/socket';
1212
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
@@ -310,6 +310,7 @@ export default class RedisCluster<
310310

311311
this._options = options;
312312
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
313+
this.on('__refreshShardedChannels', this.refreshShardedChannelsSubscriptions.bind(this));
313314

314315
if (options?.commandOptions) {
315316
this._commandOptions = options.commandOptions;
@@ -449,6 +450,7 @@ export default class RedisCluster<
449450
}
450451

451452
if (err.message.startsWith('MOVED')) {
453+
console.log('Cluster::_execute() -> MOVED, rediscover');
452454
await this._slots.rediscover(client);
453455
client = await this._slots.getClient(firstKey, isReadonly);
454456
continue;
@@ -553,6 +555,7 @@ export default class RedisCluster<
553555
firstChannel = Array.isArray(channels) ? channels[0] : channels;
554556
let client = await this._self._slots.getShardedPubSubClient(firstChannel);
555557
for (let i = 0; ; i++) {
558+
console.log(`SSUBSCRIBE ch:${channels} try(${i}`);
556559
try {
557560
return await client.SSUBSCRIBE(channels, listener, bufferMode);
558561
} catch (err) {
@@ -561,6 +564,7 @@ export default class RedisCluster<
561564
}
562565

563566
if (err.message.startsWith('MOVED')) {
567+
console.log(`RedisCluster::SSUBSCRIBE() -> MOVED, rediscover`);
564568
await this._self._slots.rediscover(client);
565569
client = await this._self._slots.getShardedPubSubClient(firstChannel);
566570
continue;
@@ -584,6 +588,17 @@ export default class RedisCluster<
584588
);
585589
}
586590

591+
refreshShardedChannelsSubscriptions(allChannelListeners: Map<string, ChannelListeners>) {
592+
for(const [channel, listeners] of allChannelListeners) {
593+
for(const bufListener of listeners.buffers) {
594+
this.sSubscribe(channel, bufListener, true);
595+
}
596+
for(const strListener of listeners.strings) {
597+
this.sSubscribe(channel, strListener);
598+
}
599+
}
600+
}
601+
587602
sUnsubscribe = this.SUNSUBSCRIBE;
588603

589604
/**

0 commit comments

Comments
 (0)