Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17338.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Do not block event sending/receiving while calulating large event auth chains.
12 changes: 12 additions & 0 deletions synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,13 +617,25 @@ async def _persist_event_batch(
room_id, chunk
)

with Measure(self._clock, "calculate_chain_cover_index_for_events"):
# We now calculate chain ID/sequence numbers for any state events we're
# persisting. We ignore out of band memberships as we're not in the room
# and won't have their auth chain (we'll fix it up later if we join the
# room).
#
# See: docs/auth_chain_difference_algorithm.md
new_event_links = await self.persist_events_store.calculate_chain_cover_index_for_events(
room_id, [e for e, _ in chunk]
)

await self.persist_events_store._persist_events_and_state_updates(
room_id,
chunk,
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
new_event_links=new_event_links,
)

return replaced_events
Expand Down
20 changes: 14 additions & 6 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ def __init__(
500000, "_event_auth_cache", size_callback=len
)

# Flag used by unit tests to disable fallback when there is no chain cover
# index.
self.tests_allow_no_chain_cover_index = True

self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)

if isinstance(self.database_engine, PostgresEngine):
Expand Down Expand Up @@ -220,8 +224,10 @@ async def get_auth_chain_ids(
)
except _NoChainCoverIndex:
# For whatever reason we don't actually have a chain cover index
# for the events in question, so we fall back to the old method.
pass
# for the events in question, so we fall back to the old method
# (except in tests)
if not self.tests_allow_no_chain_cover_index:
raise

return await self.db_pool.runInteraction(
"get_auth_chain_ids",
Expand Down Expand Up @@ -271,7 +277,7 @@ def _get_auth_chain_ids_using_cover_index_txn(
if events_missing_chain_info:
# This can happen due to e.g. downgrade/upgrade of the server. We
# raise an exception and fall back to the previous algorithm.
logger.info(
logger.error(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really shouldn't be seeing these anymore I don't think, except while old servers are upgrading (with the background update running)

"Unexpectedly found that events don't have chain IDs in room %s: %s",
room_id,
events_missing_chain_info,
Expand Down Expand Up @@ -482,8 +488,10 @@ async def get_auth_chain_difference(
)
except _NoChainCoverIndex:
# For whatever reason we don't actually have a chain cover index
# for the events in question, so we fall back to the old method.
pass
# for the events in question, so we fall back to the old method
# (except in tests)
if not self.tests_allow_no_chain_cover_index:
raise

return await self.db_pool.runInteraction(
"get_auth_chain_difference",
Expand Down Expand Up @@ -710,7 +718,7 @@ def _fixup_auth_chain_difference_sets(
if events_missing_chain_info - event_to_auth_ids.keys():
# Uh oh, we somehow haven't correctly done the chain cover index,
# bail and fall back to the old method.
logger.info(
logger.error(
"Unexpectedly found that events don't have chain IDs in room %s: %s",
room_id,
events_missing_chain_info - event_to_auth_ids.keys(),
Expand Down
Loading