Skip to content
Merged
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <math.h>
#include <ctype.h>
#include <stdatomic.h>
#include <stdbool.h>

/* This struct is used to encapsulate filtering criteria for operations on clients
* such as identifying specific clients to kill or retrieve. Each field in the struct
Expand Down Expand Up @@ -3280,13 +3281,17 @@ int processInputBuffer(client *c) {

/* This function can be called from the main-thread or from the IO-thread.
* The function allocates query-buf for the client if required and reads to it from the network.
* It will set c->nread to the bytes read from the network. */
void readToQueryBuf(client *c) {
* It will set c->nread to the bytes read from the network.
* Returns non-zero if the buffer was filled (more data may be available).
*/

static bool readToQueryBuf(client *c) {
int big_arg = 0;
size_t qblen, readlen;
int ret = 0;

/* If the replica RDB client is marked as closed ASAP, do not try to read from it */
if (c->flag.close_asap) return;
if (c->flag.close_asap) return ret;

int is_primary = c->read_flags & READ_FLAGS_PRIMARY;

Expand Down Expand Up @@ -3343,9 +3348,9 @@ void readToQueryBuf(client *c) {

c->nread = connRead(c->conn, c->querybuf + qblen, readlen);
if (c->nread <= 0) {
return;
return ret;
}

ret = (size_t)c->nread == readlen;
sdsIncrLen(c->querybuf, c->nread);
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
Expand All @@ -3360,21 +3365,30 @@ void readToQueryBuf(client *c) {
c->read_flags |= READ_FLAGS_QB_LIMIT_REACHED;
}
}
return ret;
}

#define REPL_MAX_READS_PER_IO_EVENT 25
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
/* Check if we can send the client to be handled by the IO-thread */
if (postponeClientRead(c)) return;

if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return;

readToQueryBuf(c);

if (handleReadResult(c) == C_OK) {
if (processInputBuffer(c) == C_ERR) return;
}
beforeNextClient(c);
bool repeat = false;
int iter = 0;
do {
bool full_read = readToQueryBuf(c);
if (handleReadResult(c) == C_OK) {
if (processInputBuffer(c) == C_ERR) return;
}
repeat = (c->flag.primary &&
!c->flag.close_asap &&
++iter < REPL_MAX_READS_PER_IO_EVENT &&
full_read);
beforeNextClient(c);
} while (repeat);
}

/* An "Address String" is a colon separated ip:port pair.
Expand Down
Loading