Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,44 @@ jobs:
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: ./runtest-cluster --io-threads ${{github.event.inputs.cluster_test_args}}

test-ubuntu-tls-io-threads:
runs-on: ubuntu-latest
if: |
(github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'valkey-io/valkey') ||
(github.event_name == 'pull_request' && (contains(github.event.pull_request.labels.*.name, 'run-extra-tests') || github.event.pull_request.base.ref != 'unstable'))) &&
!contains(github.event.inputs.skipjobs, 'tls') && !contains(github.event.inputs.skipjobs, 'iothreads')
timeout-minutes: 14400
steps:
- name: prep
if: github.event_name == 'workflow_dispatch'
run: |
echo "GITHUB_REPOSITORY=${{github.event.inputs.use_repo}}" >> $GITHUB_ENV
echo "GITHUB_HEAD_REF=${{github.event.inputs.use_git_ref}}" >> $GITHUB_ENV
echo "skipjobs: ${{github.event.inputs.skipjobs}}"
echo "skiptests: ${{github.event.inputs.skiptests}}"
echo "test_args: ${{github.event.inputs.test_args}}"
echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}"
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: |
make BUILD_TLS=yes SERVER_CFLAGS='-Werror'
- name: testprep
run: |
sudo apt-get install tcl8.6 tclx tcl-tls
./utils/gen-test-certs.sh
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
run: |
./runtest --io-threads --tls --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
- name: cluster tests
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: |
./runtest-cluster --io-threads --tls ${{github.event.inputs.cluster_test_args}}

test-ubuntu-reclaim-cache:
runs-on: ubuntu-latest
if: |
Expand Down
5 changes: 3 additions & 2 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ typedef enum {
CONN_STATE_ERROR
} ConnectionState;

#define CONN_FLAG_CLOSE_SCHEDULED (1 << 0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1 << 1) /* Write barrier requested */
#define CONN_FLAG_CLOSE_SCHEDULED (1 << 0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1 << 1) /* Write barrier requested */
#define CONN_FLAG_ALLOW_ACCEPT_OFFLOAD (1 << 2) /* Connection accept can be offloaded to IO threads. */

#define CONN_TYPE_SOCKET "tcp"
#define CONN_TYPE_UNIX "unix"
Expand Down
52 changes: 52 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,55 @@ void trySendPollJobToIOThreads(void) {
aeSetPollProtect(server.el, 1);
IOJobQueue_push(jq, IOThreadPoll, server.el);
}

static void ioThreadAccept(void *data) {
client *c = (client *)data;
connAccept(c->conn, NULL);
c->io_read_state = CLIENT_COMPLETED_IO;
}

/*
* Attempts to offload an Accept operation (currently used for TLS accept) for a client
* connection to I/O threads.
*
* Returns:
* C_OK - If the accept operation was successfully queued for processing
* C_ERR - If the connection is not eligible for offloading
*
* Parameters:
* conn - The connection object to perform the accept operation on
*/
int trySendAcceptToIOThreads(connection *conn) {
if (server.io_threads_num <= 1) {
return C_ERR;
}

if (!(conn->flags & CONN_FLAG_ALLOW_ACCEPT_OFFLOAD)) {
return C_ERR;
}

client *c = connGetPrivateData(conn);
if (c->io_read_state != CLIENT_IDLE) {
return C_OK;
}

if (server.active_io_threads_num <= 1) {
return C_ERR;
}

size_t thread_id = (c->id % (server.active_io_threads_num - 1)) + 1;
IOJobQueue *job_queue = &io_jobs[thread_id];

if (IOJobQueue_isFull(job_queue)) {
return C_ERR;
}

c->io_read_state = CLIENT_PENDING_IO;
c->flag.pending_read = 1;
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
connSetPostponeUpdateState(c->conn, 1);
server.stat_io_accept_offloaded++;
IOJobQueue_push(job_queue, ioThreadAccept, c);

return C_OK;
}
1 change: 1 addition & 0 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ int tryOffloadFreeArgvToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);
int trySendAcceptToIOThreads(connection *conn);

#endif /* IO_THREADS_H */
6 changes: 6 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ client *createClient(connection *conn) {
if (server.tcpkeepalive) connKeepAlive(conn, server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
conn->flags |= CONN_FLAG_ALLOW_ACCEPT_OFFLOAD;
}
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
selectDb(c, 0);
Expand Down Expand Up @@ -4722,9 +4723,14 @@ int processIOThreadsReadDone(void) {
processed++;
server.stat_io_reads_processed++;

/* Save the current conn state, as connUpdateState may modify it */
int in_accept_state = (connGetState(c->conn) == CONN_STATE_ACCEPTING);
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);

/* In accept state, no client's data was read - stop here. */
if (in_accept_state) continue;

/* On read error - stop here. */
if (handleReadResult(c) == C_ERR) {
continue;
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2604,6 +2604,7 @@ void resetServerStats(void) {
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_io_freed_objects = 0;
server.stat_io_accept_offloaded = 0;
server.stat_poll_processed_by_io_threads = 0;
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
Expand Down Expand Up @@ -5862,6 +5863,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"io_threaded_accept_processed:%lld\r\n", server.stat_io_accept_offloaded,
"io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads,
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,7 @@ struct valkeyServer {
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_io_freed_objects; /* Number of objects freed by IO threads */
long long stat_io_accept_offloaded; /* Number of offloaded accepts */
long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */
long long stat_total_reads_processed; /* Total number of read events processed */
long long stat_total_writes_processed; /* Total number of write events processed */
Expand Down
Loading
Loading