Skip to content

Commit de3672a

Browse files
authored
Offload replication writes to IO threads (#1485)
This PR offloads the write to replica clients to IO threads. ## Main Changes * Replica writes will be offloaded but only after the replica is in online mode.. * Replica reads will still be done in the main thread to reduce complexity and because read traffic from replicas is negligible. ### Implementation Details In order to offload the writes, `writeToReplica` has been split into 2 parts: 1. The write itself made by the IO thread or by the main thread 2. The post write where we update the replication buffers refcount will be done in the main-thread after the write-job is done in the IO thread (similar to what we do with a regular client) ### Additional Changes * In `writeToReplica` we now use `writev` in case more than 1 buffer exists. * Changed client `nwritten` field to `ssize_t` since with a replica the `nwritten` can theoretically exceed `int` size (not subject to `NET_MAX_WRITES_PER_EVENT` limit). * Changed parsing code to use `memchr` instead of `strchr`: * During parsing command, ASAN got stuck for unknown reason when called to `strchr` to look for the next `\r` * Adding assert for null-terminated querybuf didn't resolve the issue. * Switched to `memchr` as it's more secure and resolves the issue ### Testing * Added integration tests * Added unit tests **Related issue:** #761 --------- Signed-off-by: Uri Yagelnik <[email protected]>
1 parent 7fa784a commit de3672a

File tree

8 files changed

+493
-47
lines changed

8 files changed

+493
-47
lines changed

src/io_threads.c

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ int trySendReadToIOThreads(client *c) {
321321
if (server.active_io_threads_num <= 1) return C_ERR;
322322
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
323323
if (c->io_read_state != CLIENT_IDLE) return C_OK;
324-
/* Currently, replica reads are not offloaded to IO threads. */
324+
/* For simplicity, don't offload replica clients reads as read traffic from replica is negligible */
325325
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
326326
/* With Lua debug client we may call connWrite directly in the main thread */
327327
if (c->flag.lua_debug) return C_ERR;
@@ -364,8 +364,8 @@ int trySendWriteToIOThreads(client *c) {
364364
if (c->io_write_state != CLIENT_IDLE) return C_OK;
365365
/* Nothing to write */
366366
if (!clientHasPendingReplies(c)) return C_ERR;
367-
/* Currently, replica writes are not offloaded to IO threads. */
368-
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
367+
/* For simplicity, avoid offloading non-online replicas */
368+
if (getClientType(c) == CLIENT_TYPE_REPLICA && c->repl_data->repl_state != REPLICA_STATE_ONLINE) return C_ERR;
369369
/* We can't offload debugged clients as the main-thread may read at the same time */
370370
if (c->flag.lua_debug) return C_ERR;
371371

@@ -392,21 +392,29 @@ int trySendWriteToIOThreads(client *c) {
392392
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
393393
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);
394394

395-
/* Save the last block of the reply list to io_last_reply_block and the used
396-
* position to io_last_bufpos. The I/O thread will write only up to
397-
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
398-
* threads from reading data that might be invalid in their local CPU cache. */
399-
c->io_last_reply_block = listLast(c->reply);
400-
if (c->io_last_reply_block) {
401-
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
395+
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
396+
if (is_replica) {
397+
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
398+
replBufBlock *o = listNodeValue(c->io_last_reply_block);
399+
c->io_last_bufpos = o->used;
402400
} else {
403-
c->io_last_bufpos = (size_t)c->bufpos;
401+
/* Save the last block of the reply list to io_last_reply_block and the used
402+
* position to io_last_bufpos. The I/O thread will write only up to
403+
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
404+
* threads from reading data that might be invalid in their local CPU cache. */
405+
c->io_last_reply_block = listLast(c->reply);
406+
if (c->io_last_reply_block) {
407+
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
408+
} else {
409+
c->io_last_bufpos = (size_t)c->bufpos;
410+
}
404411
}
405-
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);
412+
413+
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0 || is_replica);
406414

407415
/* The main-thread will update the client state after the I/O thread completes the write. */
408416
connSetPostponeUpdateState(c->conn, 1);
409-
c->write_flags = 0;
417+
c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0;
410418
c->io_write_state = CLIENT_PENDING_IO;
411419

412420
IOJobQueue_push(jq, ioThreadWriteToClient, c);

src/networking.c

Lines changed: 123 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,9 @@ void disconnectReplicas(void) {
15341534
void unlinkClient(client *c) {
15351535
listNode *ln;
15361536

1537+
/* Wait for IO operations to be done before unlinking the client. */
1538+
waitForClientIO(c);
1539+
15371540
/* If this is marked as current client unset it. */
15381541
if (c->conn && server.current_client == c) server.current_client = NULL;
15391542

@@ -1934,36 +1937,122 @@ client *lookupClientByID(uint64_t id) {
19341937
return c;
19351938
}
19361939

1937-
void writeToReplica(client *c) {
1938-
/* Can be called from main-thread only as replica write offload is not supported yet */
1939-
serverAssert(inMainThread());
1940-
int nwritten = 0;
1940+
static void postWriteToReplica(client *c) {
1941+
if (c->nwritten <= 0) return;
1942+
1943+
server.stat_net_repl_output_bytes += c->nwritten;
1944+
1945+
/* Locate the last node which has leftover data and
1946+
* decrement reference counts of all nodes in front of it.
1947+
* Set c->ref_repl_buf_node to point to the last node and
1948+
* c->ref_block_pos to the offset within that node */
1949+
listNode *curr = c->repl_data->ref_repl_buf_node;
1950+
listNode *next = NULL;
1951+
size_t nwritten = c->nwritten + c->repl_data->ref_block_pos;
1952+
replBufBlock *o = listNodeValue(curr);
1953+
1954+
while (nwritten >= o->used) {
1955+
next = listNextNode(curr);
1956+
if (!next) break; /* End of list */
1957+
1958+
nwritten -= o->used;
1959+
o->refcount--;
1960+
1961+
curr = next;
1962+
o = listNodeValue(curr);
1963+
o->refcount++;
1964+
}
1965+
1966+
serverAssert(nwritten <= o->used);
1967+
c->repl_data->ref_repl_buf_node = curr;
1968+
c->repl_data->ref_block_pos = nwritten;
1969+
1970+
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
1971+
}
1972+
1973+
static void writeToReplica(client *c) {
1974+
listNode *last_node;
1975+
size_t bufpos;
1976+
19411977
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1942-
while (clientHasPendingReplies(c)) {
1943-
replBufBlock *o = listNodeValue(c->repl_data->ref_repl_buf_node);
1944-
serverAssert(o->used >= c->repl_data->ref_block_pos);
1945-
1946-
/* Send current block if it is not fully sent. */
1947-
if (o->used > c->repl_data->ref_block_pos) {
1948-
nwritten = connWrite(c->conn, o->buf + c->repl_data->ref_block_pos, o->used - c->repl_data->ref_block_pos);
1949-
if (nwritten <= 0) {
1950-
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
1951-
return;
1952-
}
1953-
c->nwritten += nwritten;
1954-
c->repl_data->ref_block_pos += nwritten;
1978+
/* Determine the last block and buffer position based on thread context */
1979+
if (inMainThread()) {
1980+
last_node = listLast(server.repl_buffer_blocks);
1981+
if (!last_node) return;
1982+
bufpos = ((replBufBlock *)listNodeValue(last_node))->used;
1983+
} else {
1984+
last_node = c->io_last_reply_block;
1985+
serverAssert(last_node != NULL);
1986+
bufpos = c->io_last_bufpos;
1987+
}
1988+
1989+
listNode *first_node = c->repl_data->ref_repl_buf_node;
1990+
1991+
/* Handle the single block case */
1992+
if (first_node == last_node) {
1993+
replBufBlock *b = listNodeValue(first_node);
1994+
c->nwritten = connWrite(c->conn, b->buf + c->repl_data->ref_block_pos, bufpos - c->repl_data->ref_block_pos);
1995+
if (c->nwritten <= 0) {
1996+
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
1997+
}
1998+
return;
1999+
}
2000+
2001+
/* Multiple blocks case */
2002+
ssize_t total_bytes = 0;
2003+
int iovcnt = 0;
2004+
struct iovec iov_arr[IOV_MAX];
2005+
struct iovec *iov = iov_arr;
2006+
int iovmax = min(IOV_MAX, c->conn->iovcnt);
2007+
2008+
for (listNode *cur_node = first_node; cur_node != NULL && iovcnt < iovmax; cur_node = listNextNode(cur_node)) {
2009+
replBufBlock *cur_block = listNodeValue(cur_node);
2010+
size_t start = (cur_node == first_node) ? c->repl_data->ref_block_pos : 0;
2011+
size_t len = (cur_node == last_node) ? bufpos : cur_block->used;
2012+
len -= start;
2013+
2014+
iov[iovcnt].iov_base = cur_block->buf + start;
2015+
iov[iovcnt].iov_len = len;
2016+
total_bytes += len;
2017+
iovcnt++;
2018+
if (cur_node == last_node) break;
2019+
}
2020+
2021+
if (total_bytes == 0) return;
2022+
2023+
ssize_t totwritten = 0;
2024+
while (iovcnt > 0) {
2025+
int nwritten = connWritev(c->conn, iov, iovcnt);
2026+
2027+
if (nwritten <= 0) {
2028+
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
2029+
c->nwritten = (totwritten > 0) ? totwritten : nwritten;
2030+
return;
2031+
}
2032+
2033+
totwritten += nwritten;
2034+
2035+
if (totwritten == total_bytes) {
2036+
break;
19552037
}
19562038

1957-
/* If we fully sent the object on head, go to the next one. */
1958-
listNode *next = listNextNode(c->repl_data->ref_repl_buf_node);
1959-
if (next && c->repl_data->ref_block_pos == o->used) {
1960-
o->refcount--;
1961-
((replBufBlock *)(listNodeValue(next)))->refcount++;
1962-
c->repl_data->ref_repl_buf_node = next;
1963-
c->repl_data->ref_block_pos = 0;
1964-
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
2039+
/* Update iov array */
2040+
while (nwritten > 0) {
2041+
if ((size_t)nwritten < iov[0].iov_len) {
2042+
/* partial block written */
2043+
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
2044+
iov[0].iov_len -= nwritten;
2045+
break;
2046+
}
2047+
2048+
/* full block written */
2049+
nwritten -= iov[0].iov_len;
2050+
iov++;
2051+
iovcnt--;
19652052
}
19662053
}
2054+
2055+
c->nwritten = totwritten;
19672056
}
19682057

19692058
/* This function should be called from _writeToClient when the reply list is not empty,
@@ -2158,7 +2247,7 @@ int postWriteToClient(client *c) {
21582247
if (getClientType(c) != CLIENT_TYPE_REPLICA) {
21592248
_postWriteToClient(c);
21602249
} else {
2161-
server.stat_net_repl_output_bytes += c->nwritten > 0 ? c->nwritten : 0;
2250+
postWriteToReplica(c);
21622251
}
21632252

21642253
if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
@@ -2718,7 +2807,7 @@ void processMultibulkBuffer(client *c) {
27182807
serverAssertWithInfo(c, NULL, c->argc == 0);
27192808

27202809
/* Multi bulk length cannot be read without a \r\n */
2721-
newline = strchr(c->querybuf + c->qb_pos, '\r');
2810+
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
27222811
if (newline == NULL) {
27232812
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
27242813
c->read_flags |= READ_FLAGS_ERROR_BIG_MULTIBULK;
@@ -2795,7 +2884,7 @@ void processMultibulkBuffer(client *c) {
27952884
while (c->multibulklen) {
27962885
/* Read bulk length if unknown */
27972886
if (c->bulklen == -1) {
2798-
newline = strchr(c->querybuf + c->qb_pos, '\r');
2887+
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
27992888
if (newline == NULL) {
28002889
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
28012890
c->read_flags |= READ_FLAGS_ERROR_BIG_BULK_COUNT;
@@ -5042,7 +5131,12 @@ void ioThreadWriteToClient(void *data) {
50425131
client *c = data;
50435132
serverAssert(c->io_write_state == CLIENT_PENDING_IO);
50445133
c->nwritten = 0;
5045-
_writeToClient(c);
5134+
if (c->write_flags & WRITE_FLAGS_IS_REPLICA) {
5135+
writeToReplica(c);
5136+
} else {
5137+
_writeToClient(c);
5138+
}
5139+
50465140
atomic_thread_fence(memory_order_release);
50475141
c->io_write_state = CLIENT_COMPLETED_IO;
50485142
}

src/replication.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4238,8 +4238,6 @@ void replicationCachePrimary(client *c) {
42384238
serverAssert(server.primary != NULL && server.cached_primary == NULL);
42394239
serverLog(LL_NOTICE, "Caching the disconnected primary state.");
42404240

4241-
/* Wait for IO operations to be done before proceeding */
4242-
waitForClientIO(c);
42434241
/* Unlink the client from the server structures. */
42444242
unlinkClient(c);
42454243

src/server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2633,7 +2633,7 @@ void dictVanillaFree(void *val);
26332633

26342634
/* Write flags for various write errors and states */
26352635
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)
2636-
2636+
#define WRITE_FLAGS_IS_REPLICA (1 << 1)
26372637

26382638
client *createClient(connection *conn);
26392639
void freeClient(client *c);

0 commit comments

Comments
 (0)