diff --git a/README.md b/README.md index b6a45b9a..a03da1eb 100644 --- a/README.md +++ b/README.md @@ -1007,7 +1007,7 @@ cluster.get("foo", (err, res) => { state stabilized after a failover, so adding a delay before resending can prevent a ping pong effect. - `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node. - `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`). - - `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`). + - `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`), setting it to a negative value will disable the slots refresh. ### Read-write splitting diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 663edb26..627f2f82 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -70,6 +70,7 @@ class Cluster extends EventEmitter { private reconnectTimeout: NodeJS.Timer; private status: ClusterStatus; private isRefreshing = false; + private _refreshSlotsCacheCallbacks = []; public isCluster = true; private _autoPipelines: Map = new Map(); private _groupsIds: { [key: string]: number } = {}; @@ -168,7 +169,7 @@ class Cluster extends EventEmitter { } resetNodesRefreshInterval() { - if (this.slotsTimer) { + if (this.slotsTimer || this.options.slotsRefreshInterval < 0) { return; } const nextRound = () => { @@ -284,14 +285,13 @@ class Cluster extends EventEmitter { this.once("close", closeListener); this.once("close", this.handleCloseEvent.bind(this)); - this.refreshSlotsCache( - function (err) { - if (err && err.message === "Failed to refresh slots cache.") { - Redis.prototype.silentEmit.call(this, "error", err); - this.connectionPool.reset([]); - } - }.bind(this) - ); + this.refreshSlotsCache((err) => { + if (err && err.message === "Failed to refresh slots cache.") { + Redis.prototype.silentEmit.call(this, "error", err); + this.connectionPool.reset([]); + } + }); + this.subscriber.start(); if (this.options.shardedSubscribers) { @@ -522,27 +522,29 @@ class Cluster extends EventEmitter { * @memberof Cluster */ private refreshSlotsCache(callback?: CallbackFunction): void { + if (typeof callback === "function") { + this._refreshSlotsCacheCallbacks.push(callback); + } + if (this.isRefreshing) { - if (typeof callback === "function") { - process.nextTick(callback); - } return; } this.isRefreshing = true; - const _this = this; - const wrapper = function (error?: Error) { - _this.isRefreshing = false; - if (typeof callback === "function") { + const wrapper = (error?: Error) => { + this.isRefreshing = false; + for (const callback of this._refreshSlotsCacheCallbacks) { callback(error); } + + this._refreshSlotsCacheCallbacks = []; }; const nodes = shuffle(this.connectionPool.getNodes()); let lastNodeError = null; - function tryNode(index) { + const tryNode = (index: number) => { if (index === nodes.length) { const error = new ClusterAllFailedError( "Failed to refresh slots cache.", @@ -553,8 +555,8 @@ class Cluster extends EventEmitter { const node = nodes[index]; const key = `${node.options.host}:${node.options.port}`; debug("getting slot cache from %s", key); - _this.getInfoFromNode(node, function (err) { - switch (_this.status) { + this.getInfoFromNode(node, (err) => { + switch (this.status) { case "close": case "end": return wrapper(new Error("Cluster is disconnected.")); @@ -562,15 +564,15 @@ class Cluster extends EventEmitter { return wrapper(new Error("Cluster is disconnecting.")); } if (err) { - _this.emit("node error", err, key); + this.emit("node error", err, key); lastNodeError = err; tryNode(index + 1); } else { - _this.emit("refresh"); + this.emit("refresh"); wrapper(); } }); - } + }; tryNode(0); }