Skip to content
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ Makefile.dep
compile_commands.json
redis.code-workspace
.cache
.cscope*
.swp
50 changes: 18 additions & 32 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ void queueClientForReprocessing(client *c) {
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
} else if (c->bstate.btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -200,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand All @@ -211,11 +209,11 @@ void unblockClient(client *c, int queue_for_reprocessing) {
resetClient(c);
}

/* We count blocked client stats on regular clients and not on module clients */
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--;
server.blocked_clients_by_type[c->bstate.btype]--;
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
if (!(c->flags & CLIENT_MODULE))
server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[c->bstate.btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
Expand All @@ -231,15 +229,19 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyNullArray(c);
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_WAITAOF) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
if (c->cmd == shared.wait_cmd) {
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
} else if (c->cmd == shared.waitaof_cmd) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->cmd == shared.setslot_cmd) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name);
}
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -585,29 +587,13 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}

/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
blockClient(c, BLOCKED_WAIT);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
6 changes: 4 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6016,7 +6016,7 @@ void clusterCommandSetSlot(client *c) {
* This ensures that all replicas have the latest topology information, enabling
* a reliable slot ownership transfer even if the primary node went down during
* the process. */
if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_PREREPL_DONE) == 0) {
if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_REPLICATION_DONE) == 0) {
forceCommandPropagation(c, PROPAGATE_REPL);
/* We are a primary and this is the first time we see this `SETSLOT`
* command. Force-replicate the command to all of our replicas
Expand All @@ -6026,7 +6026,9 @@ void clusterCommandSetSlot(client *c) {
* 2. The repl offset target is set to the master's current repl offset + 1.
* There is no concern of partial replication because replicas always
* ack the repl offset at the command boundary. */
blockForPreReplication(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves);
blockClientForReplicaAck(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves, 0);
/* Mark client as pending command for execution after replication to replicas. */
c->flags |= CLIENT_PENDING_COMMAND;
replicationRequestAckFromSlaves();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE);
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_REPLICATION_DONE);

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down
11 changes: 5 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3457,7 +3457,7 @@ void waitCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
blockForReplication(c, timeout, offset, numreplicas);
blockClientForReplicaAck(c, timeout, offset, numreplicas, 0);

/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
Expand Down Expand Up @@ -3497,7 +3497,7 @@ void waitaofCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
blockForAofFsync(c, timeout, c->woff, numlocal, numreplicas);
blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal);

/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
Expand Down Expand Up @@ -3532,8 +3532,7 @@ void processClientsWaitingReplicas(void) {
int numreplicas = 0;

client *c = ln->value;
int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL;
int is_wait_aof = c->cmd == shared.waitaof_cmd;

if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
Expand Down Expand Up @@ -3580,8 +3579,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (is_wait_prerepl) {
c->flags |= CLIENT_PREREPL_DONE;
} else if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags |= CLIENT_REPLICATION_DONE;
} else {
addReplyLongLong(c, numreplicas);
}
Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,11 @@ void createSharedObjects(void) {
* string in string comparisons for the ZRANGEBYLEX command. */
shared.minstring = sdsnew("minstring");
shared.maxstring = sdsnew("maxstring");

/* Shared command object */
shared.wait_cmd = lookupCommandByCString("WAIT");
shared.waitaof_cmd = lookupCommandByCString("WAITAOF");
shared.setslot_cmd = lookupCommandByCString("CLUSTER|SETSLOT");
}

void initServerClientMemUsageBuckets(void) {
Expand Down
29 changes: 13 additions & 16 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,21 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL << 48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL << 49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL << 50) /* The client is re-processing the command. */
#define CLIENT_PREREPL_DONE (1ULL << 51) /* Indicate that pre-replication has been done on the client */
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
typedef enum blocking_type {
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
BLOCKED_WAITAOF, /* WAITAOF for AOF file fsync. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;

/* Client request types */
Expand Down Expand Up @@ -1377,6 +1375,7 @@ struct sharedObjectsStruct {
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
*sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */
sds minstring, maxstring;
struct serverCommand *wait_cmd, *waitaof_cmd, *setslot_cmd;
};

/* ZSETs use a specialized version of Skiplists */
Expand Down Expand Up @@ -3498,9 +3497,7 @@ void signalKeyAsReady(serverDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
Expand Down