Skip to content

Commit 838ba44

Browse files
xbaselalexander-shabanovmadolson
authored
Reply Copy Avoidance (#2078)
### Overview This PR introduces the ability to avoid copying the content of string object into replies (i.e. bulk string replies) and to allow I/O threads refer directly to obj->ptr in writev iov. ### Key Changes * Added capability to reply construction allowing to interleave regular replies with copy avoid replies in client reply buffers * Extended write-to-client handlers to support copy avoid replies * Added copy avoidance of string bulk replies when copy avoidance indicated by I/O threads * Minor changes in cluster slots stats in order to support `network-bytes-out` for copy avoid replies * Copy avoidance is beneficial for performance despite object size only starting certain number of threads. So it will be enabled only starting certain number of threads. Internal configuration ``min-io-threads-copy-avoid`` introduced to manage this number of threads **Note**: When copy avoidance disabled content and handling of client reply buffers remains as before this PR ### Implementation Details #### ``client`` and ``clientReplyBlock`` structs: 1. ``buf_encoded`` flag has been added to ``clientReplyBlock`` struct and to ``client`` struct for static ``c->buf`` to indicate if reply buffer is in copy avoidance mode (i.e. include headers and payloads) or not (i.e. plain replies only). 2. ``io_last_written_buf``, ``io_last_written_bufpos``, ``io_last_written_data_len`` fields added ``client`` struct to to keep track of write state between ``writevToClient`` invocations #### Reply construction: 1. Original ```_addReplyToBuffer``` and ```_addReplyProtoToList``` have been renamed to ```_addReplyPayloadToBuffer``` and ```_addReplyPayloadToList``` and extended to support different types of payloads - regular replies and copy avoid replies. 3. New ```_addReplyToBuffer``` and ```_addReplyProtoToList``` calls now ```_addReplyPayloadToBuffer``` and ```_addReplyPayloadToList``` and used for adding **regular** replies to client reply buffers. 4. Newly introduced ```_addBulkOffloadToBuffer``` and ```_addBulkOffloadToList``` are used for adding **copy avoid** replies to client reply buffers. #### Write-to-client infrastructure: The ```writevToClient``` and ```_postWriteToClient``` has been significantly changed to support copy avoidance capability. #### Debug configuration: 1. ``min-io-threads-avoid-copy-reply`` - Minimum number of IO threads for copy avoidance 2. ``min-string-size-avoid-copy-reply`` - Minimum bulk string size for copy avoidance when IO threads disabled 3. ``min-string-size-avoid-copy-reply-threaded`` - Minimum bulk string size for copy avoidance when IO threads enabled ### Testing 1. Existing unit and integration tests passed. Copy avoidance enabled on tests with ``--io-threads`` flag 2. Added unit tests for copy avoidance functionality ### Performance Tests Note: pay attention `io-threads 1` config means only main thread with no additional io-threads, `io-threads 2` means main thread plus 1 I/O thread, `io-threads 9` means main thread plus 8 I/O threads. #### 512 byte object size Tests are conducted on memory optimized instances using: * 3,000,000 keys * 512 bytes object size * 1000 clients |io-threads (including main thread) |Plain Reply |Copy Avoidance | |--- |--- |--- | |7 |1,160,000 |1,160,000 | |8 |1,150,000 |1,280,000 | |9 |1,150,000 |1,330,000 | |10 |N/A |1,380,000 | |11 |N/A |1,420,000 | #### Various object size, small number of threads |iothreads |Data size |Keys |Clients |Instance type |Unstable branch |Copy Avoidance On | |--- |--- |--- |--- |--- |--- |--- | |1 |512 byte |3,000,000 |1,000 |memory optimized |195,000 |195,000 | |2 |512 byte |3,000,000 |1,000 |memory optimized |245,000 |245,000 | |3 |512 byte |3,000,000 |1,000 |memory optimized |455,000 |459,000 | |4 |512 byte |3,000,000 |1,000 |memory optimized |685,000 |685,000 | | | | | | | | | |1 |1K |3,000,000 |1,000 |memory optimized |185,000 |185,000 | |2 |1K |3,000,000 |1,000 |memory optimized |235,000 |235,000 | |3 |1K |3,000,000 |1,000 |memory optimized |450,000 |450,000 | | | | | | | | | |1 |4K |1,000,000 |1,000 |network optimized |182,000 |187,000 | |2 |4K |1,000,000 |1,000 |network optimized |240,000 |238,000 | | | | | | | | | |1 |16K |1,000,000 |500 |network optimized |100,000 |120,000 | |2 |16K |1,000,000 |500 |network optimized |140,000 |140,000 | |3 |16K |1,000,000 |500 |network optimized |275,000 |260,000 | | | | | | | | | |1 |32K |500,000 |500 |network optimized |57,000 |90,000 | |2 |32K |500,000 |500 |network optimized |110,000 |110,000 | |3 |32K |500,000 |500 |network optimized |215,000 |215,000 | | | | | | | | | |1 |64K |100,000 |500 |network optimized |30,000 |57,000 | |2 |64K |100,000 |500 |network optimized |69,000 |61,000 | |3 |64K |100,000 |500 |network optimized |120,000 |120,000 | |4 |64K |100,000 |500 |network optimized |115,000 - 175,000 |175,000 | |5 |64K |100,000 |500 |network optimized |115,000 - 165,000 |230,000 | --------- Signed-off-by: Alexander Shabanov <[email protected]> Signed-off-by: xbasel <[email protected]> Signed-off-by: Madelyn Olson <[email protected]> Co-authored-by: Alexander Shabanov <[email protected]> Co-authored-by: Madelyn Olson <[email protected]>
1 parent 988297d commit 838ba44

23 files changed

+912
-142
lines changed

src/cluster_slot_stats.c

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -131,22 +131,23 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
131131
}
132132
}
133133

134-
static int canAddNetworkBytesOut(client *c) {
135-
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
134+
/* Accumulates egress bytes for the slot. */
135+
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) {
136+
if (!clusterSlotStatsEnabled(slot)) return;
137+
138+
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
139+
server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out;
136140
}
137141

138142
/* Accumulates egress bytes upon sending RESP responses back to user clients. */
139143
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
140-
if (!canAddNetworkBytesOut(c)) return;
141-
142-
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
143-
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
144+
clusterSlotStatsAddNetworkBytesOutForSlot(c->slot, c->net_output_bytes_curr_cmd);
144145
}
145146

146147
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
147148
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
148149
client *c = server.current_client;
149-
if (c == NULL || !canAddNetworkBytesOut(c)) return;
150+
if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return;
150151

151152
/* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */
152153
len *= (long long)listLength(server.replicas);
@@ -177,24 +178,14 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
177178
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
178179
* This function covers the internal propagation component. */
179180
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
180-
/* For a blocked client, c->slot could be pre-filled.
181-
* Thus c->slot is backed-up for restoration after aggregation is completed. */
182-
int _slot = c->slot;
183-
c->slot = slot;
184-
if (!canAddNetworkBytesOut(c)) {
185-
/* c->slot should not change as a side effect of this function,
186-
* regardless of the function's early return condition. */
187-
c->slot = _slot;
188-
return;
189-
}
181+
if (!clusterSlotStatsEnabled(slot)) return;
190182

191-
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
192-
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
183+
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
184+
server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd;
193185

194186
/* For sharded pubsub, the client's network bytes metrics must be reset here,
195187
* as resetClient() is not called until subscription ends. */
196188
c->net_output_bytes_curr_cmd = 0;
197-
c->slot = _slot;
198189
}
199190

200191
/* Adds reply for the ORDERBY variant.
@@ -222,9 +213,7 @@ void clusterSlotStatResetAll(void) {
222213
* would equate to repeating the same calculation twice.
223214
*/
224215
static int canAddCpuDuration(client *c) {
225-
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
226-
server.cluster_enabled && /* Cluster mode should be enabled. */
227-
c->slot != -1 && /* Command should be slot specific. */
216+
return clusterSlotStatsEnabled(c->slot) &&
228217
(!server.execution_nesting || /* Either; */
229218
(server.execution_nesting && /* 1) Command should not be nested, or */
230219
c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */
@@ -251,8 +240,7 @@ static int canAddNetworkBytesIn(client *c) {
251240
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
252241
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
253242
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
254-
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
255-
!server.in_exec;
243+
return clusterSlotStatsEnabled(c->slot) && !(c->flag.blocked) && !server.in_exec;
256244
}
257245

258246
/* Adds network ingress bytes of the current command in execution,
@@ -346,3 +334,7 @@ void clusterSlotStatsCommand(client *c) {
346334
addReplySubcommandSyntaxError(c);
347335
}
348336
}
337+
338+
int clusterSlotStatsEnabled(int slot) {
339+
return server.cluster_slot_stats_enabled && server.cluster_enabled && slot != -1;
340+
}

src/cluster_slot_stats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
/* General use-cases. */
77
void clusterSlotStatReset(int slot);
88
void clusterSlotStatResetAll(void);
9+
int clusterSlotStatsEnabled(int slot);
910

1011
/* cpu-usec metric. */
1112
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
@@ -17,6 +18,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len);
1718
void clusterSlotStatsResetClusterMsgLength(void);
1819

1920
/* network-bytes-out metric. */
21+
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out);
2022
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
2123
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
2224
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);

src/config.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3262,6 +3262,9 @@ standardConfig static_configs[] = {
32623262
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
32633263
createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */
32643264
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
3265+
createIntConfig("min-io-threads-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_copy_avoid, 7, INTEGER_CONFIG, NULL, NULL),
3266+
createIntConfig("min-string-size-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid, 16384, INTEGER_CONFIG, NULL, NULL),
3267+
createIntConfig("min-string-size-avoid-copy-reply-threaded", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid_threaded, 65536, INTEGER_CONFIG, NULL, NULL),
32653268
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
32663269
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
32673270
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */

src/io_threads.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,14 @@ int trySendWriteToIOThreads(client *c) {
447447
* threads from reading data that might be invalid in their local CPU cache. */
448448
c->io_last_reply_block = listLast(c->reply);
449449
if (c->io_last_reply_block) {
450-
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
450+
clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
451+
c->io_last_bufpos = block->used;
452+
/* If buffer is encoded force new header */
453+
if (block->flag.buf_encoded) block->last_header = NULL;
451454
} else {
452455
c->io_last_bufpos = (size_t)c->bufpos;
456+
/* If buffer is encoded force new header */
457+
if (c->flag.buf_encoded) c->last_header = NULL;
453458
}
454459
}
455460

src/memory_prefetch.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include "memory_prefetch.h"
1212
#include "server.h"
13+
#include "io_threads.h"
1314

1415
typedef enum {
1516
PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */
@@ -120,6 +121,10 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
120121
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
121122
/* Not done yet */
122123
moveToNextKey();
124+
} else if (server.io_threads_num >= server.min_io_threads_copy_avoid) {
125+
/* Copy avoidance should be more efficient without value prefetch
126+
* starting certain number of I/O threads */
127+
markKeyAsdone(info);
123128
} else {
124129
info->state = PREFETCH_VALUE;
125130
}

0 commit comments

Comments
 (0)