From 08387276cd23794d82d19c7d2a726b7cd276926f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Aug 2021 16:14:19 +0100 Subject: [PATCH 1/3] Fix perf of fetching the same events many times. The code to deduplicate repeated fetches of the same set of events was N^2 (over the number of events requested), which could lead to a process being completely wedged. The main fix is to deduplicate the returned deferreds so we only await on a deferred once rather than many times. Seperately, when handling the returned events from the defrered we only add the events we care about to the event map to be returned (so that we don't pay the price of inserting extraneous events into the dict). --- .../storage/databases/main/events_worker.py | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 375463e4e979..b30a382f1ce9 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -520,16 +520,26 @@ async def _get_events_from_cache_or_db( # We now look up if we're already fetching some of the events in the DB, # if so we wait for those lookups to finish instead of pulling the same # events out of the DB multiple times. - already_fetching: Dict[str, defer.Deferred] = {} + # + # Note: we might get the same `ObservableDeferred` back for multiple + # events we're already fetching, so we deduplicate the deferreds to + # avoid extraneous work (if we don't do this we can end up in a n^2 mode + # when we wait on the same Deferred N times, then try and merge the + # same dict into itself N times). + already_fetching_ids: Set[str] = set() + already_fetching_deferreds: Set[ + ObservableDeferred[Dict[str, _EventCacheEntry]] + ] = set() for event_id in missing_events_ids: deferred = self._current_event_fetches.get(event_id) if deferred is not None: # We're already pulling the event out of the DB. Add the deferred # to the collection of deferreds to wait on. - already_fetching[event_id] = deferred.observe() + already_fetching_ids.add(event_id) + already_fetching_deferreds.add(deferred) - missing_events_ids.difference_update(already_fetching) + missing_events_ids.difference_update(already_fetching_ids) if missing_events_ids: log_ctx = current_context() @@ -569,18 +579,25 @@ async def _get_events_from_cache_or_db( with PreserveLoggingContext(): fetching_deferred.callback(missing_events) - if already_fetching: + if already_fetching_deferreds: # Wait for the other event requests to finish and add their results # to ours. results = await make_deferred_yieldable( defer.gatherResults( - already_fetching.values(), + (d.observe() for d in already_fetching_deferreds), consumeErrors=True, ) ).addErrback(unwrapFirstError) for result in results: - event_entry_map.update(result) + # We filter out events that we haven't asked for as we might get + # a *lot* of superfluous events back, and there is not point + # going through and inserting them all (which can take time). + event_entry_map.update( + (event_id, entry) + for event_id, entry in result.items() + if event_id in already_fetching_ids + ) if not allow_rejected: event_entry_map = { From 95838378d0ab6ed25131d0839825992757bafc1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Aug 2021 16:21:33 +0100 Subject: [PATCH 2/3] Newsfile --- changelog.d/10703.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10703.bugfix diff --git a/changelog.d/10703.bugfix b/changelog.d/10703.bugfix new file mode 100644 index 000000000000..874181cd23f9 --- /dev/null +++ b/changelog.d/10703.bugfix @@ -0,0 +1 @@ +Fix performance of fetching the same large set of events repeatedly at the same time, which in extreme cases could wedge the process. Introduced in v1.41.0. From a59fe072de757a4533bfdb4c43a01b3db948ce3a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Aug 2021 09:50:39 +0100 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/10703.bugfix | 2 +- synapse/storage/databases/main/events_worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changelog.d/10703.bugfix b/changelog.d/10703.bugfix index 874181cd23f9..a5a4ecf8eedf 100644 --- a/changelog.d/10703.bugfix +++ b/changelog.d/10703.bugfix @@ -1 +1 @@ -Fix performance of fetching the same large set of events repeatedly at the same time, which in extreme cases could wedge the process. Introduced in v1.41.0. +Fix a regression introduced in v1.41.0 which affected the performance of concurrent fetches of large sets of events, in extreme cases causing the process to hang. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b30a382f1ce9..9501f00f3bb3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -591,7 +591,7 @@ async def _get_events_from_cache_or_db( for result in results: # We filter out events that we haven't asked for as we might get - # a *lot* of superfluous events back, and there is not point + # a *lot* of superfluous events back, and there is no point # going through and inserting them all (which can take time). event_entry_map.update( (event_id, entry)