Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->replicaof = NULL;
node->last_in_ping_gossip = 0;
node->ping_sent = node->pong_received = 0;
node->outbound_link_attempt_time = 0;
node->data_received = 0;
node->meet_sent = 0;
node->fail_time = 0;
Expand Down Expand Up @@ -5362,10 +5363,12 @@ static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now) {
return now - node->ctime > getHandshakeTimeout() ? 1 : 0;
}

#define NODE_CONNECTION_RETRIES_PER_TIMEOUT 10

/* Check if the node is disconnected and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) {
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now, long long *cluster_conn_attempts) {
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) return 1;
Expand Down Expand Up @@ -5394,6 +5397,17 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) {
}

if (node->link == NULL) {
mstime_t reconnect_interval = server.cluster_node_timeout / 2;
/* Skip this outbound connection attempt when all these conditions are true:
* 1. No inbound link from the peer exists.
* 2. The back‑off window since the last try is still active
* 3. The node has already exceeded its retry budget for this cron cycle
*/
if (!node->inbound_link && (now - node->outbound_link_attempt_time < reconnect_interval / NODE_CONNECTION_RETRIES_PER_TIMEOUT && *cluster_conn_attempts == 0)) {
return 1;
}
node->outbound_link_attempt_time = now;
(*cluster_conn_attempts)--;
clusterLink *link = createClusterLink(node);
link->conn = connCreate(connTypeOfCluster());
connSetPrivateData(link->conn, link);
Expand Down Expand Up @@ -5439,6 +5453,22 @@ static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) {
freeClusterLinkOnBufferLimitReached(node->inbound_link);
}

/* Compute the maximum number of connection attempts the clusterCron
* loop should schedule in a single cron.
*
* We want to guarantee that every node is contacted 10 times within node timeout. */
static long long maxConnectionAttemptsPerCron(void) {
long long reconnect_interval = server.cluster_node_timeout / 2;
if (reconnect_interval <= 0)
return 0;
/* We run the cron loop every 100 ms. To reach 100 % of the nodes
* within the timeout, we need: ceil(nodes * 100 / reconnect_interval) */
const long long min_nodes_for_coverage = dictSize(server.cluster->nodes) * CLUSTER_CRON_PERIOD_MS / reconnect_interval;
/* Increase the coverage budget so each node can be probed 10 times
* inside the timeout. */
return min_nodes_for_coverage * NODE_CONNECTION_RETRIES_PER_TIMEOUT;
}

/* This is executed 10 times every second */
void clusterCron(void) {
dictIterator *di;
Expand All @@ -5450,7 +5480,6 @@ void clusterCron(void) {
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;

iteration++; /* Number of times this function was called so far. */

clusterUpdateMyselfHostname();
Expand All @@ -5459,6 +5488,7 @@ void clusterCron(void) {
server.cluster->stats_pfail_nodes = 0;
/* Run through some of the operations we want to do on each cluster node. */
di = dictGetSafeIterator(server.cluster->nodes);
long long cluster_node_conn_attempts = maxConnectionAttemptsPerCron();
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
/* We free the inbound or outboud link to the node if the link has an
Expand All @@ -5467,7 +5497,7 @@ void clusterCron(void) {
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
if (!server.debug_cluster_disable_reconnection && clusterNodeCronHandleReconnect(node, now)) continue;
if (!server.debug_cluster_disable_reconnection && clusterNodeCronHandleReconnect(node, now, &cluster_node_conn_attempts)) continue;
}
dictReleaseIterator(di);

Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ struct _clusterNode {
mstime_t meet_sent; /* Unix time we sent latest meet packet */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
mstime_t outbound_link_attempt_time; /* Unix time we last tried to establish an outgoing link */
mstime_t inbound_link_freed_time; /* Last time we freed the inbound link for this node.
If it was never freed, it is the same as ctime */
long long repl_offset; /* Last known repl offset for this node. */
Expand Down
2 changes: 1 addition & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1653,7 +1653,7 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa

/* Run the Cluster cron. */
if (server.cluster_enabled) {
run_with_period(100) clusterCron();
run_with_period(CLUSTER_CRON_PERIOD_MS) clusterCron();
}

/* Run the Sentinel timer if we are in sentinel mode. */
Expand Down
3 changes: 3 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,9 @@ typedef enum {
(NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | \
NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_MODULE) /* A flag */

/* Period in milliseconds between successive clusterCron() executions */
#define CLUSTER_CRON_PERIOD_MS 100

/* Using the following macro you can run code inside serverCron() with the
* specified period, specified in milliseconds.
* The actual resolution depends on server.hz. */
Expand Down
Loading