-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Safe async event cache #13308
Safe async event cache #13308
Changes from 6 commits
6beae64
65b47e1
87001f8
c170748
be84644
9ef18e4
e90f402
b8ea213
d39d9bf
a2d2312
bd174cc
e71ac4a
22f114d
771c29d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix async get event cache invalidation logic. Contributed by Nick @ Beeper (@fizzadar). | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| Awaitable, | ||
| Callable, | ||
| Collection, | ||
| Dict, | ||
|
|
@@ -33,6 +34,7 @@ | |
| Tuple, | ||
| Type, | ||
| TypeVar, | ||
| Union, | ||
| cast, | ||
| overload, | ||
| ) | ||
|
|
@@ -57,7 +59,7 @@ | |
| from synapse.storage.background_updates import BackgroundUpdater | ||
| from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine | ||
| from synapse.storage.types import Connection, Cursor | ||
| from synapse.util.async_helpers import delay_cancellation, maybe_awaitable | ||
| from synapse.util.async_helpers import delay_cancellation | ||
| from synapse.util.iterutils import batch_iter | ||
|
|
||
| if TYPE_CHECKING: | ||
|
|
@@ -208,7 +210,9 @@ def __getattr__(self, name: str) -> Any: | |
|
|
||
|
|
||
| # The type of entry which goes on our after_callbacks and exception_callbacks lists. | ||
| _CallbackListEntry = Tuple[Callable[..., object], Tuple[object, ...], Dict[str, object]] | ||
| _CallbackListEntry = Tuple[ | ||
| Callable[..., Union[object, Awaitable]], Tuple[object, ...], Dict[str, object] | ||
| ] | ||
|
|
||
| P = ParamSpec("P") | ||
| R = TypeVar("R") | ||
|
|
@@ -796,6 +800,20 @@ async def runInteraction( | |
| The result of func | ||
| """ | ||
|
|
||
| async def _run_callbacks(callbacks: List[_CallbackListEntry]): | ||
| sync_callbacks: List[_CallbackListEntry] = [] | ||
|
|
||
| for cb, args, kwargs in callbacks: | ||
| if inspect.iscoroutinefunction(cb): | ||
|
||
| awaitable = cb(*args, **kwargs) | ||
| assert isinstance(awaitable, Awaitable) | ||
| await awaitable | ||
| else: | ||
| sync_callbacks.append((cb, args, kwargs)) | ||
|
|
||
| for cb, args, kwargs in sync_callbacks: | ||
| cb(*args, **kwargs) | ||
|
||
|
|
||
| async def _runInteraction() -> R: | ||
| after_callbacks: List[_CallbackListEntry] = [] | ||
| exception_callbacks: List[_CallbackListEntry] = [] | ||
|
|
@@ -817,15 +835,10 @@ async def _runInteraction() -> R: | |
| **kwargs, | ||
| ) | ||
|
|
||
| for after_callback, after_args, after_kwargs in after_callbacks: | ||
| await maybe_awaitable(after_callback(*after_args, **after_kwargs)) | ||
|
|
||
| await _run_callbacks(after_callbacks) | ||
| return cast(R, result) | ||
| except Exception: | ||
| for exception_callback, after_args, after_kwargs in exception_callbacks: | ||
| await maybe_awaitable( | ||
| exception_callback(*after_args, **after_kwargs) | ||
| ) | ||
| await _run_callbacks(exception_callbacks) | ||
| raise | ||
|
|
||
| # To handle cancellation, we ensure that `after_callback`s and | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.