From fe0f38ea1ea20d06e1b6d89437c0a33ca5a235a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Apr 2023 21:15:33 +0100 Subject: [PATCH 1/5] Don't keep old stream_ordering_to_exterm around --- synapse/storage/databases/main/event_federation.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a19ba88bf8af..014ae4228c41 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1664,19 +1664,12 @@ async def get_successor_events(self, event_id: str) -> List[str]: @wrap_as_background_process("delete_old_forward_extrem_cache") async def _delete_old_forward_extrem_cache(self) -> None: def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: - # Delete entries older than a month, while making sure we don't delete - # the only entries for a room. sql = """ DELETE FROM stream_ordering_to_exterm - WHERE - room_id IN ( - SELECT room_id - FROM stream_ordering_to_exterm - WHERE stream_ordering > ? - ) AND stream_ordering < ? + WHERE stream_ordering < ? """ txn.execute( - sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) # type: ignore[attr-defined] + sql, (self.stream_ordering_month_ago) # type: ignore[attr-defined] ) await self.db_pool.runInteraction( From 81a27daf0d64fee0b9705af33e9478a8ce8224be Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Apr 2023 21:20:02 +0100 Subject: [PATCH 2/5] Newsfile --- changelog.d/15382.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15382.misc diff --git a/changelog.d/15382.misc b/changelog.d/15382.misc new file mode 100644 index 000000000000..c5b054d19e72 --- /dev/null +++ b/changelog.d/15382.misc @@ -0,0 +1 @@ +Improve DB performance of clearing out old data from `stream_ordering_to_exterm`. From 6665a7f9b8f7e5a0fea6a0db85c7e7f356142917 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Apr 2023 21:33:56 +0100 Subject: [PATCH 3/5] Handle edge case --- synapse/storage/databases/main/event_federation.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 014ae4228c41..a5d6a4e6c1b1 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1232,10 +1232,17 @@ def get_forward_extremeties_for_room_txn(txn: LoggingTransaction) -> List[str]: txn.execute(sql, (stream_ordering, room_id)) return [event_id for event_id, in txn] - return await self.db_pool.runInteraction( + event_ids = await self.db_pool.runInteraction( "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn ) + # If we didn't find any IDs, then we must have cleared out the + # associated `stream_ordering_to_exterm`. + if not event_ids: + raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,)) + + return event_ids + def _get_connected_batch_event_backfill_results_txn( self, txn: LoggingTransaction, insertion_event_id: str, limit: int ) -> List[BackfillQueueNavigationItem]: From d5ccd2df6c92597f705ecfe0de1a00564880fb0c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2023 08:50:18 +0100 Subject: [PATCH 4/5] Check if extremities have changed --- synapse/handlers/device.py | 10 +++++++ .../databases/main/event_federation.py | 29 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9ded6389acdb..95011c12abc8 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -215,6 +215,16 @@ async def get_user_ids_changed( possibly_changed = set(changed) possibly_left = set() for room_id in rooms_changed: + # Check if the forward extremities have changed. If not then we know + # the current state won't have changed, and so we can skip this room. + try: + if not await self.store.has_room_extremities_changed_since( + room_id, stream_ordering + ): + continue + except errors.StoreError: + pass + current_state_ids = await self._state_storage.get_current_state_ids( room_id, await_full_state=False ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a5d6a4e6c1b1..e0d105e9a3bf 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1171,6 +1171,35 @@ def _get_min_depth_interaction( return int(min_depth) if min_depth is not None else None + async def has_room_extremities_changed_since( + self, + room_id: str, + stream_ordering: int, + ) -> bool: + """Check if the forward extremities in a room have changed since the + given stream ordering + + Throws a StoreError if we have since purged the index for + stream_orderings from that point. + """ + + if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined] + raise StoreError(400, f"stream_ordering too old {stream_ordering}") + + sql = """ + SELECT 1 FROM stream_ordering_to_exterm + WHERE stream_ordering > ? AND room_id = ? + LIMIT 1 + """ + + def has_room_extremities_changed_since_txn(txn: LoggingTransaction) -> bool: + txn.execute(sql, (stream_ordering, room_id)) + return txn.fetchone() is not None + + return await self.db_pool.runInteraction( + "has_room_extremities_changed_since", has_room_extremities_changed_since_txn + ) + @cancellable async def get_forward_extremities_for_room_at_stream_ordering( self, room_id: str, stream_ordering: int From c9ee7617c549951def0755c1b85d2ccbcd90f96a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2023 12:42:33 +0100 Subject: [PATCH 5/5] Rename func --- synapse/handlers/device.py | 2 +- synapse/storage/databases/main/event_federation.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 95011c12abc8..d2063d443558 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -218,7 +218,7 @@ async def get_user_ids_changed( # Check if the forward extremities have changed. If not then we know # the current state won't have changed, and so we can skip this room. try: - if not await self.store.has_room_extremities_changed_since( + if not await self.store.have_room_forward_extremities_changed_since( room_id, stream_ordering ): continue diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index e0d105e9a3bf..9e6011e8ea18 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1171,7 +1171,7 @@ def _get_min_depth_interaction( return int(min_depth) if min_depth is not None else None - async def has_room_extremities_changed_since( + async def have_room_forward_extremities_changed_since( self, room_id: str, stream_ordering: int, @@ -1192,12 +1192,15 @@ async def has_room_extremities_changed_since( LIMIT 1 """ - def has_room_extremities_changed_since_txn(txn: LoggingTransaction) -> bool: + def have_room_forward_extremities_changed_since_txn( + txn: LoggingTransaction, + ) -> bool: txn.execute(sql, (stream_ordering, room_id)) return txn.fetchone() is not None return await self.db_pool.runInteraction( - "has_room_extremities_changed_since", has_room_extremities_changed_since_txn + "have_room_forward_extremities_changed_since", + have_room_forward_extremities_changed_since_txn, ) @cancellable