diff --git a/src/bio.c b/src/bio.c index 4e35540f86..d0d95753f7 100644 --- a/src/bio.c +++ b/src/bio.c @@ -61,6 +61,7 @@ #include "server.h" +#include "connection.h" #include "bio.h" #include @@ -68,6 +69,7 @@ static char *bio_worker_title[] = { "bio_close_file", "bio_aof", "bio_lazy_free", + "bio_rdb_save", }; #define BIO_WORKER_NUM (sizeof(bio_worker_title) / sizeof(*bio_worker_title)) @@ -77,6 +79,7 @@ static unsigned int bio_job_to_worker[] = { [BIO_AOF_FSYNC] = 1, [BIO_CLOSE_AOF] = 1, [BIO_LAZY_FREE] = 2, + [BIO_RDB_SAVE] = 3, }; static pthread_t bio_threads[BIO_WORKER_NUM]; @@ -84,6 +87,7 @@ static pthread_mutex_t bio_mutex[BIO_WORKER_NUM]; static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM]; static list *bio_jobs[BIO_WORKER_NUM]; static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0}; +static __thread unsigned long bio_thread_id = 0; /* This structure represents a background Job. It is only used locally to this * file as the API does not expose the internals at all. */ @@ -108,6 +112,12 @@ typedef union bio_job { lazy_free_fn *free_fn; /* Function that will free the provided arguments */ void *free_args[]; /* List of arguments to be passed to the free function */ } free_args; + + struct { + int type; + connection *conn; /* Connection to download the RDB from */ + int is_dual_channel; /* Single vs dual channel */ + } save_to_disk_args; } bio_job; void *bioProcessBackgroundJobs(void *arg); @@ -203,6 +213,13 @@ void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache) { bioSubmitJob(BIO_AOF_FSYNC, job); } +void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel) { + bio_job *job = zmalloc(sizeof(*job)); + job->save_to_disk_args.conn = conn; + job->save_to_disk_args.is_dual_channel = is_dual_channel; + bioSubmitJob(BIO_RDB_SAVE, job); +} + void *bioProcessBackgroundJobs(void *arg) { bio_job *job; unsigned long worker = (unsigned long)arg; @@ -225,6 +242,8 @@ void *bioProcessBackgroundJobs(void *arg) { if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); + bio_thread_id = worker; + while (1) { listNode *ln; @@ -278,6 +297,8 @@ void *bioProcessBackgroundJobs(void *arg) { if (job_type == BIO_CLOSE_AOF) close(job->fd_args.fd); } else if (job_type == BIO_LAZY_FREE) { job->free_args.free_fn(job->free_args.free_args); + } else if (job_type == BIO_RDB_SAVE) { + replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } @@ -333,3 +354,7 @@ void bioKillThreads(void) { } } } + +int inBioThread(void) { + return bio_thread_id != 0; +} diff --git a/src/bio.h b/src/bio.h index 80cf515380..8ae76ec0a2 100644 --- a/src/bio.h +++ b/src/bio.h @@ -41,6 +41,8 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache); void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache); void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); +void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel); +int inBioThread(void); /* Background job opcodes */ enum { @@ -48,6 +50,7 @@ enum { BIO_AOF_FSYNC, /* Deferred AOF fsync. */ BIO_LAZY_FREE, /* Deferred objects freeing. */ BIO_CLOSE_AOF, /* Deferred close for AOF files. */ + BIO_RDB_SAVE, /* Deferred save RDB to disk on replica */ BIO_NUM_OPS }; diff --git a/src/replication.c b/src/replication.c index 93c3541793..4f8b444709 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2084,14 +2084,14 @@ int replicationSupportSkipRDBChecksum(connection *conn, int is_replica_stream_ve return is_replica_stream_verified && is_primary_stream_verified && connIsIntegrityChecked(conn); } -/* Helper function for readSyncBulkPayload() to initialize tempDb +/* Helper function for replicaLoadPrimaryRDBFromSocket() to initialize tempDb * before socket-loading the new db from primary. The tempDb may be populated * by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */ serverDb **disklessLoadInitTempDb(void) { return zcalloc(sizeof(serverDb *) * server.dbnum); } -/* Helper function for readSyncBulkPayload() to discard our tempDb +/* Helper function for replicaLoadPrimaryRDBFromSocket() to discard our tempDb * when the loading succeeded or failed. */ void disklessLoadDiscardTempDb(serverDb **tempDb) { discardTempDb(tempDb); @@ -2113,7 +2113,7 @@ void disklessLoadDiscardFunctionsLibCtx(functionsLibCtx *temp_functions_lib_ctx) /* If we know we got an entirely different data set from our primary * we have no way to incrementally feed our replicas after that. * We want our replicas to resync with us as well, if we have any sub-replicas. - * This is useful on readSyncBulkPayload in places where we just finished transferring db. */ + * This is useful on replicaLoadPrimaryRDBFromSocket/Disk in places where we just finished transferring db. */ void replicationAttachToNewPrimary(void) { /* Replica starts to apply data from new primary, we must discard the cached * primary structure. */ @@ -2124,173 +2124,65 @@ void replicationAttachToNewPrimary(void) { freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */ } -/* Asynchronously read the SYNC payload we receive from a primary */ -#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024 * 1024 * 8) /* 8 MB */ -void readSyncBulkPayload(connection *conn) { - char buf[PROTO_IOBUF_LEN]; - ssize_t nread, readlen, nwritten; - int use_diskless_load = useDisklessLoad(); - serverDb **diskless_load_tempDb = NULL; - functionsLibCtx *temp_functions_lib_ctx = NULL; - int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; - off_t left; - - /* Static vars used to hold the EOF mark, and the last bytes received - * from the server: when they match, we reached the end of the transfer. */ - static char eofmark[RDB_EOF_MARK_SIZE]; - static char lastbytes[RDB_EOF_MARK_SIZE]; - static int usemark = 0; - /* If repl_transfer_size == -1 we still have to read the bulk length - * from the primary reply. */ - if (server.repl_transfer_size == -1) { - nread = connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000); - if (nread == -1) { - serverLog(LL_WARNING, "I/O error reading bulk count from PRIMARY: %s", connGetLastError(conn)); - goto error; - } else { - /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and - * convert "\r\n" to '\0' so 1 byte is lost. */ +/* During replication, the primary sends sync metadata as the first line + * which can be either a standard bulk format ($) or an EOF-delimited + * format ($EOF:) for diskless transfers. We need this data in order + * to detect transfer completion. This function reads and parses that + * metadata line. + * The primary may also send an error message starting with '-' or a ping + * (newline) to keep the connection alive, in which case this function + * should be called again later. + * Returns C_OK on success, C_ERR on error, or C_RETRY for primary ping. */ +int tryReadBulkPayloadMetadata(connection *conn, char *buf, char *eofmark, char *lastbytes, int *usemark, off_t *repl_transfer_size) { + ssize_t nread = connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000); + if (nread == -1) { + serverLog(LL_WARNING, "I/O error reading bulk count from PRIMARY: %s", connGetLastError(conn)); + return C_ERR; + } else { + /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and + * convert "\r\n" to '\0' so 1 byte is lost. */ + if (inBioThread()) + server.bio_stat_net_repl_input_bytes += nread + 1; + else server.stat_net_repl_input_bytes += nread + 1; - } - - if (buf[0] == '-') { - serverLog(LL_WARNING, "PRIMARY aborted replication with an error: %s", buf + 1); - goto error; - } else if (buf[0] == '\0') { - /* At this stage just a newline works as a PING in order to take - * the connection live. So we refresh our last interaction - * timestamp. */ - server.repl_transfer_lastio = server.unixtime; - return; - } else if (buf[0] != '$') { - serverLog(LL_WARNING, - "Bad protocol from PRIMARY, the first byte is not '$' (we received '%s'), are you sure the host " - "and port are right?", - buf); - goto error; - } - - /* There are two possible forms for the bulk payload. One is the - * usual $ bulk format. The other is used for diskless transfers - * when the primary does not know beforehand the size of the file to - * transfer. In the latter case, the following format is used: - * - * $EOF:<40 bytes delimiter> - * - * At the end of the file the announced delimiter is transmitted. The - * delimiter is long and random enough that the probability of a - * collision with the actual file content can be ignored. */ - if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= RDB_EOF_MARK_SIZE) { - usemark = 1; - memcpy(eofmark, buf + 5, RDB_EOF_MARK_SIZE); - memset(lastbytes, 0, RDB_EOF_MARK_SIZE); - /* Set any repl_transfer_size to avoid entering this code path - * at the next call. */ - server.repl_transfer_size = 0; - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving streamed RDB from primary with EOF %s", - use_diskless_load ? "to parser" : "to disk"); - } else { - usemark = 0; - server.repl_transfer_size = strtol(buf + 1, NULL, 10); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s", - (long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk"); - } - return; } - if (!use_diskless_load) { - /* Read the data from the socket, store it to a file and search - * for the EOF. */ - if (usemark) { - readlen = sizeof(buf); - } else { - left = server.repl_transfer_size - server.repl_transfer_read; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); - } - - nread = connRead(conn, buf, readlen); - if (nread <= 0) { - if (connGetState(conn) == CONN_STATE_CONNECTED) { - /* equivalent to EAGAIN */ - return; - } - serverLog(LL_WARNING, "I/O error trying to sync with PRIMARY: %s", - (nread == -1) ? connGetLastError(conn) : "connection lost"); - goto error; - } - server.stat_net_repl_input_bytes += nread; - - /* When a mark is used, we want to detect EOF asap in order to avoid - * writing the EOF mark into the file... */ - int eof_reached = 0; - - if (usemark) { - /* Update the last bytes array, and check if it matches our - * delimiter. */ - if (nread >= RDB_EOF_MARK_SIZE) { - memcpy(lastbytes, buf + nread - RDB_EOF_MARK_SIZE, RDB_EOF_MARK_SIZE); - } else { - int rem = RDB_EOF_MARK_SIZE - nread; - memmove(lastbytes, lastbytes + nread, rem); - memcpy(lastbytes + rem, buf, nread); - } - if (memcmp(lastbytes, eofmark, RDB_EOF_MARK_SIZE) == 0) eof_reached = 1; - } - - /* Update the last I/O time for the replication transfer (used in - * order to detect timeouts during replication), and write what we - * got from the socket to the dump file on disk. */ + /* Check the bulk payload header for errors */ + if (buf[0] == '-') { + serverLog(LL_WARNING, "PRIMARY aborted replication with an error: %s", buf + 1); + return C_ERR; + } else if (buf[0] == '\0') { + /* At this stage just a newline works as a PING in order to take + * the connection live. So we refresh our last interaction + * timestamp. */ server.repl_transfer_lastio = server.unixtime; - if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) { - serverLog(LL_WARNING, - "Write error or short write writing to the DB dump file " - "needed for PRIMARY <-> REPLICA synchronization: %s", - (nwritten == -1) ? strerror(errno) : "short write"); - goto error; - } - server.repl_transfer_read += nread; - - /* Delete the last 40 bytes from the file if we reached EOF. */ - if (usemark && eof_reached) { - if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - RDB_EOF_MARK_SIZE) == -1) { - serverLog(LL_WARNING, - "Error truncating the RDB file received from the primary " - "for SYNC: %s", - strerror(errno)); - goto error; - } - } - - /* Sync data on disk from time to time, otherwise at the end of the - * transfer we may suffer a big delay as the memory buffers are copied - * into the actual disk. */ - if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { - off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; - rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); - server.repl_transfer_last_fsync_off += sync_size; - } - - /* Check if the transfer is now complete */ - if (!usemark) { - if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; - } + return C_RETRY; + } else if (buf[0] != '$') { + serverLog(LL_WARNING, + "Bad protocol from PRIMARY, the first byte is not '$' (we received '%s'), are you sure the host " + "and port are right?", + buf); + return C_ERR; + } - /* If the transfer is yet not complete, we need to read more, so - * return ASAP and wait for the handler to be called again. */ - if (!eof_reached) return; + /* Check if this is an EOF-based transfer ($EOF:) or size-based ($) */ + if (strncmp(buf + 1, "EOF:", 4) == 0 && strlen(buf + 5) >= RDB_EOF_MARK_SIZE) { + /* EOF-based transfer: extract the delimiter */ + memcpy(eofmark, buf + 5, RDB_EOF_MARK_SIZE); + memset(lastbytes, 0, RDB_EOF_MARK_SIZE); + *usemark = true; + *repl_transfer_size = 0; + } else { + /* Size-based transfer: parse the size */ + *usemark = false; + *repl_transfer_size = strtol(buf + 1, NULL, 10); } - /* We reach this point in one of the following cases: - * - * 1. The replica is using diskless replication, that is, it reads data - * directly from the socket to the server memory, without using - * a temporary RDB file on disk. In that case we just block and - * read everything from the socket. - * - * 2. Or when we are done reading from the socket to the RDB file, in - * such case we want just to read the RDB file in memory. */ + return C_OK; +} +void replicaBeforeLoadPrimaryRDB(connection *conn, int use_diskless_load) { /* We need to stop any AOF rewriting child before flushing and parsing * the RDB, otherwise we'll create a copy-on-write disaster. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); @@ -2309,195 +2201,230 @@ void readSyncBulkPayload(connection *conn) { killRDBChild(); } - if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* Initialize empty tempDb dictionaries. */ - diskless_load_tempDb = disklessLoadInitTempDb(); - temp_functions_lib_ctx = disklessLoadFunctionsLibCtxCreate(); - - moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL); - } - /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ connSetReadHandler(conn, NULL); +} - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (use_diskless_load) { - rio rdb; - serverDb **dbarray; - functionsLibCtx *functions_lib_ctx; - int asyncLoading = 0; +void replicaAfterLoadPrimaryRDB(connection *conn, rdbSaveInfo *rsi) { + /* Final setup of the connected replica <- primary link */ + if (conn == server.repl_rdb_transfer_s) { + dualChannelSyncHandleRdbLoadCompletion(); + } else { + replicationCreatePrimaryClient(server.repl_transfer_s, rsi->repl_stream_db); + server.repl_state = REPL_STATE_CONNECTED; + server.repl_down_since = 0; + /* Send the initial ACK immediately to put this replica in online state. */ + replicationSendAck(); + } - if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* Async loading means we continue serving read commands during full resync, and - * "swap" the new db with the old db only when loading is done. - * It is enabled only on SWAPDB diskless replication when primary replication ID hasn't changed, - * because in that state the old content of the db represents a different point in time of the same - * data set we're currently receiving from the primary. */ - if (memcmp(server.replid, server.primary_replid, CONFIG_RUN_ID_SIZE) == 0) { - asyncLoading = 1; - } - dbarray = diskless_load_tempDb; - functions_lib_ctx = temp_functions_lib_ctx; - } else { - /* We will soon start loading the RDB from socket, the replication history is changed, - * we must discard the cached primary structure and force resync of sub-replicas. */ - replicationAttachToNewPrimary(); + /* Fire the primary link modules event. */ + moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); + if (server.repl_state == REPL_STATE_CONNECTED) { + /* After a full resynchronization we use the replication ID and + * offset of the primary. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid, server.primary->repl_data->replid, sizeof(server.replid)); + server.primary_repl_offset = server.primary->repl_data->reploff; + } + clearReplicationId2(); - /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + /* Let's create the replication backlog if needed. Replicas need to + * accumulate the backlog regardless of the fact they have sub-replicas + * or not, in order to behave correctly if they are promoted to + * primaries after a failover. */ + if (server.repl_backlog == NULL) createReplicationBacklog(); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Finished with success"); - dbarray = server.db; - functions_lib_ctx = functionsLibCtxGetCurrent(); - } + if (server.supervised_mode == SUPERVISED_SYSTEMD) { + serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Finished with success. Ready to accept connections " + "in read-write mode.\n"); + } - rioInitWithConn(&rdb, conn, server.repl_transfer_size); - - /* Put the socket in blocking mode to simplify RDB transfer. - * We'll restore it when the RDB is received. */ - connBlock(conn); - connRecvTimeout(conn, server.repl_timeout * 1000); - - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); - startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); - if (replicationSupportSkipRDBChecksum(conn, use_diskless_load, usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; - int loadingFailed = 0; - rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; - if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) { - /* RDB loading failed. */ - serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " - "from socket, check server logs."); - loadingFailed = 1; - } else if (usemark) { - /* Verify the end mark is correct. */ - if (!rioRead(&rdb, buf, RDB_EOF_MARK_SIZE) || memcmp(buf, eofmark, RDB_EOF_MARK_SIZE) != 0) { - serverLog(LL_WARNING, "Replication stream EOF marker is broken"); - loadingFailed = 1; - } - } + /* Restart the AOF subsystem now that we finished the sync. This + * will trigger an AOF rewrite, and when done will start appending + * to the new file. */ + if (server.aof_enabled) restartAOFAfterSYNC(); - if (loadingFailed) { - stopLoading(0); - rioFreeConn(&rdb, NULL); + /* In case of dual channel replication sync we want to close the RDB connection + * once the connection is established */ + if (conn == server.repl_rdb_transfer_s) { + connClose(conn); + server.repl_rdb_transfer_s = NULL; + } +} - if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* Discard potentially partially loaded tempDb. */ - moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED, - NULL); +int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, int *usemark, rdbSaveInfo *rsi) { + rio rdb; + serverDb **dbarray; + functionsLibCtx *functions_lib_ctx; + serverDb **diskless_load_tempDb = NULL; + functionsLibCtx *temp_functions_lib_ctx = NULL; + int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + int asyncLoading = 0; - disklessLoadDiscardTempDb(diskless_load_tempDb); - disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); - } else { - /* Remove the half-loaded data in case we started with an empty replica. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - } + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Initialize empty tempDb dictionaries. */ + diskless_load_tempDb = disklessLoadInitTempDb(); + temp_functions_lib_ctx = disklessLoadFunctionsLibCtxCreate(); - /* Note that there's no point in restarting the AOF on SYNC - * failure, it'll be restarted when sync succeeds or the replica - * gets promoted. */ - goto error; - } + moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL); + /* Async loading means we continue serving read commands during full resync, and + * "swap" the new db with the old db only when loading is done. + * It is enabled only on SWAPDB diskless replication when primary replication ID hasn't changed, + * because in that state the old content of the db represents a different point in time of the same + * data set we're currently receiving from the primary. */ + if (memcmp(server.replid, server.primary_replid, CONFIG_RUN_ID_SIZE) == 0) { + asyncLoading = 1; + } + dbarray = diskless_load_tempDb; + functions_lib_ctx = temp_functions_lib_ctx; + } else { + /* We will soon start loading the RDB from socket, the replication history is changed, + * we must discard the cached primary structure and force resync of sub-replicas. */ + replicationAttachToNewPrimary(); - /* RDB loading succeeded if we reach this point. */ - if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* We will soon swap main db with tempDb and replicas will start - * to apply data from new primary, we must discard the cached - * primary structure and force resync of sub-replicas. */ - replicationAttachToNewPrimary(); + /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Swapping active DB with loaded DB"); - swapMainDbWithTempDb(diskless_load_tempDb); + dbarray = server.db; + functions_lib_ctx = functionsLibCtxGetCurrent(); + } + + rioInitWithConn(&rdb, conn, server.repl_transfer_size); + + /* Put the socket in blocking mode to simplify RDB transfer. + * We'll restore it when the RDB is received. */ + connBlock(conn); + connRecvTimeout(conn, server.repl_timeout * 1000); + + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); + startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); + if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; + int loadingFailed = 0; + rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; + if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) { + /* RDB loading failed. */ + serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " + "from socket, check server logs."); + loadingFailed = 1; + } else if (*usemark) { + /* Verify the end mark is correct. */ + if (!rioRead(&rdb, buf, RDB_EOF_MARK_SIZE) || memcmp(buf, eofmark, RDB_EOF_MARK_SIZE) != 0) { + serverLog(LL_WARNING, "Replication stream EOF marker is broken"); + loadingFailed = 1; + } + } - /* swap existing functions ctx with the temporary one */ - functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 1); + if (loadingFailed) { + stopLoading(0); + rioFreeConn(&rdb, NULL); - moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Discard potentially partially loaded tempDb. */ + moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED, NULL); - /* Delete the old db as it's useless now. */ disklessLoadDiscardTempDb(diskless_load_tempDb); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding old DB in background"); + disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); + } else { + /* Remove the half-loaded data in case we started with an empty replica. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); } - /* Inform about db change, as replication was diskless and didn't cause a save. */ - server.dirty++; + /* Note that there's no point in restarting the AOF on SYNC + * failure, it'll be restarted when sync succeeds or the replica + * gets promoted. */ + return C_ERR; + } - stopLoading(1); + /* RDB loading succeeded if we reach this point. */ + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* We will soon swap main db with tempDb and replicas will start + * to apply data from new primary, we must discard the cached + * primary structure and force resync of sub-replicas. */ + replicationAttachToNewPrimary(); - /* Cleanup and restore the socket to the original state to continue - * with the normal replication. */ - rioFreeConn(&rdb, NULL); - connNonBlock(conn); - connRecvTimeout(conn, 0); - } else { - /* Make sure the new file (also used for persistence) is fully synced - * (not covered by earlier calls to rdb_fsync_range). */ - if (fsync(server.repl_transfer_fd) == -1) { - serverLog(LL_WARNING, - "Failed trying to sync the temp DB to disk in " - "PRIMARY <-> REPLICA synchronization: %s", - strerror(errno)); - goto error; - } + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Swapping active DB with loaded DB"); + swapMainDbWithTempDb(diskless_load_tempDb); - /* Rename rdb like renaming rewrite aof asynchronously. */ - int old_rdb_fd = open(server.rdb_filename, O_RDONLY | O_NONBLOCK); - if (rename(server.repl_transfer_tmpfile, server.rdb_filename) == -1) { - serverLog(LL_WARNING, - "Failed trying to rename the temp DB into %s in " - "PRIMARY <-> REPLICA synchronization: %s", - server.rdb_filename, strerror(errno)); - if (old_rdb_fd != -1) close(old_rdb_fd); - goto error; - } - /* Close old rdb asynchronously. */ - if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0, 0); + /* swap existing functions ctx with the temporary one */ + functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 1); - /* Sync the directory to ensure rename is persisted */ - if (fsyncFileDir(server.rdb_filename) == -1) { - serverLog(LL_WARNING, - "Failed trying to sync DB directory %s in " - "PRIMARY <-> REPLICA synchronization: %s", - server.rdb_filename, strerror(errno)); - goto error; - } + moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, + NULL); - /* We will soon start loading the RDB from disk, the replication history is changed, - * we must discard the cached primary structure and force resync of sub-replicas. */ - replicationAttachToNewPrimary(); + /* Delete the old db as it's useless now. */ + disklessLoadDiscardTempDb(diskless_load_tempDb); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding old DB in background"); + } - /* Empty the databases only after the RDB file is ok, that is, before the RDB file - * is actually loaded, in case we encounter an error and drop the replication stream - * and leave an empty database. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + /* Inform about db change, as replication was diskless and didn't cause a save. */ + server.dirty++; - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); - if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) { - serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " - "DB from disk, check server logs."); - if (server.rdb_del_sync_files && allPersistenceDisabled()) { - serverLog(LL_NOTICE, "Removing the RDB file obtained from " - "the primary. This replica has persistence " - "disabled"); - bg_unlink(server.rdb_filename); - } + stopLoading(1); - /* If disk-based RDB loading fails, remove the half-loaded dataset. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + /* Cleanup and restore the socket to the original state to continue + * with the normal replication. */ + rioFreeConn(&rdb, NULL); + connNonBlock(conn); + connRecvTimeout(conn, 0); + return C_OK; +} - /* Note that there's no point in restarting the AOF on sync failure, - it'll be restarted when sync succeeds or replica promoted. */ - goto error; - } +int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { + int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + /* Make sure the new file (also used for persistence) is fully synced + * (not covered by earlier calls to rdb_fsync_range). */ + if (fsync(server.repl_transfer_fd) == -1) { + serverLog(LL_WARNING, + "Failed trying to sync the temp DB to disk in " + "PRIMARY <-> REPLICA synchronization: %s", + strerror(errno)); + return C_ERR; + } + + /* Rename rdb like renaming rewrite aof asynchronously. */ + int old_rdb_fd = open(server.rdb_filename, O_RDONLY | O_NONBLOCK); + if (rename(server.repl_transfer_tmpfile, server.rdb_filename) == -1) { + serverLog(LL_WARNING, + "Failed trying to rename the temp DB into %s in " + "PRIMARY <-> REPLICA synchronization: %s", + server.rdb_filename, strerror(errno)); + if (old_rdb_fd != -1) close(old_rdb_fd); + return C_ERR; + } + /* Close old rdb asynchronously. */ + if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0, 0); + + /* Sync the directory to ensure rename is persisted */ + if (fsyncFileDir(server.rdb_filename) == -1) { + serverLog(LL_WARNING, + "Failed trying to sync DB directory %s in " + "PRIMARY <-> REPLICA synchronization: %s", + server.rdb_filename, strerror(errno)); + return C_ERR; + } + + /* We will soon start loading the RDB from disk, the replication history is changed, + * we must discard the cached primary structure and force resync of sub-replicas. */ + replicationAttachToNewPrimary(); - /* Cleanup. */ + /* Empty the databases only after the RDB file is ok, that is, before the RDB file + * is actually loaded, in case we encounter an error and drop the replication stream + * and leave an empty database. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); + if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) { + serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " + "DB from disk, check server logs."); if (server.rdb_del_sync_files && allPersistenceDisabled()) { serverLog(LL_NOTICE, "Removing the RDB file obtained from " "the primary. This replica has persistence " @@ -2505,62 +2432,242 @@ void readSyncBulkPayload(connection *conn) { bg_unlink(server.rdb_filename); } - zfree(server.repl_transfer_tmpfile); - close(server.repl_transfer_fd); - server.repl_transfer_fd = -1; - server.repl_transfer_tmpfile = NULL; + /* If disk-based RDB loading fails, remove the half-loaded dataset. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + + /* Note that there's no point in restarting the AOF on sync failure, + * it'll be restarted when sync succeeds or replica promoted. */ + return C_ERR; } - /* Final setup of the connected replica <- primary link */ - if (conn == server.repl_rdb_transfer_s) { - dualChannelSyncHandleRdbLoadCompletion(); + /* Cleanup. */ + if (server.rdb_del_sync_files && allPersistenceDisabled()) { + serverLog(LL_NOTICE, "Removing the RDB file obtained from " + "the primary. This replica has persistence " + "disabled"); + bg_unlink(server.rdb_filename); + } + + zfree(server.repl_transfer_tmpfile); + close(server.repl_transfer_fd); + server.repl_transfer_fd = -1; + server.repl_transfer_tmpfile = NULL; + return C_OK; +} + +/* Asynchronously read the SYNC payload we receive from a primary, parse it, + * and load it directly to memory without going through the disk */ +void replicaReceiveRDBFromPrimaryToMemory(connection *conn) { + char buf[PROTO_IOBUF_LEN]; + int ret; + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + + /* Static vars used to hold the EOF mark, and the last bytes received + * from the server: when they match, we reached the end of the transfer. */ + static char eofmark[RDB_EOF_MARK_SIZE]; + static char lastbytes[RDB_EOF_MARK_SIZE]; + static int usemark = 0; + + /* If repl_transfer_size != -1 then we have already read the metadata + * from the primary reply in a previous call to this handler. */ + if (server.repl_transfer_size != -1) goto read_from_socket; + + ret = tryReadBulkPayloadMetadata(conn, buf, eofmark, lastbytes, &usemark, &server.repl_transfer_size); + /* If we got C_RETRY, then tryReadBulkPayloadMetadata will be re-attempted in the next call to this handler, + * since server.repl_transfer_size is still -1. If we got C_OK, then server.repl_transfer_size is not -1, and we + * will skip directly to the read-from-socket part. */ + if (ret == C_OK) { + serverAssert(server.repl_transfer_size >= 0); + return; + } else if (ret == C_RETRY) { + serverAssert(server.repl_transfer_size == -1); + return; + } else if (ret == C_ERR) { + serverLog(LL_WARNING, "Failed to read sync metadata"); + cancelReplicationHandshake(1); + return; + } + +read_from_socket: + if (server.repl_transfer_size == 0) { + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving streamed RDB from primary with EOF to parser"); } else { - replicationCreatePrimaryClient(server.repl_transfer_s, rsi.repl_stream_db); - server.repl_state = REPL_STATE_CONNECTED; - server.repl_down_since = 0; - /* Send the initial ACK immediately to put this replica in online state. */ - replicationSendAck(); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary to parser", (long long)server.repl_transfer_size); } - /* Fire the primary link modules event. */ - moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); - if (server.repl_state == REPL_STATE_CONNECTED) { - /* After a full resynchronization we use the replication ID and - * offset of the primary. The secondary ID / offset are cleared since - * we are starting a new history. */ - memcpy(server.replid, server.primary->repl_data->replid, sizeof(server.replid)); - server.primary_repl_offset = server.primary->repl_data->reploff; + replicaBeforeLoadPrimaryRDB(conn, 1); + if (replicaLoadPrimaryRDBFromSocket(conn, buf, eofmark, &usemark, &rsi) == C_ERR) { + serverLog(LL_WARNING, "Failed to load RDB"); + cancelReplicationHandshake(1); + return; } - clearReplicationId2(); + replicaAfterLoadPrimaryRDB(conn, &rsi); +} - /* Let's create the replication backlog if needed. Replicas need to - * accumulate the backlog regardless of the fact they have sub-replicas - * or not, in order to behave correctly if they are promoted to - * primaries after a failover. */ - if (server.repl_backlog == NULL) createReplicationBacklog(); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Finished with success"); +int tryReadBulkPayload(connection *conn, char *buf, int usemark, ssize_t *nread_out) { + ssize_t readlen, nread; + off_t left; - if (server.supervised_mode == SUPERVISED_SYSTEMD) { - serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Finished with success. Ready to accept connections " - "in read-write mode.\n"); + if (usemark) { + readlen = sizeof(buf[0]) * PROTO_IOBUF_LEN; + } else { + left = server.bio_repl_transfer_size - server.bio_repl_transfer_read; + readlen = (left < (signed)(sizeof(buf[0]) * PROTO_IOBUF_LEN)) ? left : (signed)(sizeof(buf[0]) * PROTO_IOBUF_LEN); } - /* Restart the AOF subsystem now that we finished the sync. This - * will trigger an AOF rewrite, and when done will start appending - * to the new file. */ - if (server.aof_enabled) restartAOFAfterSYNC(); + nread = connRead(conn, buf, readlen); + if (nread <= 0) { + if (connGetState(conn) == CONN_STATE_CONNECTED) { + /* equivalent to EAGAIN */ + memset(buf, 0, PROTO_IOBUF_LEN); + return C_RETRY; + } + replicaBioSaveServerLog(LL_WARNING, "I/O error trying to sync with PRIMARY: %s", + (nread == -1) ? connGetLastError(conn) : "connection lost"); + return C_ERR; + } - /* In case of dual channel replication sync we want to close the RDB connection - * once the connection is established */ - if (conn == server.repl_rdb_transfer_s) { - connClose(conn); - server.repl_rdb_transfer_s = NULL; + server.bio_stat_net_repl_input_bytes += nread; + *nread_out = nread; + return C_OK; +} + +void replicaReceiveRDBFromPrimaryToDisk(connection *conn, int is_dual_channel) { + int usemark; + char lastbytes[RDB_EOF_MARK_SIZE]; + char buf[PROTO_IOBUF_LEN]; + char eofmark[RDB_EOF_MARK_SIZE]; + ssize_t nread, nwritten; + off_t repl_transfer_last_fsync_off = 0; + bool error = 0, eof_reached = 0; + int ret = 0; + + /* There is currently only a background thread implementation for replica disk-based sync */ + debugServerAssert(inBioThread()); + + /* Put the socket in blocking mode to simplify RDB transfer. + * We'll restore it when the RDB is received. */ + connBlock(conn); + connRecvTimeout(conn, server.repl_syncio_timeout * 1000); + + atomic_store_explicit(&server.replica_bio_disk_save_state, REPL_BIO_DISK_SAVE_STATE_IN_PROGRESS, memory_order_release); + /* Loop until we can read the sync metadata or fail */ + do { + if (server.replica_bio_abort_save) { + replicaBioSaveServerLog(LL_WARNING, "Main thread asked to abort the save while Bio thread is reading the sync metadata"); + error = 1; + goto done; + } + ret = tryReadBulkPayloadMetadata(conn, buf, eofmark, lastbytes, &usemark, &server.bio_repl_transfer_size); + if (ret == C_ERR) { + replicaBioSaveServerLog(LL_WARNING, "Error reading sync metadata"); + error = 1; + goto done; + } + } while (ret == C_RETRY); + debugServerAssert(ret == C_OK); + + if (server.bio_repl_transfer_size == 0) { + /* 0 bytes means we don't know the size of the payload beforehand, we will read until we see EOF */ + replicaBioSaveServerLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving streamed RDB from primary with EOF to disk"); + } else { + replicaBioSaveServerLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary to disk", (long long)server.bio_repl_transfer_size); } - return; -error: - cancelReplicationHandshake(1); - return; + /* Now read the actual sync data and save it to disk */ + do { + if (server.replica_bio_abort_save) { + replicaBioSaveServerLog(LL_WARNING, "Main thread asked to abort the save while Bio thread is reading the sync payload"); + error = 1; + goto done; + } + + ret = tryReadBulkPayload(conn, buf, usemark, &nread); + if (ret == C_RETRY) { + continue; + } else if (ret == C_ERR) { + replicaBioSaveServerLog(LL_WARNING, "Error reading sync payload"); + error = 1; + goto done; + } + + /* When a mark is used, we want to detect EOF asap in order to avoid + * writing the EOF mark into the file... */ + if (usemark) { + if (nread >= RDB_EOF_MARK_SIZE) { + memcpy(lastbytes, buf + nread - RDB_EOF_MARK_SIZE, RDB_EOF_MARK_SIZE); + } else { + int rem = RDB_EOF_MARK_SIZE - nread; + memmove(lastbytes, lastbytes + nread, rem); + memcpy(lastbytes + rem, buf, nread); + } + eof_reached = (memcmp(lastbytes, eofmark, RDB_EOF_MARK_SIZE) == 0); + } + + /* Update the last I/O time for the replication transfer (used in + * order to detect timeouts during replication). */ + server.repl_transfer_lastio = server.unixtime; + + /* Write what we got from the socket to the dump file on disk */ + if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) { + replicaBioSaveServerLog(LL_WARNING, + "Write error or short write writing to the DB dump file " + "needed for PRIMARY <-> REPLICA synchronization: %s", + (nwritten == -1) ? strerror(errno) : "short write"); + error = 1; + goto done; + } + server.bio_repl_transfer_read += nread; + + /* Delete the last 40 bytes from the file if we reached EOF. */ + if (usemark && eof_reached) { + if (ftruncate(server.repl_transfer_fd, server.bio_repl_transfer_read - RDB_EOF_MARK_SIZE) == -1) { + replicaBioSaveServerLog(LL_WARNING, + "Error truncating the RDB file received from the primary " + "for SYNC: %s", + strerror(errno)); + error = 1; + goto done; + } + } + + /* Sync data on disk from time to time, otherwise at the end of the + * transfer we may suffer a big delay as the memory buffers are copied + * into the actual disk. */ + if (server.bio_repl_transfer_read >= repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { + off_t sync_size = server.bio_repl_transfer_read - repl_transfer_last_fsync_off; + rdb_fsync_range(server.repl_transfer_fd, repl_transfer_last_fsync_off, sync_size); + repl_transfer_last_fsync_off += sync_size; + } + + /* Check if the transfer is now complete */ + if (!usemark) { + if (server.bio_repl_transfer_read == server.bio_repl_transfer_size) eof_reached = 1; + } + } while (!eof_reached); + +done: + /* Restore the socket to the original state to continue + * with normal replication. */ + connNonBlock(conn); + connRecvTimeout(conn, 0); + + /* We will not use the primary's connection any more, it is now safe to restore the main thread's access to it */ + if (is_dual_channel) { + server.repl_rdb_transfer_s = conn; + } else { + server.repl_transfer_s = conn; + } + + /* Handle completion */ + if (error) { + replicaBioSaveServerLog(LL_WARNING, "Error downloading RDB"); + atomic_store_explicit(&server.replica_bio_disk_save_state, REPL_BIO_DISK_SAVE_STATE_FAIL, memory_order_release); + } else { + replicaBioSaveServerLog(LL_NOTICE, "Done downloading RDB"); + atomic_store_explicit(&server.replica_bio_disk_save_state, REPL_BIO_DISK_SAVE_STATE_FINISHED, memory_order_release); + } } char *receiveSynchronousResponse(connection *conn) { @@ -2673,6 +2780,32 @@ void freePendingReplDataBuf(void) { server.pending_repl_data.len = 0; } +void receiveRDBinBioThread(bool is_dual_channel) { + serverLog(LL_NOTICE, "Replica main thread creating Bio thread to save RDB to disk"); + connection *conn; + /* Thread safety: revoke the main thread's access to the connection while the Bio thread uses it. + * We'll restore it once the Bio thread terminates */ + if (is_dual_channel) { + conn = server.repl_rdb_transfer_s; + server.repl_rdb_transfer_s = NULL; + } else { + conn = server.repl_transfer_s; + server.repl_transfer_s = NULL; + } + connSetReadHandler(conn, NULL); + bioCreateSaveRDBToDiskJob(conn, is_dual_channel); +} + +void receiveRDBinBioThreadSingleChannel(connection *conn) { + UNUSED(conn); + receiveRDBinBioThread(0); +} + +void receiveRDBinBioThreadDualChannel(connection *conn) { + UNUSED(conn); + receiveRDBinBioThread(1); +} + /* Replication: Replica side. * Upon dual-channel sync failure, close rdb-connection, reset repl-state, reset * provisional primary struct, and free local replication buffer. */ @@ -2824,11 +2957,18 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) { /* As the next block we will receive using this connection is the rdb, we need to prepare * the connection accordingly */ - serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, readSyncBulkPayload) != C_ERR); server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_lastio = server.unixtime; + server.bio_repl_transfer_size = -1; + server.bio_repl_transfer_read = 0; + if (!useDisklessLoad()) { + /* Only create the Bio thread once the first piece of data is sent by the primary */ + serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, receiveRDBinBioThreadDualChannel) != C_ERR); + } else { + serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, replicaReceiveRDBFromPrimaryToMemory) != C_ERR); + } return C_OK; } @@ -3959,20 +4099,33 @@ void syncWithPrimary(connection *conn) { server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; return; } - /* Setup the non blocking download of the bulk file. */ - if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { - char conninfo[CONN_INFO_LEN]; - serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); - syncWithPrimaryHandleError(&conn); - return; - } + if (!useDisklessLoad()) { + /* Only create the Bio thread once the first piece of data is sent by the primary */ + if (connSetReadHandler(conn, receiveRDBinBioThreadSingleChannel) == C_ERR) { + char conninfo[CONN_INFO_LEN]; + serverLog(LL_WARNING, "Can't create readable event for Bio SYNC: %s (%s)", strerror(errno), + connGetInfo(conn, conninfo, sizeof(conninfo))); + syncWithPrimaryHandleError(&conn); + return; + } + } else { + /* Setup the non blocking download of the bulk file. */ + if (connSetReadHandler(conn, replicaReceiveRDBFromPrimaryToMemory) == C_ERR) { + char conninfo[CONN_INFO_LEN]; + serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), + connGetInfo(conn, conninfo, sizeof(conninfo))); + syncWithPrimaryHandleError(&conn); + return; + } + } server.repl_state = REPL_STATE_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_lastio = server.unixtime; + server.bio_repl_transfer_size = -1; + server.bio_repl_transfer_read = 0; } int connectWithPrimary(void) { @@ -3992,6 +4145,12 @@ int connectWithPrimary(void) { return C_OK; } +void resetBioRDBSaveState(void) { + server.bio_repl_transfer_size = 0; + server.bio_repl_transfer_read = 0; + server.replica_bio_disk_save_state = REPL_BIO_DISK_SAVE_STATE_NONE; +} + /* In disk-based replication, replica will open a temp db file to store the RDB file. * Before entering the REPL_STATE_TRANSFER or after entering the REPL_STATE_TRANSFER, * if an error occurs, we need to clean up related resources, such as closing the tmp @@ -4040,6 +4199,12 @@ void replicationAbortSyncTransfer(void) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(int reconnect) { + if (bioPendingJobsOfType(BIO_RDB_SAVE)) { + server.replica_bio_abort_save = 1; + bioDrainWorker(BIO_RDB_SAVE); + server.replica_bio_abort_save = 0; + } + resetBioRDBSaveState(); if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { replicationAbortDualChannelSyncTransfer(); } @@ -4805,6 +4970,55 @@ long long replicationGetReplicaOffset(void) { return offset; } +void handleBioThreadFinishedRDBDownload(void) { + int bio_save_state = atomic_load_explicit(&server.replica_bio_disk_save_state, memory_order_acquire); + + /* Either no Bio sync started, or it's still in progress */ + if (bio_save_state == REPL_BIO_DISK_SAVE_STATE_NONE || bio_save_state == REPL_BIO_DISK_SAVE_STATE_IN_PROGRESS) { + return; + } + + /* Error during transfer */ + if (bio_save_state == REPL_BIO_DISK_SAVE_STATE_FAIL) { + serverLog(LL_WARNING, "Replica main thread detected RDB download failure in Bio thread"); + cancelReplicationHandshake(1); + return; + } + + debugServerAssert(bio_save_state == REPL_BIO_DISK_SAVE_STATE_FINISHED); + + /* Bio termination detected - we can get rid of the state vars */ + int bio_repl_transfer_size = server.bio_repl_transfer_size; + int bio_repl_transfer_read = server.bio_repl_transfer_read; + resetBioRDBSaveState(); + + serverLog(LL_NOTICE, "Replica main thread detected RDB download completion in Bio thread"); + + /* Handle Bio sync success */ + serverLog(LL_NOTICE, "Loading the RDB and finalizing primary-replica sync..."); + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + connection *conn; + if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { + conn = server.repl_rdb_transfer_s; + } else { + conn = server.repl_transfer_s; + } + + /* If the connection was cleared then the bio_save_state should've also been cleared, so we shouldn't have + * gotten here in the first place */ + debugServerAssert(conn); + + replicaBeforeLoadPrimaryRDB(conn, 0); + if (replicaLoadPrimaryRDBFromDisk(&rsi) == C_ERR) { + serverLog(LL_WARNING, "Failed to load RDB"); + cancelReplicationHandshake(1); + return; + } + replicaAfterLoadPrimaryRDB(conn, &rsi); + server.repl_transfer_size = bio_repl_transfer_size; + server.repl_transfer_read = bio_repl_transfer_read; +} + /* --------------------------- REPLICATION CRON ---------------------------- */ /* Replication cron function, called 1 time per second. */ @@ -4971,6 +5185,9 @@ void replicationCron(void) { replicationStartPendingFork(); + /* Handle completion of RDB downloaded that was offloaded to a Bio thread */ + handleBioThreadFinishedRDBDownload(); + /* Remove the RDB file used for replication if the server is not running * with any persistence. */ removeRDBUsedToSyncReplicas(); diff --git a/src/server.c b/src/server.c index b5ac27f366..536163d588 100644 --- a/src/server.c +++ b/src/server.c @@ -1493,12 +1493,12 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa monotime current_time = getMonotonicUs(); long long factor = 1000000; // us trackInstantaneousMetric(STATS_METRIC_COMMAND, server.stat_numcommands, current_time, factor); - trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes + server.stat_net_repl_input_bytes, + trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes + server.stat_net_repl_input_bytes + server.bio_stat_net_repl_input_bytes, current_time, factor); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, server.stat_net_output_bytes + server.stat_net_repl_output_bytes, current_time, factor); - trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION, server.stat_net_repl_input_bytes, current_time, + trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION, server.stat_net_repl_input_bytes + server.bio_stat_net_repl_input_bytes, current_time, factor); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION, server.stat_net_repl_output_bytes, current_time, factor); @@ -2739,6 +2739,7 @@ void resetServerStats(void) { server.stat_net_input_bytes = 0; server.stat_net_output_bytes = 0; server.stat_net_repl_input_bytes = 0; + server.bio_stat_net_repl_input_bytes = 0; server.stat_net_repl_output_bytes = 0; server.stat_unexpected_error_replies = 0; server.stat_total_error_replies = 0; @@ -6052,9 +6053,9 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "total_connections_received:%lld\r\n", server.stat_numconnections, "total_commands_processed:%lld\r\n", server.stat_numcommands, "instantaneous_ops_per_sec:%lld\r\n", getInstantaneousMetric(STATS_METRIC_COMMAND), - "total_net_input_bytes:%lld\r\n", server.stat_net_input_bytes + server.stat_net_repl_input_bytes, + "total_net_input_bytes:%lld\r\n", server.stat_net_input_bytes + server.stat_net_repl_input_bytes + server.bio_stat_net_repl_input_bytes, "total_net_output_bytes:%lld\r\n", server.stat_net_output_bytes + server.stat_net_repl_output_bytes, - "total_net_repl_input_bytes:%lld\r\n", server.stat_net_repl_input_bytes, + "total_net_repl_input_bytes:%lld\r\n", server.stat_net_repl_input_bytes + server.bio_stat_net_repl_input_bytes, "total_net_repl_output_bytes:%lld\r\n", server.stat_net_repl_output_bytes, "instantaneous_input_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT) / 1024, "instantaneous_output_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT) / 1024, @@ -6148,16 +6149,25 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "replicas_repl_buffer_peak:%zu\r\n", server.pending_repl_data.peak)); if (server.repl_state == REPL_STATE_TRANSFER) { + int repl_transfer_size_stat; + int repl_transfer_read_stat; + if (atomic_load_explicit(&server.replica_bio_disk_save_state, memory_order_acquire) != REPL_BIO_DISK_SAVE_STATE_NONE) { + repl_transfer_size_stat = server.bio_repl_transfer_size; + repl_transfer_read_stat = server.bio_repl_transfer_read; + } else { + repl_transfer_size_stat = server.repl_transfer_size; + repl_transfer_read_stat = server.repl_transfer_read; + } double perc = 0; - if (server.repl_transfer_size) { - perc = ((double)server.repl_transfer_read / server.repl_transfer_size) * 100; + if (repl_transfer_size_stat) { + perc = ((double)repl_transfer_read_stat / repl_transfer_size_stat) * 100; } info = sdscatprintf( info, FMTARGS( - "master_sync_total_bytes:%lld\r\n", (long long)server.repl_transfer_size, - "master_sync_read_bytes:%lld\r\n", (long long)server.repl_transfer_read, - "master_sync_left_bytes:%lld\r\n", (long long)(server.repl_transfer_size - server.repl_transfer_read), + "master_sync_total_bytes:%lld\r\n", (long long)repl_transfer_size_stat, + "master_sync_read_bytes:%lld\r\n", (long long)repl_transfer_read_stat, + "master_sync_left_bytes:%lld\r\n", (long long)(repl_transfer_size_stat - repl_transfer_read_stat), "master_sync_perc:%.2f\r\n", perc, "master_sync_last_io_seconds_ago:%d\r\n", (int)(server.unixtime - server.repl_transfer_lastio))); } diff --git a/src/server.h b/src/server.h index 0f560d3474..1271e49c78 100644 --- a/src/server.h +++ b/src/server.h @@ -435,6 +435,13 @@ typedef enum { REPL_DUAL_CHANNEL_RDB_LOADED, } repl_rdb_channel_state; +typedef enum { + REPL_BIO_DISK_SAVE_STATE_NONE = 0, /* No active disk-saving Bio thread */ + REPL_BIO_DISK_SAVE_STATE_IN_PROGRESS, /* The disk-saving Bio job has been created */ + REPL_BIO_DISK_SAVE_STATE_FINISHED, /* The disk-saving Bio job finished */ + REPL_BIO_DISK_SAVE_STATE_FAIL /* The disk-saving Bio job failed */ +} replica_bio_disk_save_state; + /* The state of an in progress coordinated failover */ typedef enum { NO_FAILOVER = 0, /* No failover in progress */ @@ -443,6 +450,7 @@ typedef enum { * PSYNC FAILOVER request. */ } failover_state; + /* State of replicas from the POV of the primary. Used in client->replstate. * In SEND_BULK and ONLINE state the replica receives new updates * in its output queue. In the WAIT_BGSAVE states instead the server is waiting @@ -514,6 +522,7 @@ typedef enum { #define ZSKIPLIST_MAX_SEARCH 10 /* Append only defines */ +#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024 * 1024 * 8) /* 8 MB */ #define AOF_FSYNC_NO 0 #define AOF_FSYNC_ALWAYS 1 #define AOF_FSYNC_EVERYSEC 2 @@ -1979,6 +1988,16 @@ struct valkeyServer { * delay (start sooner if they all connect). */ int dual_channel_replication; /* Config used to determine if the replica should * use dual channel replication for full syncs. */ + _Atomic int replica_bio_disk_save_state; /* Flag set by the bio thread to indicate that the + * RDB save to disk has completed, or failed */ + _Atomic bool replica_bio_abort_save; /* Flag set by main thread, used to signal to replica's + * disk-saving bio thread to abort the save */ + long long bio_stat_net_repl_input_bytes; /* Used to calculate stat_net_repl_input_bytes on the + * replica's bio thread without touching main thread vars */ + off_t bio_repl_transfer_size; /* Used to calculate bio_repl_transfer_size on the + * replica's bio thread without touching main thread vars */ + off_t bio_repl_transfer_read; /* Used to calculate bio_repl_transfer_read on the + * replica's bio thread without touching main thread vars */ int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel * to establish psync. */ int debug_pause_after_fork; /* Debug param that pauses the main process @@ -2001,33 +2020,33 @@ struct valkeyServer { long long read_reploff; int dbid; } repl_provisional_primary; - client *cached_primary; /* Cached primary to be reused for PSYNC. */ - rio *loading_rio; /* Pointer to the rio object currently used for loading data. */ - int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ - int repl_state; /* Replication status if the instance is a replica */ - int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */ - off_t repl_transfer_size; /* Size of RDB to read from primary during sync. */ - off_t repl_transfer_read; /* Amount of RDB read from primary during sync. */ - off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ - connection *repl_transfer_s; /* Replica -> Primary SYNC connection */ - connection *repl_rdb_transfer_s; /* Primary FULL SYNC connection (RDB download) */ - int repl_transfer_fd; /* Replica -> Primary SYNC temp file descriptor */ - char *repl_transfer_tmpfile; /* Replica-> Primary SYNC temp file name */ - time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ - int repl_serve_stale_data; /* Serve stale data when link is down? */ - int repl_replica_ro; /* Replica is read only? */ - int repl_replica_ignore_maxmemory; /* If true replicas do not evict. */ - time_t repl_down_since; /* Unix time at which link with primary went down */ - int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ - int repl_mptcp; /* Use Multipath TCP for replica on client side */ - int replica_priority; /* Reported in INFO and used by Sentinel. */ - int replica_announced; /* If true, replica is announced by Sentinel */ - int replica_announce_port; /* Give the primary this listening port. */ - char *replica_announce_ip; /* Give the primary this ip address. */ - int propagation_error_behavior; /* Configures the behavior of the replica - * when it receives an error on the replication stream */ - int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to - * persist writes to AOF. */ + client *cached_primary; /* Cached primary to be reused for PSYNC. */ + rio *loading_rio; /* Pointer to the rio object currently used for loading data. */ + int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ + int repl_state; /* Replication status if the instance is a replica */ + int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */ + off_t repl_transfer_size; /* Size of RDB to read from primary during sync. */ + off_t repl_transfer_read; /* Amount of RDB read from primary during sync. */ + off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ + connection *repl_transfer_s; /* Replica -> Primary SYNC connection */ + connection *repl_rdb_transfer_s; /* Primary FULL SYNC connection (RDB download) */ + int repl_transfer_fd; /* Replica -> Primary SYNC temp file descriptor */ + char *repl_transfer_tmpfile; /* Replica-> Primary SYNC temp file name */ + _Atomic time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ + int repl_serve_stale_data; /* Serve stale data when link is down? */ + int repl_replica_ro; /* Replica is read only? */ + int repl_replica_ignore_maxmemory; /* If true replicas do not evict. */ + time_t repl_down_since; /* Unix time at which link with primary went down */ + int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ + int repl_mptcp; /* Use Multipath TCP for replica on client side */ + int replica_priority; /* Reported in INFO and used by Sentinel. */ + int replica_announced; /* If true, replica is announced by Sentinel */ + int replica_announce_port; /* Give the primary this listening port. */ + char *replica_announce_ip; /* Give the primary this ip address. */ + int propagation_error_behavior; /* Configures the behavior of the replica + * when it receives an error on the replication stream */ + int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to + * persist writes to AOF. */ /* The following two fields is where we store primary PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into @@ -2084,7 +2103,7 @@ struct valkeyServer { int list_max_listpack_size; int list_compress_depth; /* time cache */ - time_t unixtime; /* Unix time sampled every cron cycle. */ + _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ time_t timezone; /* Cached timezone. As set by tzset(). */ _Atomic int daylight_active; /* Currently in daylight saving time. */ mstime_t mstime; /* 'unixtime' in milliseconds. */ @@ -3018,6 +3037,7 @@ int sendCurrentOffsetToReplica(client *replica); void addRdbReplicaToPsyncWait(client *replica); void initClientReplicationData(client *c); void freeClientReplicationData(client *c); +void replicaReceiveRDBFromPrimaryToDisk(connection *conn, int is_dual_channel); /* Generic persistence functions */ void startLoadingFile(size_t size, char *filename, int rdbflags); @@ -3996,6 +4016,8 @@ void debugPauseProcess(void); * to the log message. */ #define dualChannelServerLog(level, ...) serverLog(level, "Dual channel replication: " __VA_ARGS__) +#define replicaBioSaveServerLog(level, ...) serverLog(level, "Replica bio thread: " __VA_ARGS__) + #define serverDebug(fmt, ...) printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) #define serverDebugMark() printf("-- MARK %s:%d --\n", __FILE__, __LINE__) diff --git a/tests/integration/replica-redirect.tcl b/tests/integration/replica-redirect.tcl index af724f5c72..ac575b5b66 100644 --- a/tests/integration/replica-redirect.tcl +++ b/tests/integration/replica-redirect.tcl @@ -19,6 +19,11 @@ start_server {tags {needs:repl external:skip}} { r replicaof $primary_host $primary_port wait_replica_online $primary + # In case of bio thread RDB download, there can be up to 1000ms + # (1 replication cron loop) delay until the rdb starts loading + after 1000 + wait_done_loading r + assert_error "REDIRECT*" {$rr exec} $rr close } diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index a46313f095..8b69b2ce5f 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -238,6 +238,10 @@ tags "modules" { $replica replicaof $master_host $master_port wait_replica_online $master + # In case of bio thread RDB download, there can be up to 1000ms + # (1 replication cron loop) delay until the rdb starts loading + after 1000 + wait_done_loading r test {Test master link up hook} { assert_equal [r hooks.event_count masterlink-up] 1