diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index ed2c4c3596..5fd28f733e 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1661,6 +1661,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->replicaof = NULL; node->last_in_ping_gossip = 0; node->ping_sent = node->pong_received = 0; + node->outbound_link_attempt_time = 0; node->data_received = 0; node->meet_sent = 0; node->fail_time = 0; @@ -5362,10 +5363,12 @@ static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now) { return now - node->ctime > getHandshakeTimeout() ? 1 : 0; } +#define NODE_CONNECTION_RETRIES_PER_TIMEOUT 10 + /* Check if the node is disconnected and re-establish the connection. * Also update a few stats while we are here, that can be used to make * better decisions in other part of the code. */ -static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) { +static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now, long long *cluster_conn_attempts) { /* Not interested in reconnecting the link with myself or nodes * for which we have no address. */ if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) return 1; @@ -5394,6 +5397,17 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) { } if (node->link == NULL) { + mstime_t reconnect_interval = server.cluster_node_timeout / 2; + /* Skip this outbound connection attempt when all these conditions are true: + * 1. No inbound link from the peer exists. + * 2. The back‑off window since the last try is still active + * 3. The node has already exceeded its retry budget for this cron cycle + */ + if (!node->inbound_link && (now - node->outbound_link_attempt_time < reconnect_interval / NODE_CONNECTION_RETRIES_PER_TIMEOUT && *cluster_conn_attempts == 0)) { + return 1; + } + node->outbound_link_attempt_time = now; + (*cluster_conn_attempts)--; clusterLink *link = createClusterLink(node); link->conn = connCreate(connTypeOfCluster()); connSetPrivateData(link->conn, link); @@ -5439,6 +5453,22 @@ static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) { freeClusterLinkOnBufferLimitReached(node->inbound_link); } +/* Compute the maximum number of connection attempts the clusterCron + * loop should schedule in a single cron. + * + * We want to guarantee that every node is contacted 10 times within node timeout. */ +static long long maxConnectionAttemptsPerCron(void) { + long long reconnect_interval = server.cluster_node_timeout / 2; + if (reconnect_interval <= 0) + return 0; + /* We run the cron loop every 100 ms. To reach 100 % of the nodes + * within the timeout, we need: ceil(nodes * 100 / reconnect_interval) */ + const long long min_nodes_for_coverage = dictSize(server.cluster->nodes) * CLUSTER_CRON_PERIOD_MS / reconnect_interval; + /* Increase the coverage budget so each node can be probed 10 times + * inside the timeout. */ + return min_nodes_for_coverage * NODE_CONNECTION_RETRIES_PER_TIMEOUT; +} + /* This is executed 10 times every second */ void clusterCron(void) { dictIterator *di; @@ -5450,7 +5480,6 @@ void clusterCron(void) { mstime_t min_pong = 0, now = mstime(); clusterNode *min_pong_node = NULL; static unsigned long long iteration = 0; - iteration++; /* Number of times this function was called so far. */ clusterUpdateMyselfHostname(); @@ -5459,6 +5488,7 @@ void clusterCron(void) { server.cluster->stats_pfail_nodes = 0; /* Run through some of the operations we want to do on each cluster node. */ di = dictGetSafeIterator(server.cluster->nodes); + long long cluster_node_conn_attempts = maxConnectionAttemptsPerCron(); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); /* We free the inbound or outboud link to the node if the link has an @@ -5467,7 +5497,7 @@ void clusterCron(void) { /* The protocol is that function(s) below return non-zero if the node was * terminated. */ - if (!server.debug_cluster_disable_reconnection && clusterNodeCronHandleReconnect(node, now)) continue; + if (!server.debug_cluster_disable_reconnection && clusterNodeCronHandleReconnect(node, now, &cluster_node_conn_attempts)) continue; } dictReleaseIterator(di); diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index aab7b46806..a687cf0e7e 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -357,6 +357,7 @@ struct _clusterNode { mstime_t meet_sent; /* Unix time we sent latest meet packet */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t orphaned_time; /* Starting time of orphaned primary condition */ + mstime_t outbound_link_attempt_time; /* Unix time we last tried to establish an outgoing link */ mstime_t inbound_link_freed_time; /* Last time we freed the inbound link for this node. If it was never freed, it is the same as ctime */ long long repl_offset; /* Last known repl offset for this node. */ diff --git a/src/server.c b/src/server.c index f16ef03eb1..79d06a6eb9 100644 --- a/src/server.c +++ b/src/server.c @@ -1653,7 +1653,7 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa /* Run the Cluster cron. */ if (server.cluster_enabled) { - run_with_period(100) clusterCron(); + run_with_period(CLUSTER_CRON_PERIOD_MS) clusterCron(); } /* Run the Sentinel timer if we are in sentinel mode. */ diff --git a/src/server.h b/src/server.h index a19764ec28..db29c0fd51 100644 --- a/src/server.h +++ b/src/server.h @@ -665,6 +665,9 @@ typedef enum { (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | \ NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_MODULE) /* A flag */ +/* Period in milliseconds between successive clusterCron() executions */ +#define CLUSTER_CRON_PERIOD_MS 100 + /* Using the following macro you can run code inside serverCron() with the * specified period, specified in milliseconds. * The actual resolution depends on server.hz. */