Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 82 additions & 4 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2204,6 +2204,13 @@ void markNodeAsFailingIfNeeded(clusterNode *node) {
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();

if (nodeIsReplica(myself) && myself->replicaof == node) {
/* Immediately check if the node is our primary node, if so, we can try
* to initiate a failover as soon as possible. */
myself->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FASTER_FAILOVER);
}

/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL.
* We do that even if this node is a replica and not a primary: anyway
Expand All @@ -2227,6 +2234,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
serverLog(LL_NOTICE, "Clear FAIL state for node %.40s (%s): %s is reachable again.", node->name,
node->human_nodename, nodeIsReplica(node) ? "replica" : "primary without slots");
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}

Expand All @@ -2241,6 +2249,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
"Clear FAIL state for node %.40s (%s): is reachable again and nobody is serving its slots after some time.",
node->name, node->human_nodename);
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}
}
Expand Down Expand Up @@ -3319,7 +3328,7 @@ int clusterProcessPacket(clusterLink *link) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}

/* Checks if the node supports light message hdr */
/* Checks if the node supports some flags. */
if (sender) {
if (flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
Expand All @@ -3332,6 +3341,12 @@ int clusterProcessPacket(clusterLink *link) {
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
}

if (flags & CLUSTER_NODE_MY_PRIMARY_FAIL) {
sender->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
} else {
sender->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
}
}

/* Update the last time we saw any data from this node. We
Expand Down Expand Up @@ -3542,6 +3557,12 @@ int clusterProcessPacket(clusterLink *link) {
noaddr_node->flags &= ~CLUSTER_NODE_PFAIL;
noaddr_node->flags |= CLUSTER_NODE_FAIL;
noaddr_node->fail_time = now;
if (nodeIsReplica(myself) && myself->replicaof == noaddr_node) {
/* Immediately check if the node is our primary node, if so, we can try
* to initiate a failover as soon as possible. */
myself->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FASTER_FAILOVER);
}
clusterSendFail(noaddr_node->name);
}
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
Expand Down Expand Up @@ -3777,6 +3798,16 @@ int clusterProcessPacket(clusterLink *link) {
failing->fail_time = now;
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);

if (nodeIsReplica(myself) && myself->replicaof == failing) {
/* Immediately check if the node is our primary node, if so, we can try
* to initiate a failover as soon as possible. We will also try to send
* the FAIL packet in case we trigger the failover immediately but unable
* to get the votes. */
myself->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterSendFail(failing->name);
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FASTER_FAILOVER);
}
}
} else {
serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender,
Expand Down Expand Up @@ -4787,6 +4818,21 @@ int clusterGetFailedPrimaryRank(void) {
return rank;
}


/* Returns 1 if all replicas under my primary think the primary is in FAIL state. */
int clusterAllReplicasThinkPrimaryIsFail(void) {
serverAssert(nodeIsReplica(myself));
serverAssert(myself->replicaof);

clusterNode *primary = myself->replicaof;
for (int i = 0; i < primary->num_replicas; i++) {
if (!nodePrimaryIsFail(primary->replicas[i])) {
return 0;
}
}
return 1;
}

/* This function is called by clusterHandleReplicaFailover() in order to
* let the replica log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
Expand Down Expand Up @@ -4981,10 +5027,23 @@ void clusterHandleReplicaFailover(void) {
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = now;
server.cluster->failover_auth_rank = 0;
/* Reset auth_age since it is outdated now and we can bypass the auth_timeout
}

if (server.cluster->mf_end == 0 &&
server.cluster->failover_auth_rank == 0 &&
server.cluster->failover_failed_primary_rank == 0 &&
clusterAllReplicasThinkPrimaryIsFail()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "I am the best ranked replica and can initiate the election immediately.");
}

if (server.cluster->failover_auth_time == now) {
/* If we happen to initiate a failover (automatic or manual) immediately.
* Reset auth_age since it is outdated now and we can bypass the auth_timeout
* check in the next state and start the election ASAP. */
auth_age = 0;
}

serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
"(rank #%d, primary rank #%d, offset %lld).",
Expand All @@ -5008,8 +5067,13 @@ void clusterHandleReplicaFailover(void) {
* It is also possible that we received the message that telling a
* shard is up. Update the delay if our failed_primary_rank changed.
*
* It is also possible that we received more message and then we figure
* out myself is the best ranked replica, in this case, we can initiate
* the election immediately.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0) {
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0 &&
server.cluster->failover_auth_time != now) {
int newrank = clusterGetReplicaRank();
if (newrank != server.cluster->failover_auth_rank) {
long long added_delay = (newrank - server.cluster->failover_auth_rank) * 1000;
Expand All @@ -5027,6 +5091,13 @@ void clusterHandleReplicaFailover(void) {
serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.",
new_failed_primary_rank, added_delay);
}

if (server.cluster->failover_auth_rank == 0 &&
server.cluster->failover_failed_primary_rank == 0 &&
clusterAllReplicasThinkPrimaryIsFail()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "I am the best ranked replica and can initiate the election immediately.");
}
}

/* Return ASAP if we can't still start the election. */
Expand Down Expand Up @@ -5573,7 +5644,8 @@ void clusterBeforeSleep(void) {
}
} else if (flags & CLUSTER_TODO_HANDLE_FAILOVER) {
/* Handle failover, this is needed when it is likely that there is already
* the quorum from primaries in order to react fast. */
* the quorum from primaries in order to react fast. Or when we determine
* that we can proceed with the failover. */
clusterHandleReplicaFailover();
}

Expand All @@ -5592,6 +5664,12 @@ void clusterBeforeSleep(void) {
* regular ping. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
}

if (flags & CLUSTER_TODO_HANDLE_FASTER_FAILOVER) {
/* Handle faster failover, this goes after all the todos to check if we
* can initiate a fast failover. */
clusterHandleReplicaFailover();
}
}

void clusterDoBeforeSleep(int flags) {
Expand Down
3 changes: 3 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#define CLUSTER_TODO_FSYNC_CONFIG (1 << 3)
#define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1 << 4)
#define CLUSTER_TODO_BROADCAST_ALL (1 << 5)
#define CLUSTER_TODO_HANDLE_FASTER_FAILOVER (1 << 6)

/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
Expand Down Expand Up @@ -54,6 +55,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */
#define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */
#define CLUSTER_NODE_MY_PRIMARY_FAIL (1 << 13) /* myself is a replica and my primary is FAIL. */
#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand All @@ -70,6 +72,7 @@ typedef struct clusterLink {
#define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED)
#define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))
#define nodePrimaryIsFail(n) ((n)->flags & CLUSTER_NODE_MY_PRIMARY_FAIL)

/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
Expand Down
Loading