Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c675a18

Browse files
authored
Track ongoing event fetches correctly (again) (#11376)
The previous fix for the ongoing event fetches counter (8eec25a) was both insufficient and incorrect. When the database is unreachable, `_do_fetch` never gets run and so `_event_fetch_ongoing` is never decremented. The previous fix also moved the `_event_fetch_ongoing` decrement outside of the `_event_fetch_lock` which allowed race conditions to corrupt the counter.
1 parent 7862f82 commit c675a18

File tree

3 files changed

+251
-43
lines changed

3 files changed

+251
-43
lines changed

changelog.d/11376.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection, for real this time. Also fix a race condition introduced in the previous insufficient fix in 1.47.0.

synapse/storage/databases/main/events_worker.py

Lines changed: 112 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
logger = logging.getLogger(__name__)
7373

7474

75-
# These values are used in the `enqueus_event` and `_do_fetch` methods to
75+
# These values are used in the `enqueue_event` and `_fetch_loop` methods to
7676
# control how we batch/bulk fetch events from the database.
7777
# The values are plucked out of thing air to make initial sync run faster
7878
# on jki.re
@@ -602,7 +602,7 @@ async def _get_events_from_cache_or_db(
602602
# already due to `_get_events_from_db`).
603603
fetching_deferred: ObservableDeferred[
604604
Dict[str, _EventCacheEntry]
605-
] = ObservableDeferred(defer.Deferred())
605+
] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
606606
for event_id in missing_events_ids:
607607
self._current_event_fetches[event_id] = fetching_deferred
608608

@@ -736,35 +736,118 @@ async def get_stripped_room_state_from_event_context(
736736
for e in state_to_include.values()
737737
]
738738

739-
def _do_fetch(self, conn: Connection) -> None:
739+
def _maybe_start_fetch_thread(self) -> None:
740+
"""Starts an event fetch thread if we are not yet at the maximum number."""
741+
with self._event_fetch_lock:
742+
if (
743+
self._event_fetch_list
744+
and self._event_fetch_ongoing < EVENT_QUEUE_THREADS
745+
):
746+
self._event_fetch_ongoing += 1
747+
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
748+
# `_event_fetch_ongoing` is decremented in `_fetch_thread`.
749+
should_start = True
750+
else:
751+
should_start = False
752+
753+
if should_start:
754+
run_as_background_process("fetch_events", self._fetch_thread)
755+
756+
async def _fetch_thread(self) -> None:
757+
"""Services requests for events from `_event_fetch_list`."""
758+
exc = None
759+
try:
760+
await self.db_pool.runWithConnection(self._fetch_loop)
761+
except BaseException as e:
762+
exc = e
763+
raise
764+
finally:
765+
should_restart = False
766+
event_fetches_to_fail = []
767+
with self._event_fetch_lock:
768+
self._event_fetch_ongoing -= 1
769+
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
770+
771+
# There may still be work remaining in `_event_fetch_list` if we
772+
# failed, or it was added in between us deciding to exit and
773+
# decrementing `_event_fetch_ongoing`.
774+
if self._event_fetch_list:
775+
if exc is None:
776+
# We decided to exit, but then some more work was added
777+
# before `_event_fetch_ongoing` was decremented.
778+
# If a new event fetch thread was not started, we should
779+
# restart ourselves since the remaining event fetch threads
780+
# may take a while to get around to the new work.
781+
#
782+
# Unfortunately it is not possible to tell whether a new
783+
# event fetch thread was started, so we restart
784+
# unconditionally. If we are unlucky, we will end up with
785+
# an idle fetch thread, but it will time out after
786+
# `EVENT_QUEUE_ITERATIONS * EVENT_QUEUE_TIMEOUT_S` seconds
787+
# in any case.
788+
#
789+
# Note that multiple fetch threads may run down this path at
790+
# the same time.
791+
should_restart = True
792+
elif isinstance(exc, Exception):
793+
if self._event_fetch_ongoing == 0:
794+
# We were the last remaining fetcher and failed.
795+
# Fail any outstanding fetches since no one else will
796+
# handle them.
797+
event_fetches_to_fail = self._event_fetch_list
798+
self._event_fetch_list = []
799+
else:
800+
# We weren't the last remaining fetcher, so another
801+
# fetcher will pick up the work. This will either happen
802+
# after their existing work, however long that takes,
803+
# or after at most `EVENT_QUEUE_TIMEOUT_S` seconds if
804+
# they are idle.
805+
pass
806+
else:
807+
# The exception is a `SystemExit`, `KeyboardInterrupt` or
808+
# `GeneratorExit`. Don't try to do anything clever here.
809+
pass
810+
811+
if should_restart:
812+
# We exited cleanly but noticed more work.
813+
self._maybe_start_fetch_thread()
814+
815+
if event_fetches_to_fail:
816+
# We were the last remaining fetcher and failed.
817+
# Fail any outstanding fetches since no one else will handle them.
818+
assert exc is not None
819+
with PreserveLoggingContext():
820+
for _, deferred in event_fetches_to_fail:
821+
deferred.errback(exc)
822+
823+
def _fetch_loop(self, conn: Connection) -> None:
740824
"""Takes a database connection and waits for requests for events from
741825
the _event_fetch_list queue.
742826
"""
743-
try:
744-
i = 0
745-
while True:
746-
with self._event_fetch_lock:
747-
event_list = self._event_fetch_list
748-
self._event_fetch_list = []
749-
750-
if not event_list:
751-
single_threaded = self.database_engine.single_threaded
752-
if (
753-
not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
754-
or single_threaded
755-
or i > EVENT_QUEUE_ITERATIONS
756-
):
757-
break
758-
else:
759-
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
760-
i += 1
761-
continue
762-
i = 0
827+
i = 0
828+
while True:
829+
with self._event_fetch_lock:
830+
event_list = self._event_fetch_list
831+
self._event_fetch_list = []
832+
833+
if not event_list:
834+
# There are no requests waiting. If we haven't yet reached the
835+
# maximum iteration limit, wait for some more requests to turn up.
836+
# Otherwise, bail out.
837+
single_threaded = self.database_engine.single_threaded
838+
if (
839+
not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
840+
or single_threaded
841+
or i > EVENT_QUEUE_ITERATIONS
842+
):
843+
return
844+
845+
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
846+
i += 1
847+
continue
848+
i = 0
763849

764-
self._fetch_event_list(conn, event_list)
765-
finally:
766-
self._event_fetch_ongoing -= 1
767-
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
850+
self._fetch_event_list(conn, event_list)
768851

769852
def _fetch_event_list(
770853
self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]]
@@ -806,9 +889,7 @@ def fire():
806889
# We only want to resolve deferreds from the main thread
807890
def fire(evs, exc):
808891
for _, d in evs:
809-
if not d.called:
810-
with PreserveLoggingContext():
811-
d.errback(exc)
892+
d.errback(exc)
812893

813894
with PreserveLoggingContext():
814895
self.hs.get_reactor().callFromThread(fire, event_list, e)
@@ -983,20 +1064,9 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]:
9831064
events_d = defer.Deferred()
9841065
with self._event_fetch_lock:
9851066
self._event_fetch_list.append((events, events_d))
986-
9871067
self._event_fetch_lock.notify()
9881068

989-
if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
990-
self._event_fetch_ongoing += 1
991-
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
992-
should_start = True
993-
else:
994-
should_start = False
995-
996-
if should_start:
997-
run_as_background_process(
998-
"fetch_events", self.db_pool.runWithConnection, self._do_fetch
999-
)
1069+
self._maybe_start_fetch_thread()
10001070

10011071
logger.debug("Loading %d events: %s", len(events), events)
10021072
with PreserveLoggingContext():

tests/storage/databases/main/test_events_worker.py

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,24 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import json
15+
from contextlib import contextmanager
16+
from typing import Generator
1517

18+
from twisted.enterprise.adbapi import ConnectionPool
19+
from twisted.internet.defer import ensureDeferred
20+
from twisted.test.proto_helpers import MemoryReactor
21+
22+
from synapse.api.room_versions import EventFormatVersions, RoomVersions
1623
from synapse.logging.context import LoggingContext
1724
from synapse.rest import admin
1825
from synapse.rest.client import login, room
19-
from synapse.storage.databases.main.events_worker import EventsWorkerStore
26+
from synapse.server import HomeServer
27+
from synapse.storage.databases.main.events_worker import (
28+
EVENT_QUEUE_THREADS,
29+
EventsWorkerStore,
30+
)
31+
from synapse.storage.types import Connection
32+
from synapse.util import Clock
2033
from synapse.util.async_helpers import yieldable_gather_results
2134

2235
from tests import unittest
@@ -144,3 +157,127 @@ def test_dedupe(self):
144157

145158
# We should have fetched the event from the DB
146159
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
160+
161+
162+
class DatabaseOutageTestCase(unittest.HomeserverTestCase):
163+
"""Test event fetching during a database outage."""
164+
165+
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
166+
self.store: EventsWorkerStore = hs.get_datastore()
167+
168+
self.room_id = f"!room:{hs.hostname}"
169+
self.event_ids = [f"event{i}" for i in range(20)]
170+
171+
self._populate_events()
172+
173+
def _populate_events(self) -> None:
174+
"""Ensure that there are test events in the database.
175+
176+
When testing with the in-memory SQLite database, all the events are lost during
177+
the simulated outage.
178+
179+
To ensure consistency between `room_id`s and `event_id`s before and after the
180+
outage, rows are built and inserted manually.
181+
182+
Upserts are used to handle the non-SQLite case where events are not lost.
183+
"""
184+
self.get_success(
185+
self.store.db_pool.simple_upsert(
186+
"rooms",
187+
{"room_id": self.room_id},
188+
{"room_version": RoomVersions.V4.identifier},
189+
)
190+
)
191+
192+
self.event_ids = [f"event{i}" for i in range(20)]
193+
for idx, event_id in enumerate(self.event_ids):
194+
self.get_success(
195+
self.store.db_pool.simple_upsert(
196+
"events",
197+
{"event_id": event_id},
198+
{
199+
"event_id": event_id,
200+
"room_id": self.room_id,
201+
"topological_ordering": idx,
202+
"stream_ordering": idx,
203+
"type": "test",
204+
"processed": True,
205+
"outlier": False,
206+
},
207+
)
208+
)
209+
self.get_success(
210+
self.store.db_pool.simple_upsert(
211+
"event_json",
212+
{"event_id": event_id},
213+
{
214+
"room_id": self.room_id,
215+
"json": json.dumps({"type": "test", "room_id": self.room_id}),
216+
"internal_metadata": "{}",
217+
"format_version": EventFormatVersions.V3,
218+
},
219+
)
220+
)
221+
222+
@contextmanager
223+
def _outage(self) -> Generator[None, None, None]:
224+
"""Simulate a database outage.
225+
226+
Returns:
227+
A context manager. While the context is active, any attempts to connect to
228+
the database will fail.
229+
"""
230+
connection_pool = self.store.db_pool._db_pool
231+
232+
# Close all connections and shut down the database `ThreadPool`.
233+
connection_pool.close()
234+
235+
# Restart the database `ThreadPool`.
236+
connection_pool.start()
237+
238+
original_connection_factory = connection_pool.connectionFactory
239+
240+
def connection_factory(_pool: ConnectionPool) -> Connection:
241+
raise Exception("Could not connect to the database.")
242+
243+
connection_pool.connectionFactory = connection_factory # type: ignore[assignment]
244+
try:
245+
yield
246+
finally:
247+
connection_pool.connectionFactory = original_connection_factory
248+
249+
# If the in-memory SQLite database is being used, all the events are gone.
250+
# Restore the test data.
251+
self._populate_events()
252+
253+
def test_failure(self) -> None:
254+
"""Test that event fetches do not get stuck during a database outage."""
255+
with self._outage():
256+
failure = self.get_failure(
257+
self.store.get_event(self.event_ids[0]), Exception
258+
)
259+
self.assertEqual(str(failure.value), "Could not connect to the database.")
260+
261+
def test_recovery(self) -> None:
262+
"""Test that event fetchers recover after a database outage."""
263+
with self._outage():
264+
# Kick off a bunch of event fetches but do not pump the reactor
265+
event_deferreds = []
266+
for event_id in self.event_ids:
267+
event_deferreds.append(ensureDeferred(self.store.get_event(event_id)))
268+
269+
# We should have maxed out on event fetcher threads
270+
self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS)
271+
272+
# All the event fetchers will fail
273+
self.pump()
274+
self.assertEqual(self.store._event_fetch_ongoing, 0)
275+
276+
for event_deferred in event_deferreds:
277+
failure = self.get_failure(event_deferred, Exception)
278+
self.assertEqual(
279+
str(failure.value), "Could not connect to the database."
280+
)
281+
282+
# This next event fetch should succeed
283+
self.get_success(self.store.get_event(self.event_ids[0]))

0 commit comments

Comments
 (0)