-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix assertions being thrown by the EventsStream update function #7337
Changes from all commits
23b2826
9cbdfb3
ce428a1
3655eaf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,11 +15,12 @@ | |
| # limitations under the License. | ||
|
|
||
| import heapq | ||
| from typing import Iterable, Tuple, Type | ||
| from collections import Iterable | ||
| from typing import List, Tuple, Type | ||
|
|
||
| import attr | ||
|
|
||
| from ._base import Stream, Token, db_query_to_update_function | ||
| from ._base import Stream, StreamUpdateResult, Token | ||
|
|
||
|
|
||
| """Handling of the 'events' replication stream | ||
|
|
@@ -117,30 +118,106 @@ class EventsStream(Stream): | |
| def __init__(self, hs): | ||
| self._store = hs.get_datastore() | ||
| super().__init__( | ||
| self._store.get_current_events_token, | ||
| db_query_to_update_function(self._update_function), | ||
| self._store.get_current_events_token, self._update_function, | ||
| ) | ||
|
|
||
| async def _update_function( | ||
| self, from_token: Token, current_token: Token, limit: int | ||
| ) -> Iterable[tuple]: | ||
| self, from_token: Token, current_token: Token, target_row_count: int | ||
| ) -> StreamUpdateResult: | ||
|
|
||
| # the events stream merges together three separate sources: | ||
| # * new events | ||
| # * current_state changes | ||
| # * events which were previously outliers, but have now been de-outliered. | ||
| # | ||
| # The merge operation is complicated by the fact that we only have a single | ||
| # "stream token" which is supposed to indicate how far we have got through | ||
| # all three streams. It's therefore no good to return rows 1-1000 from the | ||
| # "new events" table if the state_deltas are limited to rows 1-100 by the | ||
| # target_row_count. | ||
| # | ||
| # In other words: we must pick a new upper limit, and must return *all* rows | ||
| # up to that point for each of the three sources. | ||
| # | ||
| # Start by trying to split the target_row_count up. We expect to have a | ||
| # negligible number of ex-outliers, and a rough approximation based on recent | ||
| # traffic on sw1v.org shows that there are approximately the same number of | ||
| # event rows between a given pair of stream ids as there are state | ||
| # updates, so let's split our target_row_count among those two types. The target | ||
| # is only an approximation - it doesn't matter if we end up going a bit over it. | ||
|
|
||
| target_row_count //= 2 | ||
|
|
||
| # now we fetch up to that many rows from the events table | ||
|
|
||
| event_rows = await self._store.get_all_new_forward_event_rows( | ||
| from_token, current_token, limit | ||
| ) | ||
| event_updates = ( | ||
| (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows | ||
| ) | ||
| from_token, current_token, target_row_count | ||
| ) # type: List[Tuple] | ||
|
|
||
| # we rely on get_all_new_forward_event_rows strictly honouring the limit, so | ||
| # that we know it is safe to just take upper_limit = event_rows[-1][0]. | ||
| assert ( | ||
| len(event_rows) <= target_row_count | ||
| ), "get_all_new_forward_event_rows did not honour row limit" | ||
|
|
||
| # if we hit the limit on event_updates, there's no point in going beyond the | ||
| # last stream_id in the batch for the other sources. | ||
|
|
||
| if len(event_rows) == target_row_count: | ||
| limited = True | ||
| upper_limit = event_rows[-1][0] # type: int | ||
| else: | ||
| limited = False | ||
| upper_limit = current_token | ||
|
|
||
| # next up is the state delta table | ||
|
|
||
| state_rows = await self._store.get_all_updated_current_state_deltas( | ||
| from_token, current_token, limit | ||
| ) | ||
| state_updates = ( | ||
| (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows | ||
| ) | ||
| from_token, upper_limit, target_row_count | ||
| ) # type: List[Tuple] | ||
|
|
||
| # again, if we've hit the limit there, we'll need to limit the other sources | ||
| assert len(state_rows) < target_row_count | ||
| if len(state_rows) == target_row_count: | ||
| assert state_rows[-1][0] <= upper_limit | ||
| upper_limit = state_rows[-1][0] | ||
| limited = True | ||
|
||
|
|
||
| # FIXME: is it a given that there is only one row per stream_id in the | ||
| # state_deltas table (so that we can be sure that we have got all of the | ||
| # rows for upper_limit)? | ||
|
||
|
|
||
| # finally, fetch the ex-outliers rows. We assume there are few enough of these | ||
| # not to bother with the limit. | ||
|
|
||
| all_updates = heapq.merge(event_updates, state_updates) | ||
| ex_outliers_rows = await self._store.get_ex_outlier_stream_rows( | ||
| from_token, upper_limit | ||
| ) # type: List[Tuple] | ||
|
|
||
| return all_updates | ||
| # we now need to turn the raw database rows returned into tuples suitable | ||
| # for the replication protocol (basically, we add an identifier to | ||
| # distinguish the row type). At the same time, we can limit the event_rows | ||
| # to the max stream_id from state_rows. | ||
|
|
||
| event_updates = ( | ||
| (stream_id, (EventsStreamEventRow.TypeId, rest)) | ||
| for (stream_id, *rest) in event_rows | ||
| if stream_id <= upper_limit | ||
| ) # type: Iterable[Tuple[int, Tuple]] | ||
|
|
||
| state_updates = ( | ||
| (stream_id, (EventsStreamCurrentStateRow.TypeId, rest)) | ||
| for (stream_id, *rest) in state_rows | ||
| ) # type: Iterable[Tuple[int, Tuple]] | ||
|
|
||
| ex_outliers_updates = ( | ||
| (stream_id, (EventsStreamEventRow.TypeId, rest)) | ||
| for (stream_id, *rest) in ex_outliers_rows | ||
| ) # type: Iterable[Tuple[int, Tuple]] | ||
|
|
||
| # we need to return a sorted list, so merge them together. | ||
| updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates)) | ||
| return updates, upper_limit, limited | ||
|
|
||
| @classmethod | ||
| def parse_row(cls, row): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -973,8 +973,18 @@ def get_current_events_token(self): | |
| return self._stream_id_gen.get_current_token() | ||
|
|
||
| def get_all_new_forward_event_rows(self, last_id, current_id, limit): | ||
| if last_id == current_id: | ||
| return defer.succeed([]) | ||
|
||
| """Returns new events, for the Events replication stream | ||
|
|
||
| Args: | ||
| last_id: the last stream_id from the previous batch. | ||
| current_id: the maximum stream_id to return up to | ||
| limit: the maximum number of rows to return | ||
|
|
||
| Returns: Deferred[List[Tuple]] | ||
| a list of events stream rows. Each tuple consists of a stream id as | ||
| the first element, followed by fields suitable for casting into an | ||
| EventsStreamRow. | ||
| """ | ||
|
|
||
| def get_all_new_forward_event_rows(txn): | ||
| sql = ( | ||
|
|
@@ -989,13 +999,26 @@ def get_all_new_forward_event_rows(txn): | |
| " LIMIT ?" | ||
| ) | ||
| txn.execute(sql, (last_id, current_id, limit)) | ||
| new_event_updates = txn.fetchall() | ||
| return txn.fetchall() | ||
|
|
||
| if len(new_event_updates) == limit: | ||
| upper_bound = new_event_updates[-1][0] | ||
| else: | ||
| upper_bound = current_id | ||
| return self.db.runInteraction( | ||
| "get_all_new_forward_event_rows", get_all_new_forward_event_rows | ||
| ) | ||
|
|
||
| def get_ex_outlier_stream_rows(self, last_id, current_id): | ||
| """Returns de-outliered events, for the Events replication stream | ||
|
|
||
| Args: | ||
| last_id: the last stream_id from the previous batch. | ||
| current_id: the maximum stream_id to return up to | ||
|
|
||
| Returns: Deferred[List[Tuple]] | ||
| a list of events stream rows. Each tuple consists of a stream id as | ||
| the first element, followed by fields suitable for casting into an | ||
| EventsStreamRow. | ||
| """ | ||
|
|
||
| def get_ex_outlier_stream_rows_txn(txn): | ||
| sql = ( | ||
| "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," | ||
| " state_key, redacts, relates_to_id" | ||
|
|
@@ -1006,15 +1029,14 @@ def get_all_new_forward_event_rows(txn): | |
| " LEFT JOIN event_relations USING (event_id)" | ||
| " WHERE ? < event_stream_ordering" | ||
| " AND event_stream_ordering <= ?" | ||
| " ORDER BY event_stream_ordering DESC" | ||
| " ORDER BY event_stream_ordering ASC" | ||
| ) | ||
| txn.execute(sql, (last_id, upper_bound)) | ||
| new_event_updates.extend(txn) | ||
|
|
||
| return new_event_updates | ||
| txn.execute(sql, (last_id, current_id)) | ||
| return txn.fetchall() | ||
|
|
||
| return self.db.runInteraction( | ||
| "get_all_new_forward_event_rows", get_all_new_forward_event_rows | ||
| "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn | ||
| ) | ||
|
|
||
| def get_all_new_backfill_event_rows(self, last_id, current_id, limit): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a while to realise the crux here is that: "for each source we must return all rows up to the same token."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair. Will add another comment.