Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countChannelsInSlot(unsigned int hashslot);
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
list *clusterLookupNodeListByShardId(const char *shard_id);
void clusterRemoveNodeFromShard(clusterNode *node);
int auxShardIdSetter(clusterNode *n, void *value, size_t length);
sds auxShardIdGetter(clusterNode *n, sds s);
Expand Down Expand Up @@ -128,13 +127,24 @@ sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
void clusterCommandFlushslot(client *c);
int clusterAllReplicasThinkPrimaryIsFail(void);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
int clusterNodeIsVotingPrimary(clusterNode *n) {
return (n->flags & CLUSTER_NODE_PRIMARY) && n->numslots;
}

/* Returns if myself is the best ranked replica in an automatic failover process.
* To avoid newly added empty replica from affecting the ranking, we will skip it. */
static inline int myselfIsBestRankedReplica(void) {
return (server.cluster->mf_end == 0 &&
getNodeReplicationOffset(myself) != 0 &&
server.cluster->failover_auth_rank == 0 &&
server.cluster->failover_failed_primary_rank == 0 &&
clusterAllReplicasThinkPrimaryIsFail());
}

int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
}
Expand Down Expand Up @@ -2244,6 +2254,10 @@ void markNodeAsFailing(clusterNode *node) {

/* Immediately check if the failing node is our primary node. */
if (nodeIsReplica(myself) && myself->replicaof == node) {
/* Mark my primary is FAIL so that we can bring out flags during gossip,
* so that other nodes know that my primary node has failed, so that other
* nodes know that my offset will no longer be updated. */
myself->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
/* We can start an automatic failover as soon as possible, setting a flag
* here so that we don't need to waiting for the cron to kick in. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
Expand Down Expand Up @@ -2312,6 +2326,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
serverLog(LL_NOTICE, "Clear FAIL state for node %.40s (%s): %s is reachable again.", node->name,
node->human_nodename, nodeIsReplica(node) ? "replica" : "primary without slots");
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}

Expand All @@ -2326,6 +2341,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
"Clear FAIL state for node %.40s (%s): is reachable again and nobody is serving its slots after some time.",
node->name, node->human_nodename);
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}
}
Expand Down Expand Up @@ -3426,6 +3442,13 @@ int clusterProcessPacket(clusterLink *link) {
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
}

/* Check if the sender has marked its primary node as FAIL. */
if (flags & CLUSTER_NODE_MY_PRIMARY_FAIL) {
sender->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
} else {
sender->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
}
}

/* Update the last time we saw any data from this node. We
Expand Down Expand Up @@ -4717,6 +4740,16 @@ void clusterRequestFailoverAuth(void) {
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the primary is working. */
if (server.cluster->mf_end) msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;

/* If this is an automatic failover and if myself is the best ranked replica,
* set the CLUSTERMSG_FLAG0_FORCEACK bit in the header as well.
*
* In this case, we hope that other primary nodes will not refuse to vote because
* they did not receive the FAIL message in time. */
if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) {
msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
}

clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down Expand Up @@ -4909,6 +4942,29 @@ int clusterGetFailedPrimaryRank(void) {
return rank;
}


/* Returns 1 if all replicas under my primary think the primary is in FAIL state.
*
* This is useful in automatic failover. For example, from my perspective,
* if all other replicas, including myself, both mark the primary node as FAIL,
* which means that myself and other replicas have exchanged new gossip information
* after the primary node went down, and we know the latest replication offset of
* the replicas. If a replica finds that its ranking is optimal in all cases, then
* the replica can initiate an election immediately in automatic failover without
* waiting for the delay. */
int clusterAllReplicasThinkPrimaryIsFail(void) {
serverAssert(nodeIsReplica(myself));
serverAssert(myself->replicaof);

clusterNode *primary = myself->replicaof;
for (int i = 0; i < primary->num_replicas; i++) {
if (!nodePrimaryIsFail(primary->replicas[i])) {
return 0;
}
}
return 1;
}

/* This function is called by clusterHandleReplicaFailover() in order to
* let the replica log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
Expand Down Expand Up @@ -5109,10 +5165,20 @@ void clusterHandleReplicaFailover(void) {
server.cluster->failover_auth_time = now;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_failed_primary_rank = 0;
/* Reset auth_age since it is outdated now and we can bypass the auth_timeout
}

if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "This is the best ranked replica and can initiate the election immediately.");
}

if (server.cluster->failover_auth_time == now) {
/* If we happen to initiate a failover (automatic or manual) immediately.
* Reset auth_age since it is outdated now and we can bypass the auth_timeout
* check in the next state and start the election ASAP. */
auth_age = 0;
}

serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
"(rank #%d, primary rank #%d, offset %lld).",
Expand All @@ -5136,8 +5202,13 @@ void clusterHandleReplicaFailover(void) {
* It is also possible that we received the message that telling a
* shard is up. Update the delay if our failed_primary_rank changed.
*
* It is also possible that we received more message and then we figure
* out myself is the best ranked replica, in this case, we can initiate
* the election immediately.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0) {
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0 &&
server.cluster->failover_auth_time != now) {
int newrank = clusterGetReplicaRank();
if (newrank != server.cluster->failover_auth_rank) {
long long added_delay = (newrank - server.cluster->failover_auth_rank) * 1000;
Expand All @@ -5155,6 +5226,11 @@ void clusterHandleReplicaFailover(void) {
serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.",
new_failed_primary_rank, added_delay);
}

if (myselfIsBestRankedReplica()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "Myself become the best ranked replica, initiate the election immediately.");
}
}

/* Return ASAP if we can't still start the election. */
Expand Down
6 changes: 6 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */
#define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */
#define CLUSTER_NODE_MY_PRIMARY_FAIL (1 << 13) /* myself is a replica and my primary is FAIL in my view. \
* myself will gossip this flag to other replica in the \
* shard so that the replicas can make a better ranking \
* decisions to help with the failover. */

#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand All @@ -76,6 +81,7 @@ typedef struct clusterLink {
#define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED)
#define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))
#define nodePrimaryIsFail(n) ((n)->flags & CLUSTER_NODE_MY_PRIMARY_FAIL)

/* Cluster messages header */

Expand Down
19 changes: 17 additions & 2 deletions tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ proc verify_log_message {srv_idx pattern from_line} {
incr from_line
set result [exec tail -n +$from_line < [srv $srv_idx stdout]]
if {![string match $pattern $result]} {
fail "expected message not found in log file: $pattern"
fail "expected message not found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern"
}
}

Expand All @@ -210,7 +210,7 @@ proc verify_no_log_message {srv_idx pattern from_line} {
incr from_line
set result [exec tail -n +$from_line < [srv $srv_idx stdout]]
if {[string match $pattern $result]} {
fail "expected message found in log file: $pattern"
fail "expected message found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern"
}
}

Expand Down Expand Up @@ -1252,3 +1252,18 @@ proc bp {{s {}}} {
puts $res
}
}

proc memcmp {string1 string2} {
set len1 [string length $string1]
set len2 [string length $string2]
set minLen [expr min($len1, $len2)]

for {set i 0} {$i < $minLen} {incr i} {
set char1 [scan [string index $string1 $i] %c]
set char2 [scan [string index $string2 $i] %c]
if {$char1 != $char2} {
return [expr {$char1 - $char2}]
}
}
return [expr {$len1 - $len2}]
}
149 changes: 149 additions & 0 deletions tests/unit/cluster/faster-failover.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Check the faster failover is working.

# Allocate slot 0 to the first primary and allocate slot 1 to the second primary,
# and then evenly distribute the remaining slots to the remaining primaries.
proc my_slot_allocation {primaries replicas} {
R 0 cluster addslots 0
R 1 cluster addslots 1
R 2 cluster addslotsrange 2 5463
R 3 cluster addslotsrange 5464 10927
R 4 cluster addslotsrange 10928 16383
}

# We have the following nodes:
# Primary: R0 R1 R2 R3 R4
# Replica1: R5 R6 R7 R8 R9
# Replica2: R10 R11
#
# R0 own slot 0 and its replicas are R5 and R10, key key_977613 belong to slot 0.
# R1 own slot 1 and its replicas are R6 and R11, key key_991803 belong to slot 1.
#
# We will test the scenario where both R0 and R1 shards are down at the same time
# to test whether their faster failover is as expected.
#
# In the R0 shard, the offset of R10 will be greater than that of R5, so it is
# expected that R10 will start failover faster.
#
# In the R1 shard, the offsets of R6 and R11 are the same, we have replica rank
# to sure that there will not have failover timeout.
#
# Even if R0 and R1 down at the same time, we have failed primary rank to ensure
# that there will not have failover timeout.
#
# Needs to run in the body of
# start_cluster 5 7 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000}} {
proc test_best_ranked_replica {} {
test "The best replica can initiate an election immediately in an automatic failover" {
# We calculate their rankings so that we can make rank judgments later.
set R0_failed_primary_rank [expr [expr [memcmp [R 0 cluster myshardid] [R 1 cluster myshardid]] < 0] ? 0 : 1]
set R1_failed_primary_rank [expr [expr [memcmp [R 1 cluster myshardid] [R 0 cluster myshardid]] < 0] ? 0 : 1]

set R6_replica_rank [expr [expr [memcmp [R 6 cluster myid] [R 11 cluster myid]] < 0] ? 0 : 1]
set R11_replica_rank [expr [expr [memcmp [R 11 cluster myid] [R 6 cluster myid]] < 0] ? 0 : 1]

set R0_nodeid [R 0 cluster myid]
set R1_nodeid [R 1 cluster myid]

# Write some data to the R0 and wait the sync.
for {set i 0} {$i < 10} {incr i} {
R 0 incr key_977613
}
wait_for_ofs_sync [srv 0 client] [srv -5 client]
wait_for_ofs_sync [srv 0 client] [srv -10 client]

# Write some data to the R1 and wait the sync.
for {set i 0} {$i < 10} {incr i} {
R 1 incr key_991803
}
wait_for_ofs_sync [srv -1 client] [srv -6 client]
wait_for_ofs_sync [srv -1 client] [srv -11 client]

# Pause R5 so it has no chance to catch up with the offset.
pause_process [srv -5 pid]

# Kill the replica client of R0.
R 0 client kill type replica
R 0 incr key_977613

# Wait for R10 to catch up with the offset so it will have a better offset than R5.
wait_for_ofs_sync [srv 0 client] [srv -10 client]

# Pause R0 and R1, the replicas of each shard will do the automatic failover.
pause_process [srv 0 pid]
pause_process [srv -1 pid]

# Resume R5.
resume_process [srv -5 pid]

# Make sure both primaries R0 and R1 are FAIL from the replica's view.
wait_for_condition 1000 10 {
[cluster_has_flag [cluster_get_node_by_id 5 $R0_nodeid] fail] eq 1 &&
[cluster_has_flag [cluster_get_node_by_id 5 $R1_nodeid] fail] eq 1 &&
[cluster_has_flag [cluster_get_node_by_id 10 $R0_nodeid] fail] eq 1 &&
[cluster_has_flag [cluster_get_node_by_id 10 $R1_nodeid] fail] eq 1 &&

[cluster_has_flag [cluster_get_node_by_id 6 $R0_nodeid] fail] eq 1 &&
[cluster_has_flag [cluster_get_node_by_id 6 $R1_nodeid] fail] eq 1 &&
[cluster_has_flag [cluster_get_node_by_id 11 $R0_nodeid] fail] eq 1 &&
[cluster_has_flag [cluster_get_node_by_id 11 $R1_nodeid] fail] eq 1
} else {
fail "The node is not marked with the correct flag"
}

wait_for_condition 1000 100 {
[s -10 role] == "master" &&
([s -6 role] == "master" || [s -11 role] == "master")
} else {
fail "No failover detected"
}

# case 1: R0 and R1 down in the same time, R0 have a better failed primary rank, R10 is
# the best ranked replica in the first place, if so, there is no delay.
if {$R0_failed_primary_rank == 0} {
if {[count_log_message -10 "This is the best ranked replica and can initiate the election immediately"] != 0} {
verify_log_message -10 "*Start of election delayed for 0 milliseconds*" 0
}
}
# No matter what, R10 will be the best ranked replica in R0.
# This is the best ranked replica and can initiate the election immediately
# Myself become the best ranked replica, initiate the election immediately
verify_log_message -10 "*best ranked replica*" 0
wait_for_log_messages -5 {"*Successful partial resynchronization with primary*"} 0 1000 50

# case 2: R0 and R1 down in the same time, R1 have a better failed primary rank, R6 or R11
# will be the best ranked replica in the first place, if so, there is no delay.
if {$R1_failed_primary_rank == 0 && $R6_replica_rank == 0} {
if {[count_log_message -6 "This is the best ranked replica and can initiate the election immediately"]} {
verify_log_message -6 "*Start of election delayed for 0 milliseconds*" 0
}
}
if {$R1_failed_primary_rank == 0 && $R11_replica_rank == 0} {
if {[count_log_message -11 "This is the best ranked replica and can initiate the election immediately"]} {
verify_log_message -11 "*Start of election delayed for 0 milliseconds*" 0
}
}
# No matter what, there will be a best ranked replica in R1.
# This is the best ranked replica and can initiate the election immediately
# Myself become the best ranked replica, initiate the election immediately
if {$R6_replica_rank == 0} {
verify_log_message -6 "*best ranked replica*" 0
wait_for_log_messages -11 {"*Successful partial resynchronization with primary*"} 0 1000 50
} else {
verify_log_message -11 "*best ranked replica*" 0
wait_for_log_messages -6 {"*Successful partial resynchronization with primary*"} 0 1000 50
}

# In any case, we do not expect timeouts.
verify_no_log_message -5 "*Failover attempt expired*" 0
verify_no_log_message -10 "*Failover attempt expired*" 0
verify_no_log_message -6 "*Failover attempt expired*" 0
verify_no_log_message -11 "*Failover attempt expired*" 0
}
}

# This change and test is quite important, so we want to run it a few more times.
for {set i 0} {$i < 5} {incr i} {
start_cluster 5 7 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000}} {
test_best_ranked_replica
} my_slot_allocation cluster_allocate_replicas ;# start_cluster
}
Loading