-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
WIP: Dmr/unblock catchup #15228
WIP: Dmr/unblock catchup #15228
Changes from 3 commits
6b70d44
bebd7d2
c813f89
0aa0201
7f97783
1cb55e9
f139247
31f2b15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |
| # limitations under the License. | ||
| import logging | ||
| from enum import Enum, auto | ||
| from typing import Collection, Dict, FrozenSet, List, Optional, Tuple | ||
| from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Sequence, Tuple | ||
|
|
||
| import attr | ||
| from typing_extensions import Final | ||
|
|
@@ -565,29 +565,40 @@ async def filter_events_for_server( | |
| storage: StorageControllers, | ||
| target_server_name: str, | ||
| local_server_name: str, | ||
| events: List[EventBase], | ||
| events: Sequence[EventBase], | ||
| redact: bool = True, | ||
| check_history_visibility_only: bool = False, | ||
| filter_out_erased_senders: bool = True, | ||
| ) -> List[EventBase]: | ||
| """Filter a list of events based on whether given server is allowed to | ||
| """Filter a list of events based on whether the target server is allowed to | ||
| see them. | ||
|
|
||
| For a fully stated room, the target server is allowed to see an event E if: | ||
| - the state at E has world readable or shared history vis, OR | ||
| - the state at E says that the target server is in the room. | ||
|
|
||
| For a partially stated room, the target server is allowed to see E if: | ||
| - E was created by this homeserver, AND: | ||
| - the partial state at E has world readable or shared history vis, OR | ||
| - the partial state at E says that the target server is in the room. | ||
|
|
||
| TODO: state before or state after? | ||
|
|
||
| Args: | ||
| storage | ||
| server_name | ||
| target_server_name | ||
| local_server_name | ||
| events | ||
| redact: Whether to return a redacted version of the event, or | ||
| to filter them out entirely. | ||
| check_history_visibility_only: Whether to only check the | ||
| history visibility, rather than things like if the sender has been | ||
| redact: Controls what to do with events which have been filtered out. | ||
| If True, include their redacted forms; if False, omit them entirely. | ||
| filter_out_erased_senders: If true, also filter out events whose sender has been | ||
| erased. This is used e.g. during pagination to decide whether to | ||
| backfill or not. | ||
|
|
||
| Returns | ||
| The filtered events. | ||
| """ | ||
|
|
||
| def is_sender_erased(event: EventBase, erased_senders: Dict[str, bool]) -> bool: | ||
| def is_sender_erased(event: EventBase, erased_senders: Mapping[str, bool]) -> bool: | ||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if erased_senders and erased_senders[event.sender]: | ||
| logger.info("Sender of %s has been erased, redacting", event.event_id) | ||
| return True | ||
|
|
@@ -616,7 +627,7 @@ def check_event_is_visible( | |
| # server has no users in the room: redact | ||
| return False | ||
|
|
||
| if not check_history_visibility_only: | ||
| if filter_out_erased_senders: | ||
| erased_senders = await storage.main.are_users_erased(e.sender for e in events) | ||
| else: | ||
| # We don't want to check whether users are erased, which is equivalent | ||
|
|
@@ -632,7 +643,7 @@ def check_event_is_visible( | |
| # this check but would base the filtering on an outdated view of the membership events. | ||
|
|
||
| partial_state_invisible_events = set() | ||
| if not check_history_visibility_only: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to some failing tests we need to keep a way to shortcut this code for the backfill use case. |
||
| if filter_out_erased_senders: | ||
| for e in events: | ||
| sender_domain = get_domain_from_id(e.sender) | ||
| if ( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| from typing import Callable, List, Optional, Tuple | ||
| from typing import Callable, Collection, List, Optional, Tuple | ||
| from unittest import mock | ||
| from unittest.mock import Mock | ||
|
|
||
| from twisted.test.proto_helpers import MemoryReactor | ||
|
|
@@ -500,3 +501,77 @@ def test_not_latest_event(self) -> None: | |
| self.assertEqual(len(sent_pdus), 1) | ||
| self.assertEqual(sent_pdus[0].event_id, event_2.event_id) | ||
| self.assertFalse(per_dest_queue._catching_up) | ||
|
|
||
| def test_catch_up_is_not_blocked_by_partial_state_room(self) -> None: | ||
| """Detects (part of?) https://github.com/matrix-org/synapse/issues/15220.""" | ||
| # ARRANGE: | ||
| # - a local user (u1) | ||
| # - a room with which contains u1 and two remote users, @u2:host2 and @u3:other | ||
| # - events in that room such that | ||
| # - history visibility is restricted | ||
| # - u1 sent message events | ||
| # - afterwards, u3 sent a remote event | ||
| # - catchup to begin for host2 | ||
| per_dest_queue, sent_pdus = self.make_fake_destination_queue() | ||
|
|
||
| self.register_user("u1", "you the one") | ||
| u1_token = self.login("u1", "you the one") | ||
| room = self.helper.create_room_as("u1", tok=u1_token) | ||
| self.helper.send_state( | ||
| room_id=room, | ||
| event_type="m.room.history_visibility", | ||
| body={"history_visibility": "joined"}, | ||
| tok=u1_token, | ||
| ) | ||
| self.get_success( | ||
| event_injection.inject_member_event(self.hs, room, "@u2:host2", "join") | ||
| ) | ||
| self.get_success( | ||
| event_injection.inject_member_event(self.hs, room, "@u3:other", "join") | ||
| ) | ||
|
|
||
| # create some events | ||
| event_id_1 = self.helper.send(room, "hello", tok=u1_token)["event_id"] | ||
| event_id_2 = self.helper.send(room, "world", tok=u1_token)["event_id"] | ||
| # pretend that u3 changes their displayname | ||
| event_id_3 = self.get_success( | ||
| event_injection.inject_member_event(self.hs, room, "@u3:other", "join") | ||
| ).event_id | ||
|
|
||
| # destination_rooms should already be populated, but let us pretend that we already | ||
| # sent (successfully) up to and including event id 1 | ||
| event_1 = self.get_success(self.hs.get_datastores().main.get_event(event_id_1)) | ||
| assert event_1.internal_metadata.stream_ordering is not None | ||
| self.get_success( | ||
| self.hs.get_datastores().main.set_destination_last_successful_stream_ordering( | ||
| "host2", event_1.internal_metadata.stream_ordering | ||
| ) | ||
| ) | ||
|
|
||
| # Mock event 3 as having partial state | ||
| self.get_success( | ||
| event_injection.mark_event_as_partial_state(self.hs, event_id_3, room) | ||
| ) | ||
|
|
||
| # Fail the test if we block on full state for event 3. | ||
| async def mock_await_full_state(event_ids: Collection[str]) -> None: | ||
| if event_id_3 in event_ids: | ||
| raise AssertionError("Tried to await full state for event_id_3") | ||
|
|
||
| # ACT | ||
| with mock.patch.object( | ||
| self.hs.get_storage_controllers().state._partial_state_events_tracker, | ||
| "await_full_state", | ||
| mock_await_full_state, | ||
| ): | ||
| self.get_success(per_dest_queue._catch_up_transmission_loop()) | ||
|
|
||
| # ASSERT | ||
| # We should have: | ||
| # - not sent event 3: it's not ours, and the room is partial stated | ||
| # - fallen back to sending event 2: it's the most recent event in the room | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a little dodgy. In this situation, BUT: suppose that the order of event was
then host 2 wouldn't be privvy to event 2. I think we'll either
I need to probably make another test case and stare at this some more. |
||
| # we tried to send to host2 | ||
| # - completed catch-up | ||
| self.assertEqual(len(sent_pdus), 1) | ||
| self.assertEqual(sent_pdus[0].event_id, event_id_2) | ||
| self.assertFalse(per_dest_queue._catching_up) | ||
Uh oh!
There was an error while loading. Please reload this page.