-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor MSC3030 /timestamp_to_event to move away from our snowflake pull from destination pattern
#14096
Refactor MSC3030 /timestamp_to_event to move away from our snowflake pull from destination pattern
#14096
Changes from 7 commits
8867831
f03a2b6
7c82755
3ce3984
720788d
da87def
7332df1
482629b
a57fceb
7f86fef
79f2fea
25ce11c
64a907a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix [MSC3030](https://github.com/matrix-org/matrix-spec-proposals/pull/3030) `/timestamp_to_event` endpoint not backfilling found local event which could be an `outlier`. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -80,6 +80,18 @@ | |
| T = TypeVar("T") | ||
|
|
||
|
|
||
| @attr.s(frozen=True, slots=True, auto_attribs=True) | ||
| class PulledPduInfo: | ||
| """ | ||
| A result object that stores the PDU and info about it like which homeserver we | ||
| pulled it from (`pull_origin`) | ||
| """ | ||
|
|
||
| pdu: EventBase | ||
| # Which homeserver we pulled the PDU from | ||
| pull_origin: str | ||
|
|
||
|
|
||
| class InvalidResponseError(RuntimeError): | ||
| """Helper for _try_destination_list: indicates that the server returned a response | ||
| we couldn't parse | ||
|
|
@@ -114,7 +126,9 @@ def __init__(self, hs: "HomeServer"): | |
| self.hostname = hs.hostname | ||
| self.signing_key = hs.signing_key | ||
|
|
||
| self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache( | ||
| # Cache mapping `event_id` to a tuple of the event itself and the `pull_origin` | ||
| # (which server we pulled the event from) | ||
| self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache( | ||
| cache_name="get_pdu_cache", | ||
| clock=self._clock, | ||
| max_len=1000, | ||
|
|
@@ -352,11 +366,11 @@ async def _record_failure_callback( | |
| @tag_args | ||
| async def get_pdu( | ||
| self, | ||
| destinations: Iterable[str], | ||
| destinations: Collection[str], | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noticed that we iterate over |
||
| event_id: str, | ||
| room_version: RoomVersion, | ||
| timeout: Optional[int] = None, | ||
| ) -> Optional[EventBase]: | ||
| ) -> Optional[PulledPduInfo]: | ||
| """Requests the PDU with given origin and ID from the remote home | ||
| servers. | ||
|
|
||
|
|
@@ -371,11 +385,11 @@ async def get_pdu( | |
| moving to the next destination. None indicates no timeout. | ||
|
|
||
| Returns: | ||
| The requested PDU, or None if we were unable to find it. | ||
| The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it. | ||
| """ | ||
|
|
||
| logger.debug( | ||
| "get_pdu: event_id=%s from destinations=%s", event_id, destinations | ||
| f"get_pdu(event_id={event_id}): from destinations=%s", destinations | ||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
|
|
||
| # TODO: Rate limit the number of times we try and get the same event. | ||
|
|
@@ -384,23 +398,24 @@ async def get_pdu( | |
| # it gets persisted to the database), so we cache the results of the lookup. | ||
| # Note that this is separate to the regular get_event cache which caches | ||
| # events once they have been persisted. | ||
| event = self._get_pdu_cache.get(event_id) | ||
| get_pdu_cache_entry = self._get_pdu_cache.get(event_id) | ||
|
|
||
| event = None | ||
| pull_origin = None | ||
| if get_pdu_cache_entry: | ||
| event, pull_origin = get_pdu_cache_entry | ||
| # If we don't see the event in the cache, go try to fetch it from the | ||
| # provided remote federated destinations | ||
| if not event: | ||
| elif not get_pdu_cache_entry: | ||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) | ||
|
|
||
| # TODO: We can probably refactor this to use `_try_destination_list` | ||
| for destination in destinations: | ||
| now = self._clock.time_msec() | ||
| last_attempt = pdu_attempts.get(destination, 0) | ||
| if last_attempt + PDU_RETRY_TIME_MS > now: | ||
| logger.debug( | ||
| "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", | ||
| destination, | ||
| last_attempt, | ||
| PDU_RETRY_TIME_MS, | ||
| now, | ||
| f"get_pdu(event_id={event_id}): skipping destination={destination} because we tried it recently last_attempt={last_attempt} and we only check every {PDU_RETRY_TIME_MS} (now={now})", | ||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
| continue | ||
|
|
||
|
|
@@ -411,43 +426,40 @@ async def get_pdu( | |
| room_version=room_version, | ||
| timeout=timeout, | ||
| ) | ||
| pull_origin = destination | ||
|
|
||
| pdu_attempts[destination] = now | ||
|
|
||
| if event: | ||
| # Prime the cache | ||
| self._get_pdu_cache[event.event_id] = event | ||
| self._get_pdu_cache[event.event_id] = (event, pull_origin) | ||
|
|
||
| # Now that we have an event, we can break out of this | ||
| # loop and stop asking other destinations. | ||
| break | ||
|
|
||
| except SynapseError as e: | ||
| except NotRetryingDestination as e: | ||
| logger.info(f"get_pdu(event_id={event_id}): {e}") | ||
| continue | ||
| except FederationDeniedError: | ||
| logger.info( | ||
| "Failed to get PDU %s from %s because %s", | ||
| event_id, | ||
| destination, | ||
| e, | ||
| f"get_pdu(event_id={event_id}): Not attempting to fetch PDU from {destination} because the homeserver is not on our federation whitelist" | ||
|
||
| ) | ||
| continue | ||
| except NotRetryingDestination as e: | ||
| logger.info(str(e)) | ||
| continue | ||
| except FederationDeniedError as e: | ||
| logger.info(str(e)) | ||
| except SynapseError as e: | ||
| logger.info( | ||
| f"get_pdu(event_id={event_id}): Failed to get PDU from {destination} because {e}", | ||
| ) | ||
| continue | ||
| except Exception as e: | ||
| pdu_attempts[destination] = now | ||
|
|
||
| logger.info( | ||
| "Failed to get PDU %s from %s because %s", | ||
| event_id, | ||
| destination, | ||
| e, | ||
| f"get_pdu(event_id={event_id}): Failed to get PDU from {destination} because {e}", | ||
| ) | ||
| continue | ||
|
|
||
| if not event: | ||
| if not event or not pull_origin: | ||
| return None | ||
|
|
||
| # `event` now refers to an object stored in `get_pdu_cache`. Our | ||
|
|
@@ -459,7 +471,7 @@ async def get_pdu( | |
| event.room_version, | ||
| ) | ||
|
|
||
| return event_copy | ||
| return PulledPduInfo(event_copy, pull_origin) | ||
|
|
||
| @trace | ||
| @tag_args | ||
|
|
@@ -699,12 +711,14 @@ async def _check_sigs_and_hash_and_fetch_one( | |
| pdu_origin = get_domain_from_id(pdu.sender) | ||
| if not res and pdu_origin != origin: | ||
| try: | ||
| res = await self.get_pdu( | ||
| pulled_pdu_info = await self.get_pdu( | ||
| destinations=[pdu_origin], | ||
| event_id=pdu.event_id, | ||
| room_version=room_version, | ||
| timeout=10000, | ||
| ) | ||
| if pulled_pdu_info is not None: | ||
| res = pulled_pdu_info.pdu | ||
| except SynapseError: | ||
| pass | ||
|
|
||
|
|
@@ -806,6 +820,7 @@ async def _try_destination_list( | |
| ) | ||
|
|
||
| for destination in destinations: | ||
| # We don't want to ask our own server for information we don't have | ||
| if destination == self.server_name: | ||
| continue | ||
|
|
||
|
|
@@ -814,9 +829,18 @@ async def _try_destination_list( | |
| except ( | ||
| RequestSendFailed, | ||
| InvalidResponseError, | ||
| NotRetryingDestination, | ||
| ) as e: | ||
| logger.warning("Failed to %s via %s: %s", description, destination, e) | ||
| # Skip to the next homeserver in the list to try. | ||
| continue | ||
| except NotRetryingDestination as e: | ||
| logger.info(f"{description}: {e}") | ||
| continue | ||
| except FederationDeniedError: | ||
| logger.info( | ||
| f"{description}: Not attempting to {description} from {destination} because the homeserver is not on our federation whitelist" | ||
| ) | ||
| continue | ||
| except UnsupportedRoomVersionError: | ||
| raise | ||
| except HttpResponseException as e: | ||
|
|
@@ -1609,6 +1633,54 @@ async def send_request( | |
| return result | ||
|
|
||
| async def timestamp_to_event( | ||
| self, *, destinations: List[str], room_id: str, timestamp: int, direction: str | ||
| ) -> Optional["TimestampToEventResponse"]: | ||
| """ | ||
| Calls each remote federating server from `destinations` asking for their closest | ||
| event to the given timestamp in the given direction until we get a response. | ||
| Also validates the response to always return the expected keys or raises an | ||
| error. | ||
|
|
||
| Args: | ||
| destinations: The domains of homeservers to try fetching from | ||
| room_id: Room to fetch the event from | ||
| timestamp: The point in time (inclusive) we should navigate from in | ||
| the given direction to find the closest event. | ||
| direction: ["f"|"b"] to indicate whether we should navigate forward | ||
| or backward from the given timestamp to find the closest event. | ||
|
|
||
| Returns: | ||
| A parsed TimestampToEventResponse including the closest event_id | ||
| and origin_server_ts or None if no destination has a response. | ||
| """ | ||
|
|
||
| async def _timestamp_to_event_from_destination( | ||
| destination: str, | ||
| ) -> TimestampToEventResponse: | ||
| return await self._timestamp_to_event_from_destination( | ||
| destination, room_id, timestamp, direction | ||
| ) | ||
|
|
||
| try: | ||
| # Loop through each homeserver candidate until we get a succesful response | ||
| timestamp_to_event_response = await self._try_destination_list( | ||
|
Comment on lines
+1681
to
+1682
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this logic from |
||
| "timestamp_to_event", | ||
| destinations, | ||
| # TODO: The requested timestamp may lie in a part of the | ||
| # event graph that the remote server *also* didn't have, | ||
| # in which case they will have returned another event | ||
| # which may be nowhere near the requested timestamp. In | ||
| # the future, we may need to reconcile that gap and ask | ||
| # other homeservers, and/or extend `/timestamp_to_event` | ||
| # to return events on *both* sides of the timestamp to | ||
| # help reconcile the gap faster. | ||
| _timestamp_to_event_from_destination, | ||
| ) | ||
| return timestamp_to_event_response | ||
| except SynapseError: | ||
| return None | ||
|
|
||
| async def _timestamp_to_event_from_destination( | ||
| self, destination: str, room_id: str, timestamp: int, direction: str | ||
| ) -> "TimestampToEventResponse": | ||
| """ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.