99#include "module.h"
1010#include "functions.h"
1111
12+ #include <sys/wait.h>
13+ #include <fcntl.h>
14+
1215typedef enum slotMigrationJobState {
1316 /* Importing states */
1417 SLOT_IMPORT_WAIT_ACK ,
@@ -76,6 +79,7 @@ typedef struct slotMigrationJob {
7679 * cleanup is done. */
7780 sds description ; /* Description, used for
7881 * logging. */
82+ size_t stat_cow_bytes ; /* Copy on write bytes during slot migration fork. */
7983
8084 /* State needed during client establishment */
8185 connection * conn ; /* Connection to slot import source node. */
@@ -1264,9 +1268,8 @@ bool shouldRewriteHashtableIndex(int didx, hashtable *ht, void *privdata) {
12641268}
12651269
12661270/* Contains the logic run on the child process during the snapshot phase. */
1267- int childSnapshotForSyncSlot (int req , rio * rdb , void * privdata ) {
1268- UNUSED (req );
1269- list * slot_ranges = privdata ;
1271+ int childSnapshotForSyncSlot (rio * aof , slotMigrationJob * job ) {
1272+ list * slot_ranges = job -> slot_ranges ;
12701273 size_t key_count = 0 ;
12711274 for (int db_num = 0 ; db_num < server .dbnum ; db_num ++ ) {
12721275 listIter li ;
@@ -1276,40 +1279,127 @@ int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
12761279 slotRange * r = (slotRange * )ln -> value ;
12771280 for (int slot = r -> start_slot ; slot <= r -> end_slot ; slot ++ ) {
12781281 if (rewriteSlotToAppendOnlyFileRio (
1279- rdb , db_num , slot , & key_count ) == C_ERR ) return C_ERR ;
1282+ aof , db_num , slot , & key_count ) == C_ERR ) return C_ERR ;
12801283 }
12811284 }
12821285 }
1283- rioWrite (rdb , "*3\r\n" , 4 );
1284- rioWriteBulkString (rdb , "CLUSTER" , 7 );
1285- rioWriteBulkString (rdb , "SYNCSLOTS" , 9 );
1286- rioWriteBulkString (rdb , "SNAPSHOT-EOF" , 12 );
1286+ rioWrite (aof , "*3\r\n" , 4 );
1287+ rioWriteBulkString (aof , "CLUSTER" , 7 );
1288+ rioWriteBulkString (aof , "SYNCSLOTS" , 9 );
1289+ rioWriteBulkString (aof , "SNAPSHOT-EOF" , 12 );
12871290 return C_OK ;
12881291}
12891292
1293+ /* Kill the slot migration child using SIGUSR1 (so that the parent will know
1294+ * the child did not exit for an error, but because we wanted), and performs
1295+ * the cleanup needed. */
1296+ void killSlotMigrationChild (void ) {
1297+ /* No slot migration child? return. */
1298+ if (server .child_type != CHILD_TYPE_SLOT_MIGRATION ) return ;
1299+ serverLog (LL_NOTICE , "Killing running slot migration child: %ld" , (long )server .child_pid );
1300+
1301+ /* Because we are not using here waitpid (like we have in killAppendOnlyChild
1302+ * and TerminateModuleForkChild), all the cleanup operations is done by
1303+ * checkChildrenDone, that later will find that the process killed. */
1304+ kill (server .child_pid , SIGUSR1 );
1305+ }
1306+
12901307/* Begin the snapshot for the provided job in a child process. */
1291- int slotExportJobBeginSnapshot (slotMigrationJob * job ) {
1292- connection * * conns = zmalloc (sizeof (connection * ));
1293- * conns = job -> client -> conn ;
1294- rdbSnapshotOptions opts = {
1295- .connsnum = 1 ,
1296- .conns = conns ,
1297- .use_pipe = 1 ,
1298- .req = REPLICA_REQ_NONE ,
1299- .skip_checksum = 1 ,
1300- .snapshot_func = childSnapshotForSyncSlot ,
1301- .privdata = job -> slot_ranges };
1302- if (saveSnapshotToConnectionSockets (opts ) != C_OK ) {
1308+ int slotExportJobBeginSnapshotToTargetSocket (slotMigrationJob * job ) {
1309+ if (hasActiveChildProcess ()) return C_ERR ;
1310+
1311+ pid_t childpid ;
1312+ int pipefds [2 ], slot_migration_pipe_write = -1 , safe_to_exit_pipe = -1 ;
1313+ serverAssert (server .slot_migration_pipe_read == -1 && server .slot_migration_child_exit_pipe == -1 );
1314+
1315+ /* Before to fork, create a pipe that is used to transfer the slot data bytes to
1316+ * the parent, we can't let it write directly to the sockets, since in case
1317+ * of TLS we must let the parent handle a continuous TLS state when the
1318+ * child terminates and parent takes over. */
1319+ if (anetPipe (pipefds , O_NONBLOCK , 0 ) == -1 ) return C_ERR ;
1320+ server .slot_migration_pipe_read = pipefds [0 ]; /* read end */
1321+ slot_migration_pipe_write = pipefds [1 ]; /* write end */
1322+
1323+ /* create another pipe that is used by the parent to signal to the child
1324+ * that it can exit. */
1325+ if (anetPipe (pipefds , 0 , 0 ) == -1 ) {
1326+ close (slot_migration_pipe_write );
1327+ close (server .slot_migration_pipe_read );
1328+ server .slot_migration_pipe_read = -1 ;
13031329 return C_ERR ;
13041330 }
1305- if (server .debug_pause_after_fork ) debugPauseProcess ();
1306- return C_OK ;
1331+ safe_to_exit_pipe = pipefds [0 ]; /* read end */
1332+ server .slot_migration_child_exit_pipe = pipefds [1 ]; /* write end */
1333+
1334+ server .slot_migration_pipe_conn = job -> client -> conn ;
1335+
1336+ if ((childpid = serverFork (CHILD_TYPE_SLOT_MIGRATION )) == 0 ) {
1337+ /* Child */
1338+ rio aof ;
1339+ rioInitWithFd (& aof , slot_migration_pipe_write );
1340+ /* Close the reading part, so that if the parent crashes, the child will
1341+ * get a write error and exit. */
1342+ close (server .rdb_pipe_read );
1343+
1344+ serverSetProcTitle ("valkey-slot-migration-to-target" );
1345+ serverSetCpuAffinity (server .bgsave_cpulist );
1346+
1347+ int retval = childSnapshotForSyncSlot (& aof , job );
1348+ if (retval == C_OK && rioFlush (& aof ) == 0 ) retval = C_ERR ;
1349+ if (retval == C_OK ) {
1350+ sendChildCowInfo (CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE , "Slot migration" );
1351+ }
1352+ rioFreeFd (& aof );
1353+ /* wake up the reader, tell it we're done. */
1354+ close (slot_migration_pipe_write );
1355+ close (server .slot_migration_child_exit_pipe ); /* close write end so that we can detect the close on the parent. */
1356+ ssize_t dummy = read (safe_to_exit_pipe , pipefds , 1 );
1357+ UNUSED (dummy );
1358+ exitFromChild ((retval == C_OK ) ? 0 : 1 );
1359+ } else {
1360+ /* Parent */
1361+ if (childpid == -1 ) {
1362+ serverLog (LL_WARNING , "Can't begin slot migration snapshot in background: fork: %s" , strerror (errno ));
1363+ close (slot_migration_pipe_write );
1364+ close (server .slot_migration_pipe_read );
1365+ close (server .slot_migration_child_exit_pipe );
1366+ server .slot_migration_pipe_conn = NULL ;
1367+ return C_ERR ;
1368+ }
1369+
1370+ serverLog (LL_NOTICE , "Started child process %ld for slot migration %s" , (long )childpid , job -> description );
1371+ close (slot_migration_pipe_write ); /* close write in parent so that it can detect the close on the child. */
1372+ if (aeCreateFileEvent (server .el , server .slot_migration_pipe_read , AE_READABLE , slotMigrationPipeReadHandler , NULL ) == AE_ERR ) {
1373+ serverPanic ("Unrecoverable error creating server.rdb_pipe_read file event." );
1374+ }
1375+ close (safe_to_exit_pipe );
1376+ if (server .debug_pause_after_fork ) debugPauseProcess ();
1377+ return C_OK ;
1378+ }
1379+ return C_OK ; /* Unreached. */
13071380}
13081381
1309- /* Callback triggered after snapshot is finished. We either begin sending the
1310- * incremental contents or fail the associated migration. */
1311- void clusterHandleSlotExportBackgroundSaveDone (int bgsaveerr ) {
1312- if (!server .cluster_enabled ) return ;
1382+ /* When a background slot migration terminates, call the right handler. */
1383+ void backgroundSlotMigrationDoneHandler (int exitcode , int bysignal ) {
1384+ if (!bysignal && exitcode == 0 ) {
1385+ serverLog (LL_NOTICE , "Background SLOT MIGRATION transfer terminated with success" );
1386+ } else if (!bysignal && exitcode != 0 ) {
1387+ serverLog (LL_WARNING , "Background SLOT MIGRATION transfer error" );
1388+ } else {
1389+ serverLog (LL_WARNING , "Background SLOT MIGRATION transfer terminated by signal %d" , bysignal );
1390+ }
1391+ if (server .slot_migration_child_exit_pipe != -1 ) close (server .slot_migration_child_exit_pipe );
1392+ if (server .slot_migration_pipe_read > 0 ) {
1393+ aeDeleteFileEvent (server .el , server .slot_migration_pipe_read , AE_READABLE );
1394+ close (server .slot_migration_pipe_read );
1395+ }
1396+ server .slot_migration_child_exit_pipe = -1 ;
1397+ server .slot_migration_pipe_read = -1 ;
1398+ server .slot_migration_pipe_conn = NULL ;
1399+ zfree (server .slot_migration_pipe_buff );
1400+ server .slot_migration_pipe_buff = NULL ;
1401+ server .slot_migration_pipe_bufflen = 0 ;
1402+
13131403 listIter li ;
13141404 listNode * ln ;
13151405 listRewind (server .cluster -> slot_migration_jobs , & li );
@@ -1319,8 +1409,9 @@ void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr) {
13191409 if (job -> state != SLOT_EXPORT_SNAPSHOTTING ) {
13201410 continue ;
13211411 }
1322- if (bgsaveerr == C_OK ) {
1412+ if (! bysignal && exitcode == 0 ) {
13231413 slotExportBeginStreaming (job );
1414+ job -> stat_cow_bytes = server .stat_slot_migration_cow_bytes ;
13241415 } else {
13251416 serverLog (LL_WARNING ,
13261417 "Child process failed to snapshot slot migration %s" ,
@@ -1683,7 +1774,7 @@ void proceedWithSlotMigration(slotMigrationJob *job) {
16831774 serverLog (LL_NOTICE ,
16841775 "Beginning snapshot of slot migration %s." ,
16851776 job -> description );
1686- if (slotExportJobBeginSnapshot (job ) == C_ERR ) {
1777+ if (slotExportJobBeginSnapshotToTargetSocket (job ) == C_ERR ) {
16871778 serverLog (LL_WARNING ,
16881779 "Slot migration %s failed to start slot snapshot" ,
16891780 job -> description );
@@ -1780,10 +1871,6 @@ void resetSlotMigrationJob(slotMigrationJob *job) {
17801871
17811872 sdsfree (job -> response_buf );
17821873 job -> response_buf = NULL ;
1783-
1784- /* Description is not needed once migration is finished */
1785- sdsfree (job -> description );
1786- job -> description = NULL ;
17871874}
17881875
17891876void freeSlotMigrationJob (void * o ) {
@@ -1793,6 +1880,7 @@ void freeSlotMigrationJob(void *o) {
17931880 sdsfree (job -> slot_ranges_str );
17941881 sdsfree (job -> status_msg );
17951882 sdsfree (job -> response_buf );
1883+ sdsfree (job -> description );
17961884 zfree (o );
17971885}
17981886
@@ -1940,7 +2028,7 @@ void finishSlotMigrationJob(slotMigrationJob *job,
19402028 slotExportTryUnpause ();
19412029 /* Fast fail the child process, which will be cleaned up fully in
19422030 * checkChildrenDone. */
1943- if (job -> state == SLOT_EXPORT_SNAPSHOTTING ) killRDBChild ();
2031+ if (job -> state == SLOT_EXPORT_SNAPSHOTTING ) killSlotMigrationChild ();
19442032 }
19452033 if (job -> type == SLOT_MIGRATION_IMPORT &&
19462034 state != SLOT_MIGRATION_JOB_SUCCESS ) {
@@ -1997,7 +2085,7 @@ void clusterCommandGetSlotMigrations(client *c) {
19972085 listRewind (server .cluster -> slot_migration_jobs , & li );
19982086 while ((ln = listNext (& li )) != NULL ) {
19992087 slotMigrationJob * job = ln -> value ;
2000- addReplyMapLen (c , 10 );
2088+ addReplyMapLen (c , 11 );
20012089 addReplyBulkCString (c , "name" );
20022090 addReplyBulkCBuffer (c , job -> name , CLUSTER_NAMELEN );
20032091 addReplyBulkCString (c , "operation" );
@@ -2020,6 +2108,8 @@ void clusterCommandGetSlotMigrations(client *c) {
20202108 addReplyBulkCString (c , slotMigrationJobStateToString (job -> state ));
20212109 addReplyBulkCString (c , "message" );
20222110 addReplyBulkCString (c , job -> status_msg ? job -> status_msg : "" );
2111+ addReplyBulkCString (c , "cow_size" );
2112+ addReplyLongLong (c , (long long )job -> stat_cow_bytes );
20232113 }
20242114}
20252115
0 commit comments