Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 99 additions & 17 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countChannelsInSlot(unsigned int hashslot);
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
list *clusterLookupNodeListByShardId(const char *shard_id);
void clusterRemoveNodeFromShard(clusterNode *node);
int auxShardIdSetter(clusterNode *n, void *value, size_t length);
sds auxShardIdGetter(clusterNode *n, sds s);
Expand Down Expand Up @@ -127,13 +126,22 @@ sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
void clusterCommandFlushslot(client *c);
int clusterAllReplicasThinkPrimaryIsFail(void);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
static inline int clusterNodeIsVotingPrimary(clusterNode *n) {
return (n->flags & CLUSTER_NODE_PRIMARY) && n->numslots;
}

/* Returns if myself is the best ranked replica in an automatic failover process. */
static inline int myselfIsBestRankedReplica(void) {
return (server.cluster->mf_end == 0 &&
server.cluster->failover_auth_rank == 0 &&
server.cluster->failover_failed_primary_rank == 0 &&
clusterAllReplicasThinkPrimaryIsFail());
}

int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
}
Expand Down Expand Up @@ -2164,6 +2172,23 @@ int clusterBlacklistExists(char *nodeid, size_t len) {
* CLUSTER messages exchange - PING/PONG and gossip
* -------------------------------------------------------------------------- */

/* Marks a node as FAIL. Apart from clusterLoadConfig, this is the only way we mark a
* node as FAIL during runtime. */
void markNodeAsFailing(clusterNode *node) {
/* Mark the node as FAIL. */
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
/* Remove the PFAIL flag. */
node->flags &= ~CLUSTER_NODE_PFAIL;

/* Immediately check if the failing node is our primary node. */
if (nodeIsReplica(myself) && myself->replicaof == node) {
myself->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
}

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}

/* This function checks if a given node should be marked as FAIL.
* It happens if the following conditions are met:
*
Expand Down Expand Up @@ -2200,17 +2225,14 @@ void markNodeAsFailingIfNeeded(clusterNode *node) {
serverLog(LL_NOTICE, "Marking node %.40s (%s) as failing (quorum reached).", node->name, node->human_nodename);

/* Mark the node as failing. */
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
markNodeAsFailing(node);

/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL.
* We do that even if this node is a replica and not a primary: anyway
* the failing state is triggered collecting failure reports from primaries,
* so here the replica is only helping propagating this status. */
clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}

/* This function is called only if a node is marked as FAIL, but we are able
Expand All @@ -2227,6 +2249,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
serverLog(LL_NOTICE, "Clear FAIL state for node %.40s (%s): %s is reachable again.", node->name,
node->human_nodename, nodeIsReplica(node) ? "replica" : "primary without slots");
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}

Expand All @@ -2241,6 +2264,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
"Clear FAIL state for node %.40s (%s): is reachable again and nobody is serving its slots after some time.",
node->name, node->human_nodename);
node->flags &= ~CLUSTER_NODE_FAIL;
if (nodeIsReplica(myself) && myself->replicaof == node) node->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}
}
Expand Down Expand Up @@ -3319,19 +3343,28 @@ int clusterProcessPacket(clusterLink *link) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}

/* Checks if the node supports light message hdr */
/* Store some flags about the sender. */
if (sender) {
/* Check if the node supports light publish message hdr */
if (flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
}

/* Check if the node supports light module message hdr */
if (flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
}

/* Check if the sender has marked its primary node as FAIL. */
if (flags & CLUSTER_NODE_MY_PRIMARY_FAIL) {
sender->flags |= CLUSTER_NODE_MY_PRIMARY_FAIL;
} else {
sender->flags &= ~CLUSTER_NODE_MY_PRIMARY_FAIL;
}
}

/* Update the last time we saw any data from this node. We
Expand Down Expand Up @@ -3539,12 +3572,10 @@ int clusterProcessPacket(clusterLink *link) {
* the clients, and the replica will never initiate a failover since the
* node is not actually in FAIL state. */
if (!nodeFailed(noaddr_node)) {
noaddr_node->flags &= ~CLUSTER_NODE_PFAIL;
noaddr_node->flags |= CLUSTER_NODE_FAIL;
noaddr_node->fail_time = now;
markNodeAsFailing(noaddr_node);
clusterSendFail(noaddr_node->name);
}
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0;
}
}
Expand Down Expand Up @@ -3773,10 +3804,7 @@ int clusterProcessPacket(clusterLink *link) {
if (failing && !(failing->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_MYSELF))) {
serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", hdr->sender,
sender->human_nodename, hdr->data.fail.about.nodename, failing->human_nodename);
failing->flags |= CLUSTER_NODE_FAIL;
failing->fail_time = now;
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
markNodeAsFailing(failing);
}
} else {
serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender,
Expand Down Expand Up @@ -4595,6 +4623,16 @@ void clusterRequestFailoverAuth(void) {
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the primary is working. */
if (server.cluster->mf_end) msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;

/* If this is an automatic failover and if myself is the best ranked replica,
* set the CLUSTERMSG_FLAG0_FORCEACK bit in the header as well.
*
* In this case, we hope that other primary nodes will not refuse to vote because
* they did not receive the FAIL message in time. */
if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) {
msgblock->data[0].msg.mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
}

clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down Expand Up @@ -4787,6 +4825,29 @@ int clusterGetFailedPrimaryRank(void) {
return rank;
}


/* Returns 1 if all replicas under my primary think the primary is in FAIL state.
*
* This is useful in automatic failover. For example, from my perspective,
* if all other replicas, including myself, both mark the primary node as FAIL,
* which means that myself and other replicas have exchanged new gossip information
* after the primary node went down, and we know the latest replication offset of
* the replicas. If a replica finds that its ranking is optimal in all cases, then
* the replica can initiate an election immediately in automatic failover without
* waiting for the delay. */
int clusterAllReplicasThinkPrimaryIsFail(void) {
serverAssert(nodeIsReplica(myself));
serverAssert(myself->replicaof);

clusterNode *primary = myself->replicaof;
for (int i = 0; i < primary->num_replicas; i++) {
if (!nodePrimaryIsFail(primary->replicas[i])) {
return 0;
}
}
return 1;
}

/* This function is called by clusterHandleReplicaFailover() in order to
* let the replica log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
Expand Down Expand Up @@ -4982,10 +5043,20 @@ void clusterHandleReplicaFailover(void) {
server.cluster->failover_auth_time = now;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_failed_primary_rank = 0;
/* Reset auth_age since it is outdated now and we can bypass the auth_timeout
}

if (server.cluster->mf_end == 0 && myselfIsBestRankedReplica()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "This is the best ranked replica and can initiate the election immediately.");
}

if (server.cluster->failover_auth_time == now) {
/* If we happen to initiate a failover (automatic or manual) immediately.
* Reset auth_age since it is outdated now and we can bypass the auth_timeout
* check in the next state and start the election ASAP. */
auth_age = 0;
}

serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
"(rank #%d, primary rank #%d, offset %lld).",
Expand All @@ -5009,8 +5080,13 @@ void clusterHandleReplicaFailover(void) {
* It is also possible that we received the message that telling a
* shard is up. Update the delay if our failed_primary_rank changed.
*
* It is also possible that we received more message and then we figure
* out myself is the best ranked replica, in this case, we can initiate
* the election immediately.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0) {
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0 &&
server.cluster->failover_auth_time != now) {
int newrank = clusterGetReplicaRank();
if (newrank != server.cluster->failover_auth_rank) {
long long added_delay = (newrank - server.cluster->failover_auth_rank) * 1000;
Expand All @@ -5028,6 +5104,11 @@ void clusterHandleReplicaFailover(void) {
serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.",
new_failed_primary_rank, added_delay);
}

if (myselfIsBestRankedReplica()) {
server.cluster->failover_auth_time = now;
serverLog(LL_NOTICE, "Myself become the best ranked replica, initiate the election immediately.");
}
}

/* Return ASAP if we can't still start the election. */
Expand Down Expand Up @@ -5574,7 +5655,8 @@ void clusterBeforeSleep(void) {
}
} else if (flags & CLUSTER_TODO_HANDLE_FAILOVER) {
/* Handle failover, this is needed when it is likely that there is already
* the quorum from primaries in order to react fast. */
* the quorum from primaries in order to react fast. Or when we determine
* that we can proceed with the failover. */
clusterHandleReplicaFailover();
}

Expand Down
6 changes: 6 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */
#define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */
#define CLUSTER_NODE_MY_PRIMARY_FAIL (1 << 13) /* myself is a replica and my primary is FAIL in my view.
* myself will gossip this flag to other replica in the
* shard so that the replicas can make a better ranking
* decisions to help with the failover. */

#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand All @@ -70,6 +75,7 @@ typedef struct clusterLink {
#define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED)
#define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))
#define nodePrimaryIsFail(n) ((n)->flags & CLUSTER_NODE_MY_PRIMARY_FAIL)

/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
Expand Down
19 changes: 17 additions & 2 deletions tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ proc verify_log_message {srv_idx pattern from_line} {
incr from_line
set result [exec tail -n +$from_line < [srv $srv_idx stdout]]
if {![string match $pattern $result]} {
fail "expected message not found in log file: $pattern"
fail "expected message not found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern"
}
}

Expand All @@ -210,7 +210,7 @@ proc verify_no_log_message {srv_idx pattern from_line} {
incr from_line
set result [exec tail -n +$from_line < [srv $srv_idx stdout]]
if {[string match $pattern $result]} {
fail "expected message found in log file: $pattern"
fail "expected message found in srv $srv_idx log file [srv $srv_idx stdout]: $pattern"
}
}

Expand Down Expand Up @@ -1252,3 +1252,18 @@ proc bp {{s {}}} {
puts $res
}
}

proc memcmp {string1 string2} {
set len1 [string length $string1]
set len2 [string length $string2]
set minLen [expr min($len1, $len2)]

for {set i 0} {$i < $minLen} {incr i} {
set char1 [scan [string index $string1 $i] %c]
set char2 [scan [string index $string2 $i] %c]
if {$char1 != $char2} {
return [expr {$char1 - $char2}]
}
}
return [expr {$len1 - $len2}]
}
Loading
Loading