Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <math.h>
#include <ctype.h>
#include <stdatomic.h>
#include <stdbool.h>

/* This struct is used to encapsulate filtering criteria for operations on clients
* such as identifying specific clients to kill or retrieve. Each field in the struct
Expand Down Expand Up @@ -3345,7 +3346,7 @@ void readToQueryBuf(client *c) {
if (c->nread <= 0) {
return;
}

c->is_qb_full_read = (size_t)c->nread == readlen;
sdsIncrLen(c->querybuf, c->nread);
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
Expand All @@ -3362,19 +3363,42 @@ void readToQueryBuf(client *c) {
}
}

#define REPL_MAX_READS_PER_IO_EVENT 25
/** Keeps replica reading from the primary if recvq has data. */
static bool shouldRepeatReadFromPrimary(client *c, int iteration) {
// If the client is not a primary replica, is closing, or flow control is disabled, no more reads.
if (!(c->flag.primary) || c->flag.close_asap) {
return 0;
}

if (iteration < REPL_MAX_READS_PER_IO_EVENT &&
c->is_qb_full_read) {
return 1;
}

return 0;
}


void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
/* Check if we can send the client to be handled by the IO-thread */
if (postponeClientRead(c)) return;

if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return;

readToQueryBuf(c);
bool shouldRepeat = false;
int iter = 0;
do {
readToQueryBuf(c);

if (handleReadResult(c) == C_OK) {
if (processInputBuffer(c) == C_ERR) return;
}
beforeNextClient(c);
if (handleReadResult(c) == C_OK) {
if (processInputBuffer(c) == C_ERR) return;
}
iter++;
shouldRepeat = shouldRepeatReadFromPrimary(c, iter);
beforeNextClient(c);
} while (shouldRepeat);
}

/* An "Address String" is a colon separated ip:port pair.
Expand Down
22 changes: 12 additions & 10 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <netinet/in.h>
#include <sys/socket.h>
#include <signal.h>
#include <stdbool.h>

#ifdef HAVE_LIBSYSTEMD
#include <systemd/sd-daemon.h>
Expand Down Expand Up @@ -1187,16 +1188,17 @@ typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
/* Input buffer and command parsing fields */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
robj **argv; /* Arguments of current command. */
int argc; /* Num of arguments of current command. */
int argv_len; /* Size of argv array (may be more than argc) */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
long long woff; /* Last write global replication offset. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
bool is_qb_full_read; /* True if the last read returned the maximum allowed bytes */
robj **argv; /* Arguments of current command. */
int argc; /* Num of arguments of current command. */
int argv_len; /* Size of argv array (may be more than argc) */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
long long woff; /* Last write global replication offset. */
/* Command execution state and command information */
struct serverCommand *cmd; /* Current command. */
struct serverCommand *lastcmd; /* Last command executed. */
Expand Down
Loading