Skip to content

Commit 93d7cca

Browse files
Fix accounting for dual channel RDB bytes in replication stats (#2602)
Resolves #2545 Followed the steps to reproduce the issue, and was able to get non-zero `total_net_repl_output_bytes`. ``` (base) ~/workspace/valkey git:[fix-bug-2545] src/valkey-cli INFO | grep total_net_repl_output_bytes total_net_repl_output_bytes:1788 ``` --------- Signed-off-by: Sarthak Aggarwal <[email protected]>
1 parent a47e8fa commit 93d7cca

File tree

6 files changed

+62
-10
lines changed

6 files changed

+62
-10
lines changed

src/childinfo.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ typedef struct {
3636
size_t cow;
3737
monotime cow_updated;
3838
double progress;
39+
size_t repl_output_bytes;
3940
childInfoType information_type; /* Type of information */
4041
} child_info_data;
4142

@@ -64,7 +65,7 @@ void closeChildInfoPipe(void) {
6465
}
6566

6667
/* Send save data to parent. */
67-
void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname) {
68+
void sendChildInfoGeneric(childInfoType info_type, size_t keys, size_t repl_output_bytes, double progress, char *pname) {
6869
if (server.child_info_pipe[1] == -1) return;
6970

7071
static monotime cow_updated = 0;
@@ -101,6 +102,7 @@ void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress,
101102

102103
data.information_type = info_type;
103104
data.keys = keys;
105+
data.repl_output_bytes = repl_output_bytes;
104106
data.cow = cow;
105107
data.cow_updated = cow_updated;
106108
data.progress = progress;
@@ -115,7 +117,7 @@ void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress,
115117
}
116118

117119
/* Update Child info. */
118-
void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_updated, size_t keys, double progress) {
120+
void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_updated, size_t keys, size_t repl_output_bytes, double progress) {
119121
if (cow > server.stat_current_cow_peak) server.stat_current_cow_peak = cow;
120122

121123
if (information_type == CHILD_INFO_TYPE_CURRENT_INFO) {
@@ -129,14 +131,16 @@ void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_up
129131
server.stat_rdb_cow_bytes = server.stat_current_cow_peak;
130132
} else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) {
131133
server.stat_module_cow_bytes = server.stat_current_cow_peak;
134+
} else if (information_type == CHILD_INFO_TYPE_REPL_OUTPUT_BYTES) {
135+
server.stat_net_repl_output_bytes += (long long)repl_output_bytes;
132136
}
133137
}
134138

135139
/* Read child info data from the pipe.
136140
* if complete data read into the buffer,
137141
* data is stored into *buffer, and returns 1.
138142
* otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */
139-
int readChildInfo(childInfoType *information_type, size_t *cow, monotime *cow_updated, size_t *keys, double *progress) {
143+
int readChildInfo(childInfoType *information_type, size_t *cow, monotime *cow_updated, size_t *keys, size_t *repl_output_bytes, double *progress) {
140144
/* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */
141145
static child_info_data buffer;
142146
ssize_t wlen = sizeof(buffer);
@@ -156,6 +160,7 @@ int readChildInfo(childInfoType *information_type, size_t *cow, monotime *cow_up
156160
*cow = buffer.cow;
157161
*cow_updated = buffer.cow_updated;
158162
*keys = buffer.keys;
163+
*repl_output_bytes = buffer.repl_output_bytes;
159164
*progress = buffer.progress;
160165
return 1;
161166
} else {
@@ -170,11 +175,12 @@ void receiveChildInfo(void) {
170175
size_t cow;
171176
monotime cow_updated;
172177
size_t keys;
178+
size_t repl_output_bytes;
173179
double progress;
174180
childInfoType information_type;
175181

176182
/* Drain the pipe and update child info so that we get the final message. */
177-
while (readChildInfo(&information_type, &cow, &cow_updated, &keys, &progress)) {
178-
updateChildInfo(information_type, cow, cow_updated, keys, progress);
183+
while (readChildInfo(&information_type, &cow, &cow_updated, &keys, &repl_output_bytes, &progress)) {
184+
updateChildInfo(information_type, cow, cow_updated, keys, repl_output_bytes, progress);
179185
}
180186
}

src/module.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11480,7 +11480,7 @@ int VM_Fork(ValkeyModuleForkDoneHandler cb, void *user_data) {
1148011480
* reported in INFO.
1148111481
* The `progress` argument should between 0 and 1, or -1 when not available. */
1148211482
void VM_SendChildHeartbeat(double progress) {
11483-
sendChildInfoGeneric(CHILD_INFO_TYPE_CURRENT_INFO, 0, progress, "Module fork");
11483+
sendChildInfoGeneric(CHILD_INFO_TYPE_CURRENT_INFO, 0, 0, progress, "Module fork");
1148411484
}
1148511485

1148611486
/* Call from the child process when you want to terminate it.

src/rdb.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3723,6 +3723,9 @@ int saveSnapshotToConnectionSockets(rdbSnapshotOptions options) {
37233723

37243724
if (retval == C_OK) {
37253725
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
3726+
if (!options.use_pipe) {
3727+
sendChildInfoGeneric(CHILD_INFO_TYPE_REPL_OUTPUT_BYTES, 0, rdb.processed_bytes, -1, "RDB");
3728+
}
37263729
}
37273730
if (!options.use_pipe) {
37283731
rioFreeConnset(&rdb);

src/server.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6789,11 +6789,11 @@ int serverFork(int purpose) {
67896789
}
67906790

67916791
void sendChildCowInfo(childInfoType info_type, char *pname) {
6792-
sendChildInfoGeneric(info_type, 0, -1, pname);
6792+
sendChildInfoGeneric(info_type, 0, 0, -1, pname);
67936793
}
67946794

67956795
void sendChildInfo(childInfoType info_type, size_t keys, char *pname) {
6796-
sendChildInfoGeneric(info_type, keys, -1, pname);
6796+
sendChildInfoGeneric(info_type, keys, 0, -1, pname);
67976797
}
67986798

67996799
/* Dismiss big chunks of memory inside a client structure, see zmadvise_dontneed() */

src/server.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,7 +1657,8 @@ typedef enum childInfoType {
16571657
CHILD_INFO_TYPE_CURRENT_INFO,
16581658
CHILD_INFO_TYPE_AOF_COW_SIZE,
16591659
CHILD_INFO_TYPE_RDB_COW_SIZE,
1660-
CHILD_INFO_TYPE_MODULE_COW_SIZE
1660+
CHILD_INFO_TYPE_MODULE_COW_SIZE,
1661+
CHILD_INFO_TYPE_REPL_OUTPUT_BYTES
16611662
} childInfoType;
16621663

16631664
struct valkeyServer {
@@ -3156,7 +3157,7 @@ int rewriteSlotToAppendOnlyFileRio(rio *aof, int db_num, int hashslot, size_t *k
31563157
/* Child info */
31573158
void openChildInfoPipe(void);
31583159
void closeChildInfoPipe(void);
3159-
void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname);
3160+
void sendChildInfoGeneric(childInfoType info_type, size_t keys, size_t repl_output_bytes, double progress, char *pname);
31603161
void sendChildCowInfo(childInfoType info_type, char *pname);
31613162
void sendChildInfo(childInfoType info_type, size_t keys, char *pname);
31623163
void receiveChildInfo(void);

tests/integration/dual-channel-replication.tcl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,48 @@ start_server {tags {"dual-channel-replication external:skip"}} {
7777
fail "Replicas and primary offsets were unable to match."
7878
}
7979
}
80+
81+
test "Dual-channel replication counts snapshot bytes" {
82+
wait_for_condition 50 100 {
83+
[getInfoProperty [$primary info stats] "total_net_repl_output_bytes"] > 0
84+
} else {
85+
fail "Replication output bytes not updated"
86+
}
87+
}
88+
}
89+
}
90+
91+
start_server {tags {"dual-channel-replication external:skip"}} {
92+
set replica [srv 0 client]
93+
set replica_host [srv 0 host]
94+
set replica_port [srv 0 port]
95+
start_server {} {
96+
set primary [srv 0 client]
97+
set primary_host [srv 0 host]
98+
set primary_port [srv 0 port]
99+
100+
$primary config set repl-diskless-sync yes
101+
$primary config set repl-diskless-sync-delay 0
102+
$primary config set dual-channel-replication-enabled yes
103+
104+
$replica config set repl-diskless-sync yes
105+
$replica config set repl-diskless-load swapdb
106+
$replica config set dual-channel-replication-enabled yes
107+
108+
for {set j 0} {$j < 100} {incr j} {
109+
$primary set key$j [string repeat x 100]
110+
}
111+
$primary config resetstat
112+
113+
test "dual-channel replication reports rdb transfer bytes" {
114+
$replica replicaof $primary_host $primary_port
115+
verify_replica_online $primary 0 700
116+
wait_for_condition 50 100 {
117+
[getInfoProperty [$primary info stats] "total_net_repl_output_bytes"] > 1000
118+
} else {
119+
fail "Replication output bytes not updated"
120+
}
121+
}
80122
}
81123
}
82124

0 commit comments

Comments
 (0)