Skip to content

Commit abbc6a2

Browse files
committed
misc: de-duplicate the code by adding replicationRestoreOffsetFromSaveInfo
Signed-off-by: arthur.lee <[email protected]>
1 parent 41d02f9 commit abbc6a2

File tree

4 files changed

+41
-56
lines changed

4 files changed

+41
-56
lines changed

src/aof.c

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,32 +1470,7 @@ int loadSingleAppendOnlyFile(char *filename) {
14701470
goto cleanup;
14711471
} else {
14721472
/* Restore the replication ID / offset from the RDB file. */
1473-
if (rsi.repl_id_is_set && rsi.repl_offset != -1 && rsi.repl_stream_db != -1) {
1474-
rsi_is_valid = 1;
1475-
if (!iAmPrimary()) {
1476-
memcpy(server.replid, rsi.repl_id, sizeof(server.replid));
1477-
server.primary_repl_offset = rsi.repl_offset;
1478-
if (!server.primary && !server.cached_primary) {
1479-
/* only cache myself primary if replica did not synced to its primary node yet */
1480-
replicationCachePrimaryUsingMyself();
1481-
selectDb(server.cached_primary, rsi.repl_stream_db);
1482-
}
1483-
serverLog(LL_NOTICE, "Loading preamble rdb changed replication info, replid: %s, primary_repl_offset: %lld",
1484-
server.replid, server.primary_repl_offset);
1485-
} else {
1486-
memcpy(server.replid2, rsi.repl_id, sizeof(server.replid));
1487-
server.second_replid_offset = rsi.repl_offset + 1;
1488-
server.primary_repl_offset += rsi.repl_offset;
1489-
serverAssert(server.repl_backlog);
1490-
server.repl_backlog->offset = server.primary_repl_offset - server.repl_backlog->histlen + 1;
1491-
rebaseReplicationBuffer(rsi.repl_offset);
1492-
server.repl_no_replicas_since = time(NULL);
1493-
serverLog(LL_NOTICE, "Loading preamble rdb changed replication info, replid2: %s, secondary_repl_offset: "
1494-
"%lld, primary_repl_offset: %lld",
1495-
server.replid2, server.second_replid_offset, server.primary_repl_offset);
1496-
}
1497-
}
1498-
1473+
rsi_is_valid = replicationRestoreOffsetFromSaveInfo(&rsi, true);
14991474
loadingAbsProgress(ftello(fp));
15001475
last_progress_report_size = ftello(fp);
15011476
if (old_style) serverLog(LL_NOTICE, "Reading the remaining AOF tail...");

src/rdb.c

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3952,3 +3952,41 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
39523952
}
39533953
return NULL;
39543954
}
3955+
3956+
/* Restore the replication ID / offset from the RDB file
3957+
* return 1 if rdbSaveInfo is valid */
3958+
int replicationRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble) {
3959+
int rsi_is_valid = 0;
3960+
if (rsi == NULL) return rsi_is_valid;
3961+
if (rsi->repl_id_is_set && rsi->repl_offset != -1 && rsi->repl_stream_db != -1) {
3962+
/* Note that older implementations may save a repl_stream_db
3963+
* of -1 inside the RDB file in a wrong way, see more
3964+
* information in function rdbPopulateSaveInfo. */
3965+
rsi_is_valid = 1;
3966+
if (!iAmPrimary()) {
3967+
memcpy(server.replid, rsi->repl_id, sizeof(server.replid));
3968+
server.primary_repl_offset = rsi->repl_offset;
3969+
if (!is_aof_preamble || (!server.primary && !server.cached_primary)) {
3970+
/* If this is a replica, create a cached primary from this
3971+
* information, in order to allow partial resynchronizations
3972+
* with primaries. For AOF, only cache the primary if replica
3973+
* has not synced to its primary node yet. */
3974+
replicationCachePrimaryUsingMyself();
3975+
selectDb(server.cached_primary, rsi->repl_stream_db);
3976+
}
3977+
} else {
3978+
/* If this is a primary, we can save the replication info
3979+
* as secondary ID and offset, in order to allow replicas
3980+
* to partial resynchronizations with primaries. */
3981+
memcpy(server.replid2, rsi->repl_id, sizeof(server.replid));
3982+
server.second_replid_offset = rsi->repl_offset + 1;
3983+
/* Rebase primary_repl_offset from rsi.repl_offset. */
3984+
server.primary_repl_offset += rsi->repl_offset;
3985+
serverAssert(server.repl_backlog);
3986+
server.repl_backlog->offset = server.primary_repl_offset - server.repl_backlog->histlen + 1;
3987+
rebaseReplicationBuffer(rsi->repl_offset);
3988+
server.repl_no_replicas_since = time(NULL);
3989+
}
3990+
}
3991+
return rsi_is_valid;
3992+
}

src/rdb.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,5 +212,6 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
212212
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
213213
ssize_t rdbSaveFunctions(rio *rdb);
214214
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
215+
int replicationRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
215216

216217
#endif

src/server.c

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6926,36 +6926,7 @@ void loadDataFromDisk(void) {
69266926
int rdb_load_ret = rdbLoad(server.rdb_filename, &rsi, rdb_flags);
69276927
if (rdb_load_ret == RDB_OK) {
69286928
serverLog(LL_NOTICE, "DB loaded from disk: %.3f seconds", (float)(ustime() - start) / 1000000);
6929-
6930-
/* Restore the replication ID / offset from the RDB file. */
6931-
if (rsi.repl_id_is_set && rsi.repl_offset != -1 &&
6932-
/* Note that older implementations may save a repl_stream_db
6933-
* of -1 inside the RDB file in a wrong way, see more
6934-
* information in function rdbPopulateSaveInfo. */
6935-
rsi.repl_stream_db != -1) {
6936-
rsi_is_valid = 1;
6937-
if (!iAmPrimary()) {
6938-
memcpy(server.replid, rsi.repl_id, sizeof(server.replid));
6939-
server.primary_repl_offset = rsi.repl_offset;
6940-
/* If this is a replica, create a cached primary from this
6941-
* information, in order to allow partial resynchronizations
6942-
* with primaries. */
6943-
replicationCachePrimaryUsingMyself();
6944-
selectDb(server.cached_primary, rsi.repl_stream_db);
6945-
} else {
6946-
/* If this is a primary, we can save the replication info
6947-
* as secondary ID and offset, in order to allow replicas
6948-
* to partial resynchronizations with primaries. */
6949-
memcpy(server.replid2, rsi.repl_id, sizeof(server.replid));
6950-
server.second_replid_offset = rsi.repl_offset + 1;
6951-
/* Rebase primary_repl_offset from rsi.repl_offset. */
6952-
server.primary_repl_offset += rsi.repl_offset;
6953-
serverAssert(server.repl_backlog);
6954-
server.repl_backlog->offset = server.primary_repl_offset - server.repl_backlog->histlen + 1;
6955-
rebaseReplicationBuffer(rsi.repl_offset);
6956-
server.repl_no_replicas_since = time(NULL);
6957-
}
6958-
}
6929+
rsi_is_valid = replicationRestoreOffsetFromSaveInfo(&rsi, false);
69596930
} else if (rdb_load_ret != RDB_NOT_EXIST) {
69606931
serverLog(LL_WARNING, "Fatal error loading the DB, check server logs. Exiting.");
69616932
exit(1);

0 commit comments

Comments
 (0)