Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ jobs:
- name: unit tests
run: ./src/valkey-unit-tests

test-ubuntu-io-threads-sanitizer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be in daily and not in the CI.

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: make
# build with TLS module just for compilation coverage
run: make -j4 all-with-unit-tests SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
- name: testprep
run: sudo apt-get install tcl8.6 tclx -y
- name: test
run: ./runtest --io-threads --verbose --tags -slow --dump-logs
- name: module api test
run: CFLAGS='-Werror' ./runtest-moduleapi --io-threads --verbose --dump-logs

test-rdma:
runs-on: ubuntu-latest
steps:
Expand Down
4 changes: 2 additions & 2 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ void ACLFreeUserAndKillClients(user *u) {
clientSetUser(c, DefaultUser, 0);
/* We will write replies to this client later, so we can't
* close it directly even if async. */
if (c == server.current_client) {
if (isCurrentClient(c)) {
c->flag.close_after_command = 1;
} else {
freeClientAsync(c);
Expand Down Expand Up @@ -2635,7 +2635,7 @@ void addACLLogEntry(client *c, int reason, int context, int argpos, sds username
}

/* if we have a real client from the network, use it (could be missing on module timers) */
client *realclient = server.current_client ? server.current_client : c;
client *realclient = getCurrentClient() ? getCurrentClient() : c;

le->cinfo = catClientInfoString(sdsempty(), realclient, 0);
le->context = context;
Expand Down
14 changes: 14 additions & 0 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->custompoll = NULL;
eventLoop->prefetch = NULL;
eventLoop->flags = 0;
eventLoop->epoll_batch_size = 0; /* Default to 0, meaning use setsize */
/* Initialize the eventloop mutex with PTHREAD_MUTEX_ERRORCHECK type */
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
Expand Down Expand Up @@ -217,6 +219,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
* is removed. */
if (mask & AE_WRITABLE) mask |= AE_BARRIER;

/* We want to always remove AE_PRE_READABLE_HOOK if set when AE_READABLE is removed. */
if (mask & AE_READABLE) mask |= AE_PRE_READABLE_HOOK;

/* Only remove attached events */
mask = mask & fe->mask;

Expand Down Expand Up @@ -458,6 +463,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents);

for (j = 0; j < numevents; j++) {
if (numevents > 1 && eventLoop->prefetch) eventLoop->prefetch(eventLoop, j, numevents);
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
Expand Down Expand Up @@ -562,10 +568,18 @@ void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) {
eventLoop->custompoll = custompoll;
}

void aeSetPrefetchProc(aeEventLoop *eventLoop, aePrefetchProc *prefetch) {
eventLoop->prefetch = prefetch;
}

void aeSetPollProtect(aeEventLoop *eventLoop, int protect) {
if (protect) {
eventLoop->flags |= AE_PROTECT_POLL;
} else {
eventLoop->flags &= ~AE_PROTECT_POLL;
}
}

void aeSetEpollBatchSize(aeEventLoop *eventLoop, int batchSize) {
eventLoop->epoll_batch_size = batchSize;
}
22 changes: 14 additions & 8 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@
#define AE_OK 0
#define AE_ERR -1

#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */
#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */
#define AE_PRE_READABLE_HOOK 8 /* Call pre-process-read callback for the events */

#define AE_FILE_EVENTS (1 << 0)
#define AE_TIME_EVENTS (1 << 1)
Expand All @@ -72,6 +73,7 @@ typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientDat
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);
typedef int aeCustomPollProc(struct aeEventLoop *eventLoop);
typedef void aePrefetchProc(struct aeEventLoop *eventLoop, int cur_idx, int numevents);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -113,8 +115,10 @@ typedef struct aeEventLoop {
aeBeforeSleepProc *beforesleep;
aeAfterSleepProc *aftersleep;
aeCustomPollProc *custompoll;
aePrefetchProc *prefetch;
pthread_mutex_t poll_mutex;
int flags;
int epoll_batch_size; /* Maximum events to process per epoll_wait call (0 = use system default batch size) */
} aeEventLoop;

/* Prototypes */
Expand All @@ -138,10 +142,12 @@ char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll);
void aeSetPrefetchProc(aeEventLoop *eventLoop, aePrefetchProc *prefetch);
void aeSetPollProtect(aeEventLoop *eventLoop, int protect);
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
void aeSetEpollBatchSize(aeEventLoop *eventLoop, int batchSize);

#endif
3 changes: 2 additions & 1 deletion src/ae_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
int batch_size = eventLoop->epoll_batch_size > 0 ? eventLoop->epoll_batch_size : eventLoop->setsize;

retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
retval = epoll_wait(state->epfd, state->events, batch_size,
tvp ? (tvp->tv_sec * 1000 + (tvp->tv_usec + 999) / 1000) : -1);
if (retval > 0) {
int j;
Expand Down
11 changes: 6 additions & 5 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1429,10 +1429,11 @@ int loadSingleAppendOnlyFile(char *filename) {
* to the same file we're about to read. */
server.aof_state = AOF_OFF;

client *old_cur_client = server.current_client;
client *old_exec_client = server.executing_client;
client *old_cur_client = getCurrentClient();
client *old_exec_client = getExecutingClient();
fakeClient = createAOFClient();
server.current_client = server.executing_client = fakeClient;
setCurrentClient(fakeClient);
setExecutingClient(fakeClient);

/* Check if the AOF file is in RDB format (it may be RDB encoded base AOF
* or old style RDB-preamble AOF). In that case we need to load the RDB file
Expand Down Expand Up @@ -1637,8 +1638,8 @@ int loadSingleAppendOnlyFile(char *filename) {

cleanup:
if (fakeClient) freeClient(fakeClient);
server.current_client = old_cur_client;
server.executing_client = old_exec_client;
setCurrentClient(old_cur_client);
setExecutingClient(old_exec_client);
fclose(fp);
sdsfree(aof_filepath);
return ret;
Expand Down
18 changes: 10 additions & 8 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ void initClientBlockingState(client *c) {
c->bstate->generic_blocked_list_node = NULL;
c->bstate->module_blocked_handle = NULL;
c->bstate->async_rm_call_handle = NULL;
c->bstate->slot_pending_list = NULL;
listInitNode(&c->bstate->pending_client_node, c);
}

void freeClientBlockingState(client *c) {
Expand Down Expand Up @@ -226,7 +228,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {
serverAssert(c->bstate->postponed_list_node);
listDelNode(server.postponed_clients, c->bstate->postponed_list_node);
c->bstate->postponed_list_node = NULL;
} else if (c->bstate->btype == BLOCKED_SHUTDOWN) {
} else if (c->bstate->btype == BLOCKED_SHUTDOWN || c->bstate->btype == BLOCKED_SLOT) {
/* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
Expand Down Expand Up @@ -333,7 +335,7 @@ void disconnectAllBlockedClients(void) {
* command processing will start from scratch, and the command will
* be either executed or rejected. (unlike LIST blocked clients for
* which the command is already in progress in a way. */
if (c->bstate->btype == BLOCKED_POSTPONE) continue;
if (c->bstate->btype == BLOCKED_POSTPONE || c->bstate->btype == BLOCKED_SLOT) continue;

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
Expand Down Expand Up @@ -703,8 +705,8 @@ static void unblockClientOnKey(client *c, robj *key) {
* running the command, and exit the execution unit after calling the unblock handler (if exists).
* Notice that we also must set the current client so it will be available
* when we will try to send the client side caching notification (done on 'afterCommand'). */
client *old_client = server.current_client;
server.current_client = c;
client *old_client = getCurrentClient();
setCurrentClient(c);
enterExecutionUnit(1, 0);
processCommandAndResetClient(c);
if (!c->flag.blocked) {
Expand All @@ -718,7 +720,7 @@ static void unblockClientOnKey(client *c, robj *key) {
afterCommand(c);
/* Clear the reexecuting_command flag after the proc is executed. */
c->flag.reexecuting_command = 0;
server.current_client = old_client;
setCurrentClient(old_client);
}
}

Expand All @@ -728,8 +730,8 @@ static void unblockClientOnKey(client *c, robj *key) {
* be processed in moduleHandleBlockedClients. */
static void moduleUnblockClientOnKey(client *c, robj *key) {
long long prev_error_replies = server.stat_total_error_replies;
client *old_client = server.current_client;
server.current_client = c;
client *old_client = getCurrentClient();
setCurrentClient(c);
monotime replyTimer;
elapsedStart(&replyTimer);

Expand All @@ -742,7 +744,7 @@ static void moduleUnblockClientOnKey(client *c, robj *key) {
* in order to propagate any changes that could have been done inside
* moduleTryServeClientBlockedOnKey */
afterCommand(c);
server.current_client = old_client;
setCurrentClient(old_client);
}

/* Unblock a client which is currently Blocked on and provided a timeout.
Expand Down
7 changes: 4 additions & 3 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,13 @@ void clusterCommandMyShardId(client *c) {
/* When a cluster command is called, we need to decide whether to return TLS info or
* non-TLS info by the client's connection type. However if the command is called by
* a Lua script or RM_call, there is no connection in the fake client, so we use
* server.current_client here to get the real client if available. And if it is not
* current_client here to get the real client if available. And if it is not
* available (modules may call commands without a real client), we return the default
* info, which is determined by server.tls_cluster. */
static int shouldReturnTlsInfo(void) {
if (server.current_client && server.current_client->conn) {
return connIsTLS(server.current_client->conn);
client *current_client = getCurrentClient();
if (current_client && current_client->conn) {
return connIsTLS(current_client->conn);
} else {
return server.tls_cluster;
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
client *c = getCurrentClient();
if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return;

/* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */
Expand Down
2 changes: 2 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3267,6 +3267,8 @@ standardConfig static_configs[] = {
createIntConfig("min-string-size-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid, 16384, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-string-size-avoid-copy-reply-threaded", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid_threaded, 65536, INTEGER_CONFIG, NULL, NULL),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createBoolConfig("io-threads-do-commands-offloading", NULL, MODIFIABLE_CONFIG, server.io_threads_do_commands_offloading, 1, NULL, NULL), /* Command offloading enabled by default */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question of why do we think this should be a config? In the fullness of time do we expect users to tune this, or should it dynamically optimize itself for performance. I'm OK with having a hidden config here primarily for testing and tuning.

createBoolConfig("io-threads-do-commands-offloading-with-modules", NULL, MODIFIABLE_CONFIG, server.io_threads_do_commands_offloading_with_modules, 0, NULL, NULL), /* Module command offloading disabled by default */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be server configuration. Either the module should be the one deciding if it can offload work "per command" or it should be a module wide configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, module commands are not offloaded in any case. However, we also need to ensure that modules only access the keys declared in their commands. If this is not guaranteed, we cannot offload commands for other slots. Once a single module fails to guarantee this behavior, we cannot offload any commands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really disagree with anything that you are saying, but I don't see how that conflicts with my comment that this shouldn't be a config. Administration shouldn't have to worry about if a module do work outside the context. The module should be declaring that it's safe to offload commands. If any module doesn't declare that, commands shouldn't be able to get offloaded.

createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
Expand Down
Loading
Loading