Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19211.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Expire sliding sync connections that are too old or have too much pending data.
Comment thread
erikjohnston marked this conversation as resolved.
34 changes: 32 additions & 2 deletions synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
EventTypes,
Membership,
)
from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event
Expand Down Expand Up @@ -77,6 +78,15 @@
logger = logging.getLogger(__name__)


# Minimum time in milliseconds since the last sync before we consider expiring
# the connection due to too many rooms to send. This stops from getting into
# tight loops with clients that request lots of data at once.
MINIMUM_NOT_USED_AGE_EXPIRY_MS = 60 * 60 * 1000 # 1 hour
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
Comment thread
erikjohnston marked this conversation as resolved.
Outdated

# How many rooms with updates we allow before we consider the connection
# expired due to too many rooms to send.
NUM_ROOMS_THRESHOLD = 100
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
Comment thread
MadLittleMods marked this conversation as resolved.
Outdated

# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = RoomsForUserStateReset | RoomsForUser | RoomsForUserSlidingSync
Expand Down Expand Up @@ -176,6 +186,7 @@ def __init__(self, hs: "HomeServer"):
self.storage_controllers = hs.get_storage_controllers()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.is_mine_id = hs.is_mine_id
self._clock = hs.get_clock()

async def compute_interested_rooms(
self,
Expand Down Expand Up @@ -857,11 +868,30 @@ async def _filter_relevant_rooms_to_send(

# We only need to check for new events since any state changes
# will also come down as new events.
rooms_that_have_updates = (
self.store.get_rooms_that_might_have_updates(

rooms_that_have_updates = await (
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
Comment thread
erikjohnston marked this conversation as resolved.
relevant_room_map.keys(), from_token.room_key
)
)

# Check if we have lots of updates to send, if so then its
# better for us to tell the client to do a full resync
# instead.
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
#
# We only do this if the last sync was over
# `MINIMUM_NOT_USED_AGE_EXPIRY_MS` to ensure we don't get
# into tight loops with clients that keep requesting large
# sliding sync windows.
if len(rooms_that_have_updates) > NUM_ROOMS_THRESHOLD:
last_sync_ts = previous_connection_state.last_used_ts
if (
last_sync_ts is not None
and (self._clock.time_msec() - last_sync_ts)
> MINIMUM_NOT_USED_AGE_EXPIRY_MS
):
raise SlidingSyncUnknownPosition()

rooms_should_send.update(rooms_that_have_updates)
relevant_rooms_to_send_map = {
room_id: room_sync_config
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sliding_sync/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def get_and_clear_connection_positions(
"""
# If this is our first request, there is no previous connection state to fetch out of the database
if from_token is None or from_token.connection_position == 0:
return PerConnectionState()
return PerConnectionState(last_used_ts=None)

conn_id = sync_config.conn_id or ""

Expand Down
66 changes: 64 additions & 2 deletions synapse/storage/databases/main/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.logging.opentracing import log_kv
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
Expand All @@ -45,6 +46,19 @@
logger = logging.getLogger(__name__)


# How often to update the `last_used_ts` column on
# `sliding_sync_connection_positions` when the client uses a connection
# position. We don't want to update it on every use to avoid excessive
# writes, but we want it to be reasonably up-to-date to help with
# cleaning up old connection positions.
UPDATE_INTERVAL_LAST_USED_TS_MS = 5 * 60_000
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
Comment thread
erikjohnston marked this conversation as resolved.
Outdated


# Time in milliseconds the connection hasn't been used before we consider it
# expired and delete it.
CONNECTION_EXPIRY_MS = 7 * 24 * 60 * 60 * 1000 # 7 days
Comment thread
erikjohnston marked this conversation as resolved.
Outdated


class SlidingSyncStore(SQLBaseStore):
def __init__(
self,
Expand Down Expand Up @@ -76,6 +90,12 @@ def __init__(
replaces_index="sliding_sync_membership_snapshots_user_id",
)

if self.hs.config.worker.run_background_tasks:
self.clock.looping_call(
self.delete_old_sliding_sync_connections,
1 * 60 * 60 * 1000, # every hour
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
)

async def get_latest_bump_stamp_for_room(
self,
room_id: str,
Expand Down Expand Up @@ -162,13 +182,19 @@ def persist_per_connection_state_txn(
if previous_connection_position is not None:
# The `previous_connection_position` is a user-supplied value, so we
# need to make sure that the one they supplied is actually theirs.
#
# We take out a `FOR UPDATE` lock on the row to prevent races with
# the connection deletion. If the connection gets deleted underneath
# then the query will return no rows and we raise
# `SlidingSyncUnknownPosition` exception.
sql = """
SELECT connection_key
FROM sliding_sync_connection_positions
INNER JOIN sliding_sync_connections USING (connection_key)
WHERE
connection_position = ?
AND user_id = ? AND effective_device_id = ? AND conn_id = ?
FOR UPDATE
"""
txn.execute(
sql, (previous_connection_position, user_id, device_id, conn_id)
Expand Down Expand Up @@ -202,6 +228,7 @@ def persist_per_connection_state_txn(
"effective_device_id": device_id,
"conn_id": conn_id,
"created_ts": self.clock.time_msec(),
"last_used_ts": self.clock.time_msec(),
},
returning=("connection_key",),
)
Expand Down Expand Up @@ -384,7 +411,7 @@ def _get_and_clear_connection_positions_txn(
# The `previous_connection_position` is a user-supplied value, so we
# need to make sure that the one they supplied is actually theirs.
sql = """
SELECT connection_key
SELECT connection_key, last_used_ts
FROM sliding_sync_connection_positions
INNER JOIN sliding_sync_connections USING (connection_key)
WHERE
Expand All @@ -396,7 +423,20 @@ def _get_and_clear_connection_positions_txn(
if row is None:
raise SlidingSyncUnknownPosition()

(connection_key,) = row
(connection_key, last_used_ts) = row

# Update the `last_used_ts` if it's due to be updated. We don't update
# every time to avoid excessive writes.
now = self.clock.time_msec()
if last_used_ts is None or now - last_used_ts > UPDATE_INTERVAL_LAST_USED_TS_MS:
self.db_pool.simple_update_txn(
txn,
table="sliding_sync_connections",
keyvalues={
"connection_key": connection_key,
},
updatevalues={"last_used_ts": now},
)

# Now that we have seen the client has received and used the connection
# position, we can delete all the other connection positions.
Expand Down Expand Up @@ -480,12 +520,30 @@ def _get_and_clear_connection_positions_txn(
logger.warning("Unrecognized sliding sync stream in DB %r", stream)

return PerConnectionStateDB(
last_used_ts=last_used_ts,
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
room_configs=room_configs,
)

@wrap_as_background_process("delete_old_sliding_sync_connections")
async def delete_old_sliding_sync_connections(self) -> None:
"""Delete sliding sync connections that have not been used for a long time."""
cutoff_ts = self.clock.time_msec() - CONNECTION_EXPIRY_MS

def delete_old_sliding_sync_connections_txn(txn: LoggingTransaction) -> None:
sql = """
DELETE FROM sliding_sync_connections
WHERE last_used_ts IS NOT NULL AND last_used_ts < ?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like we should also have a background job that fills in last_used_ts for all of the existing rows so we can clean them all up at some point.

Could be split out to another PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am a bit undecided whether next release we just delete all connections with a null last_used_ts and take care of the problem that way. So happy to punt to a separate PR.

"""
txn.execute(sql, (cutoff_ts,))

await self.db_pool.runInteraction(
"delete_old_sliding_sync_connections",
delete_old_sliding_sync_connections_txn,
)


@attr.s(auto_attribs=True, frozen=True)
class PerConnectionStateDB:
Expand All @@ -498,6 +556,8 @@ class PerConnectionStateDB:
When persisting this *only* contains updates to the state.
"""

last_used_ts: int | None

rooms: "RoomStatusMap[str]"
receipts: "RoomStatusMap[str]"
account_data: "RoomStatusMap[str]"
Expand Down Expand Up @@ -553,6 +613,7 @@ async def from_state(
)

return PerConnectionStateDB(
last_used_ts=per_connection_state.last_used_ts,
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
Expand Down Expand Up @@ -596,6 +657,7 @@ async def to_state(self, store: "DataStore") -> "PerConnectionState":
}

return PerConnectionState(
last_used_ts=self.last_used_ts,
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
Expand Down
16 changes: 16 additions & 0 deletions synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 Element Creations, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

-- Add a timestamp for when the sliding sync connection position was last used,
-- only updated with a small granularity.
ALTER TABLE sliding_sync_connections ADD COLUMN last_used_ts BIGINT;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does last_used_ts need an index to be efficient with the delete query?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a comment:

-- Note: We don't add an index on this column to allow HOT updates on PostgreSQL
-- to reduce the cost of the updates to the column. c.f.
-- https://www.postgresql.org/docs/current/storage-hot.html
--
-- We do query this column directly to find expired connections, but we expect
-- that to be an infrequent operation and a sequential scan should be fine.

Comment thread
erikjohnston marked this conversation as resolved.
8 changes: 8 additions & 0 deletions synapse/types/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,12 +850,16 @@ class PerConnectionState:
since the last time you made a sync request.

Attributes:
last_used_ts: The time this connection was last used, in milliseconds.
This is only accurate to `UPDATE_CONNECTION_STATE_EVERY_MS`.
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""

last_used_ts: int | None = None

rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap)
Expand All @@ -867,6 +871,7 @@ def get_mutable(self) -> "MutablePerConnectionState":
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)

return MutablePerConnectionState(
last_used_ts=self.last_used_ts,
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
account_data=self.account_data.get_mutable(),
Expand All @@ -875,6 +880,7 @@ def get_mutable(self) -> "MutablePerConnectionState":

def copy(self) -> "PerConnectionState":
return PerConnectionState(
last_used_ts=self.last_used_ts,
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
account_data=self.account_data.copy(),
Expand All @@ -889,6 +895,8 @@ def __len__(self) -> int:
class MutablePerConnectionState(PerConnectionState):
"""A mutable version of `PerConnectionState`"""

last_used_ts: int | None

rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
account_data: MutableRoomStatusMap[int]
Expand Down
Loading
Loading