Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19507.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Try and reduce reactor tick times when under heavy load.
27 changes: 24 additions & 3 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ def __init__(
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events


# Number of iterations in a loop before we yield to the reactor to allow other
# things to be processed, otherwise we can end up tight looping.
ITERATIONS_BEFORE_YIELDING = 500


event_fetch_ongoing_gauge = Gauge(
"synapse_event_fetch_ongoing",
"The number of event fetchers that are running",
Expand Down Expand Up @@ -817,7 +822,7 @@ async def get_unredacted_events_from_cache_or_db(
# may be called repeatedly for the same event so at this point we cannot reach
# out to any external cache for performance reasons. The external cache is
# checked later on in the `get_missing_events_from_cache_or_db` function below.
event_entry_map = self._get_events_from_local_cache(
event_entry_map = await self._get_events_from_local_cache(
event_ids,
)

Expand Down Expand Up @@ -1004,7 +1009,7 @@ async def _get_events_from_cache(
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = self._get_events_from_local_cache(
event_map = await self._get_events_from_local_cache(
events, update_metrics=update_metrics
)

Expand Down Expand Up @@ -1045,7 +1050,7 @@ async def _get_events_from_external_cache(

return event_map

def _get_events_from_local_cache(
async def _get_events_from_local_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> dict[str, EventCacheEntry]:
"""Fetch events from the local, in memory, caches.
Expand All @@ -1058,7 +1063,15 @@ def _get_events_from_local_cache(
"""
event_map = {}

i = 0
for event_id in events:
i += 1

# Yield to the reactor to allow other things to be processed,
# otherwise we can end up tight looping.
if i % ITERATIONS_BEFORE_YIELDING == 0:
await self.clock.sleep(Duration(seconds=0))

# First check if it's in the event cache
ret = self._get_event_cache.get_local(
(event_id,), None, update_metrics=update_metrics
Expand Down Expand Up @@ -1375,7 +1388,15 @@ async def _fetch_event_ids_and_get_outstanding_redactions(

# build a map from event_id to EventBase
event_map: dict[str, EventBase] = {}
i = 0
for event_id, row in fetched_events.items():
i += 1

# Yield to the reactor to allow other things to be processed,
# otherwise we can end up tight looping.
if i % ITERATIONS_BEFORE_YIELDING == 0:
await self.clock.sleep(Duration(seconds=0))

assert row.event_id == event_id

rejected_reason = row.rejected_reason
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ async def get_joined_user_ids_from_state(
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_local_cache(
event_map = await self._get_events_from_local_cache(
member_event_ids, update_metrics=False
)

Expand Down
13 changes: 13 additions & 0 deletions synapse/util/background_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
T = TypeVar("T")


# Number of iterations in a loop before we yield to the reactor to allow other
# things to be processed, otherwise we can end up tight looping.
ITERATIONS_BEFORE_YIELDING = 500


class BackgroundQueue(Generic[T]):
"""A single-producer single-consumer async queue processing items in the
background.
Expand Down Expand Up @@ -65,6 +70,7 @@ def __init__(
timeout_ms: int = 1000,
) -> None:
self._hs = hs
self._clock = hs.get_clock()
self._name = name
self._callback = callback
self._timeout_ms = Duration(milliseconds=timeout_ms)
Expand Down Expand Up @@ -107,7 +113,14 @@ async def _process_queue(self) -> None:
# single threaded nature, but let's be a bit defensive anyway.)
self._wakeup_event.clear()

iterations = 0
while self._queue:
iterations += 1
if iterations % ITERATIONS_BEFORE_YIELDING == 0:
# Yield to the reactor to allow other things to be processed,
# otherwise we can end up tight looping.
await self._clock.sleep(Duration(seconds=0))

item = self._queue.popleft()
try:
await self._callback(item)
Expand Down
Loading