-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor backfilled into specific behavior function arguments (_persist_events_and_state_updates)
#11417
Refactor backfilled into specific behavior function arguments (_persist_events_and_state_updates)
#11417
Changes from 2 commits
d6b3fee
c138f8c
67393b8
52e3121
c83148e
96f1fe6
53e5d0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -124,7 +124,9 @@ async def _persist_events_and_state_updates( | |
| current_state_for_room: Dict[str, StateMap[str]], | ||
| state_delta_for_room: Dict[str, DeltaState], | ||
| new_forward_extremeties: Dict[str, List[str]], | ||
| backfilled: bool = False, | ||
| *, | ||
| use_negative_stream_ordering: bool = False, | ||
| inhibit_local_membership_updates: bool = False, | ||
| ) -> None: | ||
| """Persist a set of events alongside updates to the current state and | ||
| forward extremities tables. | ||
|
|
@@ -137,7 +139,14 @@ async def _persist_events_and_state_updates( | |
| room state | ||
| new_forward_extremities: Map from room_id to list of event IDs | ||
| that are the new forward extremities of the room. | ||
| backfilled | ||
| use_negative_stream_ordering: Whether to start stream_ordering on | ||
| the negative side and decrement. This should be set as True | ||
| for backfilled events because backfilled events get a negative | ||
| stream ordering so they don't come down incremental `/sync`. | ||
| inhibit_local_membership_updates: Stop the local_current_membership | ||
| from being updated by these events. This should be set to True | ||
| for backfilled events because backfilled events in the past do | ||
| not affect the current local state. | ||
|
|
||
| Returns: | ||
| Resolves when the events have been persisted | ||
|
|
@@ -159,7 +168,7 @@ async def _persist_events_and_state_updates( | |
| # | ||
| # Note: Multiple instances of this function cannot be in flight at | ||
| # the same time for the same room. | ||
| if backfilled: | ||
| if use_negative_stream_ordering: | ||
| stream_ordering_manager = self._backfill_id_gen.get_next_mult( | ||
| len(events_and_contexts) | ||
| ) | ||
|
|
@@ -176,13 +185,14 @@ async def _persist_events_and_state_updates( | |
| "persist_events", | ||
| self._persist_events_txn, | ||
| events_and_contexts=events_and_contexts, | ||
| backfilled=backfilled, | ||
| inhibit_local_membership_updates=inhibit_local_membership_updates, | ||
| use_negative_stream_ordering=use_negative_stream_ordering, | ||
| state_delta_for_room=state_delta_for_room, | ||
| new_forward_extremeties=new_forward_extremeties, | ||
| ) | ||
| persist_event_counter.inc(len(events_and_contexts)) | ||
|
|
||
| if not backfilled: | ||
| if stream < 0: | ||
| # backfilled events have negative stream orderings, so we don't | ||
| # want to set the event_persisted_position to that. | ||
| synapse.metrics.event_persisted_position.set( | ||
|
|
@@ -316,8 +326,10 @@ def _get_prevs_before_rejected_txn(txn, batch): | |
| def _persist_events_txn( | ||
| self, | ||
| txn: LoggingTransaction, | ||
| *, | ||
| events_and_contexts: List[Tuple[EventBase, EventContext]], | ||
| backfilled: bool, | ||
| inhibit_local_membership_updates: bool = False, | ||
| use_negative_stream_ordering: bool = False, | ||
| state_delta_for_room: Optional[Dict[str, DeltaState]] = None, | ||
| new_forward_extremeties: Optional[Dict[str, List[str]]] = None, | ||
| ): | ||
|
|
@@ -330,7 +342,14 @@ def _persist_events_txn( | |
| Args: | ||
| txn | ||
| events_and_contexts: events to persist | ||
| backfilled: True if the events were backfilled | ||
| inhibit_local_membership_updates: Stop the local_current_membership | ||
| from being updated by these events. This should be set to True | ||
| for backfilled events because backfilled events in the past do | ||
| not affect the current local state. | ||
| use_negative_stream_ordering: Whether to start stream_ordering on | ||
| the negative side and decrement. This should be set as True | ||
| for backfilled events because backfilled events get a negative | ||
| stream ordering so they don't come down incremental `/sync`. | ||
| delete_existing True to purge existing table rows for the events | ||
| from the database. This is useful when retrying due to | ||
| IntegrityError. | ||
|
|
@@ -364,7 +383,9 @@ def _persist_events_txn( | |
| ) | ||
|
|
||
| self._update_room_depths_txn( | ||
| txn, events_and_contexts=events_and_contexts, backfilled=backfilled | ||
| txn, | ||
| events_and_contexts=events_and_contexts, | ||
| use_negative_stream_ordering=use_negative_stream_ordering, | ||
| ) | ||
|
|
||
| # _update_outliers_txn filters out any events which have already been | ||
|
|
@@ -398,7 +419,7 @@ def _persist_events_txn( | |
| txn, | ||
| events_and_contexts=events_and_contexts, | ||
| all_events_and_contexts=all_events_and_contexts, | ||
| backfilled=backfilled, | ||
| inhibit_local_membership_updates=inhibit_local_membership_updates, | ||
| ) | ||
|
|
||
| # We call this last as it assumes we've inserted the events into | ||
|
|
@@ -1200,21 +1221,30 @@ def _update_room_depths_txn( | |
| self, | ||
| txn, | ||
| events_and_contexts: List[Tuple[EventBase, EventContext]], | ||
| backfilled: bool, | ||
| *, | ||
| use_negative_stream_ordering: bool = False, | ||
| ): | ||
| """Update min_depth for each room | ||
|
|
||
| Args: | ||
| txn (twisted.enterprise.adbapi.Connection): db connection | ||
| events_and_contexts (list[(EventBase, EventContext)]): events | ||
| we are persisting | ||
| backfilled (bool): True if the events were backfilled | ||
| use_negative_stream_ordering: Whether to start stream_ordering on | ||
| the negative side and decrement. This should be set as True | ||
| for backfilled events because backfilled events get a negative | ||
| stream ordering so they don't come down incremental `/sync`. | ||
| """ | ||
| depth_updates: Dict[str, int] = {} | ||
| for event, context in events_and_contexts: | ||
| # Remove the any existing cache entries for the event_ids | ||
| txn.call_after(self.store._invalidate_get_event_cache, event.event_id) | ||
| if not backfilled: | ||
| # This will update the `stream_ordering` position to mark the latest | ||
| # event as the front of the room. This should not be done for | ||
| # backfilled events because backfilled events have negative | ||
| # stream_ordering and happened in the past so we know that we don't | ||
| # need to update the stream_ordering tip for the room. | ||
| if not use_negative_stream_ordering: | ||
| txn.call_after( | ||
| self.store._events_stream_cache.entity_has_changed, | ||
| event.room_id, | ||
|
|
@@ -1427,7 +1457,11 @@ def _store_rejected_events_txn(self, txn, events_and_contexts): | |
| return [ec for ec in events_and_contexts if ec[0] not in to_remove] | ||
|
|
||
| def _update_metadata_tables_txn( | ||
| self, txn, events_and_contexts, all_events_and_contexts, backfilled | ||
| self, | ||
| txn, | ||
| events_and_contexts, | ||
| all_events_and_contexts, | ||
|
Comment on lines
+1448
to
+1449
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given that these don't have default values, does it necessarily make sense for them to be kw-only args? (Not necessarily asking for a change, just musing)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm leaning to keeping it. Keyword-only matches the sole usage already and seems decent to not mix-up the arguments because there isn't a straightforward order from the function name. |
||
| inhibit_local_membership_updates: bool = False, | ||
| ): | ||
| """Update all the miscellaneous tables for new events | ||
|
|
||
|
|
@@ -1439,7 +1473,10 @@ def _update_metadata_tables_txn( | |
| events that we were going to persist. This includes events | ||
| we've already persisted, etc, that wouldn't appear in | ||
| events_and_context. | ||
| backfilled (bool): True if the events were backfilled | ||
| inhibit_local_membership_updates: Stop the local_current_membership | ||
| from being updated by these events. This should be set to True | ||
| for backfilled events because backfilled events in the past do | ||
| not affect the current local state. | ||
| """ | ||
|
|
||
| # Insert all the push actions into the event_push_actions table. | ||
|
|
@@ -1513,7 +1550,7 @@ def _update_metadata_tables_txn( | |
| for event, _ in events_and_contexts | ||
| if event.type == EventTypes.Member | ||
| ], | ||
| backfilled=backfilled, | ||
| inhibit_local_membership_updates=inhibit_local_membership_updates, | ||
| ) | ||
|
|
||
| # Insert event_reference_hashes table. | ||
|
|
@@ -1638,8 +1675,19 @@ def _store_event_reference_hashes_txn(self, txn, events): | |
| txn, table="event_reference_hashes", values=vals | ||
| ) | ||
|
|
||
| def _store_room_members_txn(self, txn, events, backfilled): | ||
| """Store a room member in the database.""" | ||
| def _store_room_members_txn( | ||
| self, txn, events, *, inhibit_local_membership_updates: bool = False | ||
| ): | ||
| """ | ||
| Store a room member in the database. | ||
| Args: | ||
| txn: The transaction to use. | ||
| events: List of events to store. | ||
| inhibit_local_membership_updates: Stop the local_current_membership | ||
| from being updated by these events. This should be set to True | ||
| for backfilled events because backfilled events in the past do | ||
| not affect the current local state. | ||
| """ | ||
|
|
||
| def non_null_str_or_none(val: Any) -> Optional[str]: | ||
| return val if isinstance(val, str) and "\u0000" not in val else None | ||
|
|
@@ -1682,7 +1730,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]: | |
| # band membership", like a remote invite or a rejection of a remote invite. | ||
| if ( | ||
| self.is_mine_id(event.state_key) | ||
| and not backfilled | ||
| and not inhibit_local_membership_updates | ||
| and event.internal_metadata.is_outlier() | ||
| and event.internal_metadata.is_out_of_band_membership() | ||
| ): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -583,7 +583,8 @@ async def _persist_event_batch( | |
| current_state_for_room=current_state_for_room, | ||
| state_delta_for_room=state_delta_for_room, | ||
| new_forward_extremeties=new_forward_extremeties, | ||
| backfilled=backfilled, | ||
| use_negative_stream_ordering=backfilled, | ||
| inhibit_local_membership_updates=backfilled, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Around that idea, there is a nicety around grouping up the behavior under the Somewhere at the multiple top-levels when we refactor this further, we're going to have to make sure the Wdyt about having a static lookup map: class BackfilledAttributes:
use_negative_stream_ordering = False
inhibit_local_membership_updates = False
other_more_condition = True
extraneous_more_condition = False
different_more_condition = TrueAnd it can be used like this. I don't know. I'm not convinced of this usage and haven't figured a good clean way for the await self.persist_events_store._persist_events_and_state_updates(
chunk,
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
use_negative_stream_ordering=BackfilledAttributes.use_negative_stream_ordering if backfilled else ...,
inhibit_local_membership_updates=BackfilledAttributes.inhibit_local_membership_updates if backfilled else ...,
other_more_condition=BackfilledAttributes.other_more_condition if backfilled else ...,
extraneous_more_condition=BackfilledAttributes.extraneous_more_condition if backfilled else ...,
different_more_condition=BackfilledAttributes.different_more_condition if backfilled else ...,
)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's something to be said for identifying different "classes" of event (in this case, 'backfilled' and, uh, 'normal') with a list of exactly how those classes differ in behaviour. In that case you can just pass the 'class' identifier around rather than a sea of booleans. We do something pretty similar with room versions (see https://github.com/matrix-org/synapse/blob/v1.47.0/synapse/api/room_versions.py#L50 etc) and I don't hate it. That said, I'm yet to be convinced that such a solution isn't over-engineered in this case - if we've only really got a few degrees of freedom here, then booleans for them might be easier. Can I reserve judgement on this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great comparison to
Totally fine to push this to when we have more a need. Just wanted to throw out the inkling of an idea that the original |
||
| ) | ||
|
|
||
| await self._handle_potentially_left_users(potentially_left_users) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.