-
Notifications
You must be signed in to change notification settings - Fork 955
Read commands offloading to IO threads #2208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Conversation
|
@uriyage Can you prioritize doing the merge so that we can properly run the tests? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enables offloading of read commands to I/O worker threads and refactors client-context handling to use thread-local accessors, while also introducing an event prefetch mechanism and enhancing slot-based client blocking.
- Added
CAN_BE_OFFLOADEDflags to JSON command definitions and command table entries for read/fast commands - Replaced direct uses of
server.current_client/server.executing_clientwithgetCurrentClient/setCurrentClientwrappers - Extended event loop with configurable
epoll_batch_sizeand an AE_PREFETCH callback, and added slot-based client blocking logic
Reviewed Changes
Copilot reviewed 65 out of 65 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/commands/*.json | Added "CAN_BE_OFFLOADED" to command_flags for read/fast commands |
| src/commands.def | Added CMD_CAN_BE_OFFLOADED to the server command table entries for matching commands |
| src/cluster_slot_stats.c | Replaced server.current_client with getCurrentClient() |
| src/cluster.c | Replaced server.current_client with getCurrentClient() and added slot-check logic |
| src/blocked.c | Initialized new slot-pending fields and updated unblock logic for BLOCKED_SLOT |
| src/aof.c | Swapped direct server.current_client/executing_client usage for accessor calls |
| src/ae_epoll.c | Introduced epoll_batch_size and folded it into epoll_wait |
| src/ae.h | Added AE_PREFETCH, aePrefetchProc, and epoll_batch_size definitions |
| src/ae.c | Initialized epoll_batch_size, removed stale read events on prefetch registration, hooked prefetch callback |
| src/acl.c | Replaced client checks and references with getCurrentClient()/isCurrentClient() |
Comments suppressed due to low confidence (3)
src/commands/hvals.json:11
- Commands marked with
CAN_BE_OFFLOADEDshould have corresponding unit or integration tests verifying correct offload behavior.
"CAN_BE_OFFLOADED"
src/cluster.c:809
- Update the comment to reference
getCurrentClient()instead ofcurrent_clientfor accuracy.
* current_client here to get the real client if available. And if it is not
src/commands/get.json:12
- [nitpick] Indentation here mixes tabs and spaces for the new flag; align with the existing two-space indentation for consistency.
"CAN_BE_OFFLOADED"
d47011e to
7ca6101
Compare
hpatro
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commands that can be offloaded to I/O threads are marked with the CAN_BE_OFFLOADED flag in their json files. However, different types of commands have different exclusivity requirements:
Database-exclusive commands require complete isolation and cannot run in parallel with any other offloaded command.
Can't this be done implicitly instead of defining it via a flag explicitly? With that we would be aware all read commands will be offloaded and a dev doesn't need to check if it's enabled. In the future, we also don't risk of missing new read command being not offloaded.
madolson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
I'm primarily concerned about pipelining, multi-exec, and other operational commands that might give inconsistent performance. Commands queueing up periodically because we require the exclusive lock can cause P99 latency spikes. It would be nice to see other realistic workloads for testing performance. It would also be nice to understand how large the cluster can scale while still getting these improvements, can 500 primary clusters (~32 slots per primary) still achieve high enough concurrency?
| createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), | ||
| createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), | ||
| createBoolConfig("io-threads-do-commands-offloading", NULL, MODIFIABLE_CONFIG, server.io_threads_do_commands_offloading, 1, NULL, NULL), /* Command offloading enabled by default */ | ||
| createBoolConfig("io-threads-do-commands-offloading-with-modules", NULL, MODIFIABLE_CONFIG, server.io_threads_do_commands_offloading_with_modules, 0, NULL, NULL), /* Module command offloading disabled by default */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be server configuration. Either the module should be the one deciding if it can offload work "per command" or it should be a module wide configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, module commands are not offloaded in any case. However, we also need to ensure that modules only access the keys declared in their commands. If this is not guaranteed, we cannot offload commands for other slots. Once a single module fails to guarantee this behavior, we cannot offload any commands.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really disagree with anything that you are saying, but I don't see how that conflicts with my comment that this shouldn't be a config. Administration shouldn't have to worry about if a module do work outside the context. The module should be declaring that it's safe to offload commands. If any module doesn't declare that, commands shouldn't be able to get offloaded.
| /* If no mandatory keys are specified, we can't determine which slot will be accessed */ | ||
| if (cmd->flags & CMD_NO_MANDATORY_KEYS) return 1; | ||
| /* Any Admin level command needs full exclusivity as it impacts system-wide behaviour */ | ||
| if (cmd->flags & CMD_ADMIN) return 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't really a great assumption. Some stuff like slowlog and acl log shouldn't really impact the wide system, we shouldn't need an exclusive lock to execute those. Many systems might have automation hitting this (like ours) which would cause random dips in performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We expect admin commands to be infrequent, meaning not in the thousands per second. As long as
this is the case, it won't affect performance, as we have observed.
Exclusivity for admin commands simplifies the code and makes it more secure, as we don't need to worry about scenarios like server configurations being changed while a thread is executing a command, or client's
ACL permissions being modified during execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the case, it won't affect performance, as we have observed.
You haven't given any data on P99 performance, just throughput, so I don't think you have observed this.
Exclusivity for admin commands simplifies the code and makes it more secure, as we don't need to worry about scenarios like server configurations being changed while a thread is executing a command, or client's
ACL permissions being modified during execution.
I agree about changing ACL permissions, I was mentioning ACL log.
| IoToMTQueueProduce((uint64_t)c | (uint64_t)r, 0); | ||
| } | ||
|
|
||
| static void processClientIOCommandDone(client *c) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we deduplicate this code with the code in server.c, I don't like having two places we need to update. Ideally let's have a function in server.c that covers the overlap, and we can call that function from here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will require some refactoring of the call function. Do you think we should include it in this PR, or should we create a separate follow-up task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, include it in this PR ideally.
|
(Ooops, didn't mean to approve, but I can't dismiss it while there are merge conflicts apparently? |
Signed-off-by: Uri Yagelnik <[email protected]> # Conflicts: # src/blocked.c # src/cluster.c # src/cluster_slot_stats.c # src/commands.def # src/io_threads.c # src/io_threads.h # src/memory_prefetch.c # src/networking.c # src/server.c # src/server.h
7ca6101 to
aa83602
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #2208 +/- ##
============================================
- Coverage 71.54% 70.89% -0.65%
============================================
Files 122 123 +1
Lines 66491 67404 +913
============================================
+ Hits 47570 47787 +217
- Misses 18921 19617 +696
🚀 New features to boost your workflow:
|
Signed-off-by: Uri Yagelnik <[email protected]>
@hpatro |
| - name: unit tests | ||
| run: ./src/valkey-unit-tests | ||
|
|
||
| test-ubuntu-io-threads-sanitizer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be in daily and not in the CI.
Might become difficult to maintain in the future. I would like to keep the life of a developer implementing a new read command as easy as it is currently. Could we possibly introduce broad guard rails to avoid such mistakes? Thinking out loud here. Could you share any command which is currently not isolated and what field do they access? For multi-commands, maybe we mark them with |
Signed-off-by: Uri Yagelnik <[email protected]>
d728920 to
5b258fa
Compare
@hpatro, Thanks. I have reviewed all the missing read-only commands, and it can indeed be done with a few minor changes. I removed the flag, and now we offload all read-only commands with the following restrictions: #define CMD_CAN_BE_OFFLOADED(cmd) \
((cmd->flags & CMD_READONLY) && \
!(cmd->flags & CMD_NO_MANDATORY_KEYS) && \
!(cmd->flags & CMD_MAY_REPLICATE) && \
!(cmd->flags & CMD_BLOCKING))In addition, we won't offload commands if |
It would still be good to understand how we can offload the SCAN command or at the very last have it not take a server level lock. |
| } | ||
|
|
||
| /* Check if modules are loaded and module offloading is disabled */ | ||
| if (moduleCount() > 0 && !server.io_threads_do_commands_offloading_with_modules) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd imagine that a property access would be cheaper than a function call...
| if (moduleCount() > 0 && !server.io_threads_do_commands_offloading_with_modules) { | |
| if (!server.io_threads_do_commands_offloading_with_modules && moduleCount() > 0) { |
| rehashing_completion_ctx ctx = {.rehashing_node = metadata->rehashing_node, .kvs = metadata->kvs, .from = from}; | ||
| metadata->rehashing_node = NULL; | ||
|
|
||
| /* If not in main-thread postpone the update of kvs rehashing info to be done later by the main-thread -*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /* If not in main-thread postpone the update of kvs rehashing info to be done later by the main-thread -*/ | |
| /* If not in main-thread, postpone the update of kvs rehashing info to be done later by the main-thread */ |
| return; | ||
| } | ||
|
|
||
| /* Postpone error updates if its io-thread */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /* Postpone error updates if its io-thread */ | |
| /* Postpone error updates if it's io-thread */ |
| void *async_rm_call_handle; /* ValkeyModuleAsyncRMCallPromise structure. | ||
| which is opaque for the Redis core, only | ||
| handled in module.c. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think style is:
| void *async_rm_call_handle; /* ValkeyModuleAsyncRMCallPromise structure. | |
| which is opaque for the Redis core, only | |
| handled in module.c. */ | |
| void *async_rm_call_handle; /* ValkeyModuleAsyncRMCallPromise structure. | |
| * which is opaque for the Redis core, only | |
| * handled in module.c. */ |
| volatile uint8_t io_command_state; /* Indicate the IO command state of the client */ | ||
| ustime_t duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */ | ||
| robj **original_argv; /* Arguments of original command if arguments were rewritten. */ | ||
| unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the* execution of this client's current command. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the* execution of this client's current command. */ | |
| unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the execution of this client's current command. */ |
| long long | ||
| stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to primary, etc.) error replies */ | ||
| long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ | ||
| long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ | ||
| long long stat_io_reads_processed; /* Number of read events processed by IO threads */ | ||
| long long stat_io_writes_processed; /* Number of write events processed by IO threads */ | ||
| long long stat_io_freed_objects; /* Number of objects freed by IO threads */ | ||
| long long stat_io_accept_offloaded; /* Number of offloaded accepts */ | ||
| long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */ | ||
| long long stat_total_reads_processed; /* Total number of read events processed */ | ||
| long long stat_total_writes_processed; /* Total number of write events processed */ | ||
| long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ | ||
| long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ | ||
| long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ | ||
| long long stat_total_prefetch_batches; /* Total number of prefetched batches */ | ||
| stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to primary, etc.) error replies */ | ||
| long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this item now fit?
| long long | |
| stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to primary, etc.) error replies */ | |
| long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ | |
| long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ | |
| long long stat_io_reads_processed; /* Number of read events processed by IO threads */ | |
| long long stat_io_writes_processed; /* Number of write events processed by IO threads */ | |
| long long stat_io_freed_objects; /* Number of objects freed by IO threads */ | |
| long long stat_io_accept_offloaded; /* Number of offloaded accepts */ | |
| long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */ | |
| long long stat_total_reads_processed; /* Total number of read events processed */ | |
| long long stat_total_writes_processed; /* Total number of write events processed */ | |
| long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ | |
| long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ | |
| long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ | |
| long long stat_total_prefetch_batches; /* Total number of prefetched batches */ | |
| stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to primary, etc.) error replies */ | |
| long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ | |
| long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to primary, etc.) error replies */ | |
| long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ |
|
Guys it is SO big and SO game changing! |
|
Just checking how is it going for the 10th release? ) |
Issue: #2022
Joined work with @touitou-dan, @akashkgit @nadav-levanoni
Overview
Valkey 8.0 introduced worker threads for I/O operations, but a critical bottleneck remains: all command execution still occurs sequentially in the main thread. This architecture limitation becomes increasingly problematic as systems scale to more cores and memory, and provides minimal performance gains for CPU-intensive workloads where processing, not I/O, is the primary constraint.
This PR extends worker thread capabilities to execute read commands in parallel, removing the main thread bottleneck while maintaining data consistency. it uses the same centralized coordination model where the main-thread is acting as a scheduler as in valkey 8.0 eliminating the need for complex synchronization mechanisms such as locks or atomic operations.
Key Benefits:
Main Architecture Changes
Extended Job Types
Worker threads now handle both I/O operations and read command execution, expanding beyond the I/O-only limitation of version 8.0.
Unified Response Handling
All worker responses flow through a single multi-producer, single-consumer queue, eliminating the current need to constantly scan client lists
Continuation Tasks
When workers encounter operations requiring global data access (like expired key deletion), they create continuation tasks for the main thread to execute, ensuring data consistency.
Slot Access Control Mechanism
To prevent race conditions and ensure correct command execution order, the main thread uses deferred queues - that manage task scheduling and synchronization.
Structure and Components
Each deferred queue contains:
The system maintains:
How It Works
Example 1: Exclusive Command Handling
When an EVAL command arrives:
Example 2: Slot-level Synchronization
Consider a GET and SET command targeting the same slot:
Worker Thread Deferred Jobs
Worker threads sometimes need to execute operations that access global data structures. Instead of doing this immediately, they:
Current deferred job types include:
Special Case: ServerCron
The ServerCron function requires exclusive database access. When other commands are running in parallel, ServerCron gets enqueued as a deferred job on the exclusive queue, ensuring it runs only when no other operations are active.
Commands Offloading
Commands that can be offloaded to I/O threads are commands that are mark with READONLY flag in their json files.
However, different types of commands have different exclusivity requirements:
Database-exclusive commands require complete isolation and cannot run in parallel with any other offloaded command. These include:
Slot-exclusive commands have more limited restrictions - they can run in parallel with other read commands as long as those commands target different slots. They are only blocked by commands targeting the same slot.
This tiered approach to command exclusivity allows the system to maximize parallelism while maintaining data consistency and avoiding conflicts between concurrent operations.
A new configuration io-threads-do-command-offloading was added to disable command offloading.
Event Processing (epoll)
Three key improvements were made to the event polling process:
a) Client Structure Prefetching: When epoll_wait returns a batch of file descriptors, the system now prefetches the associated client structures to improve cache performance
during event processing. This is implemented through a new prefetch callback function added to the aeEventLoop structure, which proactively loads client data into memory before it
's accessed.
b) Epoll Round-robin offloading to Worker Threads: Similar to version 8.0, epoll_wait operations can be offloaded to worker threads. The implementation uses round-robin scheduling across available worker threads, with epoll jobs receiving higher priority than regular I/O and command processing jobs within each worker thread.
c) Batch Size Optimization: The maxevents parameter for epoll_wait was changed from using eventLoop→setsize (which could be quite large) to a fixed value of 200. This change addresses a performance regression that occurred between Linux kernel versions 5.x and 6.x, where larger batch sizes negatively impacted epoll performance.
Client Management
server.current_client converted to thread variable
Converted global server.current_client and server.executing_client to thread-local variables (_current_client, _executing_client) with accessor macros. This enables thread-safe client context management for concurrent command execution on IO threads.
New block type BLOCKED_SLOT for clients blocking on slots
Added BLOCKED_SLOT blocking type for clients waiting on busy slots during command offloading. Includes new blockingState fields (slot_pending_list, pending_client_node) . Allows proper slot contention handling by queuing clients until slots become available.
Async free client that are handled by IO threads
Modified client freeing to eliminate busy waiting for IO thread operations. Previously, freeClient() would busy wait using waitForClientIO() when clients had ongoing IO operations, creating complexity in edge cases. Now defer freeing until IO operations complete, preventing blocking and simplifying the freeing logic.
Limitations
Modules not supported by default
Keyspace miss notifications not supported
pipeline commands
Performance Evaluation
We implemented the proposed update into a prototype to evaluate the performance gain in read and read/write scenario .
Test Environment
Dataset
Benchmark Scenarios
String Operations
Hash Operations
Sorted Set Operations
List Operations