diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index b87db78edc..fb3d82e9ca 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -100,7 +100,6 @@ const char *clusterGetMessageTypeString(int type); void removeChannelsInSlot(unsigned int slot); unsigned int countChannelsInSlot(unsigned int hashslot); void clusterAddNodeToShard(const char *shard_id, clusterNode *node); -list *clusterLookupNodeListByShardId(const char *shard_id); void clusterRemoveNodeFromShard(clusterNode *node); int auxShardIdSetter(clusterNode *n, void *value, size_t length); sds auxShardIdGetter(clusterNode *n, sds s); @@ -128,6 +127,7 @@ sds clusterEncodeOpenSlotsAuxField(int rdbflags); int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now); void clusterCommandFlushslot(client *c); +int clusterAllReplicasThinkPrimaryIsFail(void); /* Only primaries that own slots have voting rights. * Returns 1 if the node has voting rights, otherwise returns 0. */ @@ -135,6 +135,16 @@ int clusterNodeIsVotingPrimary(clusterNode *n) { return (n->flags & CLUSTER_NODE_PRIMARY) && n->numslots; } +/* Returns if myself is the best ranked replica in an automatic failover process. + * To avoid newly added empty replica from affecting the ranking, we will skip it. */ +static inline int myselfIsBestRankedReplica(void) { + return (server.cluster->mf_end == 0 && + getNodeReplicationOffset(myself) != 0 && + server.cluster->failover_auth_rank == 0 && + server.cluster->failover_failed_primary_rank == 0 && + clusterAllReplicasThinkPrimaryIsFail()); +} + int getNodeDefaultClientPort(clusterNode *n) { return server.tls_cluster ? n->tls_port : n->tcp_port; } @@ -2244,6 +2254,10 @@ void markNodeAsFailing(clusterNode *node) { /* Immediately check if the failing node is our primary node. */ if (nodeIsReplica(myself) && myself->replicaof == node) { + /* Mark my primary is FAIL so that we can bring out flags during gossip, + * so that other nodes know that my primary node has failed, so that other + * nodes know that my offset will no longer be updated. */ + myself->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL; /* We can start an automatic failover as soon as possible, setting a flag * here so that we don't need to waiting for the cron to kick in. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); @@ -2312,6 +2326,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { serverLog(LL_NOTICE, "Clear FAIL state for node %.40s (%s): %s is reachable again.", node->name, node->human_nodename, nodeIsReplica(node) ? "replica" : "primary without slots"); node->flags &= ~CLUSTER_NODE_FAIL; + if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL; clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); } @@ -2326,6 +2341,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { "Clear FAIL state for node %.40s (%s): is reachable again and nobody is serving its slots after some time.", node->name, node->human_nodename); node->flags &= ~CLUSTER_NODE_FAIL; + if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL; clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); } } @@ -3426,6 +3442,13 @@ int clusterProcessPacket(clusterLink *link) { } else { sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED; } + + /* Check if the sender has marked its primary node as FAIL. */ + if (flags & CLUSTER_NODE_MY_PRIMARY_FAIL) { + sender->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL; + } else { + sender->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL; + } } /* Update the last time we saw any data from this node. We @@ -4717,6 +4740,16 @@ void clusterRequestFailoverAuth(void) { * in the header to communicate the nodes receiving the message that * they should authorized the failover even if the primary is working. */ if (server.cluster->mf_end) msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK; + + /* If this is an automatic failover and if myself is the best ranked replica, + * set the CLUSTERMSG_FLAG0_FORCEACK bit in the header as well. + * + * In this case, we hope that other primary nodes will not refuse to vote because + * they did not receive the FAIL message in time. */ + if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) { + msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK; + } + clusterBroadcastMessage(msgblock); clusterMsgSendBlockDecrRefCount(msgblock); } @@ -4909,6 +4942,29 @@ int clusterGetFailedPrimaryRank(void) { return rank; } + +/* Returns 1 if all replicas under my primary think the primary is in FAIL state. + * + * This is useful in automatic failover. For example, from my perspective, + * if all other replicas, including myself, both mark the primary node as FAIL, + * which means that myself and other replicas have exchanged new gossip information + * after the primary node went down, and we know the latest replication offset of + * the replicas. If a replica finds that its ranking is optimal in all cases, then + * the replica can initiate an election immediately in automatic failover without + * waiting for the delay. */ +int clusterAllReplicasThinkPrimaryIsFail(void) { + serverAssert(nodeIsReplica(myself)); + serverAssert(myself->replicaof); + + clusterNode *primary = myself->replicaof; + for (int i = 0; i < primary->num_replicas; i++) { + if (!nodePrimaryIsFail(primary->replicas[i])) { + return 0; + } + } + return 1; +} + /* This function is called by clusterHandleReplicaFailover() in order to * let the replica log why it is not able to failover. Sometimes there are * not the conditions, but since the failover function is called again and @@ -5109,10 +5165,20 @@ void clusterHandleReplicaFailover(void) { server.cluster->failover_auth_time = now; server.cluster->failover_auth_rank = 0; server.cluster->failover_failed_primary_rank = 0; - /* Reset auth_age since it is outdated now and we can bypass the auth_timeout + } + + if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) { + server.cluster->failover_auth_time = now; + serverLog(LL_NOTICE, "This is the best ranked replica and can initiate the election immediately."); + } + + if (server.cluster->failover_auth_time == now) { + /* If we happen to initiate a failover (automatic or manual) immediately. + * Reset auth_age since it is outdated now and we can bypass the auth_timeout * check in the next state and start the election ASAP. */ auth_age = 0; } + serverLog(LL_NOTICE, "Start of election delayed for %lld milliseconds " "(rank #%d, primary rank #%d, offset %lld).", @@ -5136,8 +5202,13 @@ void clusterHandleReplicaFailover(void) { * It is also possible that we received the message that telling a * shard is up. Update the delay if our failed_primary_rank changed. * + * It is also possible that we received more message and then we figure + * out myself is the best ranked replica, in this case, we can initiate + * the election immediately. + * * Not performed if this is a manual failover. */ - if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0) { + if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0 && + server.cluster->failover_auth_time != now) { int newrank = clusterGetReplicaRank(); if (newrank != server.cluster->failover_auth_rank) { long long added_delay = (newrank - server.cluster->failover_auth_rank) * 1000; @@ -5155,6 +5226,11 @@ void clusterHandleReplicaFailover(void) { serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.", new_failed_primary_rank, added_delay); } + + if (myselfIsBestRankedReplica()) { + server.cluster->failover_auth_time = now; + serverLog(LL_NOTICE, "Myself become the best ranked replica, initiate the election immediately."); + } } /* Return ASAP if we can't still start the election. */ diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 44a0a52884..bd7625ab98 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -60,6 +60,11 @@ typedef struct clusterLink { #define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */ #define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */ #define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */ +#define CLUSTER_NODE_MY_PRIMARY_FAIL (1 << 13) /* myself is a replica and my primary is FAIL in my view. \ + * myself will gossip this flag to other replica in the \ + * shard so that the replicas can make a better ranking \ + * decisions to help with the failover. */ + #define CLUSTER_NODE_NULL_NAME \ "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ "\000\000\000\000\000\000\000\000\000\000\000\000" @@ -76,6 +81,7 @@ typedef struct clusterLink { #define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED) #define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED) #define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL))) +#define nodePrimaryIsFail(n) ((n)->flags & CLUSTER_NODE_MY_PRIMARY_FAIL) /* Cluster messages header */ diff --git a/tests/support/util.tcl b/tests/support/util.tcl index b102618b71..980929819a 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -201,7 +201,7 @@ proc verify_log_message {srv_idx pattern from_line} { incr from_line set result [exec tail -n +$from_line < [srv $srv_idx stdout]] if {![string match $pattern $result]} { - fail "expected message not found in log file: $pattern" + fail "expected message not found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern" } } @@ -210,7 +210,7 @@ proc verify_no_log_message {srv_idx pattern from_line} { incr from_line set result [exec tail -n +$from_line < [srv $srv_idx stdout]] if {[string match $pattern $result]} { - fail "expected message found in log file: $pattern" + fail "expected message found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern" } } @@ -1252,3 +1252,18 @@ proc bp {{s {}}} { puts $res } } + +proc memcmp {string1 string2} { + set len1 [string length $string1] + set len2 [string length $string2] + set minLen [expr min($len1, $len2)] + + for {set i 0} {$i < $minLen} {incr i} { + set char1 [scan [string index $string1 $i] %c] + set char2 [scan [string index $string2 $i] %c] + if {$char1 != $char2} { + return [expr {$char1 - $char2}] + } + } + return [expr {$len1 - $len2}] +} diff --git a/tests/unit/cluster/faster-failover.tcl b/tests/unit/cluster/faster-failover.tcl new file mode 100644 index 0000000000..f938e94cf2 --- /dev/null +++ b/tests/unit/cluster/faster-failover.tcl @@ -0,0 +1,149 @@ +# Check the faster failover is working. + +# Allocate slot 0 to the first primary and allocate slot 1 to the second primary, +# and then evenly distribute the remaining slots to the remaining primaries. +proc my_slot_allocation {primaries replicas} { + R 0 cluster addslots 0 + R 1 cluster addslots 1 + R 2 cluster addslotsrange 2 5463 + R 3 cluster addslotsrange 5464 10927 + R 4 cluster addslotsrange 10928 16383 +} + +# We have the following nodes: +# Primary: R0 R1 R2 R3 R4 +# Replica1: R5 R6 R7 R8 R9 +# Replica2: R10 R11 +# +# R0 own slot 0 and its replicas are R5 and R10, key key_977613 belong to slot 0. +# R1 own slot 1 and its replicas are R6 and R11, key key_991803 belong to slot 1. +# +# We will test the scenario where both R0 and R1 shards are down at the same time +# to test whether their faster failover is as expected. +# +# In the R0 shard, the offset of R10 will be greater than that of R5, so it is +# expected that R10 will start failover faster. +# +# In the R1 shard, the offsets of R6 and R11 are the same, we have replica rank +# to sure that there will not have failover timeout. +# +# Even if R0 and R1 down at the same time, we have failed primary rank to ensure +# that there will not have failover timeout. +# +# Needs to run in the body of +# start_cluster 5 7 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000}} { +proc test_best_ranked_replica {} { + test "The best replica can initiate an election immediately in an automatic failover" { + # We calculate their rankings so that we can make rank judgments later. + set R0_failed_primary_rank [expr [expr [memcmp [R 0 cluster myshardid] [R 1 cluster myshardid]] < 0] ? 0 : 1] + set R1_failed_primary_rank [expr [expr [memcmp [R 1 cluster myshardid] [R 0 cluster myshardid]] < 0] ? 0 : 1] + + set R6_replica_rank [expr [expr [memcmp [R 6 cluster myid] [R 11 cluster myid]] < 0] ? 0 : 1] + set R11_replica_rank [expr [expr [memcmp [R 11 cluster myid] [R 6 cluster myid]] < 0] ? 0 : 1] + + set R0_nodeid [R 0 cluster myid] + set R1_nodeid [R 1 cluster myid] + + # Write some data to the R0 and wait the sync. + for {set i 0} {$i < 10} {incr i} { + R 0 incr key_977613 + } + wait_for_ofs_sync [srv 0 client] [srv -5 client] + wait_for_ofs_sync [srv 0 client] [srv -10 client] + + # Write some data to the R1 and wait the sync. + for {set i 0} {$i < 10} {incr i} { + R 1 incr key_991803 + } + wait_for_ofs_sync [srv -1 client] [srv -6 client] + wait_for_ofs_sync [srv -1 client] [srv -11 client] + + # Pause R5 so it has no chance to catch up with the offset. + pause_process [srv -5 pid] + + # Kill the replica client of R0. + R 0 client kill type replica + R 0 incr key_977613 + + # Wait for R10 to catch up with the offset so it will have a better offset than R5. + wait_for_ofs_sync [srv 0 client] [srv -10 client] + + # Pause R0 and R1, the replicas of each shard will do the automatic failover. + pause_process [srv 0 pid] + pause_process [srv -1 pid] + + # Resume R5. + resume_process [srv -5 pid] + + # Make sure both primaries R0 and R1 are FAIL from the replica's view. + wait_for_condition 1000 10 { + [cluster_has_flag [cluster_get_node_by_id 5 $R0_nodeid] fail] eq 1 && + [cluster_has_flag [cluster_get_node_by_id 5 $R1_nodeid] fail] eq 1 && + [cluster_has_flag [cluster_get_node_by_id 10 $R0_nodeid] fail] eq 1 && + [cluster_has_flag [cluster_get_node_by_id 10 $R1_nodeid] fail] eq 1 && + + [cluster_has_flag [cluster_get_node_by_id 6 $R0_nodeid] fail] eq 1 && + [cluster_has_flag [cluster_get_node_by_id 6 $R1_nodeid] fail] eq 1 && + [cluster_has_flag [cluster_get_node_by_id 11 $R0_nodeid] fail] eq 1 && + [cluster_has_flag [cluster_get_node_by_id 11 $R1_nodeid] fail] eq 1 + } else { + fail "The node is not marked with the correct flag" + } + + wait_for_condition 1000 100 { + [s -10 role] == "master" && + ([s -6 role] == "master" || [s -11 role] == "master") + } else { + fail "No failover detected" + } + + # case 1: R0 and R1 down in the same time, R0 have a better failed primary rank, R10 is + # the best ranked replica in the first place, if so, there is no delay. + if {$R0_failed_primary_rank == 0} { + if {[count_log_message -10 "This is the best ranked replica and can initiate the election immediately"] != 0} { + verify_log_message -10 "*Start of election delayed for 0 milliseconds*" 0 + } + } + # No matter what, R10 will be the best ranked replica in R0. + # This is the best ranked replica and can initiate the election immediately + # Myself become the best ranked replica, initiate the election immediately + verify_log_message -10 "*best ranked replica*" 0 + wait_for_log_messages -5 {"*Successful partial resynchronization with primary*"} 0 1000 50 + + # case 2: R0 and R1 down in the same time, R1 have a better failed primary rank, R6 or R11 + # will be the best ranked replica in the first place, if so, there is no delay. + if {$R1_failed_primary_rank == 0 && $R6_replica_rank == 0} { + if {[count_log_message -6 "This is the best ranked replica and can initiate the election immediately"]} { + verify_log_message -6 "*Start of election delayed for 0 milliseconds*" 0 + } + } + if {$R1_failed_primary_rank == 0 && $R11_replica_rank == 0} { + if {[count_log_message -11 "This is the best ranked replica and can initiate the election immediately"]} { + verify_log_message -11 "*Start of election delayed for 0 milliseconds*" 0 + } + } + # No matter what, there will be a best ranked replica in R1. + # This is the best ranked replica and can initiate the election immediately + # Myself become the best ranked replica, initiate the election immediately + if {$R6_replica_rank == 0} { + verify_log_message -6 "*best ranked replica*" 0 + wait_for_log_messages -11 {"*Successful partial resynchronization with primary*"} 0 1000 50 + } else { + verify_log_message -11 "*best ranked replica*" 0 + wait_for_log_messages -6 {"*Successful partial resynchronization with primary*"} 0 1000 50 + } + + # In any case, we do not expect timeouts. + verify_no_log_message -5 "*Failover attempt expired*" 0 + verify_no_log_message -10 "*Failover attempt expired*" 0 + verify_no_log_message -6 "*Failover attempt expired*" 0 + verify_no_log_message -11 "*Failover attempt expired*" 0 + } +} + +# This change and test is quite important, so we want to run it a few more times. +for {set i 0} {$i < 5} {incr i} { + start_cluster 5 7 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000}} { + test_best_ranked_replica + } my_slot_allocation cluster_allocate_replicas ;# start_cluster +}