Skip to content

Commit a188a46

Browse files
committed
Revert "Consolidate more blocked states"
This reverts commit 949d017.
1 parent 949d017 commit a188a46

File tree

6 files changed

+16
-12
lines changed

6 files changed

+16
-12
lines changed

src/blocked.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ void queueClientForReprocessing(client *c) {
181181
/* Unblock a client calling the right function depending on the kind
182182
* of operation the client is blocking for. */
183183
void unblockClient(client *c, int queue_for_reprocessing) {
184-
if (c->bstate.btype == BLOCKED_DATA) {
184+
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
185185
unblockClientWaitingData(c);
186186
} else if (c->bstate.btype == BLOCKED_WAIT) {
187187
unblockClientWaitingReplicas(c);
@@ -225,7 +225,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {
225225
* send it a reply of some kind. After this function is called,
226226
* unblockClient() will be called with the same client as argument. */
227227
void replyToBlockedClientTimedOut(client *c) {
228-
if (c->bstate.btype == BLOCKED_DATA) {
228+
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
229229
addReplyNullArray(c);
230230
updateStatsOnUnblock(c, 0, 0, 0);
231231
} else if (c->bstate.btype == BLOCKED_WAIT) {
@@ -434,10 +434,10 @@ static void unblockClientWaitingData(client *c) {
434434

435435
static blocking_type getBlockedTypeByType(int type) {
436436
switch (type) {
437-
case OBJ_LIST: return BLOCKED_DATA;
438-
case OBJ_ZSET: return BLOCKED_DATA;
439-
case OBJ_STREAM: return BLOCKED_DATA;
437+
case OBJ_LIST: return BLOCKED_LIST;
438+
case OBJ_ZSET: return BLOCKED_ZSET;
440439
case OBJ_MODULE: return BLOCKED_MODULE;
440+
case OBJ_STREAM: return BLOCKED_STREAM;
441441
default: return BLOCKED_NONE;
442442
}
443443
}
@@ -625,7 +625,8 @@ static void unblockClientOnKey(client *c, robj *key) {
625625

626626
/* Only in case of blocking API calls, we might be blocked on several keys.
627627
however we should force unblock the entire blocking keys */
628-
serverAssert(c->bstate.btype == BLOCKED_DATA);
628+
serverAssert(c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_LIST ||
629+
c->bstate.btype == BLOCKED_ZSET);
629630

630631
/* We need to unblock the client before calling processCommandAndResetClient
631632
* because it checks the CLIENT_BLOCKED flag */

src/cluster.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1213,7 +1213,8 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
12131213
* returns 1. Otherwise 0 is returned and no operation is performed. */
12141214
int clusterRedirectBlockedClientIfNeeded(client *c) {
12151215
clusterNode *myself = getMyClusterNode();
1216-
if (c->flags & CLIENT_BLOCKED && (c->bstate.btype == BLOCKED_DATA || c->bstate.btype == BLOCKED_MODULE)) {
1216+
if (c->flags & CLIENT_BLOCKED && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
1217+
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
12171218
dictEntry *de;
12181219
dictIterator *di;
12191220

src/server.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,11 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
432432
* if CLIENT_BLOCKED flag is set. */
433433
typedef enum blocking_type {
434434
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
435-
BLOCKED_DATA, /* Block on data: BLPOP & co., XREAD, BZPOP et al. */
435+
BLOCKED_LIST, /* BLPOP & co. */
436436
BLOCKED_WAIT, /* WAIT for synchronous replication. */
437437
BLOCKED_MODULE, /* Blocked by a loadable module. */
438+
BLOCKED_STREAM, /* XREAD. */
439+
BLOCKED_ZSET, /* BZPOP et al. */
438440
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
439441
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
440442
BLOCKED_NUM, /* Number of blocked states. */

src/t_list.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,7 +1219,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
12191219
}
12201220

12211221
/* If the keys do not exist we must block */
1222-
blockForKeys(c, BLOCKED_DATA, keys, numkeys, timeout, 0);
1222+
blockForKeys(c, BLOCKED_LIST, keys, numkeys, timeout, 0);
12231223
}
12241224

12251225
/* BLPOP <key> [<key> ...] <timeout> */
@@ -1243,7 +1243,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
12431243
addReplyNull(c);
12441244
} else {
12451245
/* The list is empty and the client blocks. */
1246-
blockForKeys(c, BLOCKED_DATA, c->argv + 1, 1, timeout, 0);
1246+
blockForKeys(c, BLOCKED_LIST, c->argv + 1, 1, timeout, 0);
12471247
}
12481248
} else {
12491249
/* The list exists and has elements, so

src/t_stream.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2404,7 +2404,7 @@ void xreadCommand(client *c) {
24042404
decrRefCount(argv_streamid);
24052405
}
24062406
}
2407-
blockForKeys(c, BLOCKED_DATA, c->argv + streams_arg, streams_count, timeout, xreadgroup);
2407+
blockForKeys(c, BLOCKED_STREAM, c->argv + streams_arg, streams_count, timeout, xreadgroup);
24082408
goto cleanup;
24092409
}
24102410

src/t_zset.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4045,7 +4045,7 @@ void blockingGenericZpopCommand(client *c,
40454045
}
40464046

40474047
/* If the keys do not exist we must block */
4048-
blockForKeys(c, BLOCKED_DATA, keys, numkeys, timeout, 0);
4048+
blockForKeys(c, BLOCKED_ZSET, keys, numkeys, timeout, 0);
40494049
}
40504050

40514051
// BZPOPMIN key [key ...] timeout

0 commit comments

Comments
 (0)