Skip to content

Commit 0567e07

Browse files
tezcShooterITsundbmoticlessnaglera
authored
Rdb channel replication (redis#13732)
This PR is based on: redis#12109 valkey-io/valkey#60 Closes: redis#11678 **Motivation** During a full sync, when master is delivering RDB to the replica, incoming write commands are kept in a replication buffer in order to be sent to the replica once RDB delivery is completed. If RDB delivery takes a long time, it might create memory pressure on master. Also, once a replica connection accumulates replication data which is larger than output buffer limits, master will kill replica connection. This may cause a replication failure. The main benefit of the rdb channel replication is streaming incoming commands in parallel to the RDB delivery. This approach shifts replication stream buffering to the replica and reduces load on master. We do this by opening another connection for RDB delivery. The main channel on replica will be receiving replication stream while rdb channel is receiving the RDB. This feature also helps to reduce master's main process CPU load. By opening a dedicated connection for the RDB transfer, the bgsave process has access to the new connection and it will stream RDB directly to the replicas. Before this change, due to TLS connection restriction, the bgsave process was writing RDB bytes to a pipe and the main process was forwarding it to the replica. This is no longer necessary, the main process can avoid these expensive socket read/write syscalls. It also means RDB delivery to replica will be faster as it avoids this step. In summary, replication will be faster and master's performance during full syncs will improve. **Implementation steps** 1. When replica connects to the master, it sends 'rdb-channel-repl' as part of capability exchange to let master to know replica supports rdb channel. 2. When replica lacks sufficient data for PSYNC, master sends +RDBCHANNELSYNC reply with replica's client id. As the next step, the replica opens a new connection (rdb-channel) and configures it against the master with the appropriate capabilities and requirements. It also sends given client id back to master over rdbchannel, so that master can associate these channels. (initial replica connection will be referred as main-channel) Then, replica requests fullsync using the RDB channel. 3. Prior to forking, master attaches the replica's main channel to the replication backlog to deliver replication stream starting at the snapshot end offset. 4. The master main process sends replication stream via the main channel, while the bgsave process sends the RDB directly to the replica via the rdb-channel. Replica accumulates replication stream in a local buffer, while the RDB is being loaded into the memory. 5. Once the replica completes loading the rdb, it drops the rdb channel and streams the accumulated replication stream into the db. Sync is completed. **Some details** - Currently, rdbchannel replication is supported only if `repl-diskless-sync` is enabled on master. Otherwise, replication will happen over a single connection as in before. - On replica, there is a limit to replication stream buffering. Replica uses a new config `replica-full-sync-buffer-limit` to limit number of bytes to accumulate. If it is not set, replica inherits `client-output-buffer-limit <replica>` hard limit config. If we reach this limit, replica stops accumulating. This is not a failure scenario though. Further accumulation will happen on master side. Depending on the configured limits on master, master may kill the replica connection. **API changes in INFO output:** 1. New replica state: `send_bulk_and_stream`. Indicates full sync is still in progress for this replica. It is receiving replication stream and rdb in parallel. ``` slave0:ip=127.0.0.1,port=5002,state=send_bulk_and_stream,offset=0,lag=0 ``` Replica state changes in steps: - First, replica sends psync and receives +RDBCHANNELSYNC :`state=wait_bgsave` - After replica connects with rdbchannel and delivery starts: `state=send_bulk_and_stream` - After full sync: `state=online` 2. On replica side, replication stream buffering metrics: - replica_full_sync_buffer_size: Currently accumulated replication stream data in bytes. - replica_full_sync_buffer_peak: Peak number of bytes that this instance accumulated in the lifetime of the process. ``` replica_full_sync_buffer_size:20485 replica_full_sync_buffer_peak:1048560 ``` **API changes in CLIENT LIST** In `client list` output, rdbchannel clients will have 'C' flag in addition to 'S' replica flag: ``` id=11 addr=127.0.0.1:39108 laddr=127.0.0.1:5001 fd=14 name= age=5 idle=5 flags=SC db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1920 events=r cmd=psync user=default redir=-1 resp=2 lib-name= lib-ver= io-thread=0 ``` **Config changes:** - `replica-full-sync-buffer-limit`: Controls how much replication data replica can accumulate during rdbchannel replication. If it is not set, a value of 0 means replica will inherit `client-output-buffer-limit <replica>` hard limit config to limit accumulated data. - `repl-rdb-channel` config is added as a hidden config. This is mostly for testing as we need to support both rdbchannel replication and the older single connection replication (to keep compatibility with older versions and rdbchannel replication will not be enabled if repl-diskless-sync is not enabled). it affects both the master (not to respond to rdb channel requests), and the replica (not to declare capability) **Internal API changes:** Changes that were introduced to Redis replication: - New replication capability is added to replconf command: `capa rdb-channel-repl`. Indicates replica is capable of rdb channel replication. Replica sends it when it connects to master along with other capabilities. - If replica needs fullsync, master replies `+RDBCHANNELSYNC <client-id>` to the replica's PSYNC request. - When replica opens rdbchannel connection, as part of replconf command, it sends `rdb-channel 1` to let master know this is rdb channel. Also, it sends `main-ch-client-id <client-id>` as part of replconf command so master can associate channels. **Testing:** As rdbchannel replication is enabled by default, we run whole test suite with it. Though, as we need to support both rdbchannel and single connection replication, we'll be running some tests twice with `repl-rdb-channel yes/no` config. **Replica state diagram** ``` * * Replica state machine * * * Main channel state * ┌───────────────────┐ * │RECEIVE_PING_REPLY │ * └────────┬──────────┘ * │ +PONG * ┌────────▼──────────┐ * │SEND_HANDSHAKE │ RDB channel state * └────────┬──────────┘ ┌───────────────────────────────┐ * │+OK ┌───► RDB_CH_SEND_HANDSHAKE │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_AUTH_REPLY │ │ REPLCONF main-ch-client-id <clientid> * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_AUTH_REPLY │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_PORT_REPLY │ │ │ +OK * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_REPLCONF_REPLY│ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_IP_REPLY │ │ │ +OK * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_FULLRESYNC │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_CAPA_REPLY │ │ │+FULLRESYNC * └────────┬──────────┘ │ │Rdb delivery * │ │ ┌──────────────▼────────────────┐ * ┌────────▼──────────┐ │ │ RDB_CH_RDB_LOADING │ * │SEND_PSYNC │ │ └──────────────┬────────────────┘ * └─┬─────────────────┘ │ │ Done loading * │PSYNC (use cached-master) │ │ * ┌─▼─────────────────┐ │ │ * │RECEIVE_PSYNC_REPLY│ │ ┌────────────►│ Replica streams replication * └─┬─────────────────┘ │ │ │ buffer into memory * │ │ │ │ * │+RDBCHANNELSYNC client-id │ │ │ * ├──────┬───────────────────┘ │ │ * │ │ Main channel │ │ * │ │ accumulates repl data │ │ * │ ┌──▼────────────────┐ │ ┌───────▼───────────┐ * │ │ REPL_TRANSFER ├───────┘ │ CONNECTED │ * │ └───────────────────┘ └────▲───▲──────────┘ * │ │ │ * │ │ │ * │ +FULLRESYNC ┌───────────────────┐ │ │ * ├────────────────► REPL_TRANSFER ├────┘ │ * │ └───────────────────┘ │ * │ +CONTINUE │ * └──────────────────────────────────────────────┘ */ ``` ----- This PR also contains changes and ideas from: valkey-io/valkey#837 valkey-io/valkey#1173 valkey-io/valkey#804 valkey-io/valkey#945 valkey-io/valkey#989 --------- Co-authored-by: Yuan Wang <[email protected]> Co-authored-by: debing.sun <[email protected]> Co-authored-by: Moti Cohen <[email protected]> Co-authored-by: naglera <[email protected]> Co-authored-by: Amit Nagler <[email protected]> Co-authored-by: Madelyn Olson <[email protected]> Co-authored-by: Binbin <[email protected]> Co-authored-by: Viktor Söderqvist <[email protected]> Co-authored-by: Ping Xie <[email protected]> Co-authored-by: Ran Shidlansik <[email protected]> Co-authored-by: ranshid <[email protected]> Co-authored-by: xbasel <[email protected]>
1 parent 57d01a7 commit 0567e07

File tree

18 files changed

+2205
-213
lines changed

18 files changed

+2205
-213
lines changed

redis.conf

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,24 @@ repl-disable-tcp-nodelay no
727727
#
728728
# repl-backlog-ttl 3600
729729

730+
# During a fullsync, the master may decide to send both the RDB file and the
731+
# replication stream to the replica in parallel. This approach shifts the
732+
# responsibility of buffering the replication stream to the replica during the
733+
# fullsync process. The replica accumulates the replication stream data until
734+
# the RDB file is fully loaded. Once the RDB delivery is completed and
735+
# successfully loaded, the replica begins processing and applying the
736+
# accumulated replication data to the db. The configuration below controls how
737+
# much replication data the replica can accumulate during a fullsync.
738+
#
739+
# When the replica reaches this limit, it will stop accumulating further data.
740+
# At this point, additional data accumulation may occur on the master side
741+
# depending on the 'client-output-buffer-limit <replica>' config of master.
742+
#
743+
# A value of 0 means replica inherits hard limit of
744+
# 'client-output-buffer-limit <replica>' config to limit accumulation size.
745+
#
746+
# replica-full-sync-buffer-limit 0
747+
730748
# The replica priority is an integer number published by Redis in the INFO
731749
# output. It is used by Redis Sentinel in order to select a replica to promote
732750
# into a master if the master is no longer working correctly.

src/config.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
* Copyright (c) 2009-Present, Redis Ltd.
44
* All rights reserved.
55
*
6+
* Copyright (c) 2024-present, Valkey contributors.
7+
* All rights reserved.
8+
*
69
* Licensed under your choice of the Redis Source Available License 2.0
710
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
811
*
@@ -3076,6 +3079,7 @@ standardConfig static_configs[] = {
30763079
createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush , 0, NULL, NULL),
30773080
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
30783081
createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
3082+
createBoolConfig("repl-rdb-channel", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.repl_rdb_channel, 1, NULL, NULL),
30793083
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),
30803084
createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL),
30813085
createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL),
@@ -3218,6 +3222,7 @@ standardConfig static_configs[] = {
32183222
createLongLongConfig("proto-max-bulk-len", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
32193223
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
32203224
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
3225+
createLongLongConfig("replica-full-sync-buffer-limit", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.repl_full_sync_buffer_limit, 0, MEMORY_CONFIG, NULL, NULL), /* Default: Inherits 'client-output-buffer-limit <replica>' */
32213226

32223227
/* Unsigned Long Long configs */
32233228
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),

src/debug.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
* Copyright (c) 2009-Present, Redis Ltd.
33
* All rights reserved.
44
*
5+
* Copyright (c) 2024-present, Valkey contributors.
6+
* All rights reserved.
7+
*
58
* Licensed under your choice of the Redis Source Available License 2.0
69
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
710
*
@@ -483,6 +486,8 @@ void debugCommand(client *c) {
483486
" In case RESET is provided the peak reset time will be restored to the default value",
484487
"REPLYBUFFER RESIZING <0|1>",
485488
" Enable or disable the reply buffer resize cron job",
489+
"REPL-PAUSE <clear|after-fork|before-rdb-channel|on-streaming-repl-buf>",
490+
" Pause the server's main process during various replication steps.",
486491
"DICT-RESIZING <0|1>",
487492
" Enable or disable the main dict and expire dict resizing.",
488493
"SCRIPT <LIST|<sha>>",
@@ -1018,6 +1023,20 @@ NULL
10181023
return;
10191024
}
10201025
addReply(c, shared.ok);
1026+
} else if (!strcasecmp(c->argv[1]->ptr, "repl-pause") && c->argc == 3) {
1027+
if (!strcasecmp(c->argv[2]->ptr, "clear")) {
1028+
server.repl_debug_pause = REPL_DEBUG_PAUSE_NONE;
1029+
} else if (!strcasecmp(c->argv[2]->ptr,"after-fork")) {
1030+
server.repl_debug_pause |= REPL_DEBUG_AFTER_FORK;
1031+
} else if (!strcasecmp(c->argv[2]->ptr,"before-rdb-channel")) {
1032+
server.repl_debug_pause |= REPL_DEBUG_BEFORE_RDB_CHANNEL;
1033+
} else if (!strcasecmp(c->argv[2]->ptr, "on-streaming-repl-buf")) {
1034+
server.repl_debug_pause |= REPL_DEBUG_ON_STREAMING_REPL_BUF;
1035+
} else {
1036+
addReplySubcommandSyntaxError(c);
1037+
return;
1038+
}
1039+
addReply(c, shared.ok);
10211040
} else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) {
10221041
server.dict_resizing = atoi(c->argv[2]->ptr);
10231042
addReply(c, shared.ok);
@@ -2583,6 +2602,12 @@ void applyWatchdogPeriod(void) {
25832602
}
25842603
}
25852604

2605+
void debugPauseProcess(void) {
2606+
serverLog(LL_NOTICE, "Process is about to stop.");
2607+
raise(SIGSTOP);
2608+
serverLog(LL_NOTICE, "Process has been continued.");
2609+
}
2610+
25862611
/* Positive input is sleep time in microseconds. Negative input is fractions
25872612
* of microseconds, i.e. -10 means 100 nanoseconds. */
25882613
void debugDelay(int usec) {

src/networking.c

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ client *createClient(connection *conn) {
188188
c->slave_addr = NULL;
189189
c->slave_capa = SLAVE_CAPA_NONE;
190190
c->slave_req = SLAVE_REQ_NONE;
191+
c->main_ch_client_id = 0;
191192
c->reply = listCreate();
192193
c->deferred_reply_errors = NULL;
193194
c->reply_bytes = 0;
@@ -252,6 +253,7 @@ void putClientInPendingWriteQueue(client *c) {
252253
* writes at this stage. */
253254
if (!(c->flags & CLIENT_PENDING_WRITE) &&
254255
(c->replstate == REPL_STATE_NONE ||
256+
c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM ||
255257
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
256258
{
257259
/* Here instead of installing the write handler, we just flag the
@@ -1556,7 +1558,16 @@ void unlinkClient(client *c) {
15561558
}
15571559
}
15581560
/* Only use shutdown when the fork is active and we are the parent. */
1559-
if (server.child_type) connShutdown(c->conn);
1561+
if (server.child_type) {
1562+
/* connShutdown() may access TLS state. If this is a rdbchannel
1563+
* client, bgsave fork is writing to the connection and TLS state in
1564+
* the main process is stale. SSL_shutdown() involves a handshake,
1565+
* and it may block the caller when used with stale TLS state.*/
1566+
if (c->flags & CLIENT_REPL_RDB_CHANNEL)
1567+
shutdown(c->conn->fd, SHUT_RDWR);
1568+
else
1569+
connShutdown(c->conn);
1570+
}
15601571
connClose(c->conn);
15611572
c->conn = NULL;
15621573
}
@@ -1725,7 +1736,8 @@ void freeClient(client *c) {
17251736

17261737
/* Log link disconnection with slave */
17271738
if (clientTypeIsSlave(c)) {
1728-
serverLog(LL_NOTICE,"Connection with replica %s lost.",
1739+
const char *type = c->flags & CLIENT_REPL_RDB_CHANNEL ? " (rdbchannel)" : "";
1740+
serverLog(LL_NOTICE,"Connection with replica%s %s lost.", type,
17291741
replicationGetSlaveName(c));
17301742
}
17311743

@@ -3086,6 +3098,7 @@ sds catClientInfoString(sds s, client *client) {
30863098
if (client->flags & CLIENT_READONLY) *p++ = 'r';
30873099
if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
30883100
if (client->flags & CLIENT_NO_TOUCH) *p++ = 'T';
3101+
if (client->flags & CLIENT_REPL_RDB_CHANNEL) *p++ = 'C';
30893102
if (p == flags) *p++ = 'N';
30903103
*p++ = '\0';
30913104

@@ -4309,7 +4322,7 @@ void flushSlavesOutputBuffers(void) {
43094322
*
43104323
* 3. Obviously if the slave is not ONLINE.
43114324
*/
4312-
if (slave->replstate == SLAVE_STATE_ONLINE &&
4325+
if ((slave->replstate == SLAVE_STATE_ONLINE || slave->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
43134326
!(slave->flags & CLIENT_CLOSE_ASAP) &&
43144327
can_receive_writes &&
43154328
!slave->repl_start_cmd_stream_on_ack &&

src/rdb.c

Lines changed: 88 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22
* Copyright (c) 2009-Present, Redis Ltd.
33
* All rights reserved.
44
*
5+
* Copyright (c) 2024-present, Valkey contributors.
6+
* All rights reserved.
7+
*
58
* Licensed under your choice of the Redis Source Available License 2.0
69
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
10+
*
11+
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
712
*/
813

914
#include "server.h"
@@ -3810,8 +3815,10 @@ static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
38103815
}
38113816
if (server.rdb_child_exit_pipe!=-1)
38123817
close(server.rdb_child_exit_pipe);
3813-
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
3814-
close(server.rdb_pipe_read);
3818+
if (server.rdb_pipe_read != -1) {
3819+
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
3820+
close(server.rdb_pipe_read);
3821+
}
38153822
server.rdb_child_exit_pipe = -1;
38163823
server.rdb_pipe_read = -1;
38173824
zfree(server.rdb_pipe_conns);
@@ -3875,60 +3882,76 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
38753882
listNode *ln;
38763883
listIter li;
38773884
pid_t childpid;
3878-
int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
3885+
int pipefds[2], rdb_pipe_write = 0, safe_to_exit_pipe = 0;
3886+
int rdb_channel = (req & SLAVE_REQ_RDB_CHANNEL);
38793887

38803888
if (hasActiveChildProcess()) return C_ERR;
38813889

38823890
/* Even if the previous fork child exited, don't start a new one until we
38833891
* drained the pipe. */
38843892
if (server.rdb_pipe_conns) return C_ERR;
38853893

3886-
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
3887-
* the parent, we can't let it write directly to the sockets, since in case
3888-
* of TLS we must let the parent handle a continuous TLS state when the
3889-
* child terminates and parent takes over. */
3890-
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
3891-
server.rdb_pipe_read = pipefds[0]; /* read end */
3892-
rdb_pipe_write = pipefds[1]; /* write end */
3893-
3894-
/* create another pipe that is used by the parent to signal to the child
3895-
* that it can exit. */
3896-
if (anetPipe(pipefds, 0, 0) == -1) {
3897-
close(rdb_pipe_write);
3898-
close(server.rdb_pipe_read);
3899-
return C_ERR;
3894+
if (!rdb_channel) {
3895+
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
3896+
* the parent, we can't let it write directly to the sockets, since in case
3897+
* of TLS we must let the parent handle a continuous TLS state when the
3898+
* child terminates and parent takes over. */
3899+
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
3900+
server.rdb_pipe_read = pipefds[0]; /* read end */
3901+
rdb_pipe_write = pipefds[1]; /* write end */
3902+
3903+
/* create another pipe that is used by the parent to signal to the child
3904+
* that it can exit. */
3905+
if (anetPipe(pipefds, 0, 0) == -1) {
3906+
close(rdb_pipe_write);
3907+
close(server.rdb_pipe_read);
3908+
return C_ERR;
3909+
}
3910+
safe_to_exit_pipe = pipefds[0]; /* read end */
3911+
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
39003912
}
3901-
safe_to_exit_pipe = pipefds[0]; /* read end */
3902-
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
39033913

39043914
/* Collect the connections of the replicas we want to transfer
3905-
* the RDB to, which are i WAIT_BGSAVE_START state. */
3906-
server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
3907-
server.rdb_pipe_numconns = 0;
3908-
server.rdb_pipe_numconns_writing = 0;
3915+
* the RDB to, which are in WAIT_BGSAVE_START state. */
3916+
int numconns = 0;
3917+
connection **conns = zmalloc(sizeof(*conns) * listLength(server.slaves));
39093918
listRewind(server.slaves,&li);
39103919
while((ln = listNext(&li))) {
39113920
client *slave = ln->value;
39123921
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
39133922
/* Check slave has the exact requirements */
39143923
if (slave->slave_req != req)
39153924
continue;
3916-
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
3917-
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
3925+
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
3926+
conns[numconns++] = slave->conn;
3927+
if (rdb_channel) {
3928+
/* Put the socket in blocking mode to simplify RDB transfer. */
3929+
connSendTimeout(slave->conn, server.repl_timeout * 1000);
3930+
connBlock(slave->conn);
3931+
}
39183932
}
39193933
}
39203934

3935+
if (!rdb_channel) {
3936+
server.rdb_pipe_conns = conns;
3937+
server.rdb_pipe_numconns = numconns;
3938+
server.rdb_pipe_numconns_writing = 0;
3939+
}
3940+
39213941
/* Create the child process. */
39223942
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
39233943
/* Child */
39243944
int retval, dummy;
39253945
rio rdb;
39263946

3927-
rioInitWithFd(&rdb,rdb_pipe_write);
3928-
3929-
/* Close the reading part, so that if the parent crashes, the child will
3930-
* get a write error and exit. */
3931-
close(server.rdb_pipe_read);
3947+
if (rdb_channel) {
3948+
rioInitWithConnset(&rdb, conns, numconns);
3949+
} else {
3950+
rioInitWithFd(&rdb,rdb_pipe_write);
3951+
/* Close the reading part, so that if the parent crashes, the child
3952+
* will get a write error and exit. */
3953+
close(server.rdb_pipe_read);
3954+
}
39323955

39333956
redisSetProcTitle("redis-rdb-to-slaves");
39343957
redisSetCpuAffinity(server.bgsave_cpulist);
@@ -3941,14 +3964,19 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
39413964
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
39423965
}
39433966

3944-
rioFreeFd(&rdb);
3945-
/* wake up the reader, tell it we're done. */
3946-
close(rdb_pipe_write);
3947-
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
3948-
/* hold exit until the parent tells us it's safe. we're not expecting
3949-
* to read anything, just get the error when the pipe is closed. */
3950-
dummy = read(safe_to_exit_pipe, pipefds, 1);
3951-
UNUSED(dummy);
3967+
if (rdb_channel) {
3968+
rioFreeConnset(&rdb);
3969+
} else {
3970+
rioFreeFd(&rdb);
3971+
/* wake up the reader, tell it we're done. */
3972+
close(rdb_pipe_write);
3973+
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
3974+
/* hold exit until the parent tells us it's safe. we're not expecting
3975+
* to read anything, just get the error when the pipe is closed. */
3976+
dummy = read(safe_to_exit_pipe, pipefds, 1);
3977+
UNUSED(dummy);
3978+
}
3979+
zfree(conns);
39523980
exitFromChild((retval == C_OK) ? 0 : 1);
39533981
} else {
39543982
/* Parent */
@@ -3966,24 +3994,33 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
39663994
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
39673995
}
39683996
}
3969-
close(rdb_pipe_write);
3970-
close(server.rdb_pipe_read);
3971-
close(server.rdb_child_exit_pipe);
3972-
zfree(server.rdb_pipe_conns);
3973-
server.rdb_pipe_conns = NULL;
3974-
server.rdb_pipe_numconns = 0;
3975-
server.rdb_pipe_numconns_writing = 0;
3997+
3998+
if (!rdb_channel) {
3999+
close(rdb_pipe_write);
4000+
close(server.rdb_pipe_read);
4001+
close(server.rdb_child_exit_pipe);
4002+
zfree(server.rdb_pipe_conns);
4003+
server.rdb_pipe_conns = NULL;
4004+
server.rdb_pipe_numconns = 0;
4005+
server.rdb_pipe_numconns_writing = 0;
4006+
}
39764007
} else {
3977-
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
3978-
(long) childpid);
4008+
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid,
4009+
rdb_channel ? "replica socket" : "parent process pipe");
39794010
server.rdb_save_time_start = time(NULL);
39804011
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
3981-
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
3982-
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
3983-
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
4012+
if (!rdb_channel) {
4013+
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
4014+
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
4015+
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
4016+
}
39844017
}
39854018
}
3986-
close(safe_to_exit_pipe);
4019+
if (rdb_channel)
4020+
zfree(conns);
4021+
else
4022+
close(safe_to_exit_pipe);
4023+
39874024
return (childpid == -1) ? C_ERR : C_OK;
39884025
}
39894026
return C_OK; /* Unreached. */

0 commit comments

Comments
 (0)