@@ -64,7 +64,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c,
6464 streamConsumer * consumer );
6565int streamParseStrictIDOrReply (client * c , robj * o , streamID * id , uint64_t missing_seq , int * seq_given );
6666int streamParseIDOrReply (client * c , robj * o , streamID * id , uint64_t missing_seq );
67-
67+ static long long streamEstimateDistance ( stream * s , streamCG * cg , streamID * next_id );
6868/* -----------------------------------------------------------------------
6969 * Low level stream encoding: a radix tree of listpacks.
7070 * ----------------------------------------------------------------------- */
@@ -1389,11 +1389,6 @@ int streamIDEqZero(streamID *id) {
13891389int streamRangeHasTombstones (stream * s , streamID * start , streamID * end ) {
13901390 streamID start_id , end_id ;
13911391
1392- if (!s -> length || streamIDEqZero (& s -> max_deleted_entry_id )) {
1393- /* The stream is empty or has no tombstones. */
1394- return 0 ;
1395- }
1396-
13971392 if (start ) {
13981393 start_id = * start ;
13991394 } else {
@@ -1418,39 +1413,56 @@ int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
14181413 return 0 ;
14191414}
14201415
1421- /* Replies with a consumer group's current lag, that is the number of messages
1422- * in the stream that are yet to be delivered. In case that the lag isn't
1423- * available due to fragmentation, the reply to the client is a null. */
1424- void streamReplyWithCGLag (client * c , stream * s , streamCG * cg ) {
1416+ /* Replies with a consumer group's current lag, which is the number of messages in the stream
1417+ * that are yet to be delivered. Additionally, it includes an entries-read field that indicates
1418+ * the number of messages currently read. In case that the lag or entries-read isn't available
1419+ * due to fragmentation, the reply to the client is null. */
1420+ void streamReplyWithCGLagAndEntriesRead (client * c , stream * s , streamCG * cg ) {
14251421 int valid = 0 ;
14261422 long long lag = 0 ;
14271423
1428- if (!s -> entries_added ) {
1429- /* The lag of a newly-initialized stream is 0. */
1430- lag = 0 ;
1424+ /* Attempt to retrieve the group's last ID logical read counter. */
1425+ long long entries_read = streamEstimateDistance (s , cg , & cg -> last_id );
1426+ if (entries_read != SCG_INVALID_ENTRIES_READ ) {
1427+ /* A valid counter was obtained. */
1428+ lag = (long long )s -> entries_added - entries_read ;
14311429 valid = 1 ;
1432- } else if (cg -> entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones (s , & cg -> last_id , NULL )) {
1433- /* No fragmentation ahead means that the group's logical reads counter
1434- * is valid for performing the lag calculation. */
1435- lag = (long long )s -> entries_added - cg -> entries_read ;
1436- valid = 1 ;
1437- } else {
1438- /* Attempt to retrieve the group's last ID logical read counter. */
1439- long long entries_read = streamEstimateDistanceFromFirstEverEntry (s , & cg -> last_id );
1440- if (entries_read != SCG_INVALID_ENTRIES_READ ) {
1441- /* A valid counter was obtained. */
1442- lag = (long long )s -> entries_added - entries_read ;
1443- valid = 1 ;
1444- }
14451430 }
14461431
14471432 if (valid ) {
1433+ /* Read counter of the last delivered ID */
1434+ addReplyBulkCString (c , "entries-read" );
1435+ addReplyLongLong (c , entries_read );
1436+ /* Group lag */
1437+ addReplyBulkCString (c , "lag" );
14481438 addReplyLongLong (c , lag );
14491439 } else {
1440+ addReplyBulkCString (c , "entries-read" );
1441+ addReplyNull (c );
1442+ addReplyBulkCString (c , "lag" );
14501443 addReplyNull (c );
14511444 }
14521445}
14531446
1447+ /* The function returns the logical read counter corresponding to next_id
1448+ * based on the information of the group.
1449+ */
1450+ static long long streamEstimateDistance (stream * s , streamCG * cg , streamID * next_id ) {
1451+ /* If the values of next_id and last_id are the same,
1452+ * it is considered that only the current value needs to be returned,
1453+ * otherwise it is considered to be the calculated value.
1454+ * This is used to align with the streamEstimateDistanceFromFirstEverEntry method.
1455+ */
1456+ long long step = streamCompareID (& cg -> last_id , next_id ) == 0 ? 0 : 1 ;
1457+ if (cg -> entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones (s , & cg -> last_id , NULL )) {
1458+ /* A valid counter and no future tombstones mean we can
1459+ * increment the read counter to keep tracking the group's
1460+ * progress. */
1461+ return cg -> entries_read + step ;
1462+ }
1463+ return streamEstimateDistanceFromFirstEverEntry (s , next_id );
1464+ }
1465+
14541466/* This function returns a value that is the ID's logical read counter, or its
14551467 * distance (the number of entries) from the first entry ever to have been added
14561468 * to the stream.
@@ -1485,11 +1497,6 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
14851497 return s -> entries_added ;
14861498 }
14871499
1488- if (!streamIDEqZero (id ) && streamCompareID (id , & s -> max_deleted_entry_id ) < 0 ) {
1489- /* The ID is before the last tombstone, so the counter is unknown. */
1490- return SCG_INVALID_ENTRIES_READ ;
1491- }
1492-
14931500 int cmp_last = streamCompareID (id , & s -> last_id );
14941501 if (cmp_last == 0 ) {
14951502 /* Return the exact counter of the last entry in the stream. */
@@ -1678,16 +1685,7 @@ size_t streamReplyWithRange(client *c,
16781685 while (streamIteratorGetID (& si , & id , & numfields )) {
16791686 /* Update the group last_id if needed. */
16801687 if (group && streamCompareID (& id , & group -> last_id ) > 0 ) {
1681- if (group -> entries_read != SCG_INVALID_ENTRIES_READ &&
1682- streamCompareID (& group -> last_id , & s -> first_id ) >= 0 &&
1683- !streamRangeHasTombstones (s , & group -> last_id , NULL )) {
1684- /* A valid counter and no tombstones in the group's last-delivered-id and the stream's last-generated-id,
1685- * we can increment the read counter to keep tracking the group's progress. */
1686- group -> entries_read ++ ;
1687- } else if (s -> entries_added ) {
1688- /* The group's counter may be invalid, so we try to obtain it. */
1689- group -> entries_read = streamEstimateDistanceFromFirstEverEntry (s , & id );
1690- }
1688+ group -> entries_read = streamEstimateDistance (s , group , & id );
16911689 group -> last_id = id ;
16921690 /* In the past, we would only set it when NOACK was specified. And in
16931691 * #9127, XCLAIM did not propagate entries_read in ACK, which would
@@ -3691,16 +3689,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
36913689 addReplyStreamID (c , & cg -> last_id );
36923690
36933691 /* Read counter of the last delivered ID */
3694- addReplyBulkCString (c , "entries-read" );
3695- if (cg -> entries_read != SCG_INVALID_ENTRIES_READ ) {
3696- addReplyLongLong (c , cg -> entries_read );
3697- } else {
3698- addReplyNull (c );
3699- }
3700-
3701- /* Group lag */
3702- addReplyBulkCString (c , "lag" );
3703- streamReplyWithCGLag (c , s , cg );
3692+ streamReplyWithCGLagAndEntriesRead (c , s , cg );
37043693
37053694 /* Group PEL count */
37063695 addReplyBulkCString (c , "pel-count" );
@@ -3887,14 +3876,7 @@ void xinfoCommand(client *c) {
38873876 addReplyLongLong (c , raxSize (cg -> pel ));
38883877 addReplyBulkCString (c , "last-delivered-id" );
38893878 addReplyStreamID (c , & cg -> last_id );
3890- addReplyBulkCString (c , "entries-read" );
3891- if (cg -> entries_read != SCG_INVALID_ENTRIES_READ ) {
3892- addReplyLongLong (c , cg -> entries_read );
3893- } else {
3894- addReplyNull (c );
3895- }
3896- addReplyBulkCString (c , "lag" );
3897- streamReplyWithCGLag (c , s , cg );
3879+ streamReplyWithCGLagAndEntriesRead (c , s , cg );
38983880 }
38993881 raxStop (& ri );
39003882 } else if (!strcasecmp (opt , "STREAM" )) {
0 commit comments