@@ -26,6 +26,13 @@ def parse_str(entry_key: AnyStr) -> "StreamEntryKey":
2626 return StreamEntryKey (timestamp , sequence )
2727
2828
29+ class PelEntry (NamedTuple ):
30+ """Pending Entry List entry: tracks consumer ownership and delivery count"""
31+ consumer_name : bytes
32+ time_read : int
33+ times_delivered : int
34+
35+
2936class StreamRangeTest :
3037 """Argument converter for sorted set LEX endpoints."""
3138
@@ -92,8 +99,8 @@ def __init__(
9299 self .last_delivered_key = start_key
93100 self .last_ack_key = start_key
94101 # Pending entry List, see https://redis.io/commands/xreadgroup/
95- # msg_id -> (consumer_name, time_read, times_delivered)
96- self .pel : Dict [StreamEntryKey , Tuple [ bytes , int , int ] ] = {}
102+ # msg_id -> PelEntry (consumer_name, time_read, times_delivered)
103+ self .pel : Dict [StreamEntryKey , PelEntry ] = {}
97104
98105 def set_id (self , last_delivered_str : bytes , entries_read : Optional [int ]) -> None :
99106 """Set last_delivered_id for the group"""
@@ -152,7 +159,7 @@ def group_read(
152159 if not noack :
153160 for k in ids_read :
154161 # Initialize with times_delivered=1 for new messages
155- self .pel [k ] = (consumer_name , _time , 1 )
162+ self .pel [k ] = PelEntry (consumer_name , _time , 1 )
156163 if len (ids_read ) > 0 :
157164 self .last_delivered_key = max (self .last_delivered_key , ids_read [- 1 ])
158165 self .entries_read = (self .entries_read or 0 ) + len (ids_read )
@@ -161,9 +168,9 @@ def group_read(
161168 return [self .stream .format_record (x ) for x in ids_read ]
162169
163170 def _calc_consumer_last_time (self ) -> None :
164- # pel values are now (consumer_name, time_read, times_delivered)
171+ # pel values are PelEntry namedtuples
165172 # Extract just consumer_name and time_read for grouping
166- new_last_success_map = {k : min (v , key = lambda x : x [ 1 ])[ 1 ] for k , v in itertools .groupby (self .pel .values (), key = itemgetter ( 0 ) )}
173+ new_last_success_map = {k : min (v , key = lambda x : x . time_read ). time_read for k , v in itertools .groupby (self .pel .values (), key = lambda x : x . consumer_name )}
167174 for consumer in new_last_success_map :
168175 if consumer not in self .consumers :
169176 self .consumers [consumer ] = StreamConsumerInfo (consumer )
@@ -178,7 +185,7 @@ def ack(self, args: Tuple[bytes]) -> int:
178185 except Exception :
179186 continue
180187 if parsed in self .pel :
181- consumer_name = self .pel [parsed ][ 0 ]
188+ consumer_name = self .pel [parsed ]. consumer_name
182189 self .consumers [consumer_name ].pending -= 1
183190 del self .pel [parsed ]
184191 res += 1
@@ -196,9 +203,9 @@ def pending(
196203 _time = current_time ()
197204 relevant_ids = list (self .pel .keys ())
198205 if consumer is not None :
199- relevant_ids = [k for k in relevant_ids if self .pel [k ][ 0 ] == consumer ]
206+ relevant_ids = [k for k in relevant_ids if self .pel [k ]. consumer_name == consumer ]
200207 if idle is not None :
201- relevant_ids = [k for k in relevant_ids if self .pel [k ][ 1 ] + idle < _time ]
208+ relevant_ids = [k for k in relevant_ids if self .pel [k ]. time_read + idle < _time ]
202209 if start is not None and end is not None :
203210 relevant_ids = [
204211 k
@@ -212,10 +219,10 @@ def pending(
212219 relevant_ids = sorted (relevant_ids )[:count ]
213220
214221 # Return all 4 fields: message_id, consumer, time_since_delivered, times_delivered
215- return [[k .encode (), self .pel [k ][ 0 ] , _time - self .pel [k ][ 1 ] , self .pel [k ][ 2 ] ] for k in relevant_ids ]
222+ return [[k .encode (), self .pel [k ]. consumer_name , _time - self .pel [k ]. time_read , self .pel [k ]. times_delivered ] for k in relevant_ids ]
216223
217224 def pending_summary (self ) -> List [Any ]:
218- counter = Counter ([self .pel [k ][ 0 ] for k in self .pel ])
225+ counter = Counter ([self .pel [k ]. consumer_name for k in self .pel ])
219226 data = [
220227 len (self .pel ),
221228 min (self .pel ).encode () if len (self .pel ) > 0 else None ,
@@ -245,18 +252,18 @@ def claim(
245252 if key not in self .pel :
246253 if force :
247254 # Force claim msg - initialize with times_delivered=1
248- self .pel [key ] = (consumer_name , _time , 1 )
255+ self .pel [key ] = PelEntry (consumer_name , _time , 1 )
249256 if key in self .stream :
250257 claimed_msgs .append (key )
251258 else :
252259 deleted_msgs .append (key )
253260 del self .pel [key ]
254261 continue
255- if curr_time - self .pel [key ][ 1 ] < min_idle_ms :
262+ if curr_time - self .pel [key ]. time_read < min_idle_ms :
256263 continue # Not idle enough time to be claimed
257264 # Increment times_delivered when claiming
258- old_times_delivered = self .pel [key ][ 2 ]
259- self .pel [key ] = (consumer_name , _time , old_times_delivered + 1 )
265+ old_times_delivered = self .pel [key ]. times_delivered
266+ self .pel [key ] = PelEntry (consumer_name , _time , old_times_delivered + 1 )
260267 if key in self .stream :
261268 claimed_msgs .append (key )
262269 else :
@@ -268,7 +275,7 @@ def claim(
268275 def read_pel_msgs (self , min_idle_ms : int , start : bytes , count : int ) -> List [StreamEntryKey ]:
269276 start_key = StreamEntryKey .parse_str (start )
270277 curr_time = current_time ()
271- msgs = sorted ([k for k in self .pel if (curr_time - self .pel [k ][ 1 ] >= min_idle_ms ) and k >= start_key ])
278+ msgs = sorted ([k for k in self .pel if (curr_time - self .pel [k ]. time_read >= min_idle_ms ) and k >= start_key ])
272279 count = min (count , len (msgs ))
273280 return msgs [:count ]
274281
0 commit comments