Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countChannelsInSlot(unsigned int hashslot);
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
list *clusterLookupNodeListByShardId(const char *shard_id);
void clusterRemoveNodeFromShard(clusterNode *node);
int auxShardIdSetter(clusterNode *n, void *value, size_t length);
sds auxShardIdGetter(clusterNode *n, sds s);
Expand Down Expand Up @@ -127,13 +126,22 @@ sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
void clusterCommandFlushslot(client *c);
int clusterAllReplicasThinkPrimaryIsFail(void);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
int clusterNodeIsVotingPrimary(clusterNode *n) {
return (n->flags & CLUSTER_NODE_PRIMARY) && n->numslots;
}

/* Returns if myself is the best ranked replica in an automatic failover process. */
static inline int myselfIsBestRankedReplica(void) {
return (server.cluster->mf_end == 0 &&
server.cluster->failover_auth_rank == 0 &&
server.cluster->failover_failed_primary_rank == 0 &&
clusterAllReplicasThinkPrimaryIsFail());
}

int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
}
Expand Down Expand Up @@ -2187,6 +2195,10 @@ void markNodeAsFailing(clusterNode *node) {

/* Immediately check if the failing node is our primary node. */
if (nodeIsReplica(myself) && myself->replicaof == node) {
/* Mark my primary is FAIL so that we can bring out flags during gossip,
* so that other nodes know that my primary node has failed, so that other
* nodes know that my offset will no longer be updated. */
myself->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
/* We can start an automatic failover as soon as possible, setting a flag
* here so that we don't need to waiting for the cron to kick in. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
Expand Down Expand Up @@ -2255,6 +2267,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
serverLog(LL_NOTICE, "Clear FAIL state for node %.40s (%s): %s is reachable again.", node->name,
node->human_nodename, nodeIsReplica(node) ? "replica" : "primary without slots");
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}

Expand All @@ -2269,6 +2282,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
"Clear FAIL state for node %.40s (%s): is reachable again and nobody is serving its slots after some time.",
node->name, node->human_nodename);
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}
}
Expand Down Expand Up @@ -3347,19 +3361,28 @@ int clusterProcessPacket(clusterLink *link) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}

/* Checks if the node supports light message hdr */
/* Store some flags about the sender. */
if (sender) {
/* Check if the node supports light publish message hdr */
if (flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
}

/* Check if the node supports light module message hdr */
if (flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
}

/* Check if the sender has marked its primary node as FAIL. */
if (flags & CLUSTER_NODE_MY_PRIMARY_FAIL) {
sender->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
} else {
sender->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
}
}

/* Update the last time we saw any data from this node. We
Expand Down Expand Up @@ -4618,6 +4641,16 @@ void clusterRequestFailoverAuth(void) {
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the primary is working. */
if (server.cluster->mf_end) msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;

/* If this is an automatic failover and if myself is the best ranked replica,
* set the CLUSTERMSG_FLAG0_FORCEACK bit in the header as well.
*
* In this case, we hope that other primary nodes will not refuse to vote because
* they did not receive the FAIL message in time. */
if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) {
msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
}

clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down Expand Up @@ -4810,6 +4843,29 @@ int clusterGetFailedPrimaryRank(void) {
return rank;
}


/* Returns 1 if all replicas under my primary think the primary is in FAIL state.
*
* This is useful in automatic failover. For example, from my perspective,
* if all other replicas, including myself, both mark the primary node as FAIL,
* which means that myself and other replicas have exchanged new gossip information
* after the primary node went down, and we know the latest replication offset of
* the replicas. If a replica finds that its ranking is optimal in all cases, then
* the replica can initiate an election immediately in automatic failover without
* waiting for the delay. */
int clusterAllReplicasThinkPrimaryIsFail(void) {
serverAssert(nodeIsReplica(myself));
serverAssert(myself->replicaof);

clusterNode *primary = myself->replicaof;
for (int i = 0; i < primary->num_replicas; i++) {
if (!nodePrimaryIsFail(primary->replicas[i])) {
return 0;
}
}
return 1;
}

/* This function is called by clusterHandleReplicaFailover() in order to
* let the replica log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
Expand Down Expand Up @@ -5005,10 +5061,20 @@ void clusterHandleReplicaFailover(void) {
server.cluster->failover_auth_time = now;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_failed_primary_rank = 0;
/* Reset auth_age since it is outdated now and we can bypass the auth_timeout
}

if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "This is the best ranked replica and can initiate the election immediately.");
}

if (server.cluster->failover_auth_time == now) {
/* If we happen to initiate a failover (automatic or manual) immediately.
* Reset auth_age since it is outdated now and we can bypass the auth_timeout
* check in the next state and start the election ASAP. */
auth_age = 0;
}

serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
"(rank #%d, primary rank #%d, offset %lld).",
Expand All @@ -5032,8 +5098,13 @@ void clusterHandleReplicaFailover(void) {
* It is also possible that we received the message that telling a
* shard is up. Update the delay if our failed_primary_rank changed.
*
* It is also possible that we received more message and then we figure
* out myself is the best ranked replica, in this case, we can initiate
* the election immediately.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0) {
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0 &&
server.cluster->failover_auth_time != now) {
int newrank = clusterGetReplicaRank();
if (newrank != server.cluster->failover_auth_rank) {
long long added_delay = (newrank - server.cluster->failover_auth_rank) * 1000;
Expand All @@ -5051,6 +5122,11 @@ void clusterHandleReplicaFailover(void) {
serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.",
new_failed_primary_rank, added_delay);
}

if (myselfIsBestRankedReplica()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "Myself become the best ranked replica, initiate the election immediately.");
}
}

/* Return ASAP if we can't still start the election. */
Expand Down
6 changes: 6 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */
#define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */
#define CLUSTER_NODE_MY_PRIMARY_FAIL (1 << 13) /* myself is a replica and my primary is FAIL in my view.
* myself will gossip this flag to other replica in the
* shard so that the replicas can make a better ranking
* decisions to help with the failover. */

#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand All @@ -70,6 +75,7 @@ typedef struct clusterLink {
#define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED)
#define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))
#define nodePrimaryIsFail(n) ((n)->flags & CLUSTER_NODE_MY_PRIMARY_FAIL)

/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
Expand Down
19 changes: 17 additions & 2 deletions tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ proc verify_log_message {srv_idx pattern from_line} {
incr from_line
set result [exec tail -n +$from_line < [srv $srv_idx stdout]]
if {![string match $pattern $result]} {
fail "expected message not found in log file: $pattern"
fail "expected message not found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern"
}
}

Expand All @@ -210,7 +210,7 @@ proc verify_no_log_message {srv_idx pattern from_line} {
incr from_line
set result [exec tail -n +$from_line < [srv $srv_idx stdout]]
if {[string match $pattern $result]} {
fail "expected message found in log file: $pattern"
fail "expected message found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern"
}
}

Expand Down Expand Up @@ -1252,3 +1252,18 @@ proc bp {{s {}}} {
puts $res
}
}

proc memcmp {string1 string2} {
set len1 [string length $string1]
set len2 [string length $string2]
set minLen [expr min($len1, $len2)]

for {set i 0} {$i < $minLen} {incr i} {
set char1 [scan [string index $string1 $i] %c]
set char2 [scan [string index $string2 $i] %c]
if {$char1 != $char2} {
return [expr {$char1 - $char2}]
}
}
return [expr {$len1 - $len2}]
}
Loading
Loading