Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2302,7 +2302,11 @@ int rewriteSlotToAppendOnlyFileRio(rio *aof, int db_num, int hashslot, size_t *k
if (key_count && ((*key_count)++ & 1023) == 0) {
long long now = mstime();
if (now - updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_count, "AOF rewrite");
if (aof->flags & RIO_FLAG_SLOT_MIGRATION_AOF) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_count, "Slot migration");
} else {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_count, "AOF rewrite");
}
updated_time = now;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/childinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_up
server.stat_rdb_cow_bytes = server.stat_current_cow_peak;
} else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) {
server.stat_module_cow_bytes = server.stat_current_cow_peak;
} else if (information_type == CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE) {
server.stat_slot_migration_cow_bytes = server.stat_current_cow_peak;
}
}

Expand Down
159 changes: 125 additions & 34 deletions src/cluster_migrateslots.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include "module.h"
#include "functions.h"

#include <sys/wait.h>
#include <fcntl.h>

typedef enum slotMigrationJobState {
/* Importing states */
SLOT_IMPORT_WAIT_ACK,
Expand Down Expand Up @@ -74,6 +77,7 @@ typedef struct slotMigrationJob {
* cleanup is done. */
sds description; /* Description, used for
* logging. */
size_t stat_cow_bytes; /* Copy on write bytes during slot migration fork. */

/* State needed during client establishment */
connection *conn; /* Connection to slot import source node. */
Expand Down Expand Up @@ -1235,9 +1239,8 @@ bool shouldRewriteHashtableIndex(int didx, hashtable *ht, void *privdata) {
}

/* Contains the logic run on the child process during the snapshot phase. */
int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
UNUSED(req);
list *slot_ranges = privdata;
int childSnapshotForSyncSlot(rio *aof, slotMigrationJob *job) {
list *slot_ranges = job->slot_ranges;
size_t key_count = 0;
for (int db_num = 0; db_num < server.dbnum; db_num++) {
listIter li;
Expand All @@ -1247,40 +1250,128 @@ int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
slotRange *r = (slotRange *)ln->value;
for (int slot = r->start_slot; slot <= r->end_slot; slot++) {
if (rewriteSlotToAppendOnlyFileRio(
rdb, db_num, slot, &key_count) == C_ERR) return C_ERR;
aof, db_num, slot, &key_count) == C_ERR) return C_ERR;
}
}
}
rioWrite(rdb, "*3\r\n", 4);
rioWriteBulkString(rdb, "CLUSTER", 7);
rioWriteBulkString(rdb, "SYNCSLOTS", 9);
rioWriteBulkString(rdb, "SNAPSHOT-EOF", 12);
rioWrite(aof, "*3\r\n", 4);
rioWriteBulkString(aof, "CLUSTER", 7);
rioWriteBulkString(aof, "SYNCSLOTS", 9);
rioWriteBulkString(aof, "SNAPSHOT-EOF", 12);
return C_OK;
}

/* Kill the slot migration child using SIGUSR1 (so that the parent will know
* the child did not exit for an error, but because we wanted), and performs
* the cleanup needed. */
void killSlotMigrationChild(void) {
/* No slot migration child? return. */
if (server.child_type != CHILD_TYPE_SLOT_MIGRATION) return;
serverLog(LL_NOTICE, "Killing running slot migration child: %ld", (long)server.child_pid);

/* Because we are not using here waitpid (like we have in killAppendOnlyChild
* and TerminateModuleForkChild), all the cleanup operations is done by
* checkChildrenDone, that later will find that the process killed. */
kill(server.child_pid, SIGUSR1);
}

/* Begin the snapshot for the provided job in a child process. */
int slotExportJobBeginSnapshot(slotMigrationJob *job) {
connection **conns = zmalloc(sizeof(connection *));
*conns = job->client->conn;
rdbSnapshotOptions opts = {
.connsnum = 1,
.conns = conns,
.use_pipe = 1,
.req = REPLICA_REQ_NONE,
.skip_checksum = 1,
.snapshot_func = childSnapshotForSyncSlot,
.privdata = job->slot_ranges};
if (saveSnapshotToConnectionSockets(opts) != C_OK) {
int slotExportJobBeginSnapshotToTargetSocket(slotMigrationJob *job) {
if (hasActiveChildProcess()) return C_ERR;

pid_t childpid;
int pipefds[2], slot_migration_pipe_write = -1, safe_to_exit_pipe = -1;
serverAssert(server.slot_migration_pipe_read == -1 && server.slot_migration_child_exit_pipe == -1);

/* Before to fork, create a pipe that is used to transfer the slot data bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.slot_migration_pipe_read = pipefds[0]; /* read end */
slot_migration_pipe_write = pipefds[1]; /* write end */

/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(slot_migration_pipe_write);
close(server.slot_migration_pipe_read);
server.slot_migration_pipe_read = -1;
return C_ERR;
}
if (server.debug_pause_after_fork) debugPauseProcess();
return C_OK;
safe_to_exit_pipe = pipefds[0]; /* read end */
server.slot_migration_child_exit_pipe = pipefds[1]; /* write end */

server.slot_migration_pipe_conn = job->client->conn;

if ((childpid = serverFork(CHILD_TYPE_SLOT_MIGRATION)) == 0) {
/* Child */
rio aof;
aof.flags |= RIO_FLAG_SLOT_MIGRATION_AOF;
rioInitWithFd(&aof, slot_migration_pipe_write);
/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
close(server.rdb_pipe_read);

serverSetProcTitle("valkey-slot-migration-to-target");
serverSetCpuAffinity(server.bgsave_cpulist);

int retval = childSnapshotForSyncSlot(&aof, job);
if (retval == C_OK && rioFlush(&aof) == 0) retval = C_ERR;
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE, "Slot migration");
}
rioFreeFd(&aof);
/* wake up the reader, tell it we're done. */
close(slot_migration_pipe_write);
close(server.slot_migration_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
ssize_t dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
serverLog(LL_WARNING, "Can't begin slot migration snapshot in background: fork: %s", strerror(errno));
close(slot_migration_pipe_write);
close(server.slot_migration_pipe_read);
close(server.slot_migration_child_exit_pipe);
server.slot_migration_pipe_conn = NULL;
return C_ERR;
}

serverLog(LL_NOTICE, "Started child process %ld for slot migration %s", (long)childpid, job->description);
close(slot_migration_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE, slotMigrationPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
close(safe_to_exit_pipe);
if (server.debug_pause_after_fork) debugPauseProcess();
return C_OK;
}
return C_OK; /* Unreached. */
}

/* Callback triggered after snapshot is finished. We either begin sending the
* incremental contents or fail the associated migration. */
void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr) {
if (!server.cluster_enabled) return;
/* When a background slot migration terminates, call the right handler. */
void backgroundSlotMigrationDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE, "Background SLOT MIGRATION transfer terminated with success");
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background SLOT MIGRATION transfer error");
} else {
serverLog(LL_WARNING, "Background SLOT MIGRATION transfer terminated by signal %d", bysignal);
}
if (server.slot_migration_child_exit_pipe != -1) close(server.slot_migration_child_exit_pipe);
if (server.slot_migration_pipe_read > 0) {
aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
close(server.slot_migration_pipe_read);
}
server.slot_migration_child_exit_pipe = -1;
server.slot_migration_pipe_read = -1;
server.slot_migration_pipe_conn = NULL;
zfree(server.slot_migration_pipe_buff);
server.slot_migration_pipe_buff = NULL;
server.slot_migration_pipe_bufflen = 0;

listIter li;
listNode *ln;
listRewind(server.cluster->slot_migration_jobs, &li);
Expand All @@ -1290,8 +1381,9 @@ void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr) {
if (job->state != SLOT_EXPORT_SNAPSHOTTING) {
continue;
}
if (bgsaveerr == C_OK) {
if (!bysignal && exitcode == 0) {
slotExportBeginStreaming(job);
job->stat_cow_bytes = server.stat_slot_migration_cow_bytes;
} else {
serverLog(LL_WARNING,
"Child process failed to snapshot slot migration %s",
Expand Down Expand Up @@ -1652,7 +1744,7 @@ void proceedWithSlotMigration(slotMigrationJob *job) {
serverLog(LL_NOTICE,
"Beginning snapshot of slot migration %s.",
job->description);
if (slotExportJobBeginSnapshot(job) == C_ERR) {
if (slotExportJobBeginSnapshotToTargetSocket(job) == C_ERR) {
serverLog(LL_WARNING,
"Slot migration %s failed to start slot snapshot",
job->description);
Expand Down Expand Up @@ -1749,10 +1841,6 @@ void resetSlotMigrationJob(slotMigrationJob *job) {

sdsfree(job->response_buf);
job->response_buf = NULL;

/* Description is not needed once migration is finished */
sdsfree(job->description);
job->description = NULL;
}

void freeSlotMigrationJob(void *o) {
Expand All @@ -1762,6 +1850,7 @@ void freeSlotMigrationJob(void *o) {
sdsfree(job->slot_ranges_str);
sdsfree(job->status_msg);
sdsfree(job->response_buf);
sdsfree(job->description);
zfree(o);
}

Expand Down Expand Up @@ -1909,7 +1998,7 @@ void finishSlotMigrationJob(slotMigrationJob *job,
slotExportTryUnpause();
/* Fast fail the child process, which will be cleaned up fully in
* checkChildrenDone. */
if (job->state == SLOT_EXPORT_SNAPSHOTTING) killRDBChild();
if (job->state == SLOT_EXPORT_SNAPSHOTTING) killSlotMigrationChild();
}
if (job->type == SLOT_MIGRATION_IMPORT &&
job->state != SLOT_MIGRATION_JOB_SUCCESS) {
Expand Down Expand Up @@ -1954,7 +2043,7 @@ void clusterCommandGetSlotMigrations(client *c) {
listRewind(server.cluster->slot_migration_jobs, &li);
while ((ln = listNext(&li)) != NULL) {
slotMigrationJob *job = ln->value;
addReplyMapLen(c, 9);
addReplyMapLen(c, 10);
addReplyBulkCString(c, "name");
addReplyBulkCBuffer(c, job->name, CLUSTER_NAMELEN);
addReplyBulkCString(c, "operation");
Expand All @@ -1975,6 +2064,8 @@ void clusterCommandGetSlotMigrations(client *c) {
addReplyBulkCString(c, slotMigrationJobStateToString(job->state));
addReplyBulkCString(c, "message");
addReplyBulkCString(c, job->status_msg ? job->status_msg : "");
addReplyBulkCString(c, "cow_size");
addReplyLongLong(c, (long long)job->stat_cow_bytes);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also expose output buffer size in here? We may be missing this information right now to allow people to monitor its progress. (Or we could do this in another PR, by exposing both the import slot client and the export slot client in the client info, i.e. adding a client flag. But i can't think of a good flag char, since import source already taken 'I' char)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably go with 'i' and 'e', stand for import or export. But no_evict already taken 'e'.

So we can go with 'i' and 'm', stand for importing or migrating as the old word.s

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To monitor it, use CLIENT LIST? 🤔 I guess it's possible, yes, but maybe it's easier to use if we put some progress information in CLUSTER SLOTMIGRATIONS.

Maybe we can do it at the same time as #2504, if we want valkey-cli to print some progress indicator in interactive mode (if stdout is a TTY).

}
}

Expand Down
3 changes: 2 additions & 1 deletion src/cluster_migrateslots.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void clusterCommandMigrateSlots(client *c);
void clusterCommandSyncSlots(client *c);
void clusterCommandGetSlotMigrations(client *c);
void clusterCommandCancelSlotMigrations(client *c);
void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr);
void backgroundSlotMigrationDoneHandler(int exitcode, int bysignal);
void clusterUpdateSlotExportsOnOwnershipChange(void);
void clusterUpdateSlotImportsOnOwnershipChange(void);
void clusterCleanupSlotMigrationLog(void);
Expand All @@ -33,5 +33,6 @@ size_t clusterGetTotalSlotExportBufferMemory(void);
bool clusterSlotFailoverGranted(int slot);
void clusterFailAllSlotExportsWithMessage(char *message);
void clusterHandleSlotMigrationErrorResponse(slotMigrationJob *job);
void killSlotMigrationChild(void);

#endif /* __CLUSTER_MIGRATESLOTS_H */
10 changes: 9 additions & 1 deletion src/commands/cluster-getslotmigrations.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
"pattern": "^([0-9]+-[0-9]+)( [0-9]+-[0-9]+)*$"
},
"node": {
"type": "string"
"description": "The source node name in an import job or the target node name in an export job.",
"type": "string",
"pattern": "^[0-9a-fA-F]{40}$"
},
"create_time": {
"description": "Creation time, in seconds since the unix epoch.",
Expand All @@ -55,10 +57,16 @@
"type": "integer"
},
"state": {
"description": "Human readable string representing the migration job state.",
"type": "string"
},
"message": {
"description": "Human readable status message with more details.",
"type": "string"
},
"cow_size": {
"description": "Copy on write bytes during slot migration fork.",
"type": "integer"
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ int getFlushCommandFlags(client *c, int *flags) {
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyData(-1, flags, NULL);
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
if (server.saveparamslen > 0) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
Expand Down
4 changes: 4 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13286,6 +13286,10 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
* will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();

/* Kill existing slot migration fork as it is saving outdated data. Also killing it
* will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();

emptyData(-1, EMPTYDB_NO_FLAGS, NULL);

/* rdbLoad() can go back to the networking and process network events. If
Expand Down
Loading
Loading