Skip to content

Commit 76576ac

Browse files
committed
Addressed PR comments
Signed-off-by: Uri Yagelnik <[email protected]>
1 parent 8b2137d commit 76576ac

File tree

4 files changed

+65
-64
lines changed

4 files changed

+65
-64
lines changed

src/networking.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1547,6 +1547,9 @@ int anyOtherReplicaWaitRdb(client *except_me) {
15471547
void unlinkClient(client *c) {
15481548
listNode *ln;
15491549

1550+
/* Wait for IO operations to be done before unlinking the client. */
1551+
waitForClientIO(c);
1552+
15501553
/* If this is marked as current client unset it. */
15511554
if (c->conn && server.current_client == c) server.current_client = NULL;
15521555

@@ -2058,7 +2061,7 @@ static void writeToReplica(client *c) {
20582061

20592062
listNode *first_node = c->ref_repl_buf_node;
20602063

2061-
/*Handle the single block case */
2064+
/* Handle the single block case */
20622065
if (first_node == last_node) {
20632066
replBufBlock *b = listNodeValue(first_node);
20642067
c->nwritten = connWrite(c->conn, b->buf + c->ref_block_pos, bufpos - c->ref_block_pos);

src/replication.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4136,8 +4136,6 @@ void replicationCachePrimary(client *c) {
41364136
serverAssert(server.primary != NULL && server.cached_primary == NULL);
41374137
serverLog(LL_NOTICE, "Caching the disconnected primary state.");
41384138

4139-
/* Wait for IO operations to be done before proceeding */
4140-
waitForClientIO(c);
41414139
/* Unlink the client from the server structures. */
41424140
unlinkClient(c);
41434141

src/unit/test_networking.c

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,59 +4,59 @@
44

55
#include <stdatomic.h>
66

7-
/* Mock structures and functions */
8-
typedef struct mockConnection {
7+
/* Fake structures and functions */
8+
typedef struct fakeConnection {
99
connection conn;
1010
int error;
1111
char *buffer;
1212
size_t buf_size;
1313
size_t written;
14-
} mockConnection;
14+
} fakeConnection;
1515

16-
/* Mock connWrite function */
17-
static int mock_connWrite(connection *conn, const void *data, size_t size) {
18-
mockConnection *mock = (mockConnection *)conn;
19-
if (mock->error) return -1;
16+
/* Fake connWrite function */
17+
static int fake_connWrite(connection *conn, const void *data, size_t size) {
18+
fakeConnection *fake_conn = (fakeConnection *)conn;
19+
if (fake_conn->error) return -1;
2020

2121
size_t to_write = size;
22-
if (mock->written + to_write > mock->buf_size) {
23-
to_write = mock->buf_size - mock->written;
22+
if (fake_conn->written + to_write > fake_conn->buf_size) {
23+
to_write = fake_conn->buf_size - fake_conn->written;
2424
}
2525

26-
memcpy(mock->buffer + mock->written, data, to_write);
27-
mock->written += to_write;
26+
memcpy(fake_conn->buffer + fake_conn->written, data, to_write);
27+
fake_conn->written += to_write;
2828
return to_write;
2929
}
3030

31-
/* Mock connWritev function */
32-
static int mock_connWritev(connection *conn, const struct iovec *iov, int iovcnt) {
33-
mockConnection *mock = (mockConnection *)conn;
34-
if (mock->error) return -1;
31+
/* Fake connWritev function */
32+
static int fake_connWritev(connection *conn, const struct iovec *iov, int iovcnt) {
33+
fakeConnection *fake_conn = (fakeConnection *)conn;
34+
if (fake_conn->error) return -1;
3535

3636
size_t total = 0;
3737
for (int i = 0; i < iovcnt; i++) {
3838
size_t to_write = iov[i].iov_len;
39-
if (mock->written + to_write > mock->buf_size) {
40-
to_write = mock->buf_size - mock->written;
39+
if (fake_conn->written + to_write > fake_conn->buf_size) {
40+
to_write = fake_conn->buf_size - fake_conn->written;
4141
}
4242
if (to_write == 0) break;
4343

44-
memcpy(mock->buffer + mock->written, iov[i].iov_base, to_write);
45-
mock->written += to_write;
44+
memcpy(fake_conn->buffer + fake_conn->written, iov[i].iov_base, to_write);
45+
fake_conn->written += to_write;
4646
total += to_write;
4747
}
4848
return total;
4949
}
5050

51-
/* Mock connection type */
52-
static ConnectionType CT_Mock = {
53-
.write = mock_connWrite,
54-
.writev = mock_connWritev,
51+
/* Fake connection type */
52+
static ConnectionType CT_Fake = {
53+
.write = fake_connWrite,
54+
.writev = fake_connWritev,
5555
};
5656

57-
static mockConnection *connCreateMock(void) {
58-
mockConnection *conn = zcalloc(sizeof(mockConnection));
59-
conn->conn.type = &CT_Mock;
57+
static fakeConnection *connCreateFake(void) {
58+
fakeConnection *conn = zcalloc(sizeof(fakeConnection));
59+
conn->conn.type = &CT_Fake;
6060
conn->conn.fd = -1;
6161
conn->conn.iovcnt = IOV_MAX;
6262
return conn;
@@ -73,10 +73,10 @@ int test_writeToReplica(int argc, char **argv, int flags) {
7373

7474
/* Test 1: Single block write */
7575
{
76-
mockConnection *mock_conn = connCreateMock();
77-
mock_conn->buffer = zmalloc(1024);
78-
mock_conn->buf_size = 1024;
79-
c->conn = (connection *)mock_conn;
76+
fakeConnection *fake_conn = connCreateFake();
77+
fake_conn->buffer = zmalloc(1024);
78+
fake_conn->buf_size = 1024;
79+
c->conn = (connection *)fake_conn;
8080

8181
/* Create replication buffer block */
8282
replBufBlock *block = zmalloc(sizeof(replBufBlock) + 128);
@@ -93,25 +93,25 @@ int test_writeToReplica(int argc, char **argv, int flags) {
9393
writeToReplica(c);
9494

9595
TEST_ASSERT(c->nwritten == 64);
96-
TEST_ASSERT(mock_conn->written == 64);
97-
TEST_ASSERT(memcmp(mock_conn->buffer, block->buf, 64) == 0);
96+
TEST_ASSERT(fake_conn->written == 64);
97+
TEST_ASSERT(memcmp(fake_conn->buffer, block->buf, 64) == 0);
9898
TEST_ASSERT((c->write_flags & WRITE_FLAGS_WRITE_ERROR) == 0);
9999

100100
/* Cleanup */
101-
zfree(mock_conn->buffer);
102-
zfree(mock_conn);
101+
zfree(fake_conn->buffer);
102+
zfree(fake_conn);
103103
zfree(block);
104104
listEmpty(server.repl_buffer_blocks);
105105
}
106106

107107
/* Test 2: Multiple blocks write */
108108
{
109-
mockConnection *mock_conn = connCreateMock();
110-
mock_conn->error = 0;
111-
mock_conn->written = 0;
112-
mock_conn->buffer = zmalloc(1024);
113-
mock_conn->buf_size = 1024;
114-
c->conn = (connection *)mock_conn;
109+
fakeConnection *fake_conn = connCreateFake();
110+
fake_conn->error = 0;
111+
fake_conn->written = 0;
112+
fake_conn->buffer = zmalloc(1024);
113+
fake_conn->buf_size = 1024;
114+
c->conn = (connection *)fake_conn;
115115

116116
/* Create multiple replication buffer blocks */
117117
replBufBlock *block1 = zmalloc(sizeof(replBufBlock) + 128);
@@ -133,27 +133,27 @@ int test_writeToReplica(int argc, char **argv, int flags) {
133133
writeToReplica(c);
134134

135135
TEST_ASSERT(c->nwritten == 96); /* 64 + 32 */
136-
TEST_ASSERT(mock_conn->written == 96);
137-
TEST_ASSERT(memcmp(mock_conn->buffer, block1->buf, 64) == 0);
138-
TEST_ASSERT(memcmp(mock_conn->buffer + 64, block2->buf, 32) == 0);
136+
TEST_ASSERT(fake_conn->written == 96);
137+
TEST_ASSERT(memcmp(fake_conn->buffer, block1->buf, 64) == 0);
138+
TEST_ASSERT(memcmp(fake_conn->buffer + 64, block2->buf, 32) == 0);
139139
TEST_ASSERT((c->write_flags & WRITE_FLAGS_WRITE_ERROR) == 0);
140140

141141
/* Cleanup */
142-
zfree(mock_conn->buffer);
143-
zfree(mock_conn);
142+
zfree(fake_conn->buffer);
143+
zfree(fake_conn);
144144
zfree(block1);
145145
zfree(block2);
146146
listEmpty(server.repl_buffer_blocks);
147147
}
148148

149149
/* Test 3: Write error */
150150
{
151-
mockConnection *mock_conn = connCreateMock();
152-
mock_conn->error = 1; /* Simulate write error */
153-
mock_conn->buffer = zmalloc(1024);
154-
mock_conn->buf_size = 1024;
155-
mock_conn->written = 0;
156-
c->conn = (connection *)mock_conn;
151+
fakeConnection *fake_conn = connCreateFake();
152+
fake_conn->error = 1; /* Simulate write error */
153+
fake_conn->buffer = zmalloc(1024);
154+
fake_conn->buf_size = 1024;
155+
fake_conn->written = 0;
156+
c->conn = (connection *)fake_conn;
157157

158158
/* Create replication buffer block */
159159
replBufBlock *block = zmalloc(sizeof(replBufBlock) + 128);
@@ -174,8 +174,8 @@ int test_writeToReplica(int argc, char **argv, int flags) {
174174

175175
/* Cleanup */
176176
listEmpty(server.repl_buffer_blocks);
177-
zfree(mock_conn->buffer);
178-
zfree(mock_conn);
177+
zfree(fake_conn->buffer);
178+
zfree(fake_conn);
179179
zfree(block);
180180
}
181181

tests/unit/networking.tcl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,12 +289,12 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
289289

290290
start_server {} {
291291
test {replicas writes are offloaded to IO threads} {
292-
set master [srv -1 client]
293-
set master_host [srv -1 host]
294-
set master_port [srv -1 port]
292+
set primary [srv -1 client]
293+
set primary_host [srv -1 host]
294+
set primary_port [srv -1 port]
295295

296296
set replica [srv 0 client]
297-
$replica replicaof $master_host $master_port
297+
$replica replicaof $primary_host $primary_port
298298

299299
wait_for_condition 500 100 {
300300
[s 0 master_link_status] eq {up}
@@ -303,11 +303,11 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
303303
}
304304

305305
# get the current io_threaded_writes_processed
306-
set info [$master info stats]
306+
set info [$primary info stats]
307307
set io_threaded_writes_processed [getInfoProperty $info io_threaded_writes_processed]
308308

309-
# Send a write command to the master
310-
$master set a 1
309+
# Send a write command to the primary
310+
$primary set a 1
311311

312312
# Wait for the write to be propagated to the replica
313313
wait_for_condition 50 100 {
@@ -317,7 +317,7 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
317317
}
318318

319319
# Get the new io_threaded_writes_processed
320-
set info [$master info stats]
320+
set info [$primary info stats]
321321
set new_io_threaded_writes_processed [getInfoProperty $info io_threaded_writes_processed]
322322
# Assert new is old + 3, 3 for the write to the info-client, set-client and to the replica.
323323
assert {$new_io_threaded_writes_processed >= $io_threaded_writes_processed + 3} ;

0 commit comments

Comments
 (0)