Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17128.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve DB usage when fetching related events.
1 change: 1 addition & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _invalidate_state_caches_all(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
Expand Down
53 changes: 31 additions & 22 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import StrCollection
from synapse.util.caches.descriptors import CachedFunction
from synapse.util.iterutils import batch_iter

Expand Down Expand Up @@ -233,8 +234,8 @@ def process_replication_rows(
)

room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
server_joined = bool(row.keys.get(1, "true"))
self._invalidate_caches_for_room(room_id, server_joined)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -388,7 +389,6 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._invalidate_local_get_event_cache_room_id(room_id) # type: ignore[attr-defined]

self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
Expand All @@ -398,11 +398,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_applicable_edit", None)
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)

self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
Expand All @@ -417,17 +413,28 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)

def _invalidate_caches_for_room_and_stream(
self, txn: LoggingTransaction, room_id: str
self,
txn: LoggingTransaction,
room_id: str,
server_in_room: bool,
) -> None:
"""Invalidate caches associated with rooms, and stream to replication.

Used when we delete rooms.

Args:
txn
room_id
server_in_room: Whether the server was joined or invited to the
room when we deleted it.
Comment on lines +428 to +429
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care if the server has knocked on the room?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, now I can't remember why I included invite. It's basically whether we need to invalidate stuff due to current_state_events table changing, which should be empty unless the server is joined

"""

self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
txn.call_after(self._invalidate_caches_for_room, room_id)
self._send_invalidation_to_replication(
txn, DELETE_ROOM_CACHE_NAME, [room_id, "true" if server_in_room else ""]
)
txn.call_after(self._invalidate_caches_for_room, room_id, server_in_room)

def _invalidate_caches_for_room(self, room_id: str) -> None:
def _invalidate_caches_for_room(self, room_id: str, server_in_room: bool) -> None:
"""Invalidate caches associated with rooms.

Used when we delete rooms.
Expand All @@ -439,8 +446,16 @@ def _invalidate_caches_for_room(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_account_data_for_room", None)
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)

if server_in_room:
self._attempt_to_invalidate_cache(
"get_latest_event_ids_in_room", (room_id,)
)
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)

# And delete state caches.
self._invalidate_state_caches_all(room_id)

self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
Expand All @@ -453,19 +468,13 @@ def _invalidate_caches_for_room(self, room_id: str) -> None:
"_get_partial_state_servers_at_join", (room_id,)
)
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))

# And delete state caches.

self._invalidate_state_caches_all(room_id)

async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
Expand Down Expand Up @@ -560,7 +569,7 @@ def _invalidate_state_caches_and_stream(
for chunk in batch_iter(members_changed, 50):
keys = itertools.chain([room_id], chunk)
self._send_invalidation_to_replication(
txn, CURRENT_STATE_CACHE_NAME, keys
txn, CURRENT_STATE_CACHE_NAME, list(keys)
)
else:
# if no members changed, we still need to invalidate the other caches.
Expand All @@ -579,7 +588,7 @@ async def send_invalidation_to_replication(
)

def _send_invalidation_to_replication(
self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]]
self, txn: LoggingTransaction, cache_name: str, keys: Optional[StrCollection]
) -> None:
"""Notifies replication that given cache has been invalidated.

Expand Down
17 changes: 16 additions & 1 deletion synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
from typing import Any, List, Set, Tuple, cast

from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main import CacheInvalidationWorkerStore
Expand Down Expand Up @@ -376,6 +377,20 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
(room_id,),
)

server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
txn,
room_id,
host=self.hs.hostname,
membership=Membership.JOIN,
)
if not server_in_room:
server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
txn,
room_id,
host=self.hs.hostname,
membership=Membership.INVITE,
)

# First, fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
Expand Down Expand Up @@ -503,6 +518,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (https://github.com/matrix-org/synapse/issues/5888)

self._invalidate_caches_for_room_and_stream(txn, room_id)
self._invalidate_caches_for_room_and_stream(txn, room_id, server_in_room)

return state_groups
21 changes: 16 additions & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,17 @@ async def is_host_invited(self, room_id: str, host: str) -> bool:

async def _check_host_room_membership(
self, room_id: str, host: str, membership: str
) -> bool:
return await self.db_pool.runInteraction(
"is_host_joined",
self._check_host_room_membership_txn,
room_id,
host,
membership,
)

def _check_host_room_membership_txn(
self, txn: LoggingTransaction, room_id: str, host: str, membership: str
) -> bool:
if "%" in host or "_" in host:
raise Exception("Invalid host name")
Expand All @@ -980,14 +991,14 @@ async def _check_host_room_membership(
# the returned user actually has the correct domain.
like_clause = "%:" + host

rows = await self.db_pool.execute(
"is_host_joined", sql, membership, room_id, like_clause
)
txn.execute(sql, (membership, room_id, like_clause))

row = txn.fetchone()

if not rows:
if not row:
return False

user_id = rows[0][0]
user_id = row[0]
if get_domain_from_id(user_id) != host:
# This can only happen if the host name has something funky in it
raise Exception("Invalid host name")
Expand Down