@@ -3281,13 +3281,17 @@ int processInputBuffer(client *c) {
32813281
32823282/* This function can be called from the main-thread or from the IO-thread.
32833283 * The function allocates query-buf for the client if required and reads to it from the network.
3284- * It will set c->nread to the bytes read from the network. */
3285- void readToQueryBuf (client * c ) {
3284+ * It will set c->nread to the bytes read from the network.
3285+ * Returns non-zero if the buffer was filled (more data may be available).
3286+ */
3287+
3288+ static bool readToQueryBuf (client * c ) {
32863289 int big_arg = 0 ;
32873290 size_t qblen , readlen ;
3291+ int ret = 0 ;
32883292
32893293 /* If the replica RDB client is marked as closed ASAP, do not try to read from it */
3290- if (c -> flag .close_asap ) return ;
3294+ if (c -> flag .close_asap ) return ret ;
32913295
32923296 int is_primary = c -> read_flags & READ_FLAGS_PRIMARY ;
32933297
@@ -3344,9 +3348,9 @@ void readToQueryBuf(client *c) {
33443348
33453349 c -> nread = connRead (c -> conn , c -> querybuf + qblen , readlen );
33463350 if (c -> nread <= 0 ) {
3347- return ;
3351+ return ret ;
33483352 }
3349- c -> is_qb_full_read = (size_t )c -> nread == readlen ;
3353+ ret = (size_t )c -> nread == readlen ;
33503354 sdsIncrLen (c -> querybuf , c -> nread );
33513355 qblen = sdslen (c -> querybuf );
33523356 if (c -> querybuf_peak < qblen ) c -> querybuf_peak = qblen ;
@@ -3361,25 +3365,10 @@ void readToQueryBuf(client *c) {
33613365 c -> read_flags |= READ_FLAGS_QB_LIMIT_REACHED ;
33623366 }
33633367 }
3368+ return ret ;
33643369}
33653370
33663371#define REPL_MAX_READS_PER_IO_EVENT 25
3367- /** Keeps replica reading from the primary if recvq has data. */
3368- static bool shouldRepeatReadFromPrimary (client * c , int iteration ) {
3369- // If the client is not a primary replica, is closing, or flow control is disabled, no more reads.
3370- if (!(c -> flag .primary ) || c -> flag .close_asap ) {
3371- return 0 ;
3372- }
3373-
3374- if (iteration < REPL_MAX_READS_PER_IO_EVENT &&
3375- c -> is_qb_full_read ) {
3376- return 1 ;
3377- }
3378-
3379- return 0 ;
3380- }
3381-
3382-
33833372void readQueryFromClient (connection * conn ) {
33843373 client * c = connGetPrivateData (conn );
33853374 /* Check if we can send the client to be handled by the IO-thread */
@@ -3390,13 +3379,14 @@ void readQueryFromClient(connection *conn) {
33903379 bool repeat = false;
33913380 int iter = 0 ;
33923381 do {
3393- readToQueryBuf (c );
3394-
3382+ bool full_read = readToQueryBuf (c );
33953383 if (handleReadResult (c ) == C_OK ) {
33963384 if (processInputBuffer (c ) == C_ERR ) return ;
33973385 }
3398- iter ++ ;
3399- repeat = shouldRepeatReadFromPrimary (c , iter );
3386+ repeat = (c -> flag .primary &&
3387+ !c -> flag .close_asap &&
3388+ ++ iter < REPL_MAX_READS_PER_IO_EVENT &&
3389+ full_read );
34003390 beforeNextClient (c );
34013391 } while (repeat );
34023392}
0 commit comments