Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10703.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance of fetching the same large set of events repeatedly at the same time, which in extreme cases could wedge the process. Introduced in v1.41.0.
29 changes: 23 additions & 6 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,16 +520,26 @@ async def _get_events_from_cache_or_db(
# We now look up if we're already fetching some of the events in the DB,
# if so we wait for those lookups to finish instead of pulling the same
# events out of the DB multiple times.
already_fetching: Dict[str, defer.Deferred] = {}
#
# Note: we might get the same `ObservableDeferred` back for multiple
# events we're already fetching, so we deduplicate the deferreds to
# avoid extraneous work (if we don't do this we can end up in a n^2 mode
# when we wait on the same Deferred N times, then try and merge the
# same dict into itself N times).
already_fetching_ids: Set[str] = set()
already_fetching_deferreds: Set[
ObservableDeferred[Dict[str, _EventCacheEntry]]
] = set()

for event_id in missing_events_ids:
deferred = self._current_event_fetches.get(event_id)
if deferred is not None:
# We're already pulling the event out of the DB. Add the deferred
# to the collection of deferreds to wait on.
already_fetching[event_id] = deferred.observe()
already_fetching_ids.add(event_id)
already_fetching_deferreds.add(deferred)

missing_events_ids.difference_update(already_fetching)
missing_events_ids.difference_update(already_fetching_ids)

if missing_events_ids:
log_ctx = current_context()
Expand Down Expand Up @@ -569,18 +579,25 @@ async def _get_events_from_cache_or_db(
with PreserveLoggingContext():
fetching_deferred.callback(missing_events)

if already_fetching:
if already_fetching_deferreds:
# Wait for the other event requests to finish and add their results
# to ours.
results = await make_deferred_yieldable(
defer.gatherResults(
already_fetching.values(),
(d.observe() for d in already_fetching_deferreds),
consumeErrors=True,
)
).addErrback(unwrapFirstError)

for result in results:
event_entry_map.update(result)
# We filter out events that we haven't asked for as we might get
# a *lot* of superfluous events back, and there is not point
# going through and inserting them all (which can take time).
event_entry_map.update(
(event_id, entry)
for event_id, entry in result.items()
if event_id in already_fetching_ids
)

if not allow_rejected:
event_entry_map = {
Expand Down