Skip to content

Commit f6cfe65

Browse files
author
Nitai Caro
committed
Updates
Signed-off-by: Nitai Caro <[email protected]>
1 parent b453aa9 commit f6cfe65

File tree

3 files changed

+19
-11
lines changed

3 files changed

+19
-11
lines changed

src/replication.c

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2206,7 +2206,7 @@ int tryReadBulkPayloadMetadata(connection *conn, char *buf, char *eofmark, char
22062206
/* At this stage just a newline works as a PING in order to take
22072207
* the connection live. So we refresh our last interaction
22082208
* timestamp. */
2209-
atomic_store_explicit(&server.repl_transfer_lastio, atomic_load_explicit(&server.unixtime, memory_order_relaxed), memory_order_relaxed);
2209+
server.repl_transfer_lastio = server.unixtime;
22102210
return C_RETRY;
22112211
} else if (ret == INSPECT_BULK_PAYLOAD_PRIMARY_BAD_PROTO) {
22122212
serverLog(LL_WARNING,
@@ -2641,7 +2641,7 @@ void replicaReceiveRDBFromPrimaryToDisk(connection *conn, int is_dual_channel) {
26412641

26422642
/* Update the last I/O time for the replication transfer (used in
26432643
* order to detect timeouts during replication). */
2644-
atomic_store_explicit(&server.repl_transfer_lastio, atomic_load_explicit(&server.unixtime, memory_order_relaxed), memory_order_relaxed);
2644+
server.repl_transfer_lastio = server.unixtime;
26452645

26462646
/* Write what we got from the socket to the dump file on disk */
26472647
if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) {
@@ -2700,7 +2700,6 @@ void replicaReceiveRDBFromPrimaryToDisk(connection *conn, int is_dual_channel) {
27002700
atomic_store_explicit(&server.replica_bio_disk_save_state, REPL_BIO_DISK_SAVE_STATE_FAIL, memory_order_release);
27012701
} else {
27022702
replicaBioSaveServerLog(LL_NOTICE, "Done downloading RDB");
2703-
server.replica_bio_disk_save_conn = conn;
27042703
atomic_store_explicit(&server.replica_bio_disk_save_state, REPL_BIO_DISK_SAVE_STATE_FINISHED, memory_order_release);
27052704
}
27062705
}
@@ -3000,7 +2999,7 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) {
30002999
server.bio_repl_transfer_read = 0;
30013000
if (!useDisklessLoad()) {
30023001
/* Only create the Bio thread once the first piece of data is sent by the primary */
3003-
connSetReadHandler(server.repl_rdb_transfer_s, receiveRDBinBioThreadDualChannel);
3002+
serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, receiveRDBinBioThreadDualChannel) != C_ERR);
30043003
} else {
30053004
serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, replicaReceiveRDBFromPrimaryToMemory) != C_ERR);
30063005
}
@@ -4137,7 +4136,13 @@ void syncWithPrimary(connection *conn) {
41374136

41384137
if (!useDisklessLoad()) {
41394138
/* Only create the Bio thread once the first piece of data is sent by the primary */
4140-
connSetReadHandler(conn, receiveRDBinBioThreadSingleChannel);
4139+
if (connSetReadHandler(conn, receiveRDBinBioThreadSingleChannel) == C_ERR) {
4140+
char conninfo[CONN_INFO_LEN];
4141+
serverLog(LL_WARNING, "Can't create readable event for Bio SYNC: %s (%s)", strerror(errno),
4142+
connGetInfo(conn, conninfo, sizeof(conninfo)));
4143+
syncWithPrimaryHandleError(&conn);
4144+
return;
4145+
}
41414146
} else {
41424147
/* Setup the non blocking download of the bulk file. */
41434148
if (connSetReadHandler(conn, replicaReceiveRDBFromPrimaryToMemory) == C_ERR) {
@@ -4995,7 +5000,6 @@ long long replicationGetReplicaOffset(void) {
49955000
void resetBioRDBSaveState(void) {
49965001
server.bio_repl_transfer_size = 0;
49975002
server.bio_repl_transfer_read = 0;
4998-
server.replica_bio_disk_save_conn = NULL;
49995003
server.replica_bio_disk_save_state = REPL_BIO_DISK_SAVE_STATE_NONE;
50005004
}
50015005

@@ -5021,14 +5025,20 @@ void handleBioThreadFinishedRDBDownload(void) {
50215025
/* Handle Bio sync success */
50225026
serverLog(LL_NOTICE, "Loading the RDB and finalizing primary-replica sync...");
50235027
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
5024-
replicaBeforeLoadPrimaryRDB(server.replica_bio_disk_save_conn, 0);
5028+
connection *conn;
5029+
if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) {
5030+
conn = server.repl_rdb_transfer_s;
5031+
} else {
5032+
conn = server.repl_transfer_s;
5033+
}
5034+
replicaBeforeLoadPrimaryRDB(conn, 0);
50255035
if (replicaLoadPrimaryRDBFromDisk(&rsi) == C_ERR) {
50265036
serverLog(LL_WARNING, "Failed to load RDB");
50275037
resetBioRDBSaveState();
50285038
cancelReplicationHandshake(1);
50295039
return;
50305040
}
5031-
replicaAfterLoadPrimaryRDB(server.replica_bio_disk_save_conn, &rsi);
5041+
replicaAfterLoadPrimaryRDB(conn, &rsi);
50325042
server.repl_transfer_size = server.bio_repl_transfer_size;
50335043
server.repl_transfer_read = server.bio_repl_transfer_read;
50345044
resetBioRDBSaveState();

src/server.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1315,7 +1315,7 @@ void databasesCron(void) {
13151315
static inline void updateCachedTimeWithUs(int update_daylight_info, const long long ustime) {
13161316
server.ustime = ustime;
13171317
server.mstime = server.ustime / 1000;
1318-
atomic_store_explicit(&server.unixtime, server.mstime / 1000, memory_order_relaxed);
1318+
server.unixtime = server.mstime / 1000;
13191319

13201320
/* To get information about daylight saving time, we need to call
13211321
* localtime_r and cache the result. However calling localtime_r in this

src/server.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1999,8 +1999,6 @@ struct valkeyServer {
19991999
* replica's bio thread without touching main thread vars */
20002000
off_t bio_repl_transfer_read; /* Used to calculate bio_repl_transfer_read on the
20012001
* replica's bio thread without touching main thread vars */
2002-
connection *replica_bio_disk_save_conn; /* Used to remember the connection we downloaded the RDB
2003-
* from, in order to finalize the sync later */
20042002
int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel
20052003
* to establish psync. */
20062004
int debug_pause_after_fork; /* Debug param that pauses the main process

0 commit comments

Comments
 (0)