-
Notifications
You must be signed in to change notification settings - Fork 955
Slot migration improvement #245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
c11b65d
03196a4
9b95b1e
f4e84d0
fac9a06
2d004b3
4cef2b4
10944e6
907de8c
58108ea
844c3a9
1aa0585
f910041
dc97cc3
ecfd3b0
aa54049
abcf439
f7e7339
b6799bd
76f5eff
2446994
5bd8e05
758ecd8
a51b146
f36330a
4f55dc2
4ddf884
e42cb94
ad5c7f8
11fc143
a9fe543
8afc34c
6f459da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,3 +41,4 @@ Makefile.dep | |
| compile_commands.json | ||
| redis.code-workspace | ||
| .cache | ||
| .cscope.* | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -186,7 +186,8 @@ void unblockClient(client *c, int queue_for_reprocessing) { | |
| c->bstate.btype == BLOCKED_ZSET || | ||
| c->bstate.btype == BLOCKED_STREAM) { | ||
| unblockClientWaitingData(c); | ||
| } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) { | ||
| } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF || | ||
| c->bstate.btype == BLOCKED_WAIT_PREREPL) { | ||
| unblockClientWaitingReplicas(c); | ||
| } else if (c->bstate.btype == BLOCKED_MODULE) { | ||
| if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); | ||
|
|
@@ -202,7 +203,8 @@ void unblockClient(client *c, int queue_for_reprocessing) { | |
|
|
||
| /* Reset the client for a new query, unless the client has pending command to process | ||
| * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ | ||
| if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { | ||
| if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN && | ||
| c->bstate.btype != BLOCKED_WAIT_PREREPL) { | ||
| freeClientOriginalArgv(c); | ||
| /* Clients that are not blocked on keys are not reprocessed so we must | ||
| * call reqresAppendResponse here (for clients blocked on key, | ||
|
|
@@ -240,6 +242,8 @@ void replyToBlockedClientTimedOut(client *c) { | |
| addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset)); | ||
| } else if (c->bstate.btype == BLOCKED_MODULE) { | ||
| moduleBlockedClientTimedOut(c, 0); | ||
| } else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) { | ||
| addReplyErrorObject(c, shared.noreplicaserr); | ||
| } else { | ||
| serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); | ||
| } | ||
|
|
@@ -597,23 +601,30 @@ static void handleClientsBlockedOnKey(readyList *rl) { | |
| } | ||
| } | ||
|
|
||
| /* block a client due to wait command */ | ||
| void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { | ||
| /* block a client for replica acknowledgement */ | ||
| void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) { | ||
| c->bstate.timeout = timeout; | ||
| c->bstate.reploffset = offset; | ||
| c->bstate.numreplicas = numreplicas; | ||
| listAddNodeHead(server.clients_waiting_acks,c); | ||
| blockClient(c,BLOCKED_WAIT); | ||
| c->bstate.numlocal = numlocal; | ||
| listAddNodeHead(server.clients_waiting_acks, c); | ||
| blockClient(c, btype); | ||
| } | ||
|
|
||
| /* block a client due to pre-replication */ | ||
| void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { | ||
| blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0); | ||
| c->flags |= CLIENT_PENDING_COMMAND; | ||
| } | ||
|
|
||
| /* block a client due to wait command */ | ||
| void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { | ||
| blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need two separate blocked states, can we instead just use the CLIENT_PENDING_COMMAND flags?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CLIENT_PENDING_COMMAND is not a blocked state. that said, I do feel that we have a proliferation of blocked/wait states, like
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. I'm saying we already have the notion that a client is blocked with a pending command, that can get re-executed once the blocking is done. (That is how the normal command blocking works). The blocking mechanic is functionally the same as the other wait (wait for a replack from k replicas then unblock).
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. there is definitely room for improvement. filed #427 to track this improvement. |
||
| } | ||
|
|
||
| /* block a client due to waitaof command */ | ||
| void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) { | ||
| c->bstate.timeout = timeout; | ||
| c->bstate.reploffset = offset; | ||
| c->bstate.numreplicas = numreplicas; | ||
| c->bstate.numlocal = numlocal; | ||
| listAddNodeHead(server.clients_waiting_acks,c); | ||
| blockClient(c,BLOCKED_WAITAOF); | ||
| blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal); | ||
| } | ||
|
|
||
| /* Postpone client from executing a command. For example the server might be busy | ||
|
|
||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -402,6 +402,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; | |
| #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ | ||
| #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ | ||
| #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ | ||
| #define CLIENT_PREREPL_DONE (1ULL<<51) /* Indicate that pre-replication has been done on the client */ | ||
|
|
||
| /* Client block type (btype field in client structure) | ||
| * if CLIENT_BLOCKED flag is set. */ | ||
|
|
@@ -415,6 +416,7 @@ typedef enum blocking_type { | |
| BLOCKED_ZSET, /* BZPOP et al. */ | ||
| BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ | ||
| BLOCKED_SHUTDOWN, /* SHUTDOWN. */ | ||
| BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found the words pre-replication is shown in several places, but I do not understand what it is (sorry missed your previous pr), Can you add a more detail explanations here so everyone can know its meaning after some time later, Thanks
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will address in the follow up PR |
||
| BLOCKED_NUM, /* Number of blocked states. */ | ||
| BLOCKED_END /* End of enumeration */ | ||
| } blocking_type; | ||
|
|
@@ -1331,7 +1333,7 @@ struct sharedObjectsStruct { | |
| *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, | ||
| *lastid, *ping, *setid, *keepttl, *load, *createconsumer, | ||
| *getack, *special_asterick, *special_equals, *default_username, *redacted, | ||
| *ssubscribebulk,*sunsubscribebulk, *smessagebulk, | ||
| *ssubscribebulk,*sunsubscribebulk, *smessagebulk, *cluster, *setslot, *importing, *migrating, | ||
| *select[PROTO_SHARED_SELECT_CMDS], | ||
| *integers[OBJ_SHARED_INTEGERS], | ||
| *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */ | ||
|
|
@@ -2817,7 +2819,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout); | |
| ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); | ||
|
|
||
| /* Replication */ | ||
| void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); | ||
| void replicationFeedSlaves(int dictid, robj **argv, int argc); | ||
| void replicationFeedStreamFromMasterStream(char *buf, size_t buflen); | ||
| void resetReplicationBuffer(void); | ||
| void feedReplicationBuffer(char *buf, size_t len); | ||
|
|
@@ -3430,7 +3432,9 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo | |
| void blockClientShutdown(client *c); | ||
| void blockPostponeClient(client *c); | ||
| void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); | ||
| void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas); | ||
| void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas); | ||
| void replicationRequestAckFromSlaves(void); | ||
| void signalDeletedKeyAsReady(serverDb *db, robj *key, int type); | ||
| void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); | ||
| void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.