diff --git a/changelog.d/19394.bugfix b/changelog.d/19394.bugfix new file mode 100644 index 00000000000..02131d89baf --- /dev/null +++ b/changelog.d/19394.bugfix @@ -0,0 +1 @@ +Capped the `WorkerLock` time out interval to a maximum of 15 minutes to prevent dealing with excessively long numbers and prevent logging when the retry is not an actual time out. Contributed by Famedly. diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 1537a18cc05..88ecfd6318a 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -208,7 +208,7 @@ class WaitingLock: write: bool | None deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) _inner_lock: Lock | None = None - _retry_interval: float = 0.1 + _timeout_interval: float = 0.1 _lock_span: "opentracing.Scope" = attr.Factory( lambda: start_active_span("WaitingLock.lock") ) @@ -240,19 +240,23 @@ async def __aenter__(self) -> None: break try: - # Wait until the we get notified the lock might have been + # Wait until the notification the lock might have been # released (by the deferred being resolved). We also - # periodically wake up in case the lock was released but we + # periodically wake up in case the lock was released, but we # weren't notified. with PreserveLoggingContext(): - timeout = self._get_next_retry_interval() await timeout_deferred( deferred=self.deferred, - timeout=timeout, + timeout=self._timeout_interval, clock=self.clock, ) - except Exception: - pass + except defer.TimeoutError: + # Only increment the timeout interval if this was an actual timeout + self._timeout_interval = self._increment_timeout_interval() + except Exception as e: + logger.warning( + "Caught an exception while waiting on WaitingLock: %r", e + ) return await self._inner_lock.__aenter__() @@ -273,13 +277,13 @@ async def __aexit__( return r - def _get_next_retry_interval(self) -> float: - next = self._retry_interval - self._retry_interval = max(5, next * 2) - if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations + def _increment_timeout_interval(self) -> float: + next = self._timeout_interval + next = min(Duration(minutes=15).as_secs(), next * 2) + if next > Duration(minutes=10).as_secs(): # >12 iterations logger.warning( "Lock timeout is getting excessive: %ss. There may be a deadlock.", - self._retry_interval, + next, ) return next * random.uniform(0.9, 1.1) @@ -297,7 +301,7 @@ class WaitingMultiLock: deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) _inner_lock_cm: AsyncContextManager | None = None - _retry_interval: float = 0.1 + _timeout_interval: float = 0.1 _lock_span: "opentracing.Scope" = attr.Factory( lambda: start_active_span("WaitingLock.lock") ) @@ -324,19 +328,23 @@ async def __aenter__(self) -> None: break try: - # Wait until the we get notified the lock might have been + # Wait until the notification the lock might have been # released (by the deferred being resolved). We also - # periodically wake up in case the lock was released but we + # periodically wake up in case the lock was released, but we # weren't notified. with PreserveLoggingContext(): - timeout = self._get_next_retry_interval() await timeout_deferred( deferred=self.deferred, - timeout=timeout, + timeout=self._timeout_interval, clock=self.clock, ) - except Exception: - pass + except defer.TimeoutError: + # Only increment the timeout interval if this was an actual timeout + self._timeout_interval = self._increment_timeout_interval() + except Exception as e: + logger.warning( + "Caught an exception while waiting on WaitingMultiLock: %r", e + ) assert self._inner_lock_cm await self._inner_lock_cm.__aenter__() @@ -360,12 +368,12 @@ async def __aexit__( return r - def _get_next_retry_interval(self) -> float: - next = self._retry_interval - self._retry_interval = max(5, next * 2) - if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations + def _increment_timeout_interval(self) -> float: + next = self._timeout_interval + next = min(Duration(minutes=15).as_secs(), next * 2) + if next > Duration(minutes=10).as_secs(): # >12 iterations logger.warning( "Lock timeout is getting excessive: %ss. There may be a deadlock.", - self._retry_interval, + next, ) return next * random.uniform(0.9, 1.1) diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py index 61ff51ff923..c5c3ce22efd 100644 --- a/tests/handlers/test_worker_lock.py +++ b/tests/handlers/test_worker_lock.py @@ -21,6 +21,7 @@ import logging import platform +from unittest.mock import patch from twisted.internet import defer from twisted.internet.testing import MemoryReactor @@ -48,13 +49,47 @@ def test_wait_for_lock_locally(self) -> None: self.get_success(lock1.__aenter__()) lock2 = self.worker_lock_handler.acquire_lock("name", "key") - d2 = defer.ensureDeferred(lock2.__aenter__()) - self.assertNoResult(d2) - - self.get_success(lock1.__aexit__(None, None, None)) + # Wrap the WaitingLock object, so we can detect if the timeouts are being hit + with patch.object( + lock2, + "_increment_timeout_interval", + wraps=lock2._increment_timeout_interval, + ) as wrapped_lock2_increment_timeout_interval_method: + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + # The lock should not time out here + wrapped_lock2_increment_timeout_interval_method.assert_not_called() + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None)) + + def test_timeouts_for_lock_locally(self) -> None: + """Test timeouts are incremented for a lock on a single worker""" + lock1 = self.worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) - self.get_success(d2) - self.get_success(lock2.__aexit__(None, None, None)) + lock2 = self.worker_lock_handler.acquire_lock("name", "key") + # Wrap the WaitingLock object, so we can detect if the timeouts are being hit + with patch.object( + lock2, + "_increment_timeout_interval", + wraps=lock2._increment_timeout_interval, + ) as wrapped_lock2_increment_timeout_interval_method: + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + # Recall that pump() will advance time of the given amount 100 times, this + # amounts to about 10 seconds passing + self.pump(0.1) + + # Should be timed out 6 times, but do not fail on that exact count + wrapped_lock2_increment_timeout_interval_method.assert_called() + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None)) def test_lock_contention(self) -> None: """Test lock contention when a lot of locks wait on a single worker""" @@ -117,10 +152,52 @@ def test_wait_for_lock_worker(self) -> None: self.get_success(lock1.__aenter__()) lock2 = worker_lock_handler.acquire_lock("name", "key") - d2 = defer.ensureDeferred(lock2.__aenter__()) - self.assertNoResult(d2) + # Wrap the WaitingLock object, so we can detect if the timeouts are being hit + with patch.object( + lock2, + "_increment_timeout_interval", + wraps=lock2._increment_timeout_interval, + ) as wrapped_lock2_increment_timeout_interval_method: + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + # The lock should not time out here + wrapped_lock2_increment_timeout_interval_method.assert_not_called() + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None)) + + def test_timeouts_for_lock_worker(self) -> None: + """Test timeouts are incremented for a lock on another worker""" + worker = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "redis": {"enabled": True}, + }, + ) + worker_lock_handler = worker.get_worker_locks_handler() - self.get_success(lock1.__aexit__(None, None, None)) + lock1 = self.main_worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) - self.get_success(d2) - self.get_success(lock2.__aexit__(None, None, None)) + lock2 = worker_lock_handler.acquire_lock("name", "key") + # Wrap the WaitingLock object, so we can detect if the timeouts are being hit + with patch.object( + lock2, + "_increment_timeout_interval", + wraps=lock2._increment_timeout_interval, + ) as wrapped_lock2_increment_timeout_interval_method: + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + # Recall that pump() will advance time of the given amount 100 times, this + # amounts to about 10 seconds passing + self.pump(0.1) + + # Should be timed out 6 times, but do not fail on that exact count + wrapped_lock2_increment_timeout_interval_method.assert_called() + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None))