Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 0244ef3

Browse files
author
Sean Quah
committed
Track ongoing event fetches correctly (again)
The previous fix for the ongoing event fetches counter (8eec25a) was both insufficient and incorrect. When the database is unreachable, `_do_fetch` never gets run and so `_event_fetch_ongoing` is never decremented. The previous fix also moved the `_event_fetch_ongoing` decrement outside of the `_event_fetch_lock` which allowed race conditions to corrupt the counter.
1 parent 319dcb9 commit 0244ef3

File tree

3 files changed

+168
-30
lines changed

3 files changed

+168
-30
lines changed

changelog.d/11376.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection, for real this time. Also fix a race condition introduced in the previous insufficient fix in 1.47.0.

synapse/storage/databases/main/events_worker.py

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ async def _get_events_from_cache_or_db(
602602
# already due to `_get_events_from_db`).
603603
fetching_deferred: ObservableDeferred[
604604
Dict[str, _EventCacheEntry]
605-
] = ObservableDeferred(defer.Deferred())
605+
] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
606606
for event_id in missing_events_ids:
607607
self._current_event_fetches[event_id] = fetching_deferred
608608

@@ -736,35 +736,56 @@ async def get_stripped_room_state_from_event_context(
736736
for e in state_to_include.values()
737737
]
738738

739-
def _do_fetch(self, conn: Connection) -> None:
739+
async def _do_fetch(self) -> None:
740+
"""Services requests for events from the `_event_fetch_list` queue."""
741+
try:
742+
await self.db_pool.runWithConnection(self._do_fetch_txn)
743+
except BaseException as e:
744+
with self._event_fetch_lock:
745+
self._event_fetch_ongoing -= 1
746+
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
747+
748+
if self._event_fetch_ongoing == 0 and self._event_fetch_list:
749+
# We are the last remaining fetcher and we have just failed.
750+
# Fail any outstanding event fetches, since no one else will process
751+
# them.
752+
failed_event_list = self._event_fetch_list
753+
self._event_fetch_list = []
754+
else:
755+
failed_event_list = []
756+
757+
for _, deferred in failed_event_list:
758+
if not deferred.called:
759+
with PreserveLoggingContext():
760+
deferred.errback(e)
761+
762+
def _do_fetch_txn(self, conn: Connection) -> None:
740763
"""Takes a database connection and waits for requests for events from
741764
the _event_fetch_list queue.
742765
"""
743-
try:
744-
i = 0
745-
while True:
746-
with self._event_fetch_lock:
747-
event_list = self._event_fetch_list
748-
self._event_fetch_list = []
766+
i = 0
767+
while True:
768+
with self._event_fetch_lock:
769+
event_list = self._event_fetch_list
770+
self._event_fetch_list = []
771+
772+
if not event_list:
773+
single_threaded = self.database_engine.single_threaded
774+
if (
775+
not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
776+
or single_threaded
777+
or i > EVENT_QUEUE_ITERATIONS
778+
):
779+
self._event_fetch_ongoing -= 1
780+
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
781+
return
782+
else:
783+
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
784+
i += 1
785+
continue
786+
i = 0
749787

750-
if not event_list:
751-
single_threaded = self.database_engine.single_threaded
752-
if (
753-
not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
754-
or single_threaded
755-
or i > EVENT_QUEUE_ITERATIONS
756-
):
757-
break
758-
else:
759-
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
760-
i += 1
761-
continue
762-
i = 0
763-
764-
self._fetch_event_list(conn, event_list)
765-
finally:
766-
self._event_fetch_ongoing -= 1
767-
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
788+
self._fetch_event_list(conn, event_list)
768789

769790
def _fetch_event_list(
770791
self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]]
@@ -994,9 +1015,7 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]:
9941015
should_start = False
9951016

9961017
if should_start:
997-
run_as_background_process(
998-
"fetch_events", self.db_pool.runWithConnection, self._do_fetch
999-
)
1018+
run_as_background_process("fetch_events", self._do_fetch)
10001019

10011020
logger.debug("Loading %d events: %s", len(events), events)
10021021
with PreserveLoggingContext():

tests/storage/databases/main/test_events_worker.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,24 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import json
15+
from contextlib import contextmanager
16+
from typing import Generator
1517

18+
from twisted.enterprise.adbapi import ConnectionPool
19+
from twisted.internet.defer import ensureDeferred
20+
from twisted.test.proto_helpers import MemoryReactor
21+
22+
from synapse.api.room_versions import EventFormatVersions, RoomVersions
1623
from synapse.logging.context import LoggingContext
1724
from synapse.rest import admin
1825
from synapse.rest.client import login, room
19-
from synapse.storage.databases.main.events_worker import EventsWorkerStore
26+
from synapse.server import HomeServer
27+
from synapse.storage.databases.main.events_worker import (
28+
EVENT_QUEUE_THREADS,
29+
EventsWorkerStore,
30+
)
31+
from synapse.storage.types import Connection
32+
from synapse.util import Clock
2033
from synapse.util.async_helpers import yieldable_gather_results
2134

2235
from tests import unittest
@@ -144,3 +157,108 @@ def test_dedupe(self):
144157

145158
# We should have fetched the event from the DB
146159
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
160+
161+
162+
class DatabaseOutageTestCase(unittest.HomeserverTestCase):
163+
"""Test event fetching during a database outage."""
164+
165+
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
166+
self.store: EventsWorkerStore = hs.get_datastore()
167+
168+
self.room_id = f"!room:{hs.hostname}"
169+
self.event_ids = [f"event{i}" for i in range(20)]
170+
171+
self._populate_events()
172+
173+
def _populate_events(self) -> None:
174+
"""Ensure that there are test events in the database."""
175+
self.get_success(
176+
self.store.db_pool.simple_upsert(
177+
"rooms",
178+
{"room_id": self.room_id},
179+
{"room_version": RoomVersions.V4.identifier},
180+
)
181+
)
182+
183+
self.event_ids = [f"event{i}" for i in range(20)]
184+
for idx, event_id in enumerate(self.event_ids):
185+
self.get_success(
186+
self.store.db_pool.simple_upsert(
187+
"events",
188+
{"event_id": event_id},
189+
{
190+
"event_id": event_id,
191+
"room_id": self.room_id,
192+
"topological_ordering": idx,
193+
"stream_ordering": idx,
194+
"type": "test",
195+
"processed": True,
196+
"outlier": False,
197+
},
198+
)
199+
)
200+
self.get_success(
201+
self.store.db_pool.simple_upsert(
202+
"event_json",
203+
{"event_id": event_id},
204+
{
205+
"room_id": self.room_id,
206+
"json": json.dumps({"type": "test", "room_id": self.room_id}),
207+
"internal_metadata": "{}",
208+
"format_version": EventFormatVersions.V3,
209+
},
210+
)
211+
)
212+
213+
@contextmanager
214+
def _outage(self) -> Generator[None, None, None]:
215+
"""Simulate a database outage."""
216+
connection_pool = self.store.db_pool._db_pool
217+
connection_pool.close()
218+
connection_pool.start()
219+
original_connection_factory = connection_pool.connectionFactory
220+
221+
def connection_factory(_pool: ConnectionPool) -> Connection:
222+
raise Exception("Could not connect to the database.")
223+
224+
connection_pool.connectionFactory = connection_factory # type: ignore[assignment]
225+
try:
226+
yield
227+
finally:
228+
connection_pool.connectionFactory = original_connection_factory
229+
230+
# If the in-memory SQLite database is being used, all the events are gone.
231+
# Restore the test data.
232+
self._populate_events()
233+
234+
def test_failure(self) -> None:
235+
"""Test that event fetches do not get stuck during a database outage."""
236+
with self._outage():
237+
failure = self.get_failure(
238+
self.store.get_event(self.event_ids[0]), Exception
239+
)
240+
self.assertEqual(str(failure.value), "Could not connect to the database.")
241+
242+
def test_recovery(self) -> None:
243+
"""Test that event fetchers recover after a database outage."""
244+
with self._outage():
245+
# Kick off a bunch of event fetches but do not pump the reactor
246+
event_deferreds = []
247+
for event_id in self.event_ids:
248+
event_deferreds.append(ensureDeferred(self.store.get_event(event_id)))
249+
250+
# We should have maxed out on event fetcher threads
251+
self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS)
252+
253+
# All the event fetchers will fail
254+
self.pump()
255+
self.assertEqual(self.store._event_fetch_ongoing, 0)
256+
257+
for event_deferred in event_deferreds:
258+
failure = self.get_failure(event_deferred, Exception)
259+
self.assertEqual(
260+
str(failure.value), "Could not connect to the database."
261+
)
262+
263+
# This next event fetch should succeed
264+
self.get_success(self.store.get_event(self.event_ids[0]))

0 commit comments

Comments
 (0)