120120]
121121
122122
123+ @attr .s (slots = True , auto_attribs = True )
124+ class _RoomReceipt :
125+ """
126+ HttpPushAction instances include the information used to generate HTTP
127+ requests to a push gateway.
128+ """
129+
130+ unthreaded_stream_ordering : int = 0
131+ # threaded_stream_ordering includes the main pseudo-thread.
132+ threaded_stream_ordering : Dict [str , int ] = attr .Factory (dict )
133+
134+ def is_unread (self , thread_id : str , stream_ordering : int ) -> bool :
135+ """Returns True if the stream ordering is unread according to the receipt information."""
136+
137+ # Only include push actions with a stream ordering after both the unthreaded
138+ # and threaded receipt. Properly handles a user without any receipts present.
139+ return (
140+ self .unthreaded_stream_ordering < stream_ordering
141+ and self .threaded_stream_ordering .get (thread_id , 0 ) < stream_ordering
142+ )
143+
144+
145+ # A _RoomReceipt with no receipts in it.
146+ MISSING_ROOM_RECEIPT = _RoomReceipt ()
147+
148+
123149@attr .s (slots = True , frozen = True , auto_attribs = True )
124150class HttpPushAction :
125151 """
@@ -705,7 +731,7 @@ def f(txn: LoggingTransaction) -> List[str]:
705731
706732 def _get_receipts_by_room_txn (
707733 self , txn : LoggingTransaction , user_id : str
708- ) -> Dict [str , int ]:
734+ ) -> Dict [str , _RoomReceipt ]:
709735 """
710736 Generate a map of room ID to the latest stream ordering that has been
711737 read by the given user.
@@ -715,7 +741,8 @@ def _get_receipts_by_room_txn(
715741 user_id: The user to fetch receipts for.
716742
717743 Returns:
718- A map of room ID to stream ordering for all rooms the user has a receipt in.
744+ A map including all rooms the user is in with a receipt. It maps
745+ room IDs to _RoomReceipt instances
719746 """
720747 receipt_types_clause , args = make_in_list_sql_clause (
721748 self .database_engine ,
@@ -727,20 +754,26 @@ def _get_receipts_by_room_txn(
727754 )
728755
729756 sql = f"""
730- SELECT room_id, MAX(stream_ordering)
757+ SELECT room_id, thread_id, MAX(stream_ordering)
731758 FROM receipts_linearized
732759 INNER JOIN events USING (room_id, event_id)
733760 WHERE { receipt_types_clause }
734761 AND user_id = ?
735- GROUP BY room_id
762+ GROUP BY room_id, thread_id
736763 """
737764
738765 args .extend ((user_id ,))
739766 txn .execute (sql , args )
740- return {
741- room_id : latest_stream_ordering
742- for room_id , latest_stream_ordering in txn .fetchall ()
743- }
767+
768+ result : Dict [str , _RoomReceipt ] = {}
769+ for room_id , thread_id , stream_ordering in txn :
770+ room_receipt = result .setdefault (room_id , _RoomReceipt ())
771+ if thread_id is None :
772+ room_receipt .unthreaded_stream_ordering = stream_ordering
773+ else :
774+ room_receipt .threaded_stream_ordering [thread_id ] = stream_ordering
775+
776+ return result
744777
745778 async def get_unread_push_actions_for_user_in_range_for_http (
746779 self ,
@@ -773,9 +806,10 @@ async def get_unread_push_actions_for_user_in_range_for_http(
773806
774807 def get_push_actions_txn (
775808 txn : LoggingTransaction ,
776- ) -> List [Tuple [str , str , int , str , bool ]]:
809+ ) -> List [Tuple [str , str , str , int , str , bool ]]:
777810 sql = """
778- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
811+ SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
812+ ep.actions, ep.highlight
779813 FROM event_push_actions AS ep
780814 WHERE
781815 ep.user_id = ?
@@ -785,7 +819,7 @@ def get_push_actions_txn(
785819 ORDER BY ep.stream_ordering ASC LIMIT ?
786820 """
787821 txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
788- return cast (List [Tuple [str , str , int , str , bool ]], txn .fetchall ())
822+ return cast (List [Tuple [str , str , str , int , str , bool ]], txn .fetchall ())
789823
790824 push_actions = await self .db_pool .runInteraction (
791825 "get_unread_push_actions_for_user_in_range_http" , get_push_actions_txn
@@ -798,10 +832,10 @@ def get_push_actions_txn(
798832 stream_ordering = stream_ordering ,
799833 actions = _deserialize_action (actions , highlight ),
800834 )
801- for event_id , room_id , stream_ordering , actions , highlight in push_actions
802- # Only include push actions with a stream ordering after any receipt, or without any
803- # receipt present (invited to but never read rooms).
804- if stream_ordering > receipts_by_room . get ( room_id , 0 )
835+ for event_id , room_id , thread_id , stream_ordering , actions , highlight in push_actions
836+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
837+ thread_id , stream_ordering
838+ )
805839 ]
806840
807841 # Now sort it so it's ordered correctly, since currently it will
@@ -845,10 +879,10 @@ async def get_unread_push_actions_for_user_in_range_for_email(
845879
846880 def get_push_actions_txn (
847881 txn : LoggingTransaction ,
848- ) -> List [Tuple [str , str , int , str , bool , int ]]:
882+ ) -> List [Tuple [str , str , str , int , str , bool , int ]]:
849883 sql = """
850- SELECT ep.event_id, ep.room_id, ep.stream_ordering , ep.actions ,
851- ep.highlight, e.received_ts
884+ SELECT ep.event_id, ep.room_id, ep.thread_id , ep.stream_ordering ,
885+ ep.actions, ep. highlight, e.received_ts
852886 FROM event_push_actions AS ep
853887 INNER JOIN events AS e USING (room_id, event_id)
854888 WHERE
@@ -859,7 +893,7 @@ def get_push_actions_txn(
859893 ORDER BY ep.stream_ordering DESC LIMIT ?
860894 """
861895 txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
862- return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
896+ return cast (List [Tuple [str , str , str , int , str , bool , int ]], txn .fetchall ())
863897
864898 push_actions = await self .db_pool .runInteraction (
865899 "get_unread_push_actions_for_user_in_range_email" , get_push_actions_txn
@@ -874,10 +908,10 @@ def get_push_actions_txn(
874908 actions = _deserialize_action (actions , highlight ),
875909 received_ts = received_ts ,
876910 )
877- for event_id , room_id , stream_ordering , actions , highlight , received_ts in push_actions
878- # Only include push actions with a stream ordering after any receipt, or without any
879- # receipt present (invited to but never read rooms).
880- if stream_ordering > receipts_by_room . get ( room_id , 0 )
911+ for event_id , room_id , thread_id , stream_ordering , actions , highlight , received_ts in push_actions
912+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
913+ thread_id , stream_ordering
914+ )
881915 ]
882916
883917 # Now sort it so it's ordered correctly, since currently it will
0 commit comments