Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,10 @@ async def create_and_send_nonmember_event(
with (await self.limiter.queue(event_dict["room_id"])):
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
requester.user.to_string(), requester.access_token_id, txn_id,
event_dict["room_id"],
requester.user.to_string(),
requester.access_token_id,
txn_id,
)
if existing_event_id:
event = await self.store.get_event(existing_event_id)
Expand Down Expand Up @@ -922,12 +925,18 @@ async def handle_new_client_event(
stream_id = result["stream_id"]
event_id = result["event_id"]
if event_id != event.event_id:
# If we get a different event back then it means that its
# been de-duplicated, so we replace the given event with the
# one already persisted.
event = await self.store.get_event(event_id)
else:
# If we newly persisted the event then we need to update its
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
return event

event, stream_id = await self.persist_and_notify_client_event(
event = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

Expand Down Expand Up @@ -976,11 +985,16 @@ async def persist_and_notify_client_event(
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
) -> Tuple[EventBase, int]:
) -> EventBase:
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.

This should only be run on the instance in charge of persisting events.

Returns:
The persisted event. This may be different than the given event if
it was de-duplicated (e.g. because we had already persisted an
event with the same transaction ID.)
"""
assert self.storage.persistence is not None
assert self._events_shard_config.should_handle(
Expand Down Expand Up @@ -1175,7 +1189,7 @@ def _notify():
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)

return event, event_pos.stream
return event

async def _bump_active_time(self, user: UserID) -> None:
try:
Expand Down
5 changes: 4 additions & 1 deletion synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ async def _local_membership_update(
if requester.is_guest:
content["kind"] = "guest"

# Check if we already have an event with a matching transaction ID. (We
# do this check just before we persist an event as well, but may as well
# do it up front for efficiency.)
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
requester.user.to_string(), requester.access_token_id, txn_id,
room_id, requester.user.to_string(), requester.access_token_id, txn_id,
)
if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
Expand Down
19 changes: 14 additions & 5 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"ratelimit": true,
"extra_users": [],
}

200 OK

{ "stream_id": ..., "event_id": ... }

The returned event ID may not match the sent event if it was deduplicated.
"""

NAME = "send_event"
Expand Down Expand Up @@ -116,14 +122,17 @@ async def _handle_request(self, request, event_id):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)

(
event,
stream_id,
) = await self.event_creation_handler.persist_and_notify_client_event(
event = await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

return 200, {"stream_id": stream_id, "event_id": event.event_id}
return (
200,
{
"stream_id": event.internal_metadata.stream_ordering,
"event_id": event.event_id,
},
)


def register_servlets(hs, http_server):
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ def _persist_transaction_ids_txn(
to_insert.append(
{
"event_id": event.event_id,
"room_id": event.room_id,
"user_id": event.sender,
"token_id": token_id,
"txn_id": txn_id,
Expand Down
11 changes: 8 additions & 3 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,14 +1314,19 @@ def get_next_event_to_expire_txn(txn):
)

async def get_event_id_from_transaction_id(
self, user_id: str, token_id: str, txn_id: str
self, room_id: str, user_id: str, token_id: int, txn_id: str
) -> Optional[str]:
"""Look up if we have already persisted an event for the transaction ID,
returning the event ID if so.
"""
return await self.db_pool.simple_select_one_onecol(
table="event_txn_id",
keyvalues={"user_id": user_id, "token_id": token_id, "txn_id": txn_id},
keyvalues={
"room_id": room_id,
"user_id": user_id,
"token_id": token_id,
"txn_id": txn_id,
},
retcol="event_id",
allow_none=True,
desc="get_event_id_from_transaction_id",
Expand All @@ -1342,7 +1347,7 @@ async def get_already_persisted_events(
txn_id = getattr(event.internal_metadata, "txn_id", None)
if token_id and txn_id:
Comment on lines +1341 to +1353
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slightly feeling that it shouldn't be the storage layer's responsibility to do this digging, but ymmv

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeeeah, though this only gets called from persist_events so we'd have to thread this through persist_events if we wanted to move the logic out of storage.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

existing = await self.get_event_id_from_transaction_id(
event.sender, token_id, txn_id
event.room_id, event.sender, token_id, txn_id
)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we want to try and batch these up a bit? The standard simple_select_many doesn't work due to it only supporting one column. There are psycopg2 helper functions that might help us though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like something similar could be made which takes an iterable of tuples (or dicts)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I'm just not sure how much I want to try and do that for something that is going in an RC

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

park it for now, optimise later?

if existing:
mapping[event.event_id] = existing
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ async def add_access_token_to_user(
token: str,
device_id: Optional[str],
valid_until_ms: Optional[int],
) -> None:
) -> int:
"""Adds an access token for the given user.

Args:
Expand All @@ -949,6 +949,8 @@ async def add_access_token_to_user(
valid_until_ms: when the token is valid until. None for no expiry.
Raises:
StoreError if there was a problem adding this.
Returns:
The token ID
"""
next_id = self._access_tokens_id_gen.get_next()

Expand All @@ -964,6 +966,8 @@ async def add_access_token_to_user(
desc="add_access_token_to_user",
)

return next_id

def _set_device_for_access_token_txn(self, txn, token: str, device_id: str) -> str:
old_device_id = self.db_pool.simple_select_one_onecol_txn(
txn, "access_tokens", {"token": token}, "device_id"
Expand Down
22 changes: 15 additions & 7 deletions synapse/storage/databases/main/schema/delta/58/19txn_id.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@
-- A map of recent events persisted with transaction IDs. Used to deduplicate
-- send event requests with the same transaction ID.
Comment on lines +17 to +18
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to mention what these are expected to be deduplicated across (user, device, transaction).

I think most of the other tables we have use the device_id, not the access token ID? Are these mostly synonymous?

--
-- Note, transaction IDs are scoped to the user ID/access token that was used to
-- make the request.
CREATE TABLE event_txn_id (
-- Note: transaction IDs are scoped to the room ID/user ID/access token that was
-- used to make the request.
--
-- Note: The foreign key constraints are ON DELETE CASCADE, as if we delete the
-- events or access token we don't want to try and de-duplicate the event.
CREATE TABLE IF NOT EXISTS event_txn_id (
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
token_id BIGINT NOT NULL,
txn_id TEXT NOT NULL,
inserted_ts BIGINT NOT NULL
inserted_ts BIGINT NOT NULL,
FOREIGN KEY (event_id)
REFERENCES events (event_id) ON DELETE CASCADE,
FOREIGN KEY (token_id)
REFERENCES access_tokens (id) ON DELETE CASCADE
);

CREATE UNIQUE INDEX event_txn_id_event_id ON event_txn_id(event_id);
CREATE UNIQUE INDEX event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id);
CREATE INDEX event_txn_id_ts ON event_txn_id(inserted_ts);
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id);
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id);
CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts);
9 changes: 6 additions & 3 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled):

Returns:
defer.Deferred: a deferred which will resolve once the events are
persisted. Runs its callbacks *without* a logcontext.
persisted. Runs its callbacks *without* a logcontext. The result
is the same as that returned by the callback passed to
`handle_queue`.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
Expand Down Expand Up @@ -229,7 +231,7 @@ async def persist_events(
for room_id in partitioned:
self._maybe_start_persisting(room_id)

# The deferred returns a map from event ID to existing event ID if the
# Each deferred returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events).
#
Expand Down Expand Up @@ -324,7 +326,8 @@ async def _persist_events(
#
# We should have checked this a long time before we get here, but it's
# possible that different send event requests race in such a way that
# they both pass the earlier checks.
# they both pass the earlier checks. Checking here isn't racey as we can
# have only one `_persist_events` per room being called at a time.
replaced_events = await self.main_store.get_already_persisted_events(
(event for event, _ in events_and_contexts)
)
Comment on lines +331 to +333
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I might have paged this out, but can we be sure that events_and_contexts doesn't itself contain duplicate txn ids?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm, I don't think it should happen because we linearize based on txn ID elsewhere, but we should add a check anyway

Expand Down
9 changes: 6 additions & 3 deletions tests/handlers/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ def test_duplicated_txn_id(self):
access_token = self.login("tester", "foobar")
room_id = self.helper.create_room_as(user_id, tok=access_token)

# We make the IDs up here, which is fine.
token_id = 4957834
txn_id = "something_suitably_random"
info = self.get_success(
self.hs.get_datastore().get_user_by_access_token(access_token,)
)
token_id = info["token_id"]

requester = create_requester(user_id, access_token_id=token_id)

txn_id = "something_suitably_random"

def create_duplicate_event():
return self.get_success(
handler.create_event(
Expand Down
11 changes: 9 additions & 2 deletions tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,24 @@ def setUp(self):
if hasattr(self, "user_id"):
if self.hijack_auth:

# We need a valid token ID to satisfy foreign key constraints.
token_id = self.get_success(
self.hs.get_datastore().add_access_token_to_user(
self.helper.auth_user_id, "some_fake_token", None, None,
)
)

async def get_user_by_access_token(token=None, allow_guest=False):
return {
"user": UserID.from_string(self.helper.auth_user_id),
"token_id": 1,
"token_id": token_id,
"is_guest": False,
}

async def get_user_by_req(request, allow_guest=False, rights="access"):
return create_requester(
UserID.from_string(self.helper.auth_user_id),
1,
token_id,
False,
False,
None,
Expand Down