Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ cluster.get("foo", (err, res) => {
state stabilized after a failover, so adding a delay before resending can prevent a ping pong effect.
- `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node.
- `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`).
- `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`).
- `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`), setting it to a negative value will disable the slots refresh.

### Read-write splitting

Expand Down
46 changes: 24 additions & 22 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
groupSrvRecords,
weightSrvRecords,
getConnectionName,
getNodeKey,

Check warning on line 15 in lib/cluster/index.ts

View workflow job for this annotation

GitHub Actions / build (8.x)

'getNodeKey' is defined but never used

Check warning on line 15 in lib/cluster/index.ts

View workflow job for this annotation

GitHub Actions / build (10.x)

'getNodeKey' is defined but never used

Check warning on line 15 in lib/cluster/index.ts

View workflow job for this annotation

GitHub Actions / build (12.x)

'getNodeKey' is defined but never used

Check warning on line 15 in lib/cluster/index.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

'getNodeKey' is defined but never used

Check warning on line 15 in lib/cluster/index.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

'getNodeKey' is defined but never used

Check warning on line 15 in lib/cluster/index.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'getNodeKey' is defined but never used
} from "./util";
import ClusterSubscriber from "./ClusterSubscriber";
import DelayQueue from "./DelayQueue";
Expand Down Expand Up @@ -70,6 +70,7 @@
private reconnectTimeout: NodeJS.Timer;
private status: ClusterStatus;
private isRefreshing = false;
private _refreshSlotsCacheCallbacks = [];
public isCluster = true;
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
private _groupsIds: { [key: string]: number } = {};
Expand Down Expand Up @@ -168,7 +169,7 @@
}

resetNodesRefreshInterval() {
if (this.slotsTimer) {
if (this.slotsTimer || this.options.slotsRefreshInterval < 0) {
return;
}
const nextRound = () => {
Expand Down Expand Up @@ -284,14 +285,13 @@
this.once("close", closeListener);
this.once("close", this.handleCloseEvent.bind(this));

this.refreshSlotsCache(
function (err) {
if (err && err.message === "Failed to refresh slots cache.") {
Redis.prototype.silentEmit.call(this, "error", err);
this.connectionPool.reset([]);
}
}.bind(this)
);
this.refreshSlotsCache((err) => {
if (err && err.message === "Failed to refresh slots cache.") {
Redis.prototype.silentEmit.call(this, "error", err);
this.connectionPool.reset([]);
}
});

this.subscriber.start();

if (this.options.shardedSubscribers) {
Expand Down Expand Up @@ -522,27 +522,29 @@
* @memberof Cluster
*/
private refreshSlotsCache(callback?: CallbackFunction<void>): void {
if (typeof callback === "function") {
this._refreshSlotsCacheCallbacks.push(callback);
}

if (this.isRefreshing) {
if (typeof callback === "function") {
process.nextTick(callback);
}
return;
}
this.isRefreshing = true;

const _this = this;
const wrapper = function (error?: Error) {
_this.isRefreshing = false;
if (typeof callback === "function") {
const wrapper = (error?: Error) => {
this.isRefreshing = false;
for (const callback of this._refreshSlotsCacheCallbacks) {
callback(error);
}

this._refreshSlotsCacheCallbacks = [];
};

const nodes = shuffle(this.connectionPool.getNodes());

let lastNodeError = null;

function tryNode(index) {
const tryNode = (index: number) => {
if (index === nodes.length) {
const error = new ClusterAllFailedError(
"Failed to refresh slots cache.",
Expand All @@ -553,24 +555,24 @@
const node = nodes[index];
const key = `${node.options.host}:${node.options.port}`;
debug("getting slot cache from %s", key);
_this.getInfoFromNode(node, function (err) {
switch (_this.status) {
this.getInfoFromNode(node, (err) => {
switch (this.status) {
case "close":
case "end":
return wrapper(new Error("Cluster is disconnected."));
case "disconnecting":
return wrapper(new Error("Cluster is disconnecting."));
}
if (err) {
_this.emit("node error", err, key);
this.emit("node error", err, key);
lastNodeError = err;
tryNode(index + 1);
} else {
_this.emit("refresh");
this.emit("refresh");
wrapper();
}
});
}
};

tryNode(0);
}
Expand Down
Loading