Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19394.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Capped the `WorkerLock` time out interval to a maximum of 15 minutes to prevent dealing with excessively long numbers and prevent logging when the retry is not an actual time out. Contributed by Famedly.
56 changes: 32 additions & 24 deletions synapse/handlers/worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class WaitingLock:
write: bool | None
deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
_inner_lock: Lock | None = None
_retry_interval: float = 0.1
_timeout_interval: float = 0.1
_lock_span: "opentracing.Scope" = attr.Factory(
lambda: start_active_span("WaitingLock.lock")
)
Expand Down Expand Up @@ -240,19 +240,23 @@ async def __aenter__(self) -> None:
break

try:
# Wait until the we get notified the lock might have been
# Wait until the notification the lock might have been
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Wait until the notification the lock might have been
# Wait until the notification that the lock might have been

# released (by the deferred being resolved). We also
# periodically wake up in case the lock was released but we
# periodically wake up in case the lock was released, but we
# weren't notified.
with PreserveLoggingContext():
timeout = self._get_next_retry_interval()
await timeout_deferred(
deferred=self.deferred,
timeout=timeout,
timeout=self._timeout_interval,
clock=self.clock,
)
except Exception:
pass
except defer.TimeoutError:
# Only increment the timeout interval if this was an actual timeout
self._timeout_interval = self._increment_timeout_interval()
Comment on lines +254 to +255
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explain why in the comments,

Adjusted that a timeout interval should only be increased if an actual timeout was reached

This allows that a normal notification of another lock being released should not increment a timeout, when a time out has not actually occurred. It should cut down on what may end up otherwise being excessive log spam about locks having a long timeout duration when such is not true.

-- @jason-famedly, #19394 (comment)

except Exception as e:
logger.warning(
"Caught an exception while waiting on WaitingLock: %r", e
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better context, we can also add the self.lock_name and self.lock_key

Suggested change
"Caught an exception while waiting on WaitingLock: %r", e
"Caught an exception while waiting on WaitingLock(%s, %s): %r", self.lock_name, self.lock_key, e

)

return await self._inner_lock.__aenter__()

Expand All @@ -273,13 +277,13 @@ async def __aexit__(

return r

def _get_next_retry_interval(self) -> float:
next = self._retry_interval
self._retry_interval = max(5, next * 2)
if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations
def _increment_timeout_interval(self) -> float:
next = self._timeout_interval
next = min(Duration(minutes=15).as_secs(), next * 2)
if next > Duration(minutes=10).as_secs(): # >12 iterations
logger.warning(
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
self._retry_interval,
next,
)
Comment on lines 284 to 287
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already exists but we can improve this message:

Suggested change
logger.warning(
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
self._retry_interval,
next,
)
"WaitingLock(%s, %s): We are having to wait a long time for the lock. Wait timeout is getting excessive: %ss. There may be a deadlock.",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could do one step better and instead track the time when we start trying to acquire the lock in __aenter__ and be able to compare to the actual time we have been waiting overall. Then we can be more specific and say "We have been waiting over 10 minutes (excessive) to acquire the lock (%s, %s). There may be a deadlock."

Then it doesn't matter what's causing us to wait and we can get a more accurate picture.

Feel free to push this off to the future.

return next * random.uniform(0.9, 1.1)
Comment on lines +280 to 288
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably just keep track of the new self._timeout_interval in here instead of assigning outside of this.

I think this makes even more sense now that this is named _increment_timeout_interval

Comment on lines +280 to 288
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explain this in the comments:

The jitter value is maintained for the timeout, to help avoid a "thundering herd" situation when all locks may time out at the same time.


Expand All @@ -297,7 +301,7 @@ class WaitingMultiLock:
deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)

_inner_lock_cm: AsyncContextManager | None = None
_retry_interval: float = 0.1
_timeout_interval: float = 0.1
_lock_span: "opentracing.Scope" = attr.Factory(
lambda: start_active_span("WaitingLock.lock")
)
Expand All @@ -324,19 +328,23 @@ async def __aenter__(self) -> None:
break

try:
# Wait until the we get notified the lock might have been
# Wait until the notification the lock might have been
# released (by the deferred being resolved). We also
# periodically wake up in case the lock was released but we
# periodically wake up in case the lock was released, but we
# weren't notified.
with PreserveLoggingContext():
timeout = self._get_next_retry_interval()
await timeout_deferred(
deferred=self.deferred,
timeout=timeout,
timeout=self._timeout_interval,
clock=self.clock,
)
except Exception:
pass
except defer.TimeoutError:
# Only increment the timeout interval if this was an actual timeout
self._timeout_interval = self._increment_timeout_interval()
except Exception as e:
logger.warning(
"Caught an exception while waiting on WaitingMultiLock: %r", e
)

assert self._inner_lock_cm
await self._inner_lock_cm.__aenter__()
Expand All @@ -360,12 +368,12 @@ async def __aexit__(

return r

def _get_next_retry_interval(self) -> float:
next = self._retry_interval
self._retry_interval = max(5, next * 2)
if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations
def _increment_timeout_interval(self) -> float:
next = self._timeout_interval
next = min(Duration(minutes=15).as_secs(), next * 2)
if next > Duration(minutes=10).as_secs(): # >12 iterations
logger.warning(
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
self._retry_interval,
next,
)
return next * random.uniform(0.9, 1.1)
99 changes: 88 additions & 11 deletions tests/handlers/test_worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import platform
from unittest.mock import patch

from twisted.internet import defer
from twisted.internet.testing import MemoryReactor
Expand Down Expand Up @@ -48,13 +49,47 @@ def test_wait_for_lock_locally(self) -> None:
self.get_success(lock1.__aenter__())

lock2 = self.worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)

self.get_success(lock1.__aexit__(None, None, None))
# Wrap the WaitingLock object, so we can detect if the timeouts are being hit
with patch.object(
lock2,
"_increment_timeout_interval",
wraps=lock2._increment_timeout_interval,
) as wrapped_lock2_increment_timeout_interval_method:
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)

# The lock should not time out here
wrapped_lock2_increment_timeout_interval_method.assert_not_called()
self.get_success(lock1.__aexit__(None, None, None))

self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))
Comment on lines +52 to +66
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous version of this test is only concerned that lock2 isn't acquired until lock1 releases.

I don't think we need to concern ourselves with internal details of timeouts here. We could have a separate test for this kind of thing but it doesn't have a ton of value.


def test_timeouts_for_lock_locally(self) -> None:
"""Test timeouts are incremented for a lock on a single worker"""
lock1 = self.worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())

self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))
lock2 = self.worker_lock_handler.acquire_lock("name", "key")
# Wrap the WaitingLock object, so we can detect if the timeouts are being hit
with patch.object(
lock2,
"_increment_timeout_interval",
wraps=lock2._increment_timeout_interval,
) as wrapped_lock2_increment_timeout_interval_method:
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)

# Recall that pump() will advance time of the given amount 100 times, this
# amounts to about 10 seconds passing
self.pump(0.1)

# Should be timed out 6 times, but do not fail on that exact count
wrapped_lock2_increment_timeout_interval_method.assert_called()
self.get_success(lock1.__aexit__(None, None, None))
Comment on lines +87 to +89
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to test whether we try_acquire_lock multiple times (what we actually care about).

Testing the internals isn't that useful.

Even more ideally, we wouldn't patch anything and instead acquire and release a lock silently (without notification) and see whether the second lock is still able to acquire some time later.


self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))

def test_lock_contention(self) -> None:
"""Test lock contention when a lot of locks wait on a single worker"""
Expand Down Expand Up @@ -117,10 +152,52 @@ def test_wait_for_lock_worker(self) -> None:
self.get_success(lock1.__aenter__())

lock2 = worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)
# Wrap the WaitingLock object, so we can detect if the timeouts are being hit
with patch.object(
lock2,
"_increment_timeout_interval",
wraps=lock2._increment_timeout_interval,
) as wrapped_lock2_increment_timeout_interval_method:
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)

# The lock should not time out here
wrapped_lock2_increment_timeout_interval_method.assert_not_called()
self.get_success(lock1.__aexit__(None, None, None))

self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))

def test_timeouts_for_lock_worker(self) -> None:
"""Test timeouts are incremented for a lock on another worker"""
worker = self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"redis": {"enabled": True},
},
)
worker_lock_handler = worker.get_worker_locks_handler()

self.get_success(lock1.__aexit__(None, None, None))
lock1 = self.main_worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())

self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))
lock2 = worker_lock_handler.acquire_lock("name", "key")
# Wrap the WaitingLock object, so we can detect if the timeouts are being hit
with patch.object(
lock2,
"_increment_timeout_interval",
wraps=lock2._increment_timeout_interval,
) as wrapped_lock2_increment_timeout_interval_method:
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)

# Recall that pump() will advance time of the given amount 100 times, this
# amounts to about 10 seconds passing
self.pump(0.1)

# Should be timed out 6 times, but do not fail on that exact count
wrapped_lock2_increment_timeout_interval_method.assert_called()
self.get_success(lock1.__aexit__(None, None, None))

self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))
Loading