@@ -26,6 +26,10 @@ static void setProtocolError(const char *errstr, client *c);
2626static void pauseClientsByClient (mstime_t end , int isPauseClientAll );
2727char * getClientSockname (client * c );
2828static inline int clientTypeIsSlave (client * c );
29+ static inline int _clientHasPendingRepliesSlave (client * c );
30+ static inline int _clientHasPendingRepliesNonSlave (client * c );
31+ static inline int _writeToClientNonSlave (client * c , ssize_t * nwritten );
32+ static inline int _writeToClientSlave (client * c , ssize_t * nwritten );
2933int ProcessingEventsWhileBlocked = 0 ; /* See processEventsWhileBlocked(). */
3034__thread sds thread_reusable_qb = NULL ;
3135__thread int thread_reusable_qb_used = 0 ; /* Avoid multiple clients using reusable query
@@ -261,6 +265,40 @@ void putClientInPendingWriteQueue(client *c) {
261265 }
262266}
263267
268+ static inline int _prepareClientToWrite (client * c ) {
269+ const uint64_t _flags = c -> flags ;
270+ /* If it's the Lua client we always return ok without installing any
271+ * handler since there is no socket at all. */
272+ if (unlikely (_flags & (CLIENT_SCRIPT |CLIENT_MODULE ))) return C_OK ;
273+
274+ /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
275+ if (unlikely (_flags & CLIENT_CLOSE_ASAP )) return C_ERR ;
276+
277+ /* CLIENT REPLY OFF / SKIP handling: don't send replies.
278+ * CLIENT_PUSHING handling: disables the reply silencing flags. */
279+ if (unlikely ((_flags & (CLIENT_REPLY_OFF |CLIENT_REPLY_SKIP )) &&
280+ !(_flags & CLIENT_PUSHING ))) return C_ERR ;
281+
282+ /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
283+ * is set. */
284+ if (unlikely ((_flags & CLIENT_MASTER ) &&
285+ !(_flags & CLIENT_MASTER_FORCE_REPLY ))) return C_ERR ;
286+
287+ if (unlikely (!c -> conn )) return C_ERR ; /* Fake client for AOF loading. */
288+
289+ /* Schedule the client to write the output buffers to the socket, unless
290+ * it should already be setup to do so (it has already pending data).
291+ *
292+ * If the client runs in an IO thread, we should not put the client in the
293+ * pending write queue. Instead, we will install the write handler to the
294+ * corresponding IO thread’s event loop and let it handle the reply. */
295+ if (!clientHasPendingReplies (c ) && likely (c -> running_tid == IOTHREAD_MAIN_THREAD_ID ))
296+ putClientInPendingWriteQueue (c );
297+
298+ /* Authorize the caller to queue in the output buffer of this client. */
299+ return C_OK ;
300+ }
301+
264302/* This function is called every time we are going to transmit new data
265303 * to the client. The behavior is the following:
266304 *
@@ -284,32 +322,7 @@ void putClientInPendingWriteQueue(client *c) {
284322 * data to the clients output buffers. If the function returns C_ERR no
285323 * data should be appended to the output buffers. */
286324int prepareClientToWrite (client * c ) {
287- /* If it's the Lua client we always return ok without installing any
288- * handler since there is no socket at all. */
289- if (c -> flags & (CLIENT_SCRIPT |CLIENT_MODULE )) return C_OK ;
290-
291- /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
292- if (c -> flags & CLIENT_CLOSE_ASAP ) return C_ERR ;
293-
294- /* CLIENT REPLY OFF / SKIP handling: don't send replies.
295- * CLIENT_PUSHING handling: disables the reply silencing flags. */
296- if ((c -> flags & (CLIENT_REPLY_OFF |CLIENT_REPLY_SKIP )) &&
297- !(c -> flags & CLIENT_PUSHING )) return C_ERR ;
298-
299- /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
300- * is set. */
301- if ((c -> flags & CLIENT_MASTER ) &&
302- !(c -> flags & CLIENT_MASTER_FORCE_REPLY )) return C_ERR ;
303-
304- if (!c -> conn ) return C_ERR ; /* Fake client for AOF loading. */
305-
306- /* Schedule the client to write the output buffers to the socket, unless
307- * it should already be setup to do so (it has already pending data). */
308- if (!clientHasPendingReplies (c ) && likely (c -> running_tid == IOTHREAD_MAIN_THREAD_ID ))
309- putClientInPendingWriteQueue (c );
310-
311- /* Authorize the caller to queue in the output buffer of this client. */
312- return C_OK ;
325+ return _prepareClientToWrite (c );
313326}
314327
315328/* -----------------------------------------------------------------------------
@@ -419,7 +432,7 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
419432
420433/* Add the object 'obj' string representation to the client output buffer. */
421434void addReply (client * c , robj * obj ) {
422- if (prepareClientToWrite (c ) != C_OK ) return ;
435+ if (_prepareClientToWrite (c ) != C_OK ) return ;
423436
424437 if (sdsEncodedObject (obj )) {
425438 _addReplyToBufferOrList (c ,obj -> ptr ,sdslen (obj -> ptr ));
@@ -438,7 +451,7 @@ void addReply(client *c, robj *obj) {
438451/* Add the SDS 's' string to the client output buffer, as a side effect
439452 * the SDS string is freed. */
440453void addReplySds (client * c , sds s ) {
441- if (prepareClientToWrite (c ) != C_OK ) {
454+ if (_prepareClientToWrite (c ) != C_OK ) {
442455 /* The caller expects the sds to be free'd. */
443456 sdsfree (s );
444457 return ;
@@ -456,7 +469,7 @@ void addReplySds(client *c, sds s) {
456469 * _addReplyProtoToList() if we fail to extend the existing tail object
457470 * in the list of objects. */
458471void addReplyProto (client * c , const char * s , size_t len ) {
459- if (prepareClientToWrite (c ) != C_OK ) return ;
472+ if (_prepareClientToWrite (c ) != C_OK ) return ;
460473 _addReplyToBufferOrList (c ,s ,len );
461474}
462475
@@ -720,7 +733,7 @@ void *addReplyDeferredLen(client *c) {
720733 /* Note that we install the write event here even if the object is not
721734 * ready to be sent, since we are sure that before returning to the
722735 * event loop setDeferredAggregateLen() will be called. */
723- if (prepareClientToWrite (c ) != C_OK ) return NULL ;
736+ if (_prepareClientToWrite (c ) != C_OK ) return NULL ;
724737
725738 /* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the
726739 * replication link that caused a reply to be generated we'll simply disconnect it.
@@ -985,7 +998,7 @@ void addReplyLongLong(client *c, long long ll) {
985998 else if (ll == 1 )
986999 addReply (c , shared .cone );
9871000 else {
988- if (prepareClientToWrite (c ) != C_OK ) return ;
1001+ if (_prepareClientToWrite (c ) != C_OK ) return ;
9891002 _addReplyLongLongWithPrefix (c , ll , ':' );
9901003 }
9911004}
@@ -998,13 +1011,13 @@ void addReplyLongLongFromStr(client *c, robj *str) {
9981011
9991012void addReplyAggregateLen (client * c , long length , int prefix ) {
10001013 serverAssert (length >= 0 );
1001- if (prepareClientToWrite (c ) != C_OK ) return ;
1014+ if (_prepareClientToWrite (c ) != C_OK ) return ;
10021015 _addReplyLongLongWithPrefix (c , length , prefix );
10031016}
10041017
10051018void addReplyArrayLen (client * c , long length ) {
10061019 serverAssert (length >= 0 );
1007- if (prepareClientToWrite (c ) != C_OK ) return ;
1020+ if (_prepareClientToWrite (c ) != C_OK ) return ;
10081021 _addReplyLongLongMBulk (c , length );
10091022}
10101023
@@ -1061,13 +1074,13 @@ void addReplyNullArray(client *c) {
10611074/* Create the length prefix of a bulk reply, example: $2234 */
10621075void addReplyBulkLen (client * c , robj * obj ) {
10631076 size_t len = stringObjectLen (obj );
1064- if (prepareClientToWrite (c ) != C_OK ) return ;
1077+ if (_prepareClientToWrite (c ) != C_OK ) return ;
10651078 _addReplyLongLongBulk (c , len );
10661079}
10671080
10681081/* Add a Redis Object as a bulk reply */
10691082void addReplyBulk (client * c , robj * obj ) {
1070- if (prepareClientToWrite (c ) != C_OK ) return ;
1083+ if (_prepareClientToWrite (c ) != C_OK ) return ;
10711084
10721085 if (sdsEncodedObject (obj )) {
10731086 const size_t len = sdslen (obj -> ptr );
@@ -1091,15 +1104,15 @@ void addReplyBulk(client *c, robj *obj) {
10911104
10921105/* Add a C buffer as bulk reply */
10931106void addReplyBulkCBuffer (client * c , const void * p , size_t len ) {
1094- if (prepareClientToWrite (c ) != C_OK ) return ;
1107+ if (_prepareClientToWrite (c ) != C_OK ) return ;
10951108 _addReplyLongLongBulk (c , len );
10961109 _addReplyToBufferOrList (c , p , len );
10971110 _addReplyToBufferOrList (c , "\r\n" , 2 );
10981111}
10991112
11001113/* Add sds to reply (takes ownership of sds and frees it) */
11011114void addReplyBulkSds (client * c , sds s ) {
1102- if (prepareClientToWrite (c ) != C_OK ) {
1115+ if (_prepareClientToWrite (c ) != C_OK ) {
11031116 sdsfree (s );
11041117 return ;
11051118 }
@@ -1235,9 +1248,9 @@ void AddReplyFromClient(client *dst, client *src) {
12351248 /* First add the static buffer (either into the static buffer or reply list) */
12361249 addReplyProto (dst ,src -> buf , src -> bufpos );
12371250
1238- /* We need to check with prepareClientToWrite again (after addReplyProto)
1251+ /* We need to check with _prepareClientToWrite again (after addReplyProto)
12391252 * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
1240- if (prepareClientToWrite (dst ) != C_OK )
1253+ if (_prepareClientToWrite (dst ) != C_OK )
12411254 return ;
12421255
12431256 /* We're bypassing _addReplyProtoToList, so we need to add the pre/post
@@ -1284,26 +1297,32 @@ void copyReplicaOutputBuffer(client *dst, client *src) {
12841297 ((replBufBlock * )listNodeValue (dst -> ref_repl_buf_node ))-> refcount ++ ;
12851298}
12861299
1300+ static inline int _clientHasPendingRepliesNonSlave (client * c ) {
1301+ return c -> bufpos || listLength (c -> reply );
1302+ }
1303+
1304+ static inline int _clientHasPendingRepliesSlave (client * c ) {
1305+ /* Replicas use global shared replication buffer instead of
1306+ * private output buffer. */
1307+ serverAssert (c -> bufpos == 0 && listLength (c -> reply ) == 0 );
1308+ if (c -> ref_repl_buf_node == NULL ) return 0 ;
1309+
1310+ /* If the last replication buffer block content is totally sent,
1311+ * we have nothing to send. */
1312+ listNode * ln = listLast (server .repl_buffer_blocks );
1313+ replBufBlock * tail = listNodeValue (ln );
1314+ if (ln == c -> ref_repl_buf_node &&
1315+ c -> ref_block_pos == tail -> used ) return 0 ;
1316+ return 1 ;
1317+ }
1318+
12871319/* Return true if the specified client has pending reply buffers to write to
12881320 * the socket. */
12891321int clientHasPendingReplies (client * c ) {
12901322 if (unlikely (clientTypeIsSlave (c ))) {
1291- /* Replicas use global shared replication buffer instead of
1292- * private output buffer. */
1293- serverAssert (c -> bufpos == 0 && listLength (c -> reply ) == 0 );
1294- if (c -> ref_repl_buf_node == NULL ) return 0 ;
1295-
1296- /* If the last replication buffer block content is totally sent,
1297- * we have nothing to send. */
1298- listNode * ln = listLast (server .repl_buffer_blocks );
1299- replBufBlock * tail = listNodeValue (ln );
1300- if (ln == c -> ref_repl_buf_node &&
1301- c -> ref_block_pos == tail -> used ) return 0 ;
1302-
1303- return 1 ;
1304- } else {
1305- return c -> bufpos || listLength (c -> reply );
1323+ return _clientHasPendingRepliesSlave (c );
13061324 }
1325+ return _clientHasPendingRepliesNonSlave (c );
13071326}
13081327
13091328void clientAcceptHandler (connection * conn ) {
@@ -1992,38 +2011,13 @@ static int _writevToClient(client *c, ssize_t *nwritten) {
19922011 return C_OK ;
19932012}
19942013
1995- /* This function does actual writing output buffers to different types of
1996- * clients, it is called by writeToClient.
2014+ /* This function does actual writing output buffers for non slave client types,
2015+ * it is called by writeToClient.
19972016 * If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
19982017 * and 'nwritten' is an output parameter, it means how many bytes server write
19992018 * to client. */
2000- int _writeToClient (client * c , ssize_t * nwritten ) {
2019+ static inline int _writeToClientNonSlave (client * c , ssize_t * nwritten ) {
20012020 * nwritten = 0 ;
2002- if (unlikely (clientTypeIsSlave (c ))) {
2003- serverAssert (c -> bufpos == 0 && listLength (c -> reply ) == 0 );
2004-
2005- replBufBlock * o = listNodeValue (c -> ref_repl_buf_node );
2006- serverAssert (o -> used >= c -> ref_block_pos );
2007- /* Send current block if it is not fully sent. */
2008- if (o -> used > c -> ref_block_pos ) {
2009- * nwritten = connWrite (c -> conn , o -> buf + c -> ref_block_pos ,
2010- o -> used - c -> ref_block_pos );
2011- if (* nwritten <= 0 ) return C_ERR ;
2012- c -> ref_block_pos += * nwritten ;
2013- }
2014-
2015- /* If we fully sent the object on head, go to the next one. */
2016- listNode * next = listNextNode (c -> ref_repl_buf_node );
2017- if (next && c -> ref_block_pos == o -> used ) {
2018- o -> refcount -- ;
2019- ((replBufBlock * )(listNodeValue (next )))-> refcount ++ ;
2020- c -> ref_repl_buf_node = next ;
2021- c -> ref_block_pos = 0 ;
2022- incrementalTrimReplicationBacklog (REPL_BACKLOG_TRIM_BLOCKS_PER_CALL );
2023- }
2024- return C_OK ;
2025- }
2026-
20272021 /* When the reply list is not empty, it's better to use writev to save us some
20282022 * system calls and TCP packets. */
20292023 if (listLength (c -> reply ) > 0 ) {
@@ -2045,8 +2039,36 @@ int _writeToClient(client *c, ssize_t *nwritten) {
20452039 c -> bufpos = 0 ;
20462040 c -> sentlen = 0 ;
20472041 }
2048- }
2042+ }
2043+ return C_OK ;
2044+ }
20492045
2046+ /* This function does actual writing output buffers for slave client types,
2047+ * it is called by writeToClient.
2048+ * If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
2049+ * and 'nwritten' is an output parameter, it means how many bytes server write
2050+ * to client. */
2051+ static inline int _writeToClientSlave (client * c , ssize_t * nwritten ) {
2052+ * nwritten = 0 ;
2053+ serverAssert (c -> bufpos == 0 && listLength (c -> reply ) == 0 );
2054+ replBufBlock * o = listNodeValue (c -> ref_repl_buf_node );
2055+ serverAssert (o -> used >= c -> ref_block_pos );
2056+ /* Send current block if it is not fully sent. */
2057+ if (o -> used > c -> ref_block_pos ) {
2058+ * nwritten = connWrite (c -> conn , o -> buf + c -> ref_block_pos ,
2059+ o -> used - c -> ref_block_pos );
2060+ if (* nwritten <= 0 ) return C_ERR ;
2061+ c -> ref_block_pos += * nwritten ;
2062+ }
2063+ /* If we fully sent the object on head, go to the next one. */
2064+ listNode * next = listNextNode (c -> ref_repl_buf_node );
2065+ if (next && c -> ref_block_pos == o -> used ) {
2066+ o -> refcount -- ;
2067+ ((replBufBlock * )(listNodeValue (next )))-> refcount ++ ;
2068+ c -> ref_repl_buf_node = next ;
2069+ c -> ref_block_pos = 0 ;
2070+ incrementalTrimReplicationBacklog (REPL_BACKLOG_TRIM_BLOCKS_PER_CALL );
2071+ }
20502072 return C_OK ;
20512073}
20522074
@@ -2064,32 +2086,45 @@ int writeToClient(client *c, int handler_installed) {
20642086 atomicIncr (server .stat_io_writes_processed [c -> running_tid ], 1 );
20652087
20662088 ssize_t nwritten = 0 , totwritten = 0 ;
2067-
2068- while (clientHasPendingReplies (c )) {
2069- int ret = _writeToClient (c , & nwritten );
2070- if (ret == C_ERR ) break ;
2071- totwritten += nwritten ;
2072- /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
2073- * bytes, in a single threaded server it's a good idea to serve
2074- * other clients as well, even if a very large request comes from
2075- * super fast link that is always able to accept data (in real world
2076- * scenario think about 'KEYS *' against the loopback interface).
2077- *
2078- * However if we are over the maxmemory limit we ignore that and
2079- * just deliver as much data as it is possible to deliver.
2080- *
2081- * Moreover, we also send as much as possible if the client is
2082- * a slave or a monitor (otherwise, on high-speed traffic, the
2083- * replication/output buffer will grow indefinitely) */
2084- if (totwritten > NET_MAX_WRITES_PER_EVENT &&
2085- (server .maxmemory == 0 ||
2086- zmalloc_used_memory () < server .maxmemory ) &&
2087- !(c -> flags & CLIENT_SLAVE )) break ;
2088- }
2089-
2090- if (unlikely (clientTypeIsSlave (c ))) {
2089+ const int is_slave = clientTypeIsSlave (c );
2090+
2091+ if (unlikely (is_slave )) {
2092+ /* We send as much as possible if the client is
2093+ * a slave (otherwise, on high-speed traffic, the
2094+ * replication buffer will grow indefinitely) */
2095+ while (_clientHasPendingRepliesSlave (c )) {
2096+ int ret = _writeToClientSlave (c , & nwritten );
2097+ if (ret == C_ERR ) break ;
2098+ totwritten += nwritten ;
2099+ }
20912100 atomicIncr (server .stat_net_repl_output_bytes , totwritten );
20922101 } else {
2102+ /* If we reach this block and client is marked with CLIENT_SLAVE flag
2103+ * it's because it's a MONITOR client, which are marked as replicas,
2104+ * but exposed as normal clients */
2105+ const int is_normal_client = !(c -> flags & CLIENT_SLAVE );
2106+ while (_clientHasPendingRepliesNonSlave (c )) {
2107+ int ret = _writeToClientNonSlave (c , & nwritten );
2108+ if (ret == C_ERR ) break ;
2109+ totwritten += nwritten ;
2110+ /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
2111+ * bytes, in a single threaded server it's a good idea to serve
2112+ * other clients as well, even if a very large request comes from
2113+ * super fast link that is always able to accept data (in real world
2114+ * scenario think about 'KEYS *' against the loopback interface).
2115+ *
2116+ * However if we are over the maxmemory limit we ignore that and
2117+ * just deliver as much data as it is possible to deliver.
2118+ *
2119+ * Moreover, we also send as much as possible if the client is
2120+ * a slave (covered above) or a monitor (covered here).
2121+ * (otherwise, on high-speed traffic, the
2122+ * output buffer will grow indefinitely) */
2123+ if (totwritten > NET_MAX_WRITES_PER_EVENT &&
2124+ (server .maxmemory == 0 ||
2125+ zmalloc_used_memory () < server .maxmemory ) &&
2126+ is_normal_client ) break ;
2127+ }
20932128 atomicIncr (server .stat_net_output_bytes , totwritten );
20942129 }
20952130
@@ -4127,6 +4162,8 @@ int getClientType(client *c) {
41274162}
41284163
41294164static inline int clientTypeIsSlave (client * c ) {
4165+ /* Even though MONITOR clients are marked as replicas, we
4166+ * want the expose them as normal clients. */
41304167 if (unlikely ((c -> flags & CLIENT_SLAVE ) && !(c -> flags & CLIENT_MONITOR )))
41314168 return 1 ;
41324169 return 0 ;
0 commit comments