Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6ab8888
Trigger manual failover on SIGTERM / shutdown to cluster primary
enjoy-binbin Sep 30, 2024
4b49f03
fix typo
enjoy-binbin Sep 30, 2024
f9ca731
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Oct 6, 2024
df0ef8d
add comment in the test
enjoy-binbin Oct 6, 2024
594fd5a
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Oct 21, 2024
519eb2a
removing mf_is_primary_failover
enjoy-binbin Oct 21, 2024
32043dd
try to fix test
enjoy-binbin Oct 27, 2024
e7b33fa
try to stable the test
enjoy-binbin Oct 27, 2024
d6649e5
Move the logic to clusterHandleServerShutdown
enjoy-binbin Oct 27, 2024
64831c9
Adjust the tests
enjoy-binbin Oct 28, 2024
b06a8c4
Do shutdown failover only when offset is match
enjoy-binbin Oct 28, 2024
5f7b429
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Jan 8, 2025
e56a360
remove count++ and fix confilct
enjoy-binbin Jan 8, 2025
0ccc4e4
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Jan 23, 2025
c9bfd69
CLUSTER FAILOVER replicaid node-id
enjoy-binbin Jan 23, 2025
c8037a1
code review v1
enjoy-binbin Jan 24, 2025
d70036b
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Feb 5, 2025
7d55db6
code review: remove error reply check, add cross-version test
enjoy-binbin Feb 5, 2025
4d5da8a
fix format and add log to debug the test
enjoy-binbin Feb 7, 2025
a1f957c
Update valkey.conf
enjoy-binbin Feb 7, 2025
37147e8
minor fixes in regexp, avoid matching the second line
enjoy-binbin Feb 7, 2025
bf60ed6
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Feb 7, 2025
27b6f6d
code review from Ping
enjoy-binbin Feb 11, 2025
61dd999
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Feb 11, 2025
8423921
Fix test
enjoy-binbin Feb 11, 2025
ed8c9bb
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Feb 25, 2025
9e00910
change to use replconf set-cluster-node-id
enjoy-binbin Feb 25, 2025
8cba555
Update valkey.conf
enjoy-binbin Feb 25, 2025
ade48cb
Change nodeid to sds
enjoy-binbin Feb 25, 2025
6b5cf7f
code review from Ping, add the assert <= 128
enjoy-binbin Feb 25, 2025
e3fdb7c
Fix the index
enjoy-binbin Feb 25, 2025
9c3d47e
Apply suggestions from code review
enjoy-binbin Mar 25, 2025
9521f5f
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Mar 25, 2025
c367470
Remove tmp file
enjoy-binbin Apr 7, 2025
5e88fd3
Remove tmp file
enjoy-binbin Apr 7, 2025
2cd1832
Code review
enjoy-binbin Apr 7, 2025
d2bf07f
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
enjoy-binbin Apr 7, 2025
533e6a6
Update tests/unit/cluster/auto-failover-on-shutdown.tcl
enjoy-binbin Apr 7, 2025
3b18c45
update valkey.conf, review from Ping
enjoy-binbin Apr 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
* Cluster exported API.
*----------------------------------------------------------------------------*/

#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */

/* Reason why the cluster state changes to fail. When adding new reasons,
* make sure to update clusterLogFailReason. */
Expand Down
94 changes: 86 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1254,8 +1254,68 @@ void clusterInitLast(void) {
}
}

void clusterAutoFailoverOnShutdown(void) {
if (!nodeIsPrimary(myself) || !server.auto_failover_on_shutdown) return;

/* Find the first best replica, that is, the replica with the largest offset. */
int legacy_replica = 0;
client *best_replica = NULL;
listIter replicas_iter;
listNode *replicas_list_node;
listRewind(server.replicas, &replicas_iter);
while ((replicas_list_node = listNext(&replicas_iter)) != NULL) {
client *replica = listNodeValue(replicas_list_node);
/* This is done only when the replica offset is caught up, to avoid data loss.
* And 0x90000 is 9.0.0, we only support this feature in this version. */
if (replica->repl_data->replica_version < 0x90000) {
legacy_replica = 1;
best_replica = NULL;
break;
}
if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE &&
replica->repl_data->repl_ack_off == server.primary_repl_offset &&
replica->repl_data->replica_nodeid && sdslen(replica->repl_data->replica_nodeid) == CLUSTER_NAMELEN) {
best_replica = replica;
}
}

/* We are not able to find the replica to do the auto failover. */
if (best_replica == NULL) {
if (legacy_replica) {
serverLog(LL_NOTICE, "Unable to perform auto failover on shutdown since there are legacy replicas.");
} else {
serverLog(LL_NOTICE, "Unable to find a replica to perform the auto failover on shutdown.");
}
return;
}

/* Send the CLUSTER FAILOVER FORCE REPLICAID node-id to all replicas since
* it is a shared replication buffer, but only the replica with the matching
* node-id will execute it. The caller will call flushReplicasOutputBuffers,
* so in here it is a best effort. */
char buf[128];
size_t buflen = snprintf(buf, sizeof(buf),
"*5\r\n$7\r\nCLUSTER\r\n"
"$8\r\nFAILOVER\r\n"
"$5\r\nFORCE\r\n"
"$9\r\nREPLICAID\r\n"
"$%d\r\n%.*s\r\n",
CLUSTER_NAMELEN,
CLUSTER_NAMELEN,
best_replica->repl_data->replica_nodeid);
serverAssert(buflen <= 128);
/* Must install write handler for all replicas first before feeding
* replication stream. */
prepareReplicasToWrite();
feedReplicationBuffer(buf, buflen);
serverLog(LL_NOTICE, "Perform auto failover to replica %s on shutdown.", best_replica->repl_data->replica_nodeid);
}

/* Called when a cluster node receives SHUTDOWN. */
void clusterHandleServerShutdown(void) {
/* Check if we are able to do the auto failover on shutdown. */
clusterAutoFailoverOnShutdown();

/* The error logs have been logged in the save function if the save fails. */
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
clusterSaveConfig(1);
Expand Down Expand Up @@ -7103,22 +7163,37 @@ int clusterCommandSpecial(client *c) {
} else {
addReplyLongLong(c, clusterNodeFailureReportsCount(n));
}
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc == 2 || c->argc == 3)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc >= 2)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [REPLICAID <NODE ID>]
* REPLICAID is currently available only for internal so we won't
* put it into the JSON file. */
Comment on lines +7168 to +7169
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should just not document it. Someone will find it and use it, so I think removing it becomes a breaking change. All other arguments, including other internal commands, are documented in the json files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we document it, I think it will mostly be confusing to users.

Users might think they can send it to the primary like with standalone FAILOVER TO, but it's not replicated so it doesn't work.

I agree though that even if we don't document it, we probably need to keep it around.

All other arguments, including other internal commands, are documented in the json files.

REPLCONF has no arguments in the JSON file. The page just says "Usage: REPLCONF". So this will not be the only undocumented argument.

int force = 0, takeover = 0;
robj *replicaid = NULL;

if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr, "force")) {
for (int j = 2; j < c->argc; j++) {
int moreargs = (c->argc - 1) - j;
if (!strcasecmp(c->argv[j]->ptr, "force")) {
force = 1;
} else if (!strcasecmp(c->argv[2]->ptr, "takeover")) {
} else if (!strcasecmp(c->argv[j]->ptr, "takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else if (c == server.primary && !strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) {
/* This option is currently available only for primary. */
j++;
replicaid = c->argv[j];
} else {
addReplyErrorObject(c, shared.syntaxerr);
return 1;
}
}

/* Check if it should be executed by myself. */
if (replicaid != NULL && memcmp(replicaid->ptr, myself->name, CLUSTER_NAMELEN) != 0) {
/* Ignore this command, including the sanity check and the process. */
addReply(c, shared.ok);
return 1;
}

/* Check preconditions. */
if (clusterNodeIsPrimary(myself)) {
addReplyError(c, "You should send CLUSTER FAILOVER to a replica");
Expand All @@ -7127,8 +7202,7 @@ int clusterCommandSpecial(client *c) {
addReplyError(c, "I'm a replica but my master is unknown to me");
return 1;
} else if (!force && (nodeFailed(myself->replicaof) || myself->replicaof->link == NULL)) {
addReplyError(c, "Master is down or failed, "
"please use CLUSTER FAILOVER FORCE");
addReplyError(c, "Master is down or failed, please use CLUSTER FAILOVER FORCE");
return 1;
}
resetManualFailover();
Expand All @@ -7147,7 +7221,11 @@ int clusterCommandSpecial(client *c) {
/* If this is a forced failover, we don't need to talk with our
* primary to agree about the offset. We just failover taking over
* it without coordination. */
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
if (c == server.primary) {
serverLog(LL_NOTICE, "Forced failover primary request accepted (primary request from '%s').", client);
} else {
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
}
manualFailoverCanStart();
/* We can start a manual failover as soon as possible, setting a flag
* here so that we don't need to waiting for the cron to kick in. */
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3193,6 +3193,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("auto-failover-on-shutdown", NULL, MODIFIABLE_CONFIG, server.auto_failover_on_shutdown, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
52 changes: 52 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,7 @@ void freeClientReplicationData(client *c) {
}
if (c->flag.primary) replicationHandlePrimaryDisconnection();
sdsfree(c->repl_data->replica_addr);
sdsfree(c->repl_data->replica_nodeid);
zfree(c->repl_data);
c->repl_data = NULL;
}
Expand Down Expand Up @@ -1351,6 +1352,13 @@ void freeClientReplicationData(client *c) {
* - rdb-channel <1|0>
* Used to identify the client as a replica's rdb connection in an dual channel
* sync session.
*
* - set-rdb-client-id <client-id>
* Used to identify the current replica main channel with existing rdb-connection
* with the given id.
*
* - set-cluster-node-id <node-id>
* Used to inform the primary of the node-id of the replica in cluster mode.
* */
void replconfCommand(client *c) {
int j;
Expand Down Expand Up @@ -1500,6 +1508,21 @@ void replconfCommand(client *c) {
return;
}
c->repl_data->associated_rdb_client_id = (uint64_t)client_id;
} else if (!strcasecmp(c->argv[j]->ptr, "set-cluster-node-id")) {
/* REPLCONF SET-CLUSTER-NODE-ID <node-id> */
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}

clusterNode *n = clusterLookupNode(c->argv[j + 1]->ptr, sdslen(c->argv[j + 1]->ptr));
if (!n) {
addReplyErrorFormat(c, "Unknown node %s", (char *)c->argv[j + 1]->ptr);
return;
}

if (c->repl_data->replica_nodeid) sdsfree(c->repl_data->replica_nodeid);
c->repl_data->replica_nodeid = sdsdup(c->argv[j + 1]->ptr);
} else {
addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr);
return;
Expand Down Expand Up @@ -3621,6 +3644,14 @@ void syncWithPrimary(connection *conn) {
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) node name. */
if (server.cluster_enabled) {
char *argv[] = {"REPLCONF", "SET-CLUSTER-NODE-ID", server.cluster->myself->name};
size_t lens[] = {strlen(argv[0]), strlen(argv[1]), CLUSTER_NAMELEN};
err = sendCommandArgv(conn, 3, argv, lens);
if (err) goto write_error;
}

server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
Expand Down Expand Up @@ -3711,6 +3742,27 @@ void syncWithPrimary(connection *conn) {
}
sdsfree(err);
err = NULL;
if (server.cluster_enabled) {
server.repl_state = REPL_STATE_RECEIVE_NODEID_REPLY;
return;
} else {
server.repl_state = REPL_STATE_SEND_PSYNC;
}
}

/* Receive REPLCONF SET-CLUSTER-NODE-ID reply. */
if (server.repl_state == REPL_STATE_RECEIVE_NODEID_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, we don't care if it failed, it is best effort. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF SET-CLUSTER-NODE-ID: %s",
err);
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}

Expand Down
6 changes: 3 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4628,16 +4628,16 @@ int finishShutdown(void) {
unlink(server.pidfile);
}

/* Handle cluster-related matters when shutdown. */
if (server.cluster_enabled) clusterHandleServerShutdown();

/* Best effort flush of replica output buffers, so that we hopefully
* send them pending writes. */
flushReplicasOutputBuffers();

/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);

/* Handle cluster-related matters when shutdown. */
if (server.cluster_enabled) clusterHandleServerShutdown();

serverLog(LL_WARNING, "%s is now ready to exit, bye bye...", server.sentinel_mode ? "Sentinel" : "Valkey");
return C_OK;

Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ typedef enum {
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_NODEID_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
Expand Down Expand Up @@ -549,7 +550,6 @@ typedef enum {
#define MAXMEMORY_FLAG_LRU (1 << 0)
#define MAXMEMORY_FLAG_LFU (1 << 1)
#define MAXMEMORY_FLAG_ALLKEYS (1 << 2)
#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU)

#define MAXMEMORY_VOLATILE_LRU ((0 << 8) | MAXMEMORY_FLAG_LRU)
#define MAXMEMORY_VOLATILE_LFU ((1 << 8) | MAXMEMORY_FLAG_LFU)
Expand Down Expand Up @@ -1154,6 +1154,7 @@ typedef struct ClientReplicationData {
see the definition of replBufBlock. */
size_t ref_block_pos; /* Access position of referenced buffer block,
i.e. the next offset to send. */
sds replica_nodeid; /* Node id in cluster mode. */
} ClientReplicationData;

typedef struct ClientModuleData {
Expand Down Expand Up @@ -2084,6 +2085,7 @@ struct valkeyServer {
unsigned long cluster_blacklist_ttl; /* Duration in seconds that a node is denied re-entry into
* the cluster after it is forgotten with CLUSTER FORGET. */
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
int auto_failover_on_shutdown; /* Trigger manual failover on shutdown to primary. */
mstime_t cluster_mf_timeout; /* Milliseconds to do a manual failover. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
Expand Down Expand Up @@ -2907,6 +2909,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);

/* Replication */
int prepareReplicasToWrite(void);
void replicationFeedReplicas(int dictid, robj **argv, int argc);
void replicationFeedStreamFromPrimaryStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
Expand Down
25 changes: 25 additions & 0 deletions tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,31 @@ proc wait_replica_online r {
}
}

proc check_replica_acked_ofs {primary replica_host replica_port} {
set infostr [$primary info replication]
set master_repl_offset [getInfoProperty $infostr master_repl_offset]
if {[regexp -lineanchor "^slave\\d+:ip=$replica_host,port=$replica_port,state=online,offset=(\\d+).*\r\n" $infostr _ offset]} {
if {$master_repl_offset == $offset} {
return 1
}
return 0
}
return 0
}

proc wait_replica_acked_ofs {primary replica replica_host replica_port} {
$primary config set repl-ping-replica-period 3600
$replica config set hz 500
wait_for_condition 1000 50 {
[check_replica_acked_ofs $primary $replica_host $replica_port] eq 1
} else {
puts "INFO REPLICATION: [$primary info replication]"
fail "replica $replica_host:$replica_port acked offset didn't match in time"
}
$primary config set repl-ping-replica-period 10
$replica config set hz 10
}

proc wait_for_ofs_sync {r1 r2} {
wait_for_condition 50 100 {
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
Expand Down
Loading
Loading