Skip to content

Commit 23e16ac

Browse files
elimeltclaudenkaradzhov
authored
fix(sentinel): use mapped address when sentinel failover moved pubsub connections (#3190)
use mapped address when sentinel failover moved pubsub connections cover pubsub mapping on master change clear pre-existing lint errors in changed files Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> Co-authored-by: Nikolay Karadzhov <[email protected]>
1 parent 3058cac commit 23e16ac

2 files changed

Lines changed: 69 additions & 11 deletions

File tree

packages/client/lib/sentinel/index.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ export class RedisSentinelClient<
209209
}
210210

211211
MULTI(): RedisSentinelMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
212+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
212213
return new (this as any).Multi(this);
213214
}
214215

@@ -529,6 +530,7 @@ export default class RedisSentinel<
529530
}
530531

531532
MULTI(): RedisSentinelMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
533+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
532534
return new (this as any).Multi(this);
533535
}
534536

@@ -651,7 +653,7 @@ export default class RedisSentinel<
651653
}
652654
}
653655

654-
class RedisSentinelInternal<
656+
export class RedisSentinelInternal<
655657
M extends RedisModules,
656658
F extends RedisFunctions,
657659
S extends RedisScripts,
@@ -681,9 +683,8 @@ class RedisSentinelInternal<
681683

682684
#anotherReset = false;
683685

684-
#configEpoch: number = 0;
685-
686686
readonly #sentinelSeedNodes: Array<RedisNode>;
687+
687688
#sentinelRootNodes: Array<RedisNode>;
688689
#sentinelClient?: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>;
689690

@@ -868,13 +869,14 @@ class RedisSentinelInternal<
868869

869870
this.#trace("#connect: returning");
870871
return;
871-
} catch (e: any) {
872-
this.#trace(`#connect: exception ${e.message}`);
872+
} catch (e) {
873+
const err = e as Error;
874+
this.#trace(`#connect: exception ${err.message}`);
873875
if (!this.#isReady && count > this.#maxCommandRediscovers) {
874876
throw e;
875877
}
876878

877-
if (e.message !== 'no valid master node') {
879+
if (err.message !== 'no valid master node') {
878880
console.log(e);
879881
}
880882
await setTimeout(1000);
@@ -944,7 +946,7 @@ class RedisSentinelInternal<
944946
return client;
945947
}
946948

947-
async #handlePubSubControlChannel(channel: Buffer, message: Buffer) {
949+
async #handlePubSubControlChannel(channel: Buffer, _message: Buffer) {
948950
this.#trace("pubsub control channel message on " + channel);
949951
this.#reset();
950952
}
@@ -1273,7 +1275,7 @@ class RedisSentinelInternal<
12731275
async transform(analyzed: ReturnType<RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>["analyze"]>) {
12741276
this.#trace("transform: enter");
12751277

1276-
let promises: Array<Promise<any>> = [];
1278+
const promises: Array<Promise<unknown>> = [];
12771279

12781280
if (analyzed.sentinelToOpen) {
12791281
this.#trace(`transform: opening a new sentinel`);
@@ -1359,7 +1361,8 @@ class RedisSentinelInternal<
13591361
}
13601362

13611363
this.#trace(`transform: adding promise to change #pubSubProxy node`);
1362-
masterPromises.push(this.#pubSubProxy.changeNode(analyzed.masterToOpen));
1364+
const mappedPubSubNode = getMappedNode(analyzed.masterToOpen.host, analyzed.masterToOpen.port, this.#nodeAddressMap);
1365+
masterPromises.push(this.#pubSubProxy.changeNode(mappedPubSubNode));
13631366
promises.push(...masterPromises);
13641367
const event: RedisSentinelEvent = {
13651368
type: "MASTER_CHANGE",
@@ -1369,7 +1372,6 @@ class RedisSentinelInternal<
13691372
if (!this.emit('topology-change', event)) {
13701373
this.#trace(`transform: emit for topology-change for master_change returned false`);
13711374
}
1372-
this.#configEpoch++;
13731375
}
13741376

13751377
const replicaCloseSet = new Set<string>();
@@ -1578,7 +1580,7 @@ export class RedisSentinelFactory extends EventEmitter {
15781580
try {
15791581
const masterData = await client.sentinel.sentinelMaster(this.options.name);
15801582

1581-
let master = parseNode(masterData);
1583+
const master = parseNode(masterData);
15821584
if (master === undefined) {
15831585
continue;
15841586
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { strict as assert } from 'node:assert';
2+
import { describe, it, beforeEach, afterEach } from 'mocha';
3+
import sinon from 'sinon';
4+
import RedisClient from '../client';
5+
import { PubSubProxy } from './pub-sub-proxy';
6+
import { RedisSentinelInternal } from './index';
7+
8+
describe('pubsub master change applies nodeAddressMap', () => {
9+
const RAW_HOST = '10.0.0.99';
10+
const RAW_PORT = 6390;
11+
const MAPPED_HOST = 'external.example.com';
12+
const MAPPED_PORT = 16390;
13+
14+
let changeNodeStub: sinon.SinonStub;
15+
let clientConnectStub: sinon.SinonStub;
16+
let internal: RedisSentinelInternal<{}, {}, {}, 2, {}>;
17+
18+
beforeEach(() => {
19+
changeNodeStub = sinon.stub(PubSubProxy.prototype, 'changeNode').resolves();
20+
// The stub only needs to short-circuit the TCP connect; the resolved value is unused.
21+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
22+
clientConnectStub = sinon.stub(RedisClient.prototype, 'connect').resolves(undefined as any);
23+
24+
internal = new RedisSentinelInternal<{}, {}, {}, 2, {}>({
25+
name: 'mymaster',
26+
sentinelRootNodes: [{ host: '127.0.0.1', port: 26379 }],
27+
nodeAddressMap: {
28+
[`${RAW_HOST}:${RAW_PORT}`]: { host: MAPPED_HOST, port: MAPPED_PORT }
29+
}
30+
});
31+
internal.on('error', () => { });
32+
});
33+
34+
afterEach(() => {
35+
changeNodeStub.restore();
36+
clientConnectStub.restore();
37+
});
38+
39+
it('passes the mapped address (not the raw sentinel-reported one) to PubSubProxy.changeNode', async () => {
40+
await internal.transform({
41+
sentinelList: [],
42+
epoch: 0,
43+
sentinelToOpen: undefined,
44+
masterToOpen: { host: RAW_HOST, port: RAW_PORT },
45+
replicasToClose: [],
46+
replicasToOpen: new Map()
47+
});
48+
49+
assert.equal(changeNodeStub.callCount, 1, 'PubSubProxy.changeNode should be called exactly once');
50+
assert.deepEqual(
51+
changeNodeStub.firstCall.args[0],
52+
{ host: MAPPED_HOST, port: MAPPED_PORT },
53+
'pubsub proxy must reconnect to the mapped address after a sentinel failover'
54+
);
55+
});
56+
});

0 commit comments

Comments
 (0)