From e846d9e5b81d3049f596d8fc05a1cc5a1d6e20f6 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Tue, 11 Mar 2025 12:52:13 +0200 Subject: [PATCH 01/19] Prioritize replication reads to reduce full syncs Adds Replication flow control (repl-flow-control) to adjust replication read frequency based on buffer pressure. Helps replicas keep up with replication data and reduces primary buffer utilization and overflows. - Dynamic replication read scaling based on buffer pressure. - Reduces full syncs by increasing replication reads when needed. - Improves replication responsiveness, reduces data staleness. - Trade-offs: Slightly higher client latency due to replication prioritization. Replication was handled like a normal client. Under high load in the replica, replication lag increased, making data stale and caused primary buffer overflows, triggering full syncs and high CPU/memory/I/O usage. - Fewer full syncs from buffer overruns. - Lower replication lag, fresher data on replicas. - More stable primary buffer usage, less swapping. - Slightly higher client latency due to replication prioritization. Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/config.c | 10 ++++++++ src/networking.c | 59 ++++++++++++++++++++++++++++++++++++++++++++---- src/server.c | 1 + src/server.h | 7 ++++++ valkey.conf | 19 ++++++++++++++++ 5 files changed, 91 insertions(+), 5 deletions(-) diff --git a/src/config.c b/src/config.c index 312d47b948..51475b289e 100644 --- a/src/config.c +++ b/src/config.c @@ -2496,6 +2496,14 @@ static int updateReplBacklogSize(const char **err) { return 1; } +static int updateReplFlowControl(const char **err) { + UNUSED(err); + if (server.repl_flow_control_enabled == 0) { + server.repl_cur_reads_per_io_event = 1; + } + return 1; +} + static int updateMaxmemory(const char **err) { UNUSED(err); if (server.maxmemory) { @@ -3193,6 +3201,7 @@ standardConfig static_configs[] = { createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), + createBoolConfig("repl-flow-control-enabled", NULL, MODIFIABLE_CONFIG, server.repl_flow_control_enabled, 1, NULL, updateReplFlowControl), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), @@ -3295,6 +3304,7 @@ standardConfig static_configs[] = { createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort), createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL), + createIntConfig("repl-max-reads-per-io-event", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_max_reads_per_io_event, 25, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/networking.c b/src/networking.c index a049936fa9..3ca6f1b713 100644 --- a/src/networking.c +++ b/src/networking.c @@ -43,6 +43,7 @@ #include #include #include +#include /* This struct is used to encapsulate filtering criteria for operations on clients * such as identifying specific clients to kill or retrieve. Each field in the struct @@ -3247,6 +3248,7 @@ void readToQueryBuf(client *c) { if (c->nread <= 0) { return; } + c->qb_full_read = (size_t)c->nread == readlen ? 1 : 0; sdsIncrLen(c->querybuf, c->nread); qblen = sdslen(c->querybuf); @@ -3264,6 +3266,47 @@ void readToQueryBuf(client *c) { } } +/** + * This function is designed to prioritize replication flow. + * Determines whether the replica should continue reading from the primary. + * It dynamically adjusts the read rate based on buffer utilization + * and ensures replication reads are not overly aggressive. + * + * @return 1 if another read should be attempted, 0 otherwise. + */ +int shouldRepeatRead(client *c, int iteration) { + // If the client is not a primary replica, is closing, or flow control is disabled, no more reads. + if (!(c->flag.primary) || c->flag.close_asap || !server.repl_flow_control_enabled) { + return 0; + } + + bool is_last_iteration = iteration >= server.repl_cur_reads_per_io_event; + + if (is_last_iteration) { + /* If the last read filled the buffer AND enough time has passed since the last increase: + * - Increase the read rate, up to a max limit. + * - This ensures a gradual ramp-up instead of an overly aggressive approach. */ + if (c->qb_full_read && server.mstime - server.repl_last_rate_update > 100) { + server.repl_cur_reads_per_io_event = MIN(server.repl_max_reads_per_io_event, + server.repl_cur_reads_per_io_event + 1); + server.repl_last_rate_update = server.mstime; // Update the last increase timestamp. + } + } else { + /* If the last read completely filled the buffer, continue reading. */ + if (c->qb_full_read) { + return 1; + } + + /* If the buffer was NOT fully filled, it indicates less replication pressure. + * Reduce the read rate to avoid excessive polling and free up resources for other clients. */ + server.repl_cur_reads_per_io_event = MAX(1, server.repl_cur_reads_per_io_event - 1); + } + + /* Stop reading for now (if we reached this point, conditions to continue were not met). */ + return 0; +} + + void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); /* Check if we can send the client to be handled by the IO-thread */ @@ -3271,12 +3314,18 @@ void readQueryFromClient(connection *conn) { if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return; - readToQueryBuf(c); + bool shouldRepeat = false; + int iter = 0; + do { + readToQueryBuf(c); - if (handleReadResult(c) == C_OK) { - if (processInputBuffer(c) == C_ERR) return; - } - beforeNextClient(c); + if (handleReadResult(c) == C_OK) { + if (processInputBuffer(c) == C_ERR) return; + } + iter++; + shouldRepeat = shouldRepeatRead(c, iter); + beforeNextClient(c); + } while (shouldRepeat); } /* An "Address String" is a colon separated ip:port pair. diff --git a/src/server.c b/src/server.c index 5affc8d8a5..c65ce0b88a 100644 --- a/src/server.c +++ b/src/server.c @@ -2806,6 +2806,7 @@ void initServer(void) { server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME; server.reply_buffer_resizing_enabled = 1; server.client_mem_usage_buckets = NULL; + server.repl_last_rate_update = 0; resetReplicationBuffer(); /* Make sure the locale is set on startup based on the config file. */ diff --git a/src/server.h b/src/server.h index 42856e4e57..d1405c07cf 100644 --- a/src/server.h +++ b/src/server.h @@ -1181,6 +1181,7 @@ typedef struct client { /* Input buffer and command parsing fields */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ + int qb_full_read; /* True if the last read returned the maximum allowed bytes */ robj **argv; /* Arguments of current command. */ int argc; /* Num of arguments of current command. */ int argv_len; /* Size of argv array (may be more than argc) */ @@ -2145,6 +2146,12 @@ struct valkeyServer { /* Local environment */ char *locale_collate; char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */ + + /* Replication flow control */ + int repl_flow_control_enabled; /* Enables adaptive flow control for replication reads on the replica */ + int repl_cur_reads_per_io_event; /* Current allowed reads from the primary file descriptor per epoll I/O event */ + int repl_max_reads_per_io_event; /* Maximum allowed reads from the primary file descriptor per I/O event */ + mstime_t repl_last_rate_update; /* Timestamp of the last increase in replication reads per I/O event */ }; #define MAX_KEYS_BUFFER 256 diff --git a/valkey.conf b/valkey.conf index df12b72d24..6b2d678ee0 100644 --- a/valkey.conf +++ b/valkey.conf @@ -2535,3 +2535,22 @@ jemalloc-bg-thread yes # the empty string. # # availability-zone "zone-name" + +################################## REPLICATION FLOW CONTROL ################################## + +# Prioritizes replication traffic to reduce primary buffer overflows, +# reducing lag and the risk of full syncs. Allows the replica to +# consume data faster under high load. +# +# If enabled, the replica invokes multiple reads per I/O event when it +# detects replication pressure. +# +# Default: yes +# repl-flow-control-enabled yes + +# Specifies the maximum number of replication reads allowed per I/O event. +# Higher values allow more replication data to be processed per event, reducing replication lag, +# but can throttle normal clients and increase their latency. +# +# Default: 25 +# repl-max-reads-per-io-event 25 \ No newline at end of file From 6a21b812dd843a03d081f1835d5f158c7e5ce750 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Wed, 26 Mar 2025 03:06:47 +0200 Subject: [PATCH 02/19] change qb_full_read to bool Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/server.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.h b/src/server.h index d1405c07cf..9f15f55c73 100644 --- a/src/server.h +++ b/src/server.h @@ -1181,7 +1181,7 @@ typedef struct client { /* Input buffer and command parsing fields */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ - int qb_full_read; /* True if the last read returned the maximum allowed bytes */ + bool qb_full_read; /* True if the last read returned the maximum allowed bytes */ robj **argv; /* Arguments of current command. */ int argc; /* Num of arguments of current command. */ int argv_len; /* Size of argv array (may be more than argc) */ From cf3cc8b91c4a516882be5e50b03b4aa77c14e2ad Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Wed, 26 Mar 2025 03:08:49 +0200 Subject: [PATCH 03/19] rename qb_full_read Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 6 +++--- src/server.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/networking.c b/src/networking.c index 3ca6f1b713..7a40c30f19 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3248,7 +3248,7 @@ void readToQueryBuf(client *c) { if (c->nread <= 0) { return; } - c->qb_full_read = (size_t)c->nread == readlen ? 1 : 0; + c->is_qb_full_read = (size_t)c->nread == readlen ? 1 : 0; sdsIncrLen(c->querybuf, c->nread); qblen = sdslen(c->querybuf); @@ -3286,14 +3286,14 @@ int shouldRepeatRead(client *c, int iteration) { /* If the last read filled the buffer AND enough time has passed since the last increase: * - Increase the read rate, up to a max limit. * - This ensures a gradual ramp-up instead of an overly aggressive approach. */ - if (c->qb_full_read && server.mstime - server.repl_last_rate_update > 100) { + if (c->is_qb_full_read && server.mstime - server.repl_last_rate_update > 100) { server.repl_cur_reads_per_io_event = MIN(server.repl_max_reads_per_io_event, server.repl_cur_reads_per_io_event + 1); server.repl_last_rate_update = server.mstime; // Update the last increase timestamp. } } else { /* If the last read completely filled the buffer, continue reading. */ - if (c->qb_full_read) { + if (c->is_qb_full_read) { return 1; } diff --git a/src/server.h b/src/server.h index 9f15f55c73..fe40995dd0 100644 --- a/src/server.h +++ b/src/server.h @@ -1181,7 +1181,7 @@ typedef struct client { /* Input buffer and command parsing fields */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ - bool qb_full_read; /* True if the last read returned the maximum allowed bytes */ + bool is_qb_full_read; /* True if the last read returned the maximum allowed bytes */ robj **argv; /* Arguments of current command. */ int argc; /* Num of arguments of current command. */ int argv_len; /* Size of argv array (may be more than argc) */ From d9557a88348b901be852fd0f174fdf4d28b54bd7 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Wed, 26 Mar 2025 03:11:54 +0200 Subject: [PATCH 04/19] Initialize repl_cur_reads_per_io_event Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/server.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.c b/src/server.c index c65ce0b88a..d91e7f3141 100644 --- a/src/server.c +++ b/src/server.c @@ -2807,6 +2807,7 @@ void initServer(void) { server.reply_buffer_resizing_enabled = 1; server.client_mem_usage_buckets = NULL; server.repl_last_rate_update = 0; + server.repl_cur_reads_per_io_event = 0; resetReplicationBuffer(); /* Make sure the locale is set on startup based on the config file. */ From 8e3ada0159c713387dcf0085da17a83193c46576 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Wed, 26 Mar 2025 03:30:46 +0200 Subject: [PATCH 05/19] remove ramp-up , apply max rate immediately for flow control. This will simplify impl Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/config.c | 10 +--------- src/networking.c | 25 +++---------------------- src/nohup.out | 1 + src/server.c | 2 -- src/server.h | 3 +-- 5 files changed, 6 insertions(+), 35 deletions(-) create mode 100644 src/nohup.out diff --git a/src/config.c b/src/config.c index 51475b289e..7c818b7f2e 100644 --- a/src/config.c +++ b/src/config.c @@ -2496,14 +2496,6 @@ static int updateReplBacklogSize(const char **err) { return 1; } -static int updateReplFlowControl(const char **err) { - UNUSED(err); - if (server.repl_flow_control_enabled == 0) { - server.repl_cur_reads_per_io_event = 1; - } - return 1; -} - static int updateMaxmemory(const char **err) { UNUSED(err); if (server.maxmemory) { @@ -3201,7 +3193,7 @@ standardConfig static_configs[] = { createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), - createBoolConfig("repl-flow-control-enabled", NULL, MODIFIABLE_CONFIG, server.repl_flow_control_enabled, 1, NULL, updateReplFlowControl), + createBoolConfig("repl-flow-control-enabled", NULL, MODIFIABLE_CONFIG, server.repl_flow_control_enabled, 1, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/networking.c b/src/networking.c index 7a40c30f19..6099bc8f86 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3270,7 +3270,6 @@ void readToQueryBuf(client *c) { * This function is designed to prioritize replication flow. * Determines whether the replica should continue reading from the primary. * It dynamically adjusts the read rate based on buffer utilization - * and ensures replication reads are not overly aggressive. * * @return 1 if another read should be attempted, 0 otherwise. */ @@ -3280,29 +3279,11 @@ int shouldRepeatRead(client *c, int iteration) { return 0; } - bool is_last_iteration = iteration >= server.repl_cur_reads_per_io_event; - - if (is_last_iteration) { - /* If the last read filled the buffer AND enough time has passed since the last increase: - * - Increase the read rate, up to a max limit. - * - This ensures a gradual ramp-up instead of an overly aggressive approach. */ - if (c->is_qb_full_read && server.mstime - server.repl_last_rate_update > 100) { - server.repl_cur_reads_per_io_event = MIN(server.repl_max_reads_per_io_event, - server.repl_cur_reads_per_io_event + 1); - server.repl_last_rate_update = server.mstime; // Update the last increase timestamp. - } - } else { - /* If the last read completely filled the buffer, continue reading. */ - if (c->is_qb_full_read) { - return 1; - } - - /* If the buffer was NOT fully filled, it indicates less replication pressure. - * Reduce the read rate to avoid excessive polling and free up resources for other clients. */ - server.repl_cur_reads_per_io_event = MAX(1, server.repl_cur_reads_per_io_event - 1); + if (iteration < server.repl_max_reads_per_io_event && + c->is_qb_full_read) { + return 1; } - /* Stop reading for now (if we reached this point, conditions to continue were not met). */ return 0; } diff --git a/src/nohup.out b/src/nohup.out new file mode 100644 index 0000000000..d185b4be95 --- /dev/null +++ b/src/nohup.out @@ -0,0 +1 @@ +stress: info: [1201400] dispatching hogs: 1 cpu, 0 io, 0 vm, 0 hdd diff --git a/src/server.c b/src/server.c index d91e7f3141..5affc8d8a5 100644 --- a/src/server.c +++ b/src/server.c @@ -2806,8 +2806,6 @@ void initServer(void) { server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME; server.reply_buffer_resizing_enabled = 1; server.client_mem_usage_buckets = NULL; - server.repl_last_rate_update = 0; - server.repl_cur_reads_per_io_event = 0; resetReplicationBuffer(); /* Make sure the locale is set on startup based on the config file. */ diff --git a/src/server.h b/src/server.h index fe40995dd0..3b57f7eb9f 100644 --- a/src/server.h +++ b/src/server.h @@ -2149,9 +2149,8 @@ struct valkeyServer { /* Replication flow control */ int repl_flow_control_enabled; /* Enables adaptive flow control for replication reads on the replica */ - int repl_cur_reads_per_io_event; /* Current allowed reads from the primary file descriptor per epoll I/O event */ int repl_max_reads_per_io_event; /* Maximum allowed reads from the primary file descriptor per I/O event */ - mstime_t repl_last_rate_update; /* Timestamp of the last increase in replication reads per I/O event */ + }; #define MAX_KEYS_BUFFER 256 From 8a59e433b185f93d20afddc84c5a216f71cf7066 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Wed, 26 Mar 2025 03:46:38 +0200 Subject: [PATCH 06/19] format Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/server.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.h b/src/server.h index 3b57f7eb9f..b03faaefe6 100644 --- a/src/server.h +++ b/src/server.h @@ -1181,7 +1181,7 @@ typedef struct client { /* Input buffer and command parsing fields */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ - bool is_qb_full_read; /* True if the last read returned the maximum allowed bytes */ + bool is_qb_full_read;/* True if the last read returned the maximum allowed bytes */ robj **argv; /* Arguments of current command. */ int argc; /* Num of arguments of current command. */ int argv_len; /* Size of argv array (may be more than argc) */ From c3390662d14665accd3a396ce85ecea44835cad9 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Wed, 26 Mar 2025 04:00:52 +0200 Subject: [PATCH 07/19] format Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/server.h | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/server.h b/src/server.h index b03faaefe6..dc5a2c2711 100644 --- a/src/server.h +++ b/src/server.h @@ -1179,17 +1179,17 @@ typedef struct client { uint64_t id; /* Client incremental unique ID. */ connection *conn; /* Input buffer and command parsing fields */ - sds querybuf; /* Buffer we use to accumulate client queries. */ - size_t qb_pos; /* The position we have read in querybuf. */ - bool is_qb_full_read;/* True if the last read returned the maximum allowed bytes */ - robj **argv; /* Arguments of current command. */ - int argc; /* Num of arguments of current command. */ - int argv_len; /* Size of argv array (may be more than argc) */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ - int reqtype; /* Request protocol type: PROTO_REQ_* */ - int multibulklen; /* Number of multi bulk arguments left to read. */ - long bulklen; /* Length of bulk argument in multi bulk request. */ - long long woff; /* Last write global replication offset. */ + sds querybuf; /* Buffer we use to accumulate client queries. */ + size_t qb_pos; /* The position we have read in querybuf. */ + bool is_qb_full_read; /* True if the last read returned the maximum allowed bytes */ + robj **argv; /* Arguments of current command. */ + int argc; /* Num of arguments of current command. */ + int argv_len; /* Size of argv array (may be more than argc) */ + size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + int reqtype; /* Request protocol type: PROTO_REQ_* */ + int multibulklen; /* Number of multi bulk arguments left to read. */ + long bulklen; /* Length of bulk argument in multi bulk request. */ + long long woff; /* Last write global replication offset. */ /* Command execution state and command information */ struct serverCommand *cmd; /* Current command. */ struct serverCommand *lastcmd; /* Last command executed. */ @@ -2150,7 +2150,6 @@ struct valkeyServer { /* Replication flow control */ int repl_flow_control_enabled; /* Enables adaptive flow control for replication reads on the replica */ int repl_max_reads_per_io_event; /* Maximum allowed reads from the primary file descriptor per I/O event */ - }; #define MAX_KEYS_BUFFER 256 From fde61104f3d577397be2f9abbd65832aa6ba90df Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 17 Apr 2025 22:48:02 +0300 Subject: [PATCH 08/19] remove repl-flow-control-enabled Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/config.c | 1 - src/networking.c | 2 +- src/server.h | 8 +++++--- valkey.conf | 16 +++------------- 4 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/config.c b/src/config.c index 7c818b7f2e..43e8d9e52d 100644 --- a/src/config.c +++ b/src/config.c @@ -3193,7 +3193,6 @@ standardConfig static_configs[] = { createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), - createBoolConfig("repl-flow-control-enabled", NULL, MODIFIABLE_CONFIG, server.repl_flow_control_enabled, 1, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/networking.c b/src/networking.c index 6099bc8f86..2840763a02 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3275,7 +3275,7 @@ void readToQueryBuf(client *c) { */ int shouldRepeatRead(client *c, int iteration) { // If the client is not a primary replica, is closing, or flow control is disabled, no more reads. - if (!(c->flag.primary) || c->flag.close_asap || !server.repl_flow_control_enabled) { + if (!(c->flag.primary) || c->flag.close_asap || server.repl_max_reads_per_io_event == 1) { return 0; } diff --git a/src/server.h b/src/server.h index dc5a2c2711..23f7a8612d 100644 --- a/src/server.h +++ b/src/server.h @@ -52,6 +52,7 @@ #include #include #include +#include #ifdef HAVE_LIBSYSTEMD #include @@ -135,7 +136,8 @@ struct hdr_histogram; #define CONFIG_BGSAVE_RETRY_DELAY 5 /* Wait a few secs before trying again. */ #define CONFIG_DEFAULT_PID_FILE "/var/run/valkey.pid" #define CONFIG_DEFAULT_BINDADDR_COUNT 2 -#define CONFIG_DEFAULT_BINDADDR {"*", "-::*"} +#define CONFIG_DEFAULT_BINDADDR \ + { "*", "-::*" } #define NET_HOST_STR_LEN 256 /* Longest valid hostname */ #define NET_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */ #define NET_ADDR_STR_LEN (NET_IP_STR_LEN + 32) /* Must be enough for ip:port */ @@ -1444,7 +1446,8 @@ typedef struct rdbSaveInfo { long long repl_offset; /* Replication offset. */ } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT {-1, 0, "0000000000000000000000000000000000000000", -1} +#define RDB_SAVE_INFO_INIT \ + { -1, 0, "0000000000000000000000000000000000000000", -1 } struct malloc_stats { size_t zmalloc_used; @@ -2148,7 +2151,6 @@ struct valkeyServer { char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */ /* Replication flow control */ - int repl_flow_control_enabled; /* Enables adaptive flow control for replication reads on the replica */ int repl_max_reads_per_io_event; /* Maximum allowed reads from the primary file descriptor per I/O event */ }; diff --git a/valkey.conf b/valkey.conf index 6b2d678ee0..c80d771e5e 100644 --- a/valkey.conf +++ b/valkey.conf @@ -2538,19 +2538,9 @@ jemalloc-bg-thread yes ################################## REPLICATION FLOW CONTROL ################################## -# Prioritizes replication traffic to reduce primary buffer overflows, -# reducing lag and the risk of full syncs. Allows the replica to -# consume data faster under high load. -# -# If enabled, the replica invokes multiple reads per I/O event when it -# detects replication pressure. -# -# Default: yes -# repl-flow-control-enabled yes - -# Specifies the maximum number of replication reads allowed per I/O event. -# Higher values allow more replication data to be processed per event, reducing replication lag, -# but can throttle normal clients and increase their latency. +# Controls how aggressively the replica prioritizes replication reads over normal clients. +# Higher values allow more replication data to be processed per I/O event, reducing lag and risk of full syncs. +# A value of 1 effectively disables the feature. # # Default: 25 # repl-max-reads-per-io-event 25 \ No newline at end of file From a69dd7293adf5efec624f40074c9631eac2ce313 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 17 Apr 2025 23:10:11 +0300 Subject: [PATCH 09/19] fix formatting Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/server.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/server.h b/src/server.h index 23f7a8612d..dd47fd390c 100644 --- a/src/server.h +++ b/src/server.h @@ -136,8 +136,7 @@ struct hdr_histogram; #define CONFIG_BGSAVE_RETRY_DELAY 5 /* Wait a few secs before trying again. */ #define CONFIG_DEFAULT_PID_FILE "/var/run/valkey.pid" #define CONFIG_DEFAULT_BINDADDR_COUNT 2 -#define CONFIG_DEFAULT_BINDADDR \ - { "*", "-::*" } +#define CONFIG_DEFAULT_BINDADDR {"*", "-::*"} #define NET_HOST_STR_LEN 256 /* Longest valid hostname */ #define NET_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */ #define NET_ADDR_STR_LEN (NET_IP_STR_LEN + 32) /* Must be enough for ip:port */ @@ -1446,8 +1445,7 @@ typedef struct rdbSaveInfo { long long repl_offset; /* Replication offset. */ } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT \ - { -1, 0, "0000000000000000000000000000000000000000", -1 } +#define RDB_SAVE_INFO_INIT {-1, 0, "0000000000000000000000000000000000000000", -1} struct malloc_stats { size_t zmalloc_used; From 13d38d0ef3f8b4547336fadceca9949b4720418f Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 14:26:17 +0300 Subject: [PATCH 10/19] Remove config Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/config.c | 1 - src/networking.c | 5 +++-- src/server.h | 3 --- valkey.conf | 9 --------- 4 files changed, 3 insertions(+), 15 deletions(-) diff --git a/src/config.c b/src/config.c index 43e8d9e52d..312d47b948 100644 --- a/src/config.c +++ b/src/config.c @@ -3295,7 +3295,6 @@ standardConfig static_configs[] = { createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort), createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL), - createIntConfig("repl-max-reads-per-io-event", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_max_reads_per_io_event, 25, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/networking.c b/src/networking.c index 2840763a02..26b7e9bc95 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3273,13 +3273,14 @@ void readToQueryBuf(client *c) { * * @return 1 if another read should be attempted, 0 otherwise. */ +#define REPL_MAX_READS_PER_IO_EVENT 25 int shouldRepeatRead(client *c, int iteration) { // If the client is not a primary replica, is closing, or flow control is disabled, no more reads. - if (!(c->flag.primary) || c->flag.close_asap || server.repl_max_reads_per_io_event == 1) { + if (!(c->flag.primary) || c->flag.close_asap) { return 0; } - if (iteration < server.repl_max_reads_per_io_event && + if (iteration < REPL_MAX_READS_PER_IO_EVENT && c->is_qb_full_read) { return 1; } diff --git a/src/server.h b/src/server.h index dd47fd390c..21d6fb15a2 100644 --- a/src/server.h +++ b/src/server.h @@ -2147,9 +2147,6 @@ struct valkeyServer { /* Local environment */ char *locale_collate; char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */ - - /* Replication flow control */ - int repl_max_reads_per_io_event; /* Maximum allowed reads from the primary file descriptor per I/O event */ }; #define MAX_KEYS_BUFFER 256 diff --git a/valkey.conf b/valkey.conf index c80d771e5e..df12b72d24 100644 --- a/valkey.conf +++ b/valkey.conf @@ -2535,12 +2535,3 @@ jemalloc-bg-thread yes # the empty string. # # availability-zone "zone-name" - -################################## REPLICATION FLOW CONTROL ################################## - -# Controls how aggressively the replica prioritizes replication reads over normal clients. -# Higher values allow more replication data to be processed per I/O event, reducing lag and risk of full syncs. -# A value of 1 effectively disables the feature. -# -# Default: 25 -# repl-max-reads-per-io-event 25 \ No newline at end of file From e282f462e1f9af1f70f648b8a6d030a9189f4f95 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 14:27:13 +0300 Subject: [PATCH 11/19] Remove nohup Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/nohup.out | 1 - 1 file changed, 1 deletion(-) delete mode 100644 src/nohup.out diff --git a/src/nohup.out b/src/nohup.out deleted file mode 100644 index d185b4be95..0000000000 --- a/src/nohup.out +++ /dev/null @@ -1 +0,0 @@ -stress: info: [1201400] dispatching hogs: 1 cpu, 0 io, 0 vm, 0 hdd From 10642abd15bd16569533a60421f4aae9f4840ea5 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 14:44:58 +0300 Subject: [PATCH 12/19] Update src/networking.c Co-authored-by: Madelyn Olson Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 8e0f58d04a..dbb4d6f97f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3346,7 +3346,7 @@ void readToQueryBuf(client *c) { if (c->nread <= 0) { return; } - c->is_qb_full_read = (size_t)c->nread == readlen ? 1 : 0; + c->is_qb_full_read = (size_t)c->nread == readlen; sdsIncrLen(c->querybuf, c->nread); qblen = sdslen(c->querybuf); From a9b66b9a2a5ccdb379d60b60ad5de1b2de7de779 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 14:45:17 +0300 Subject: [PATCH 13/19] Update src/networking.c Co-authored-by: Madelyn Olson Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index dbb4d6f97f..72d9728288 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3372,7 +3372,7 @@ void readToQueryBuf(client *c) { * @return 1 if another read should be attempted, 0 otherwise. */ #define REPL_MAX_READS_PER_IO_EVENT 25 -int shouldRepeatRead(client *c, int iteration) { +static bool shouldRepeatReadFromPrimary(client *c, int iteration) { // If the client is not a primary replica, is closing, or flow control is disabled, no more reads. if (!(c->flag.primary) || c->flag.close_asap) { return 0; From d69d383da11a917df91b0e507be8af6993b73294 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 14:53:50 +0300 Subject: [PATCH 14/19] change comment Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/networking.c b/src/networking.c index 72d9728288..f73df419da 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3347,7 +3347,6 @@ void readToQueryBuf(client *c) { return; } c->is_qb_full_read = (size_t)c->nread == readlen; - sdsIncrLen(c->querybuf, c->nread); qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; @@ -3364,14 +3363,8 @@ void readToQueryBuf(client *c) { } } -/** - * This function is designed to prioritize replication flow. - * Determines whether the replica should continue reading from the primary. - * It dynamically adjusts the read rate based on buffer utilization - * - * @return 1 if another read should be attempted, 0 otherwise. - */ #define REPL_MAX_READS_PER_IO_EVENT 25 +/** Keeps replica reading from the primary if recvq has data. */ static bool shouldRepeatReadFromPrimary(client *c, int iteration) { // If the client is not a primary replica, is closing, or flow control is disabled, no more reads. if (!(c->flag.primary) || c->flag.close_asap) { From 499de5308c1d36ffe9afdd81e69a78ffda86f147 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 15:00:47 +0300 Subject: [PATCH 15/19] fix bug Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index f73df419da..d5aac8735b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3396,7 +3396,7 @@ void readQueryFromClient(connection *conn) { if (processInputBuffer(c) == C_ERR) return; } iter++; - shouldRepeat = shouldRepeatRead(c, iter); + shouldRepeat = shouldRepeatReadFromPrimary(c, iter); beforeNextClient(c); } while (shouldRepeat); } From 68d51b37015af8c42feffe6900e3dc036c90e805 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 16:01:55 +0300 Subject: [PATCH 16/19] format Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/networking.c b/src/networking.c index d5aac8735b..0853580452 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3387,7 +3387,7 @@ void readQueryFromClient(connection *conn) { if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return; - bool shouldRepeat = false; + bool repeat = false; int iter = 0; do { readToQueryBuf(c); @@ -3396,9 +3396,9 @@ void readQueryFromClient(connection *conn) { if (processInputBuffer(c) == C_ERR) return; } iter++; - shouldRepeat = shouldRepeatReadFromPrimary(c, iter); + repeat = shouldRepeatReadFromPrimary(c, iter); beforeNextClient(c); - } while (shouldRepeat); + } while (repeat); } /* An "Address String" is a colon separated ip:port pair. From df22a2be03408f035e7003acfd239d56b511040b Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Thu, 15 May 2025 17:33:00 +0300 Subject: [PATCH 17/19] simplify Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 40 +++++++++++++++------------------------- src/server.h | 22 ++++++++++------------ 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/src/networking.c b/src/networking.c index 0853580452..70e106d805 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3281,13 +3281,17 @@ int processInputBuffer(client *c) { /* This function can be called from the main-thread or from the IO-thread. * The function allocates query-buf for the client if required and reads to it from the network. - * It will set c->nread to the bytes read from the network. */ -void readToQueryBuf(client *c) { + * It will set c->nread to the bytes read from the network. + * Returns non-zero if the buffer was filled (more data may be available). + */ + +static bool readToQueryBuf(client *c) { int big_arg = 0; size_t qblen, readlen; + int ret = 0; /* If the replica RDB client is marked as closed ASAP, do not try to read from it */ - if (c->flag.close_asap) return; + if (c->flag.close_asap) return ret; int is_primary = c->read_flags & READ_FLAGS_PRIMARY; @@ -3344,9 +3348,9 @@ void readToQueryBuf(client *c) { c->nread = connRead(c->conn, c->querybuf + qblen, readlen); if (c->nread <= 0) { - return; + return ret; } - c->is_qb_full_read = (size_t)c->nread == readlen; + ret = (size_t)c->nread == readlen; sdsIncrLen(c->querybuf, c->nread); qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; @@ -3361,25 +3365,10 @@ void readToQueryBuf(client *c) { c->read_flags |= READ_FLAGS_QB_LIMIT_REACHED; } } + return ret; } #define REPL_MAX_READS_PER_IO_EVENT 25 -/** Keeps replica reading from the primary if recvq has data. */ -static bool shouldRepeatReadFromPrimary(client *c, int iteration) { - // If the client is not a primary replica, is closing, or flow control is disabled, no more reads. - if (!(c->flag.primary) || c->flag.close_asap) { - return 0; - } - - if (iteration < REPL_MAX_READS_PER_IO_EVENT && - c->is_qb_full_read) { - return 1; - } - - return 0; -} - - void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); /* Check if we can send the client to be handled by the IO-thread */ @@ -3390,13 +3379,14 @@ void readQueryFromClient(connection *conn) { bool repeat = false; int iter = 0; do { - readToQueryBuf(c); - + bool full_read = readToQueryBuf(c); if (handleReadResult(c) == C_OK) { if (processInputBuffer(c) == C_ERR) return; } - iter++; - repeat = shouldRepeatReadFromPrimary(c, iter); + repeat = (c->flag.primary && + !c->flag.close_asap && + ++iter < REPL_MAX_READS_PER_IO_EVENT && + full_read); beforeNextClient(c); } while (repeat); } diff --git a/src/server.h b/src/server.h index 319a145292..d6c52de8db 100644 --- a/src/server.h +++ b/src/server.h @@ -53,7 +53,6 @@ #include #include #include -#include #ifdef HAVE_LIBSYSTEMD #include @@ -1188,17 +1187,16 @@ typedef struct client { uint64_t id; /* Client incremental unique ID. */ connection *conn; /* Input buffer and command parsing fields */ - sds querybuf; /* Buffer we use to accumulate client queries. */ - size_t qb_pos; /* The position we have read in querybuf. */ - bool is_qb_full_read; /* True if the last read returned the maximum allowed bytes */ - robj **argv; /* Arguments of current command. */ - int argc; /* Num of arguments of current command. */ - int argv_len; /* Size of argv array (may be more than argc) */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ - int reqtype; /* Request protocol type: PROTO_REQ_* */ - int multibulklen; /* Number of multi bulk arguments left to read. */ - long bulklen; /* Length of bulk argument in multi bulk request. */ - long long woff; /* Last write global replication offset. */ + sds querybuf; /* Buffer we use to accumulate client queries. */ + size_t qb_pos; /* The position we have read in querybuf. */ + robj **argv; /* Arguments of current command. */ + int argc; /* Num of arguments of current command. */ + int argv_len; /* Size of argv array (may be more than argc) */ + size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + int reqtype; /* Request protocol type: PROTO_REQ_* */ + int multibulklen; /* Number of multi bulk arguments left to read. */ + long bulklen; /* Length of bulk argument in multi bulk request. */ + long long woff; /* Last write global replication offset. */ /* Command execution state and command information */ struct serverCommand *cmd; /* Current command. */ struct serverCommand *lastcmd; /* Last command executed. */ From f930b2ddb91070515d63994bbd917ac70cb56835 Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Fri, 16 May 2025 01:46:56 +0300 Subject: [PATCH 18/19] Update src/networking.c MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Viktor Söderqvist Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/networking.c b/src/networking.c index 70e106d805..e4594cb34a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3282,9 +3282,7 @@ int processInputBuffer(client *c) { /* This function can be called from the main-thread or from the IO-thread. * The function allocates query-buf for the client if required and reads to it from the network. * It will set c->nread to the bytes read from the network. - * Returns non-zero if the buffer was filled (more data may be available). - */ - + * Returns true if the buffer was filled (more data may be available). */ static bool readToQueryBuf(client *c) { int big_arg = 0; size_t qblen, readlen; From c514dca5d418232277660f1b10d77fd9a769f5ee Mon Sep 17 00:00:00 2001 From: xbasel <103044017+xbasel@users.noreply.github.com> Date: Fri, 16 May 2025 01:52:51 +0300 Subject: [PATCH 19/19] review Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> --- src/networking.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index e4594cb34a..cb16234add 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3286,10 +3286,9 @@ int processInputBuffer(client *c) { static bool readToQueryBuf(client *c) { int big_arg = 0; size_t qblen, readlen; - int ret = 0; /* If the replica RDB client is marked as closed ASAP, do not try to read from it */ - if (c->flag.close_asap) return ret; + if (c->flag.close_asap) return false; int is_primary = c->read_flags & READ_FLAGS_PRIMARY; @@ -3346,9 +3345,9 @@ static bool readToQueryBuf(client *c) { c->nread = connRead(c->conn, c->querybuf + qblen, readlen); if (c->nread <= 0) { - return ret; + return false; } - ret = (size_t)c->nread == readlen; + sdsIncrLen(c->querybuf, c->nread); qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; @@ -3363,7 +3362,7 @@ static bool readToQueryBuf(client *c) { c->read_flags |= READ_FLAGS_QB_LIMIT_REACHED; } } - return ret; + return (size_t)c->nread == readlen; } #define REPL_MAX_READS_PER_IO_EVENT 25