diff --git a/src/networking.c b/src/networking.c index 5f57050971..cb16234add 100644 --- a/src/networking.c +++ b/src/networking.c @@ -43,6 +43,7 @@ #include #include #include +#include /* This struct is used to encapsulate filtering criteria for operations on clients * such as identifying specific clients to kill or retrieve. Each field in the struct @@ -3280,13 +3281,14 @@ int processInputBuffer(client *c) { /* This function can be called from the main-thread or from the IO-thread. * The function allocates query-buf for the client if required and reads to it from the network. - * It will set c->nread to the bytes read from the network. */ -void readToQueryBuf(client *c) { + * It will set c->nread to the bytes read from the network. + * Returns true if the buffer was filled (more data may be available). */ +static bool readToQueryBuf(client *c) { int big_arg = 0; size_t qblen, readlen; /* If the replica RDB client is marked as closed ASAP, do not try to read from it */ - if (c->flag.close_asap) return; + if (c->flag.close_asap) return false; int is_primary = c->read_flags & READ_FLAGS_PRIMARY; @@ -3343,7 +3345,7 @@ void readToQueryBuf(client *c) { c->nread = connRead(c->conn, c->querybuf + qblen, readlen); if (c->nread <= 0) { - return; + return false; } sdsIncrLen(c->querybuf, c->nread); @@ -3360,8 +3362,10 @@ void readToQueryBuf(client *c) { c->read_flags |= READ_FLAGS_QB_LIMIT_REACHED; } } + return (size_t)c->nread == readlen; } +#define REPL_MAX_READS_PER_IO_EVENT 25 void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); /* Check if we can send the client to be handled by the IO-thread */ @@ -3369,12 +3373,19 @@ void readQueryFromClient(connection *conn) { if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return; - readToQueryBuf(c); - - if (handleReadResult(c) == C_OK) { - if (processInputBuffer(c) == C_ERR) return; - } - beforeNextClient(c); + bool repeat = false; + int iter = 0; + do { + bool full_read = readToQueryBuf(c); + if (handleReadResult(c) == C_OK) { + if (processInputBuffer(c) == C_ERR) return; + } + repeat = (c->flag.primary && + !c->flag.close_asap && + ++iter < REPL_MAX_READS_PER_IO_EVENT && + full_read); + beforeNextClient(c); + } while (repeat); } /* An "Address String" is a colon separated ip:port pair.