Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/childinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_up
server.stat_rdb_cow_bytes = server.stat_current_cow_peak;
} else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) {
server.stat_module_cow_bytes = server.stat_current_cow_peak;
} else if (information_type == CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE) {
server.stat_slot_migration_cow_bytes = server.stat_current_cow_peak;
}
}

Expand Down
149 changes: 121 additions & 28 deletions src/cluster_migrateslots.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include "module.h"
#include "functions.h"

#include <sys/wait.h>
#include <fcntl.h>

typedef enum slotMigrationJobState {
/* Importing states */
SLOT_IMPORT_WAIT_ACK,
Expand Down Expand Up @@ -1219,9 +1222,8 @@ bool shouldRewriteHashtableIndex(int didx, hashtable *ht, void *privdata) {
}

/* Contains the logic run on the child process during the snapshot phase. */
int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
UNUSED(req);
list *slot_ranges = privdata;
int childSnapshotForSyncSlot(rio *aof, slotMigrationJob *job) {
list *slot_ranges = job->slot_ranges;
size_t key_count = 0;
for (int db_num = 0; db_num < server.dbnum; db_num++) {
listIter li;
Expand All @@ -1231,34 +1233,128 @@ int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
slotRange *r = (slotRange *)ln->value;
for (int slot = r->start_slot; slot <= r->end_slot; slot++) {
if (rewriteSlotToAppendOnlyFileRio(
rdb, db_num, slot, &key_count) == C_ERR) return C_ERR;
aof, db_num, slot, &key_count) == C_ERR) return C_ERR;
}
}
}
rioWrite(rdb, "*3\r\n", 4);
rioWriteBulkString(rdb, "CLUSTER", 7);
rioWriteBulkString(rdb, "SYNCSLOTS", 9);
rioWriteBulkString(rdb, "SNAPSHOT-EOF", 12);
rioWrite(aof, "*3\r\n", 4);
rioWriteBulkString(aof, "CLUSTER", 7);
rioWriteBulkString(aof, "SYNCSLOTS", 9);
rioWriteBulkString(aof, "SNAPSHOT-EOF", 12);
return C_OK;
}

void killSlotMigrationChild(void) {
int statloc;
/* No slot migration child? return. */
if (server.child_type != CHILD_TYPE_SLOT_MIGRATION) return;
/* Kill slot migration child, wait for child exit. */
serverLog(LL_NOTICE, "Killing running slot migration child: %ld", (long)server.child_pid);
if (kill(server.child_pid, SIGUSR1) != -1) {
while (waitpid(-1, &statloc, 0) != server.child_pid);
}
resetChildState();
serverLog(LL_NOTICE, "Slot migration child %ld killed", (long)server.child_pid);
clusterHandleSlotExportBackgroundSaveDone(C_ERR);
}

/* Begin the snapshot for the provided job in a child process. */
int slotExportJobBeginSnapshot(slotMigrationJob *job) {
connection **conns = zmalloc(sizeof(connection *));
*conns = job->client->conn;
rdbSnapshotOptions opts = {
.connsnum = 1,
.conns = conns,
.use_pipe = 1,
.req = REPLICA_REQ_NONE,
.skip_checksum = 1,
.snapshot_func = childSnapshotForSyncSlot,
.privdata = job->slot_ranges};
if (saveSnapshotToConnectionSockets(opts) != C_OK) {
int slotExportJobBeginSnapshotToTargetSocket(slotMigrationJob *job) {
if (hasActiveChildProcess()) return C_ERR;

pid_t childpid;
int pipefds[2], slot_migration_pipe_write = -1, safe_to_exit_pipe = -1;
serverAssert(server.slot_migration_pipe_read == -1 && server.slot_migration_child_exit_pipe == -1);

/* Before to fork, create a pipe that is used to transfer the slot data bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.slot_migration_pipe_read = pipefds[0]; /* read end */
slot_migration_pipe_write = pipefds[1]; /* write end */

/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(slot_migration_pipe_write);
close(server.slot_migration_pipe_read);
server.slot_migration_pipe_read = -1;
return C_ERR;
}
if (server.debug_pause_after_fork) debugPauseProcess();
return C_OK;
safe_to_exit_pipe = pipefds[0]; /* read end */
server.slot_migration_child_exit_pipe = pipefds[1]; /* write end */

server.slot_migration_pipe_conn = job->client->conn;

if ((childpid = serverFork(CHILD_TYPE_SLOT_MIGRATION)) == 0) {
/* Child */
rio aof;
rioInitWithFd(&aof, slot_migration_pipe_write);
/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
close(server.rdb_pipe_read);

serverSetProcTitle("valkey-slot-migration-to-target");
serverSetCpuAffinity(server.slot_migration_cpulist);

int retval = childSnapshotForSyncSlot(&aof, job);
if (retval == C_OK && rioFlush(&aof) == 0) retval = C_ERR;
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE, "slot migration");
}
rioFreeFd(&aof);
/* wake up the reader, tell it we're done. */
close(slot_migration_pipe_write);
close(server.slot_migration_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
ssize_t dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
serverLog(LL_WARNING, "Can't begin slot migration snapshot in background: fork: %s", strerror(errno));
close(slot_migration_pipe_write);
close(server.slot_migration_pipe_read);
close(server.slot_migration_child_exit_pipe);
server.slot_migration_pipe_conn = NULL;
return C_ERR;
}

serverLog(LL_NOTICE, "Started child process %ld for slot migration %s", (long)childpid, job->description);
close(slot_migration_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE, slotMigrationPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
close(safe_to_exit_pipe);
if (server.debug_pause_after_fork) debugPauseProcess();
return C_OK;
}
return C_OK; /* Unreached. */
}

/* When a background slot migration terminates, call the right handler. */
void backgroundSlotMigrationDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE, "Background SLOT MIGRATION transfer terminated with success");
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background SLOT MIGRATION transfer error");
} else {
serverLog(LL_WARNING, "Background SLOT MIGRATION transfer terminated by signal %d", bysignal);
}
if (server.slot_migration_child_exit_pipe != -1) close(server.slot_migration_child_exit_pipe);
if (server.slot_migration_pipe_read > 0) {
aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
close(server.slot_migration_pipe_read);
}
server.slot_migration_child_exit_pipe = -1;
server.slot_migration_pipe_read = -1;
server.slot_migration_pipe_conn = NULL;
zfree(server.slot_migration_pipe_buff);
server.slot_migration_pipe_buff = NULL;
server.slot_migration_pipe_bufflen = 0;

clusterHandleSlotExportBackgroundSaveDone((!bysignal && exitcode == 0) ? C_OK : C_ERR);
}

/* Callback triggered after snapshot is finished. We either begin sending the
Expand Down Expand Up @@ -1634,7 +1730,7 @@ void proceedWithSlotMigration(slotMigrationJob *job) {
serverLog(LL_NOTICE,
"Beginning snapshot of slot migration %s.",
job->description);
if (slotExportJobBeginSnapshot(job) == C_ERR) {
if (slotExportJobBeginSnapshotToTargetSocket(job) == C_ERR) {
serverLog(LL_WARNING,
"Slot migration %s failed to start slot snapshot",
job->description);
Expand Down Expand Up @@ -1731,10 +1827,6 @@ void resetSlotMigrationJob(slotMigrationJob *job) {

sdsfree(job->response_buf);
job->response_buf = NULL;

/* Description is not needed once migration is finished */
sdsfree(job->description);
job->description = NULL;
}

void freeSlotMigrationJob(void *o) {
Expand All @@ -1744,6 +1836,7 @@ void freeSlotMigrationJob(void *o) {
sdsfree(job->slot_ranges_str);
sdsfree(job->status_msg);
sdsfree(job->response_buf);
sdsfree(job->description);
zfree(o);
}

Expand Down Expand Up @@ -1890,7 +1983,7 @@ void finishSlotMigrationJob(slotMigrationJob *job,
slotExportTryUnpause();
/* Fast fail the child process, which will be cleaned up fully in
* checkChildrenDone. */
if (job->state == SLOT_EXPORT_SNAPSHOTTING) killRDBChild();
if (job->state == SLOT_EXPORT_SNAPSHOTTING) killSlotMigrationChild();
}
if (job->type == SLOT_MIGRATION_IMPORT &&
job->state != SLOT_MIGRATION_JOB_SUCCESS) {
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_migrateslots.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void clusterCommandMigrateSlots(client *c);
void clusterCommandSyncSlots(client *c);
void clusterCommandGetSlotMigrations(client *c);
void clusterCommandCancelSlotMigrations(client *c);
void backgroundSlotMigrationDoneHandler(int exitcode, int bysignal);
void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr);
void clusterUpdateSlotExportsOnOwnershipChange(void);
void clusterUpdateSlotImportsOnOwnershipChange(void);
Expand All @@ -33,5 +34,6 @@ size_t clusterGetTotalSlotExportBufferMemory(void);
bool clusterSlotFailoverGranted(int slot);
void clusterFailAllSlotExportsWithMessage(char *message);
void clusterHandleSlotMigrationErrorResponse(slotMigrationJob *job);
void killSlotMigrationChild(void);

#endif /* __CLUSTER_MIGRATESLOTS_H */
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3252,6 +3252,7 @@ standardConfig static_configs[] = {
createStringConfig("bio-cpulist", "bio_cpulist", IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bio_cpulist, NULL, NULL, NULL),
createStringConfig("aof-rewrite-cpulist", "aof_rewrite_cpulist", IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.aof_rewrite_cpulist, NULL, NULL, NULL),
createStringConfig("bgsave-cpulist", "bgsave_cpulist", IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bgsave_cpulist, NULL, NULL, NULL),
createStringConfig("slot-migration-cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.slot_migration_cpulist, NULL, NULL, NULL),
createStringConfig("ignore-warnings", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.ignore_warnings, "", NULL, NULL),
createStringConfig("proc-title-template", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.proc_title_template, CONFIG_DEFAULT_PROC_TITLE_TEMPLATE, isValidProcTitleTemplate, updateProcTitleTemplate),
createStringConfig("bind-source-addr", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bind_source_addr, NULL, NULL, NULL),
Expand Down
1 change: 1 addition & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ int getFlushCommandFlags(client *c, int *flags) {
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyData(-1, flags, NULL);
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
if (server.saveparamslen > 0) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
Expand Down
4 changes: 4 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13228,6 +13228,10 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
* will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();

/* Kill existing slot migration fork as it is saving outdated data. Also killing it
* will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();

emptyData(-1, EMPTYDB_NO_FLAGS, NULL);

/* rdbLoad() can go back to the networking and process network events. If
Expand Down
Loading
Loading