Skip to content

Commit 5dc6632

Browse files
authored
Only mark the client reprocessing flag when unblocked on keys (#2… (#2231)
NOTE - this is a backport of #2109 When we refactored the blocking framework we introduced the client reprocessing infrastructure. In cases the client was blocked on keys, it will attempt to reprocess the command. One challenge was to keep track of the command timeout, since we are reprocessing and do not want to re-register the client with a fresh timeout each time. The solution was to consider the client reprocessing flag when the client is blockedOnKeys: ``` if (!c->flag.reprocessing_command) { /* If the client is re-processing the command, we do not set the timeout * because we need to retain the client's original timeout. */ c->bstate->timeout = timeout; } ``` However, this introduced a new issue. There are cases where the client will consecutive blocking of different types for example: ``` CLIENT PAUSE 10000 ALL BZPOPMAX zset 1 ``` would have the client blocked on the zset endlessly if nothing will be written to it. **Credits to @uriyage for locating this with his fuzzer testing** The suggested solution is to only flag the client when it is specifically unblocked on keys. Signed-off-by: Ran Shidlansik <[email protected]>
1 parent a9b02f1 commit 5dc6632

File tree

4 files changed

+28
-12
lines changed

4 files changed

+28
-12
lines changed

src/blocked.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ static void unblockClientOnKey(client *c, robj *key) {
657657
* we need to re process the command again */
658658
if (c->flags & CLIENT_PENDING_COMMAND) {
659659
c->flags &= ~CLIENT_PENDING_COMMAND;
660+
c->flags |= CLIENT_REPROCESSING_COMMAND;
660661
/* We want the command processing and the unblock handler (see RM_Call 'K' option)
661662
* to run atomically, this is why we must enter the execution unit here before
662663
* running the command, and exit the execution unit after calling the unblock handler (if exists).
@@ -675,6 +676,8 @@ static void unblockClientOnKey(client *c, robj *key) {
675676
}
676677
exitExecutionUnit();
677678
afterCommand(c);
679+
/* Clear the reprocessing command flag after the proc is executed. */
680+
c->flags &= ~CLIENT_REPROCESSING_COMMAND;
678681
server.current_client = old_client;
679682
}
680683
}

src/server.c

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3486,7 +3486,7 @@ void call(client *c, int flags) {
34863486
* and a client which is reprocessing command again (after being unblocked).
34873487
* Blocked clients can be blocked in different places and not always it means the call() function has been
34883488
* called. For example this is required for avoiding double logging to monitors.*/
3489-
int reprocessing_command = flags & CMD_CALL_REPROCESSING;
3489+
int reprocessing_command = (c->flags & CLIENT_REPROCESSING_COMMAND) ? 1 : 0;
34903490

34913491
/* Initialization: clear the flags that must be set by the command on
34923492
* demand, and initialize the array for additional commands propagation. */
@@ -3513,20 +3513,12 @@ void call(client *c, int flags) {
35133513
* re-processing and unblock the client.*/
35143514
c->flags |= CLIENT_EXECUTING_COMMAND;
35153515

3516-
/* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
3517-
* processing of the command proc, the client is aware that it is being
3518-
* re-processed. */
3519-
if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;
3520-
35213516
monotime monotonic_start = 0;
35223517
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
35233518
monotonic_start = getMonotonicUs();
35243519

35253520
c->cmd->proc(c);
35263521

3527-
/* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
3528-
if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;
3529-
35303522
exitExecutionUnit();
35313523

35323524
/* In case client is blocked after trying to execute the command,
@@ -4166,7 +4158,6 @@ int processCommand(client *c) {
41664158
addReply(c,shared.queued);
41674159
} else {
41684160
int flags = CMD_CALL_FULL;
4169-
if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING;
41704161
call(c,flags);
41714162
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand())
41724163
handleClientsBlockedOnKeys();

src/server.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,7 @@ typedef enum {
593593
#define CMD_CALL_NONE 0
594594
#define CMD_CALL_PROPAGATE_AOF (1<<0)
595595
#define CMD_CALL_PROPAGATE_REPL (1<<1)
596-
#define CMD_CALL_REPROCESSING (1<<2)
597-
#define CMD_CALL_FROM_MODULE (1<<3) /* From RM_Call */
596+
#define CMD_CALL_FROM_MODULE (1<<2) /* From RM_Call */
598597
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
599598
#define CMD_CALL_FULL (CMD_CALL_PROPAGATE)
600599

tests/unit/type/list.tcl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2447,4 +2447,27 @@ foreach {pop} {BLPOP BLMPOP_RIGHT} {
24472447
close_replication_stream $repl
24482448
} {} {needs:repl}
24492449

2450+
test "Blocking timeout following PAUSE should honor the timeout" {
2451+
# cleanup first
2452+
r del mylist
2453+
2454+
# create a test client
2455+
set rd [redis_deferring_client]
2456+
2457+
# first PAUSE all writes for a very long time
2458+
r client pause 10000000000000 write
2459+
2460+
# block a client on the list
2461+
$rd BLPOP mylist 1
2462+
wait_for_blocked_clients_count 1
2463+
2464+
# now unpause the writes
2465+
r client unpause
2466+
2467+
# client should time-out
2468+
wait_for_blocked_clients_count 0
2469+
2470+
$rd close
2471+
}
2472+
24502473
} ;# stop servers

0 commit comments

Comments
 (0)