Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8476.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix message duplication if something goes wrong after persisting the event.
9 changes: 6 additions & 3 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2966,17 +2966,20 @@ async def persist_events_and_notify(
return result["max_stream_id"]
else:
assert self.storage.persistence
max_stream_token = await self.storage.persistence.persist_events(

# Note that this returns the events that wer persisted, which may not be
# the same as we passed in if some were deduplicated due transaction IDs.
events, max_stream_token = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)

if self._ephemeral_messages_enabled:
for (event, context) in event_and_contexts:
for event in events:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
for event in events:
await self._notify_persisted_event(event, max_stream_token)

return max_stream_token.stream
Expand Down
59 changes: 43 additions & 16 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ async def send_nonmember_event(
context: EventContext,
ratelimit: bool = True,
ignore_shadow_ban: bool = False,
) -> int:
) -> Tuple[EventBase, int]:
"""
Persists and notifies local clients and federation of an event.

Expand All @@ -654,8 +654,10 @@ async def send_nonmember_event(
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.

Return:
The stream_id of the persisted event.
Returns:
The event and stream_id of the persisted event. The returned event
may not match the given event if it was deduplicated due to an
existing event matching the transaction ID.

Raises:
ShadowBanError if the requester has been shadow-banned.
Expand All @@ -682,7 +684,10 @@ async def send_nonmember_event(
event.event_id,
prev_event.event_id,
)
return await self.store.get_stream_id_for_event(prev_event.event_id)
return (
prev_event,
await self.store.get_stream_id_for_event(prev_event.event_id),
)

return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
Expand Down Expand Up @@ -738,6 +743,11 @@ async def create_and_send_nonmember_event(
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.

Returns:
The event and stream_id of the persisted event. The returned event
may not match the given event if it was deduplicated due to an
existing event matching the transaction ID.

Raises:
ShadowBanError if the requester has been shadow-banned.
"""
Expand All @@ -752,6 +762,14 @@ async def create_and_send_nonmember_event(
# extremities to pile up, which in turn leads to state resolution
# taking longer.
with (await self.limiter.queue(event_dict["room_id"])):
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
requester.user.to_string(), requester.access_token_id, txn_id,
)
if existing_event_id:
event = await self.store.get_event(existing_event_id)
return event, event.internal_metadata.stream_ordering

event, context = await self.create_event(
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)
Expand All @@ -762,7 +780,7 @@ async def create_and_send_nonmember_event(
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)

stream_id = await self.send_nonmember_event(
event, stream_id = await self.send_nonmember_event(
requester,
event,
context,
Expand Down Expand Up @@ -843,7 +861,7 @@ async def handle_new_client_event(
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
) -> int:
) -> Tuple[EventBase, int]:
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.

Expand All @@ -857,8 +875,10 @@ async def handle_new_client_event(
ratelimit
extra_users: Any extra users to notify about event

Return:
The stream_id of the persisted event.
Returns:
The event and stream_id of the persisted event. The returned event
may not match the given event if it was deduplicated due to an
existing event matching the transaction ID.
"""

if event.is_state() and (event.type, event.state_key) == (
Expand Down Expand Up @@ -914,14 +934,17 @@ async def handle_new_client_event(
extra_users=extra_users,
)
stream_id = result["stream_id"]
event_id = result["event_id"]
if event_id != event.event_id:
event = await self.store.get_event(event_id)
event.internal_metadata.stream_ordering = stream_id
return stream_id
return event, stream_id

stream_id = await self.persist_and_notify_client_event(
event, stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

return stream_id
return event, stream_id
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
Expand Down Expand Up @@ -966,7 +989,7 @@ async def persist_and_notify_client_event(
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
) -> int:
) -> Tuple[EventBase, int]:
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.

Expand Down Expand Up @@ -1138,9 +1161,13 @@ def is_inviter_member_event(e):
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")

event_pos, max_stream_token = await self.storage.persistence.persist_event(
event, context=context
)
# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
) = await self.storage.persistence.persist_event(event, context=context)

if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
Expand All @@ -1161,7 +1188,7 @@ def _notify():
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)

return event_pos.stream
return event, event_pos.stream

async def _bump_active_time(self, user: UserID) -> None:
try:
Expand Down
14 changes: 11 additions & 3 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ async def _local_membership_update(
if requester.is_guest:
content["kind"] = "guest"

if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
requester.user.to_string(), requester.access_token_id, txn_id,
)
if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream

event, context = await self.event_creation_handler.create_event(
requester,
{
Expand Down Expand Up @@ -221,7 +229,7 @@ async def _local_membership_update(
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)

stream_id = await self.event_creation_handler.handle_new_client_event(
event, stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)

Expand Down Expand Up @@ -692,7 +700,7 @@ async def send_membership_event(
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")

await self.event_creation_handler.handle_new_client_event(
event, _ = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target_user], ratelimit=ratelimit
)

Expand Down Expand Up @@ -1185,7 +1193,7 @@ async def _locally_reject_invite(

context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
stream_id = await self.event_creation_handler.handle_new_client_event(
event, stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
return event.event_id, stream_id
Expand Down
7 changes: 5 additions & 2 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ async def _handle_request(self, request, event_id):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)

stream_id = await self.event_creation_handler.persist_and_notify_client_event(
(
event,
stream_id,
) = await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

return 200, {"stream_id": stream_id}
return 200, {"stream_id": stream_id, "event_id": event.event_id}


def register_servlets(hs, http_server):
Expand Down
30 changes: 30 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ def _persist_events_txn(

self._store_event_txn(txn, events_and_contexts=events_and_contexts)

self._persist_transaction_ids_txn(txn, events_and_contexts)

# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)

Expand Down Expand Up @@ -411,6 +413,34 @@ def _persist_events_txn(
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)

def _persist_transaction_ids_txn(
self,
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
):
"""Persist the mapping from transaction IDs to event IDs (if defined).
"""

to_insert = []
for event, _ in events_and_contexts:
token_id = getattr(event.internal_metadata, "token_id", None)
txn_id = getattr(event.internal_metadata, "txn_id", None)
if token_id and txn_id:
to_insert.append(
{
"event_id": event.event_id,
"user_id": event.sender,
"token_id": token_id,
"txn_id": txn_id,
"inserted_ts": self._clock.time_msec(),
}
)

if to_insert:
self.db_pool.simple_insert_many_txn(
txn, table="event_txn_id", values=to_insert,
)

def _update_current_state_txn(
self,
txn: LoggingTransaction,
Expand Down
62 changes: 61 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import logging
import threading
Expand Down Expand Up @@ -130,6 +129,15 @@ def __init__(self, database: DatabasePool, db_conn, hs):
db_conn, "events", "stream_ordering", step=-1
)

if not hs.config.worker.worker_app:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be put on the background worker?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, yes but that work isn't in 1.21 :(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. 🤦 I'll update it when this gets merged forward.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we're retargetting this to develop, seems like we can make this change!

# We periodically clean out old transaction ID mappings
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
"_cleanup_old_transaction_ids",
self._cleanup_old_transaction_ids,
)

self._get_event_cache = Cache(
"*getEvent*",
keylen=3,
Expand Down Expand Up @@ -1287,3 +1295,55 @@ def get_next_event_to_expire_txn(txn):
return await self.db_pool.runInteraction(
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)

async def get_event_id_from_transaction_id(
self, user_id: str, token_id: str, txn_id: str
) -> Optional[str]:
"""Look up if we have already persisted an event for the transaction ID,
returning the event ID if so.
"""
return await self.db_pool.simple_select_one_onecol(
table="event_txn_id",
keyvalues={"user_id": user_id, "token_id": token_id, "txn_id": txn_id},
retcol="event_id",
allow_none=True,
desc="get_event_id_from_transaction_id",
)

async def get_already_persisted_events(
self, events: Iterable[EventBase]
) -> Dict[str, str]:
"""Look up if we have already persisted an event for the transaction ID,
returning a mapping from event ID in the given list to the event ID of
an existing event.
"""

mapping = {}

for event in events:
token_id = getattr(event.internal_metadata, "token_id", None)
txn_id = getattr(event.internal_metadata, "txn_id", None)
if token_id and txn_id:
Comment on lines +1341 to +1353
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slightly feeling that it shouldn't be the storage layer's responsibility to do this digging, but ymmv

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeeeah, though this only gets called from persist_events so we'd have to thread this through persist_events if we wanted to move the logic out of storage.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

existing = await self.get_event_id_from_transaction_id(
event.sender, token_id, txn_id
)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we want to try and batch these up a bit? The standard simple_select_many doesn't work due to it only supporting one column. There are psycopg2 helper functions that might help us though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like something similar could be made which takes an iterable of tuples (or dicts)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I'm just not sure how much I want to try and do that for something that is going in an RC

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

park it for now, optimise later?

if existing:
mapping[event.event_id] = existing

return mapping

async def _cleanup_old_transaction_ids(self):
"""Cleans out transaction id mappings older than 24hrs.
"""

def _cleanup_old_transaction_ids_txn(txn):
sql = """
DELETE FROM event_txn_id
WHERE inserted_ts < ?
"""
one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One day ago is essentially the largest you could get behind on federation and still not have duplicates?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only get transaction IDs from local clients, who I believe shouldn't retry if significant time has passed.

txn.execute(sql, (one_day_ago,))

return await self.db_pool.runInteraction(
"_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn,
)
29 changes: 29 additions & 0 deletions synapse/storage/databases/main/schema/delta/58/19txn_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


-- A map of recent events persisted with transaction IDs. Used to deduplicate
-- send event requests with the same transaction ID.
Comment on lines +17 to +18
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to mention what these are expected to be deduplicated across (user, device, transaction).

I think most of the other tables we have use the device_id, not the access token ID? Are these mostly synonymous?

CREATE TABLE event_txn_id (
event_id TEXT NOT NULL,
user_id TEXT NOT NULL,
token_id BIGINT NOT NULL,
txn_id TEXT NOT NULL,
inserted_ts BIGINT NOT NULL
);

CREATE UNIQUE INDEX event_txn_id_event_id ON event_txn_id(event_id);
CREATE UNIQUE INDEX event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id);
CREATE INDEX event_txn_id_ts ON event_txn_id(inserted_ts);
Loading