Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ int trySendReadToIOThreads(client *c) {
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
if (c->flag.lua_debug) return C_ERR;
/* For simplicity let the main-thread handle the blocked clients */
Expand All @@ -345,6 +345,7 @@ int trySendReadToIOThreads(client *c) {
c->cur_tid = tid;
c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE;
c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0;
c->read_flags |= c->flag.primary ? READ_FLAGS_PRIMARY : 0;

c->io_read_state = CLIENT_PENDING_IO;
connSetPostponeUpdateState(c->conn, 1);
Expand All @@ -364,7 +365,7 @@ int trySendWriteToIOThreads(client *c) {
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;

Expand Down
4 changes: 3 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4954,7 +4954,9 @@ void ioThreadReadQueryFromClient(void *data) {
}

done:
trimClientQueryBuffer(c);
if (!(c->read_flags & READ_FLAGS_PRIMARY)) {
trimClientQueryBuffer(c);
}
atomic_thread_fence(memory_order_release);
c->io_read_state = CLIENT_COMPLETED_IO;
}
Expand Down
9 changes: 9 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4136,6 +4136,8 @@ void replicationCachePrimary(client *c) {
serverAssert(server.primary != NULL && server.cached_primary == NULL);
serverLog(LL_NOTICE, "Caching the disconnected primary state.");

/* Wait for IO operations to be done before proceeding */
waitForClientIO(c);
/* Unlink the client from the server structures. */
unlinkClient(c);

Expand All @@ -4154,6 +4156,13 @@ void replicationCachePrimary(client *c) {
c->bufpos = 0;
resetClient(c);

/* Reset the primary IO state. */
c->nwritten = 0;
c->nread = 0;
c->io_read_state = c->io_write_state = CLIENT_IDLE;
c->io_parsed_cmd = NULL;
c->flag.pending_command = 0;

/* Save the primary. Server.primary will be set to null later by
* replicationHandlePrimaryDisconnection(). */
server.cached_primary = server.primary;
Expand Down
Loading