Skip to content

Commit 19407e2

Browse files
uriyagemurphyjacob4
authored andcommitted
Offload replication writes to IO threads (valkey-io#1485)
This PR offloads the write to replica clients to IO threads. ## Main Changes * Replica writes will be offloaded but only after the replica is in online mode.. * Replica reads will still be done in the main thread to reduce complexity and because read traffic from replicas is negligible. ### Implementation Details In order to offload the writes, `writeToReplica` has been split into 2 parts: 1. The write itself made by the IO thread or by the main thread 2. The post write where we update the replication buffers refcount will be done in the main-thread after the write-job is done in the IO thread (similar to what we do with a regular client) ### Additional Changes * In `writeToReplica` we now use `writev` in case more than 1 buffer exists. * Changed client `nwritten` field to `ssize_t` since with a replica the `nwritten` can theoretically exceed `int` size (not subject to `NET_MAX_WRITES_PER_EVENT` limit). * Changed parsing code to use `memchr` instead of `strchr`: * During parsing command, ASAN got stuck for unknown reason when called to `strchr` to look for the next `\r` * Adding assert for null-terminated querybuf didn't resolve the issue. * Switched to `memchr` as it's more secure and resolves the issue ### Testing * Added integration tests * Added unit tests **Related issue:** valkey-io#761 --------- Signed-off-by: Uri Yagelnik <[email protected]>
1 parent 2248705 commit 19407e2

File tree

8 files changed

+493
-47
lines changed

8 files changed

+493
-47
lines changed

src/io_threads.c

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ int trySendReadToIOThreads(client *c) {
321321
if (server.active_io_threads_num <= 1) return C_ERR;
322322
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
323323
if (c->io_read_state != CLIENT_IDLE) return C_OK;
324-
/* Currently, replica reads are not offloaded to IO threads. */
324+
/* For simplicity, don't offload replica clients reads as read traffic from replica is negligible */
325325
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
326326
/* With Lua debug client we may call connWrite directly in the main thread */
327327
if (c->flag.lua_debug) return C_ERR;
@@ -364,8 +364,8 @@ int trySendWriteToIOThreads(client *c) {
364364
if (c->io_write_state != CLIENT_IDLE) return C_OK;
365365
/* Nothing to write */
366366
if (!clientHasPendingReplies(c)) return C_ERR;
367-
/* Currently, replica writes are not offloaded to IO threads. */
368-
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
367+
/* For simplicity, avoid offloading non-online replicas */
368+
if (getClientType(c) == CLIENT_TYPE_REPLICA && c->repl_data->repl_state != REPLICA_STATE_ONLINE) return C_ERR;
369369
/* We can't offload debugged clients as the main-thread may read at the same time */
370370
if (c->flag.lua_debug) return C_ERR;
371371

@@ -392,21 +392,29 @@ int trySendWriteToIOThreads(client *c) {
392392
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
393393
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);
394394

395-
/* Save the last block of the reply list to io_last_reply_block and the used
396-
* position to io_last_bufpos. The I/O thread will write only up to
397-
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
398-
* threads from reading data that might be invalid in their local CPU cache. */
399-
c->io_last_reply_block = listLast(c->reply);
400-
if (c->io_last_reply_block) {
401-
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
395+
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
396+
if (is_replica) {
397+
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
398+
replBufBlock *o = listNodeValue(c->io_last_reply_block);
399+
c->io_last_bufpos = o->used;
402400
} else {
403-
c->io_last_bufpos = (size_t)c->bufpos;
401+
/* Save the last block of the reply list to io_last_reply_block and the used
402+
* position to io_last_bufpos. The I/O thread will write only up to
403+
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
404+
* threads from reading data that might be invalid in their local CPU cache. */
405+
c->io_last_reply_block = listLast(c->reply);
406+
if (c->io_last_reply_block) {
407+
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
408+
} else {
409+
c->io_last_bufpos = (size_t)c->bufpos;
410+
}
404411
}
405-
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);
412+
413+
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0 || is_replica);
406414

407415
/* The main-thread will update the client state after the I/O thread completes the write. */
408416
connSetPostponeUpdateState(c->conn, 1);
409-
c->write_flags = 0;
417+
c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0;
410418
c->io_write_state = CLIENT_PENDING_IO;
411419

412420
IOJobQueue_push(jq, ioThreadWriteToClient, c);

src/networking.c

Lines changed: 123 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1544,6 +1544,9 @@ void disconnectReplicas(void) {
15441544
void unlinkClient(client *c) {
15451545
listNode *ln;
15461546

1547+
/* Wait for IO operations to be done before unlinking the client. */
1548+
waitForClientIO(c);
1549+
15471550
/* If this is marked as current client unset it. */
15481551
if (c->conn && server.current_client == c) server.current_client = NULL;
15491552

@@ -1953,36 +1956,122 @@ client *lookupClientByID(uint64_t id) {
19531956
return c;
19541957
}
19551958

1956-
void writeToReplica(client *c) {
1957-
/* Can be called from main-thread only as replica write offload is not supported yet */
1958-
serverAssert(inMainThread());
1959-
int nwritten = 0;
1959+
static void postWriteToReplica(client *c) {
1960+
if (c->nwritten <= 0) return;
1961+
1962+
server.stat_net_repl_output_bytes += c->nwritten;
1963+
1964+
/* Locate the last node which has leftover data and
1965+
* decrement reference counts of all nodes in front of it.
1966+
* Set c->ref_repl_buf_node to point to the last node and
1967+
* c->ref_block_pos to the offset within that node */
1968+
listNode *curr = c->repl_data->ref_repl_buf_node;
1969+
listNode *next = NULL;
1970+
size_t nwritten = c->nwritten + c->repl_data->ref_block_pos;
1971+
replBufBlock *o = listNodeValue(curr);
1972+
1973+
while (nwritten >= o->used) {
1974+
next = listNextNode(curr);
1975+
if (!next) break; /* End of list */
1976+
1977+
nwritten -= o->used;
1978+
o->refcount--;
1979+
1980+
curr = next;
1981+
o = listNodeValue(curr);
1982+
o->refcount++;
1983+
}
1984+
1985+
serverAssert(nwritten <= o->used);
1986+
c->repl_data->ref_repl_buf_node = curr;
1987+
c->repl_data->ref_block_pos = nwritten;
1988+
1989+
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
1990+
}
1991+
1992+
static void writeToReplica(client *c) {
1993+
listNode *last_node;
1994+
size_t bufpos;
1995+
19601996
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1961-
while (clientHasPendingReplies(c)) {
1962-
replBufBlock *o = listNodeValue(c->repl_data->ref_repl_buf_node);
1963-
serverAssert(o->used >= c->repl_data->ref_block_pos);
1964-
1965-
/* Send current block if it is not fully sent. */
1966-
if (o->used > c->repl_data->ref_block_pos) {
1967-
nwritten = connWrite(c->conn, o->buf + c->repl_data->ref_block_pos, o->used - c->repl_data->ref_block_pos);
1968-
if (nwritten <= 0) {
1969-
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
1970-
return;
1971-
}
1972-
c->nwritten += nwritten;
1973-
c->repl_data->ref_block_pos += nwritten;
1997+
/* Determine the last block and buffer position based on thread context */
1998+
if (inMainThread()) {
1999+
last_node = listLast(server.repl_buffer_blocks);
2000+
if (!last_node) return;
2001+
bufpos = ((replBufBlock *)listNodeValue(last_node))->used;
2002+
} else {
2003+
last_node = c->io_last_reply_block;
2004+
serverAssert(last_node != NULL);
2005+
bufpos = c->io_last_bufpos;
2006+
}
2007+
2008+
listNode *first_node = c->repl_data->ref_repl_buf_node;
2009+
2010+
/* Handle the single block case */
2011+
if (first_node == last_node) {
2012+
replBufBlock *b = listNodeValue(first_node);
2013+
c->nwritten = connWrite(c->conn, b->buf + c->repl_data->ref_block_pos, bufpos - c->repl_data->ref_block_pos);
2014+
if (c->nwritten <= 0) {
2015+
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
2016+
}
2017+
return;
2018+
}
2019+
2020+
/* Multiple blocks case */
2021+
ssize_t total_bytes = 0;
2022+
int iovcnt = 0;
2023+
struct iovec iov_arr[IOV_MAX];
2024+
struct iovec *iov = iov_arr;
2025+
int iovmax = min(IOV_MAX, c->conn->iovcnt);
2026+
2027+
for (listNode *cur_node = first_node; cur_node != NULL && iovcnt < iovmax; cur_node = listNextNode(cur_node)) {
2028+
replBufBlock *cur_block = listNodeValue(cur_node);
2029+
size_t start = (cur_node == first_node) ? c->repl_data->ref_block_pos : 0;
2030+
size_t len = (cur_node == last_node) ? bufpos : cur_block->used;
2031+
len -= start;
2032+
2033+
iov[iovcnt].iov_base = cur_block->buf + start;
2034+
iov[iovcnt].iov_len = len;
2035+
total_bytes += len;
2036+
iovcnt++;
2037+
if (cur_node == last_node) break;
2038+
}
2039+
2040+
if (total_bytes == 0) return;
2041+
2042+
ssize_t totwritten = 0;
2043+
while (iovcnt > 0) {
2044+
int nwritten = connWritev(c->conn, iov, iovcnt);
2045+
2046+
if (nwritten <= 0) {
2047+
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
2048+
c->nwritten = (totwritten > 0) ? totwritten : nwritten;
2049+
return;
2050+
}
2051+
2052+
totwritten += nwritten;
2053+
2054+
if (totwritten == total_bytes) {
2055+
break;
19742056
}
19752057

1976-
/* If we fully sent the object on head, go to the next one. */
1977-
listNode *next = listNextNode(c->repl_data->ref_repl_buf_node);
1978-
if (next && c->repl_data->ref_block_pos == o->used) {
1979-
o->refcount--;
1980-
((replBufBlock *)(listNodeValue(next)))->refcount++;
1981-
c->repl_data->ref_repl_buf_node = next;
1982-
c->repl_data->ref_block_pos = 0;
1983-
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
2058+
/* Update iov array */
2059+
while (nwritten > 0) {
2060+
if ((size_t)nwritten < iov[0].iov_len) {
2061+
/* partial block written */
2062+
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
2063+
iov[0].iov_len -= nwritten;
2064+
break;
2065+
}
2066+
2067+
/* full block written */
2068+
nwritten -= iov[0].iov_len;
2069+
iov++;
2070+
iovcnt--;
19842071
}
19852072
}
2073+
2074+
c->nwritten = totwritten;
19862075
}
19872076

19882077
/* This function should be called from _writeToClient when the reply list is not empty,
@@ -2181,7 +2270,7 @@ int postWriteToClient(client *c) {
21812270
if (getClientType(c) != CLIENT_TYPE_REPLICA) {
21822271
_postWriteToClient(c);
21832272
} else {
2184-
server.stat_net_repl_output_bytes += c->nwritten > 0 ? c->nwritten : 0;
2273+
postWriteToReplica(c);
21852274
}
21862275

21872276
if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
@@ -2751,7 +2840,7 @@ void processMultibulkBuffer(client *c) {
27512840
serverAssertWithInfo(c, NULL, c->argc == 0);
27522841

27532842
/* Multi bulk length cannot be read without a \r\n */
2754-
newline = strchr(c->querybuf + c->qb_pos, '\r');
2843+
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
27552844
if (newline == NULL) {
27562845
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
27572846
c->read_flags |= READ_FLAGS_ERROR_BIG_MULTIBULK;
@@ -2828,7 +2917,7 @@ void processMultibulkBuffer(client *c) {
28282917
while (c->multibulklen) {
28292918
/* Read bulk length if unknown */
28302919
if (c->bulklen == -1) {
2831-
newline = strchr(c->querybuf + c->qb_pos, '\r');
2920+
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
28322921
if (newline == NULL) {
28332922
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
28342923
c->read_flags |= READ_FLAGS_ERROR_BIG_BULK_COUNT;
@@ -5089,7 +5178,12 @@ void ioThreadWriteToClient(void *data) {
50895178
client *c = data;
50905179
serverAssert(c->io_write_state == CLIENT_PENDING_IO);
50915180
c->nwritten = 0;
5092-
_writeToClient(c);
5181+
if (c->write_flags & WRITE_FLAGS_IS_REPLICA) {
5182+
writeToReplica(c);
5183+
} else {
5184+
_writeToClient(c);
5185+
}
5186+
50935187
atomic_thread_fence(memory_order_release);
50945188
c->io_write_state = CLIENT_COMPLETED_IO;
50955189
}

src/replication.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4239,8 +4239,6 @@ void replicationCachePrimary(client *c) {
42394239
serverAssert(server.primary != NULL && server.cached_primary == NULL);
42404240
serverLog(LL_NOTICE, "Caching the disconnected primary state.");
42414241

4242-
/* Wait for IO operations to be done before proceeding */
4243-
waitForClientIO(c);
42444242
/* Unlink the client from the server structures. */
42454243
unlinkClient(c);
42464244

src/server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2647,7 +2647,7 @@ void dictVanillaFree(void *val);
26472647

26482648
/* Write flags for various write errors and states */
26492649
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)
2650-
2650+
#define WRITE_FLAGS_IS_REPLICA (1 << 1)
26512651

26522652
client *createClient(connection *conn);
26532653
void freeClient(client *c);

0 commit comments

Comments
 (0)