Skip to content
Merged
1 change: 1 addition & 0 deletions changelog.d/19479.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[MSC4140: Cancellable delayed events](https://github.com/matrix-org/matrix-spec-proposals/pull/4140): When persisting a delayed event to the timeline, include its `delay_id` in the event's `unsigned` section in `/sync` responses to the event sender.
22 changes: 22 additions & 0 deletions rust/src/events/internal_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enum EventInternalMetadataData {
PolicyServerSpammy(bool),
Redacted(bool),
TxnId(Box<str>),
DelayId(Box<str>),
TokenId(i64),
DeviceId(Box<str>),
}
Expand Down Expand Up @@ -115,6 +116,10 @@ impl EventInternalMetadataData {
pyo3::intern!(py, "txn_id"),
o.into_pyobject(py).unwrap_infallible().into_any(),
),
EventInternalMetadataData::DelayId(o) => (
pyo3::intern!(py, "delay_id"),
o.into_pyobject(py).unwrap_infallible().into_any(),
),
EventInternalMetadataData::TokenId(o) => (
pyo3::intern!(py, "token_id"),
o.into_pyobject(py).unwrap_infallible().into_any(),
Expand Down Expand Up @@ -179,6 +184,12 @@ impl EventInternalMetadataData {
.map(String::into_boxed_str)
.with_context(|| format!("'{key_str}' has invalid type"))?,
),
"delay_id" => EventInternalMetadataData::DelayId(
value
.extract()
.map(String::into_boxed_str)
.with_context(|| format!("'{key_str}' has invalid type"))?,
),
"token_id" => EventInternalMetadataData::TokenId(
value
.extract()
Expand Down Expand Up @@ -472,6 +483,17 @@ impl EventInternalMetadata {
set_property!(self, TxnId, obj.into_boxed_str());
}

/// The delay ID, set only if the event was a delayed event.
#[getter]
fn get_delay_id(&self) -> PyResult<&str> {
let s = get_property!(self, DelayId)?;
Ok(s)
}
#[setter]
fn set_delay_id(&mut self, obj: String) {
set_property!(self, DelayId, obj.into_boxed_str());
}

/// The access token ID of the user who sent this event, if any.
#[getter]
fn get_token_id(&self) -> PyResult<i64> {
Expand Down
81 changes: 43 additions & 38 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ class SerializeEventConfig:
# Function to convert from federation format to client format
event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1
# The entity that requested the event. This is used to determine whether to include
# the transaction_id in the unsigned section of the event.
# the transaction_id and delay_id in the unsigned section of the event.
requester: Requester | None = None
# List of event fields to include. If empty, all fields will be returned.
only_event_fields: list[str] | None = None
Expand Down Expand Up @@ -475,44 +475,49 @@ def serialize_event(
config=config,
)

# If we have a txn_id saved in the internal_metadata, we should include it in the
# unsigned section of the event if it was sent by the same session as the one
# requesting the event.
txn_id: str | None = getattr(e.internal_metadata, "txn_id", None)
if (
txn_id is not None
and config.requester is not None
and config.requester.user.to_string() == e.sender
):
# Some events do not have the device ID stored in the internal metadata,
# this includes old events as well as those created by appservice, guests,
# or with tokens minted with the admin API. For those events, fallback
# to using the access token instead.
event_device_id: str | None = getattr(e.internal_metadata, "device_id", None)
if event_device_id is not None:
if event_device_id == config.requester.device_id:
d["unsigned"]["transaction_id"] = txn_id

else:
# Fallback behaviour: only include the transaction ID if the event
# was sent from the same access token.
#
# For regular users, the access token ID can be used to determine this.
# This includes access tokens minted with the admin API.
#
# For guests and appservice users, we can't check the access token ID
# so assume it is the same session.
event_token_id: int | None = getattr(e.internal_metadata, "token_id", None)
if (
(
event_token_id is not None
and config.requester.access_token_id is not None
and event_token_id == config.requester.access_token_id
# If we have applicable fields saved in the internal_metadata, include them in the
# unsigned section of the event if the event was sent by the same session (or when
# appropriate, just the same sender) as the one requesting the event.
if config.requester is not None and config.requester.user.to_string() == e.sender:
txn_id: str | None = getattr(e.internal_metadata, "txn_id", None)
if txn_id is not None:
# Some events do not have the device ID stored in the internal metadata,
# this includes old events as well as those created by appservice, guests,
# or with tokens minted with the admin API. For those events, fallback
# to using the access token instead.
event_device_id: str | None = getattr(
e.internal_metadata, "device_id", None
)
if event_device_id is not None:
if event_device_id == config.requester.device_id:
d["unsigned"]["transaction_id"] = txn_id

else:
# Fallback behaviour: only include the transaction ID if the event
# was sent from the same access token.
#
# For regular users, the access token ID can be used to determine this.
# This includes access tokens minted with the admin API.
#
# For guests and appservice users, we can't check the access token ID
# so assume it is the same session.
event_token_id: int | None = getattr(
e.internal_metadata, "token_id", None
)
or config.requester.is_guest
or config.requester.app_service
):
d["unsigned"]["transaction_id"] = txn_id
if (
(
event_token_id is not None
and config.requester.access_token_id is not None
and event_token_id == config.requester.access_token_id
)
or config.requester.is_guest
or config.requester.app_service
):
d["unsigned"]["transaction_id"] = txn_id

delay_id: str | None = getattr(e.internal_metadata, "delay_id", None)
if delay_id is not None:
d["unsigned"]["org.matrix.msc4140.delay_id"] = delay_id

# invite_room_state and knock_room_state are a list of stripped room state events
# that are meant to provide metadata about a room to an invitee/knocker. They are
Expand Down
2 changes: 2 additions & 0 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ async def _send_event(
action=membership,
content=event.content,
origin_server_ts=event.origin_server_ts,
delay_id=event.delay_id,
)
else:
event_dict: JsonDict = {
Expand All @@ -585,6 +586,7 @@ async def _send_event(
requester,
event_dict,
txn_id=txn_id,
delay_id=event.delay_id,
)
event_id = sent_event.event_id
except ShadowBanError:
Expand Down
13 changes: 12 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ async def create_event(
state_map: StateMap[str] | None = None,
for_batch: bool = False,
current_state_group: int | None = None,
delay_id: str | None = None,
) -> tuple[EventBase, UnpersistedEventContextBase]:
"""
Given a dict from a client, create a new event. If bool for_batch is true, will
Expand All @@ -600,7 +601,7 @@ async def create_event(
Args:
requester
event_dict: An entire event
txn_id
txn_id: The transaction ID.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
Expand Down Expand Up @@ -639,6 +640,8 @@ async def create_event(
current_state_group: the current state group, used only for creating events for
batch persisting

delay_id: The delay ID of this event, if it was a delayed event.

Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
Expand Down Expand Up @@ -726,6 +729,9 @@ async def create_event(
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id

if delay_id is not None:
builder.internal_metadata.delay_id = delay_id

builder.internal_metadata.outlier = outlier

event, unpersisted_context = await self.create_new_client_event(
Expand Down Expand Up @@ -966,6 +972,7 @@ async def create_and_send_nonmember_event(
ignore_shadow_ban: bool = False,
outlier: bool = False,
depth: int | None = None,
delay_id: str | None = None,
) -> tuple[EventBase, int]:
"""
Creates an event, then sends it.
Expand Down Expand Up @@ -994,6 +1001,7 @@ async def create_and_send_nonmember_event(
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
delay_id: The delay ID of this event, if it was a delayed event.

Returns:
The event, and its stream ordering (if deduplication happened,
Expand Down Expand Up @@ -1090,6 +1098,7 @@ async def create_and_send_nonmember_event(
ignore_shadow_ban=ignore_shadow_ban,
outlier=outlier,
depth=depth,
delay_id=delay_id,
)

async def _create_and_send_nonmember_event_locked(
Expand All @@ -1103,6 +1112,7 @@ async def _create_and_send_nonmember_event_locked(
ignore_shadow_ban: bool = False,
outlier: bool = False,
depth: int | None = None,
delay_id: str | None = None,
) -> tuple[EventBase, int]:
room_id = event_dict["room_id"]

Expand Down Expand Up @@ -1131,6 +1141,7 @@ async def _create_and_send_nonmember_event_locked(
state_event_ids=state_event_ids,
outlier=outlier,
depth=depth,
delay_id=delay_id,
)
context = await unpersisted_context.persist(event)

Expand Down
10 changes: 10 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ async def _local_membership_update(
require_consent: bool = True,
outlier: bool = False,
origin_server_ts: int | None = None,
delay_id: str | None = None,
) -> tuple[str, int]:
"""
Internal membership update function to get an existing event or create
Expand Down Expand Up @@ -440,6 +441,7 @@ async def _local_membership_update(
opposed to being inline with the current DAG.
origin_server_ts: The origin_server_ts to use if a new event is created. Uses
the current timestamp if set to None.
delay_id: The delay ID of this event, if it was a delayed event.

Returns:
Tuple of event ID and stream ordering position
Expand Down Expand Up @@ -492,6 +494,7 @@ async def _local_membership_update(
depth=depth,
require_consent=require_consent,
outlier=outlier,
delay_id=delay_id,
)
context = await unpersisted_context.persist(event)
prev_state_ids = await context.get_prev_state_ids(
Expand Down Expand Up @@ -587,6 +590,7 @@ async def update_membership(
state_event_ids: list[str] | None = None,
depth: int | None = None,
origin_server_ts: int | None = None,
delay_id: str | None = None,
) -> tuple[str, int]:
"""Update a user's membership in a room.

Expand Down Expand Up @@ -617,6 +621,7 @@ async def update_membership(
based on the prev_events.
origin_server_ts: The origin_server_ts to use if a new event is created. Uses
the current timestamp if set to None.
delay_id: The delay ID of this event, if it was a delayed event.

Returns:
A tuple of the new event ID and stream ID.
Expand Down Expand Up @@ -679,6 +684,7 @@ async def update_membership(
state_event_ids=state_event_ids,
depth=depth,
origin_server_ts=origin_server_ts,
delay_id=delay_id,
)

return result
Expand All @@ -701,6 +707,7 @@ async def update_membership_locked(
state_event_ids: list[str] | None = None,
depth: int | None = None,
origin_server_ts: int | None = None,
delay_id: str | None = None,
) -> tuple[str, int]:
"""Helper for update_membership.

Expand Down Expand Up @@ -733,6 +740,7 @@ async def update_membership_locked(
based on the prev_events.
origin_server_ts: The origin_server_ts to use if a new event is created. Uses
the current timestamp if set to None.
delay_id: The delay ID of this event, if it was a delayed event.

Returns:
A tuple of the new event ID and stream ID.
Expand Down Expand Up @@ -943,6 +951,7 @@ async def update_membership_locked(
require_consent=require_consent,
outlier=outlier,
origin_server_ts=origin_server_ts,
delay_id=delay_id,
)

latest_event_ids = await self.store.get_prev_events_for_room(room_id)
Expand Down Expand Up @@ -1201,6 +1210,7 @@ async def update_membership_locked(
require_consent=require_consent,
outlier=outlier,
origin_server_ts=origin_server_ts,
delay_id=delay_id,
)

async def check_for_any_membership_in_room(
Expand Down
2 changes: 2 additions & 0 deletions synapse/synapse_rust/events.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class EventInternalMetadata:

txn_id: str
"""The transaction ID, if it was set when the event was created."""
delay_id: str
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure everything else in this file does it correctly right now, but I would be in favour of making this optional

Suggested change
delay_id: str
delay_id: str | None

This will also involve changing the Rust part to return Nones when needed.


I'm not dead-set on this, so open to your thoughts? But it seems like it is more typechecker-friendly this way.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My vote is to leave this as it is for now, as much as I appreciate proper typing, because there appears to be some nuance to this that I don't fully grasp at the moment & am hesitant to muck around with.

From what I can gather from the Rust code, there are some fields in the metadata object that are always present but may set to None (like stream_ordering and instance_name), and others that may be absent but must always be non-None when present (like txn_id and device_id).

So, it looks like there is a functional difference between the not-typed-as-optional fields that Python code looks up with getattr, and ones that are typed as optional. Since delay_id is used in much the same way as txn_id, IMO it's sufficient for the time being to use the same code patterns of the latter for the former.

"""The delay ID, set only if the event was a delayed event."""
token_id: int
"""The access token ID of the user who sent this event, if any."""
device_id: str
Expand Down
Loading
Loading