Skip to content

Commit 3cf54a8

Browse files
yairgottmurphyjacob4
authored andcommitted
Fix engine crash on module client blocking during keyspace events (valkey-io#1819)
This change enhances user experience and consistency by allowing a module to block a client on keyspace event notifications. Consistency is improved by allowing that reads after writes on the same connection yield expected results. For example, in ValkeySearch, mutations processed earlier on the same connection will be available for search. The implementation extends `VM_BlockClient` to support blocking clients on keyspace event notifications. Internal clients, LUA clients, clients issueing multi exec and those with the `deny_blocking` flag set are not blocked. Once blocked, a client’s reply is withheld until it is explicitly unblocked. --------- Signed-off-by: yairgott <[email protected]> Signed-off-by: Jacob Murphy <[email protected]>
1 parent 3ed8bc8 commit 3cf54a8

File tree

19 files changed

+590
-121
lines changed

19 files changed

+590
-121
lines changed

src/adlist.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ list *listCreate(void) {
5555

5656
/* Remove all the elements from the list without destroying the list itself. */
5757
void listEmpty(list *list) {
58+
if (!list) return;
5859
unsigned long len;
5960
listNode *current, *next;
6061

@@ -74,7 +75,6 @@ void listEmpty(list *list) {
7475
*
7576
* This function can't fail. */
7677
void listRelease(list *list) {
77-
if (!list) return;
7878
listEmpty(list);
7979
zfree(list);
8080
}

src/bitops.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,7 @@ void bitfieldGeneric(client *c, int flags) {
12531253
}
12541254
}
12551255

1256+
initDeferredReplyBuffer(c);
12561257
addReplyArrayLen(c, numops);
12571258

12581259
/* Actually process the operations. */
@@ -1364,6 +1365,7 @@ void bitfieldGeneric(client *c, int flags) {
13641365
notifyKeyspaceEvent(NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
13651366
server.dirty += changes;
13661367
}
1368+
commitDeferredReplyBuffer(c, 1);
13671369
zfree(ops);
13681370
}
13691371

src/expire.c

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,9 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
682682
return;
683683
} else {
684684
obj = setExpire(c, c->db, key, when);
685+
signalModifiedKey(c, c->db, key);
686+
notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
687+
server.dirty++;
685688
addReply(c, shared.cone);
686689
/* Propagate as PEXPIREAT millisecond-timestamp
687690
* Only rewrite the command arg if not already PEXPIREAT */
@@ -695,10 +698,6 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
695698
rewriteClientCommandArgument(c, 2, when_obj);
696699
decrRefCount(when_obj);
697700
}
698-
699-
signalModifiedKey(c, c->db, key);
700-
notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
701-
server.dirty++;
702701
return;
703702
}
704703
}

src/module.c

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ typedef struct ValkeyModuleCtx ValkeyModuleCtx;
198198
#define VALKEYMODULE_CTX_NEW_CLIENT (1 << 7) /* Free client object when the \
199199
context is destroyed */
200200
#define VALKEYMODULE_CTX_CHANNELS_POS_REQUEST (1 << 8)
201-
#define VALKEYMODULE_CTX_COMMAND (1 << 9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */
202-
201+
#define VALKEYMODULE_CTX_COMMAND (1 << 9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */
202+
#define VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION (1 << 10) /* Context created a keyspace notification event */
203203

204204
/* This represents a key opened with VM_OpenKey(). */
205205
struct ValkeyModuleKey {
@@ -7795,6 +7795,8 @@ void unblockClientFromModule(client *c) {
77957795
* in that case the privdata argument is disregarded, because we pass the
77967796
* reply callback the privdata that is set here while blocking.
77977797
*
7798+
* For details on return values and error codes, see the comment block for
7799+
* VM_BlockClient.
77987800
*/
77997801
ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
78007802
ValkeyModuleCmdFunc reply_callback,
@@ -7807,8 +7809,27 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
78077809
void *privdata,
78087810
int flags) {
78097811
client *c = ctx->client;
7812+
if (c->flag.blocked || getClientType(c) != CLIENT_TYPE_NORMAL || c->flag.deny_blocking) {
7813+
/* Early return if duplicate block attempt or client is not normal or
7814+
* client is set to deny blocking. */
7815+
errno = ENOTSUP;
7816+
return NULL;
7817+
}
7818+
7819+
if (ctx->flags & (VALKEYMODULE_CTX_TEMP_CLIENT | VALKEYMODULE_CTX_NEW_CLIENT)) {
7820+
/* Temporary clients can't be blocked */
7821+
errno = EINVAL;
7822+
return NULL;
7823+
}
7824+
int is_keyspace_notification = ctx->flags & (VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION);
78107825
int islua = scriptIsRunning();
78117826
int ismulti = server.in_exec;
7827+
if ((islua || ismulti) && is_keyspace_notification) {
7828+
/* Avoid blocking within transactions when context initiated by
7829+
* keyspace notification. */
7830+
errno = EINVAL;
7831+
return NULL;
7832+
}
78127833
initClientBlockingState(c);
78137834

78147835
c->bstate->module_blocked_handle = zmalloc(sizeof(ValkeyModuleBlockedClient));
@@ -7864,6 +7885,11 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
78647885
c->bstate->timeout = timeout;
78657886
blockClient(c, BLOCKED_MODULE);
78667887
}
7888+
/* Defer response until after being unblocked for a context originated from
7889+
* keyspace notification events */
7890+
if (is_keyspace_notification) {
7891+
initDeferredReplyBuffer(c);
7892+
}
78677893
}
78687894
return bc;
78697895
}
@@ -8091,14 +8117,27 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
80918117
* free_privdata: called in order to free the private data that is passed
80928118
* by ValkeyModule_UnblockClient() call.
80938119
*
8094-
* Note: ValkeyModule_UnblockClient should be called for every blocked client,
8095-
* even if client was killed, timed-out or disconnected. Failing to do so
8096-
* will result in memory leaks.
8120+
* Notes:
8121+
* 1. ValkeyModule_UnblockClient should be called for every blocked client,
8122+
* even if client was killed, timed-out or disconnected. Failing to do so
8123+
* will result in memory leaks.
8124+
* 2. Attempting to block the client on keyspace event notification in versions
8125+
* prior to 8.1.1 leads to a crash.
80978126
*
80988127
* There are some cases where ValkeyModule_BlockClient() cannot be used:
80998128
*
81008129
* 1. If the client is a Lua script.
81018130
* 2. If the client is executing a MULTI block.
8131+
* 3. If the client is a temporary module client.
8132+
* 4. If the client is already blocked.
8133+
*
8134+
* In cases 1 and 2, a call to ValkeyModule_BlockClient() will **not** block the
8135+
* client, but instead produce a specific error reply. Note that if the
8136+
* BlockClient call originated from within a keyspace notification, no error
8137+
* reply is generated but nullptr is returned while the errno is set to EINVAL.
8138+
*
8139+
* In case 3 and 4, a call to ValkeyModule_BlockClient() are no-op, returning
8140+
* nullptr. errno is set to EINVAL for case 3 while ENOTSUP for case 4.
81028141
*
81038142
* In these cases, a call to ValkeyModule_BlockClient() will **not** block the
81048143
* client, but instead produce a specific error reply.
@@ -8290,6 +8329,12 @@ int moduleClientIsBlockedOnKeys(client *c) {
82908329
* needs to be passed to the client, included but not limited some slow
82918330
* to compute reply or some reply obtained via networking.
82928331
*
8332+
* Returns VALKEYMODULE_OK on success. On failure, VALKEYMODULE_ERR is returned
8333+
* and `errno` is set as follows:
8334+
*
8335+
* - EINVAL if bc is NULL.
8336+
* - ENOTSUP if bc contains `blocked on keys` but its timeout callback is NULL.
8337+
*
82938338
* Note 1: this function can be called from threads spawned by the module.
82948339
*
82958340
* Note 2: when we unblock a client that is blocked for keys using the API
@@ -8300,10 +8345,17 @@ int moduleClientIsBlockedOnKeys(client *c) {
83008345
* ValkeyModule_BlockClientOnKeys() is accessible from the timeout
83018346
* callback via VM_GetBlockedClientPrivateData). */
83028347
int VM_UnblockClient(ValkeyModuleBlockedClient *bc, void *privdata) {
8348+
if (!bc) {
8349+
errno = EINVAL;
8350+
return VALKEYMODULE_ERR;
8351+
}
83038352
if (bc->blocked_on_keys) {
83048353
/* In theory the user should always pass the timeout handler as an
83058354
* argument, but better to be safe than sorry. */
8306-
if (bc->timeout_callback == NULL) return VALKEYMODULE_ERR;
8355+
if (bc->timeout_callback == NULL) {
8356+
errno = ENOTSUP;
8357+
return VALKEYMODULE_ERR;
8358+
}
83078359
if (bc->unblocked) return VALKEYMODULE_OK;
83088360
if (bc->client) moduleBlockedClientTimedOut(bc->client, 1);
83098361
}
@@ -8392,11 +8444,17 @@ void moduleHandleBlockedClients(void) {
83928444
moduleInvokeFreePrivDataCallback(c, bc);
83938445
}
83948446

8395-
/* It is possible that this blocked client object accumulated
8396-
* replies to send to the client in a thread safe context.
8397-
* We need to glue such replies to the client output buffer and
8398-
* free the temporary client we just used for the replies. */
8399-
if (c) AddReplyFromClient(c, bc->reply_client);
8447+
if (c) {
8448+
/* Replies which were added after the client is blocked by a module
8449+
* are accumulated separately. We need to transmit those replies
8450+
* to the client. */
8451+
commitDeferredReplyBuffer(c, 0);
8452+
/* It is possible that this blocked client object accumulated
8453+
* replies to send to the client in a thread safe context.
8454+
* We need to glue such replies to the client output buffer and
8455+
* free the temporary client we just used for the replies. */
8456+
AddReplyFromClient(c, bc->reply_client);
8457+
}
84008458
moduleReleaseTempClient(bc->reply_client);
84018459
moduleReleaseTempClient(bc->thread_safe_ctx_client);
84028460

@@ -8492,9 +8550,10 @@ void moduleBlockedClientTimedOut(client *c, int from_module) {
84928550

84938551
moduleFreeContext(&ctx);
84948552

8495-
if (!from_module)
8553+
if (!from_module) {
84968554
updateStatsOnUnblock(c, bc->background_duration, 0,
84978555
((server.stat_total_error_replies != prev_error_replies) ? ERROR_COMMAND_FAILED : 0));
8556+
}
84988557

84998558
/* For timeout events, we do not want to call the disconnect callback,
85008559
* because the blocked client will be automatically disconnected in
@@ -8840,12 +8899,16 @@ int VM_NotifyKeyspaceEvent(ValkeyModuleCtx *ctx, int type, const char *event, Va
88408899
return VALKEYMODULE_OK;
88418900
}
88428901

8902+
unsigned long moduleNotifyKeyspaceSubscribersCnt(void) {
8903+
return listLength(moduleKeyspaceSubscribers);
8904+
}
8905+
88438906
/* Dispatcher for keyspace notifications to module subscriber functions.
88448907
* This gets called only if at least one module requested to be notified on
88458908
* keyspace notifications */
88468909
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
88478910
/* Don't do anything if there aren't any subscribers */
8848-
if (listLength(moduleKeyspaceSubscribers) == 0) return;
8911+
if (moduleNotifyKeyspaceSubscribersCnt() == 0) return;
88498912

88508913
/* Ugly hack to handle modules which use write commands from within
88518914
* notify_callback, which they should NOT do!
@@ -8880,8 +8943,14 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
88808943
if ((sub->event_mask & type) &&
88818944
(sub->active == 0 || (sub->module->options & VALKEYMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) {
88828945
ValkeyModuleCtx ctx;
8883-
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
8946+
if (server.executing_client == NULL) {
8947+
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
8948+
} else {
8949+
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_NONE);
8950+
ctx.client = server.executing_client;
8951+
}
88848952
selectDb(ctx.client, dbid);
8953+
ctx.flags |= VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION;
88858954

88868955
/* mark the handler as active to avoid reentrant loops.
88878956
* If the subscriber performs an action triggering itself,

src/module.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ void moduleAcquireGIL(void);
203203
int moduleTryAcquireGIL(void);
204204
void moduleReleaseGIL(void);
205205
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
206+
unsigned long moduleNotifyKeyspaceSubscribersCnt(void);
206207
void firePostExecutionUnitJobs(void);
207208
void moduleCallCommandFilters(client *c);
208209
void modulePostExecutionUnitOperations(void);

0 commit comments

Comments
 (0)