Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19495.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[MSC4140: Cancellable delayed events](https://github.com/matrix-org/matrix-spec-proposals/pull/4140): No longer cancel a user's delayed state events due to a more recent state event sent from a different user.
217 changes: 2 additions & 215 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes, StickyEvent, StickyEventField
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.errors import ShadowBanError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag
from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions
from synapse.replication.http.delayed_events import (
ReplicationAddedDelayedEventRestServlet,
)
Expand All @@ -34,7 +33,6 @@
StateKey,
Timestamp,
)
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
JsonDict,
Requester,
Expand All @@ -44,8 +42,6 @@
)
from synapse.util.duration import Duration
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -76,28 +72,11 @@ def __init__(self, hs: "HomeServer"):

self._next_delayed_event_call: Optional[IDelayedCall] = None

# The current position in the current_state_delta stream
self._event_pos: int | None = None

# Guard to ensure we only process event deltas one at a time
self._event_processing = False

if hs.config.worker.worker_app is None:
self._repl_client = None

async def _schedule_db_events() -> None:
# We kick this off to pick up outstanding work from before the last restart.
# Block until we're up to date.
await self._unsafe_process_new_event()
hs.get_notifier().add_replication_callback(self.notify_new_event)
# Kick off again (without blocking) to catch any missed notifications
# that may have fired before the callback was added.
self._clock.call_later(
Duration(seconds=0),
self.notify_new_event,
)

# Now process any delayed events that are due to be sent.
# Process any delayed events that are due to be sent.
#
# We set `reprocess_events` to True in case any events had been
# marked as processed, but had not yet actually been sent,
Expand Down Expand Up @@ -131,198 +110,6 @@ async def _schedule_db_events() -> None:
def _is_master(self) -> bool:
return self._repl_client is None

def notify_new_event(self) -> None:
"""
Called when there may be more state event deltas to process,
which should cancel pending delayed events for the same state.
"""
if self._event_processing:
return

self._event_processing = True

async def process() -> None:
try:
await self._unsafe_process_new_event()
finally:
self._event_processing = False

self.hs.run_as_background_process("delayed_events.notify_new_event", process)

async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering

logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)

await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)

event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)

return

# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
logger.warning(
"Event stream ordering appears to have gone backwards (%i -> %i): "
"rewinding delayed events processor",
self._event_pos,
room_max_stream_ordering,
)
self._event_pos = room_max_stream_ordering

# Loop round handling deltas until we're up to date
while True:
with Measure(
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos >= room_max_stream_ordering:
return

logger.debug(
"Processing delayed events %s->%s",
self._event_pos,
room_max_stream_ordering,
)
(
max_pos,
deltas,
) = await self._storage_controllers.state.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)

logger.debug(
"Handling %d state deltas for delayed events processing",
len(deltas),
)
await self._handle_state_deltas(deltas)

self._event_pos = max_pos

# Expose current event processing position to prometheus
event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(max_pos)

await self._store.update_delayed_events_stream_pos(max_pos)

async def _handle_state_deltas(self, deltas: list[StateDelta]) -> None:
"""
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)

# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

# `delta.event_id` and `delta.sender` can be `None` in a few valid
# cases (see the docstring of
# `get_current_state_delta_membership_changes_for_user` for details).
if delta.event_id is None:
# TODO: Differentiate between this being caused by a state reset
# which removed a user from a room, or the homeserver
# purposefully having left the room. We can do so by checking
# whether there are any local memberships still left in the
# room. If so, then this is the result of a state reset.
#
# If it is a state reset, we should avoid cancelling new,
# delayed state events due to old state resurfacing. So we
# should skip and log a warning in this case.
#
# If the homeserver has left the room, then we should cancel all
# delayed state events intended for this room, as there is no
# need to try and send a delayed event into a room we've left.
logger.warning(
"Skipping state delta (%r, %r) without corresponding event ID. "
"This can happen if the homeserver has left the room (in which "
"case this can be ignored), or if there has been a state reset "
"which has caused the sender to be kicked out of the room",
delta.event_type,
delta.state_key,
)
continue

sender_str = event_id_and_sender_dict.get(
delta.event_id, Sentinel.UNSET_SENTINEL
)
if sender_str is None:
# An event exists, but the `sender` field was "null" and Synapse
# incorrectly accepted the event. This is not expected.
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was None. "
"This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
if sender_str is Sentinel.UNSET_SENTINEL:
# We have an event ID, but the event was not found in the
# datastore. This can happen if a room, or its history, is
# purged. State deltas related to the room are left behind, but
# the event no longer exists.
#
# As we cannot get the sender of this event, we can't calculate
# whether to cancel delayed events related to this one. So we skip.
logger.debug(
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
delta.event_id,
)
continue

try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue

next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,
event_type=delta.event_type,
state_key=delta.state_key,
not_from_localpart=(
sender.localpart
if sender.domain == self._config.server.server_name
else ""
),
)

if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)

async def add(
self,
requester: Requester,
Expand Down
70 changes: 0 additions & 70 deletions synapse/storage/databases/main/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,32 +85,6 @@ def __init__(
unique=True,
)

async def get_delayed_events_stream_pos(self) -> int:
"""
Gets the stream position of the background process to watch for state events
that target the same piece of state as any pending delayed events.
"""
return await self.db_pool.simple_select_one_onecol(
table="delayed_events_stream_pos",
keyvalues={},
retcol="stream_id",
desc="get_delayed_events_stream_pos",
)

async def update_delayed_events_stream_pos(self, stream_id: int | None) -> None:
"""
Updates the stream position of the background process to watch for state events
that target the same piece of state as any pending delayed events.

Must only be used by the worker running the background process.
"""
await self.db_pool.simple_update_one(
table="delayed_events_stream_pos",
keyvalues={},
updatevalues={"stream_id": stream_id},
desc="update_delayed_events_stream_pos",
)

async def add_delayed_event(
self,
*,
Expand Down Expand Up @@ -458,50 +432,6 @@ def cancel_delayed_event_txn(
"cancel_delayed_event", cancel_delayed_event_txn
)

async def cancel_delayed_state_events(
self,
*,
room_id: str,
event_type: str,
state_key: str,
not_from_localpart: str,
) -> Timestamp | None:
"""
Cancels all matching delayed state events, i.e. remove them as long as they haven't been processed.

Args:
room_id: The room ID to match against.
event_type: The event type to match against.
state_key: The state key to match against.
not_from_localpart: The localpart of a user whose delayed events to not cancel.
If set to the empty string, any users' delayed events may be cancelled.

Returns: The send time of the next delayed event to be sent, if any.
"""

def cancel_delayed_state_events_txn(
txn: LoggingTransaction,
) -> Timestamp | None:
txn.execute(
"""
DELETE FROM delayed_events
WHERE room_id = ? AND event_type = ? AND state_key = ?
AND user_localpart <> ?
AND NOT is_processed
""",
(
room_id,
event_type,
state_key,
not_from_localpart,
),
)
return self._get_next_delayed_event_send_ts_txn(txn)

return await self.db_pool.runInteraction(
"cancel_delayed_state_events", cancel_delayed_state_events_txn
)

async def delete_processed_delayed_event(self, delay_id: DelayID) -> None:
"""
Delete the matching delayed event, as long as it has been marked as processed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ CREATE INDEX delayed_events_send_ts ON delayed_events (send_ts);
CREATE INDEX delayed_events_is_processed ON delayed_events (is_processed);
CREATE INDEX delayed_events_room_state_event_idx ON delayed_events (room_id, event_type, state_key) WHERE state_key IS NOT NULL;

-- TODO: delete this table in the next Synapse release that can handle a breaking DB change
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possibly better tracked as a GitHub issue instead of a comment here.

More concretely, we need to study https://element-hq.github.io/synapse/latest/development/database_schema.html#synapse-schema-versions

There's a worked example with a table removal. Since I think it's fine to have degraded rollback support for this experimental feature (as in, I think it's probably OK to stop reading and writing to this table at the same time), I think for us this is something along the lines of:

  • SCHEMA_VERSION needs to be increased now, with a comment on the version saying that in the new SCHEMA_VERSION we no longer use this table.

  • In a later version, we can make SCHEMA_COMPAT_VERSION match that as we drop the table, preventing rollback to the older SCHEMA_VERSION.


Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • SCHEMA_VERSION needs to be increased now, with a comment on the version saying that in the new SCHEMA_VERSION we no longer use this table.

Since the schema isn't actually changed by this PR, would it be acceptable to put that comment on the latest SCHEMA_VERSION (which is a newer version than when the table was created) instead of creating an "empty" bump?

CREATE TABLE delayed_events_stream_pos (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
Expand Down
Loading
Loading