-
Notifications
You must be signed in to change notification settings - Fork 1.5k
collator-protocol: short-term fixes for connectivity #4640
Changes from 6 commits
3ca17f2
45040a5
6c87745
1a6cae1
4cc4ae0
0ccc46b
b552244
503e467
048e7c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -321,15 +321,14 @@ impl State { | |
|
|
||
| /// Distribute a collation. | ||
| /// | ||
| /// Figure out the core our para is assigned to and the relevant validators. | ||
| /// Issue a connection request to these validators. | ||
| /// If the para is not scheduled or next up on any core, at the relay-parent, | ||
| /// or the relay-parent isn't in the active-leaves set, we ignore the message | ||
| /// as it must be invalid in that case - although this indicates a logic error | ||
| /// elsewhere in the node. | ||
| /// If the para is not scheduled on any core, at the relay parent, | ||
| /// or the relay parent isn't in our view or we already collated on the relay parent, | ||
| /// we ignore the message as it must be invalid in that case - | ||
| /// although this indicates a logic error elsewhere in the node. | ||
| /// | ||
| /// Otherwise, start advertising the collation to interested peers. | ||
| async fn distribute_collation<Context>( | ||
| ctx: &mut Context, | ||
| runtime: &mut RuntimeInfo, | ||
| state: &mut State, | ||
| id: ParaId, | ||
| receipt: CandidateReceipt, | ||
|
|
@@ -358,32 +357,8 @@ where | |
| return Ok(()) | ||
| } | ||
|
|
||
| // Determine which core the para collated-on is assigned to. | ||
| // If it is not scheduled then ignore the message. | ||
| let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? { | ||
| Some(core) => core, | ||
| None => { | ||
| tracing::warn!( | ||
| target: LOG_TARGET, | ||
| para_id = %id, | ||
| ?relay_parent, | ||
| "looks like no core is assigned to {} at {}", id, relay_parent, | ||
| ); | ||
|
|
||
| return Ok(()) | ||
| }, | ||
| }; | ||
|
|
||
| // Determine the group on that core. | ||
| let current_validators = | ||
| determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?; | ||
|
|
||
| if current_validators.validators.is_empty() { | ||
| tracing::warn!( | ||
| target: LOG_TARGET, | ||
| core = ?our_core, | ||
| "there are no validators assigned to core", | ||
| ); | ||
| if !state.our_validators_groups.contains_key(&relay_parent) { | ||
| tracing::warn!(target: LOG_TARGET, "There are no validators assigned to the core.",); | ||
|
|
||
| return Ok(()) | ||
| } | ||
|
|
@@ -394,16 +369,9 @@ where | |
| relay_parent = %relay_parent, | ||
| candidate_hash = ?receipt.hash(), | ||
| pov_hash = ?pov.hash(), | ||
| core = ?our_core, | ||
| ?current_validators, | ||
| "Accepted collation, connecting to validators." | ||
| "Accepted collation", | ||
| ); | ||
|
|
||
| // Issue a discovery request for the validators of the current group: | ||
| connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await; | ||
|
|
||
| state.our_validators_groups.insert(relay_parent, ValidatorGroup::new()); | ||
|
|
||
| if let Some(result_sender) = result_sender { | ||
| state.collation_result_senders.insert(receipt.hash(), result_sender); | ||
| } | ||
|
|
@@ -522,7 +490,7 @@ where | |
| Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>, | ||
| { | ||
| // ignore address resolution failure | ||
| // will reissue a new request on new collation | ||
| // will reissue a new request on new relay parent | ||
| let (failed, _) = oneshot::channel(); | ||
| ctx.send_message(NetworkBridgeMessage::ConnectToValidators { | ||
| validator_ids, | ||
|
|
@@ -633,8 +601,7 @@ where | |
| ); | ||
| }, | ||
| Some(id) => { | ||
| distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender) | ||
| .await?; | ||
| distribute_collation(ctx, state, id, receipt, pov, result_sender).await?; | ||
| }, | ||
| None => { | ||
| tracing::warn!( | ||
|
|
@@ -919,7 +886,7 @@ where | |
| }, | ||
| OurViewChange(view) => { | ||
| tracing::trace!(target: LOG_TARGET, ?view, "Own view change"); | ||
| handle_our_view_change(state, view).await?; | ||
| handle_our_view_change(ctx, runtime, state, view).await?; | ||
| }, | ||
| PeerMessage(remote, msg) => { | ||
| handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; | ||
|
|
@@ -933,7 +900,16 @@ where | |
| } | ||
|
|
||
| /// Handles our view changes. | ||
| async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> { | ||
| async fn handle_our_view_change<Context>( | ||
| ctx: &mut Context, | ||
| runtime: &mut RuntimeInfo, | ||
| state: &mut State, | ||
| view: OurView, | ||
| ) -> Result<()> | ||
| where | ||
| Context: SubsystemContext<Message = CollatorProtocolMessage>, | ||
| Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>, | ||
| { | ||
| for removed in state.view.difference(&view) { | ||
| tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed."); | ||
|
|
||
|
|
@@ -966,8 +942,54 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> | |
| state.waiting_collation_fetches.remove(removed); | ||
| } | ||
|
|
||
| let new_leaves: Vec<_> = view.difference(&state.view).cloned().collect(); | ||
| state.view = view; | ||
|
|
||
| let id = match state.collating_on { | ||
| Some(id) => id, | ||
| None => return Ok(()), | ||
| }; | ||
|
|
||
| for relay_parent in new_leaves { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?relay_parent, | ||
| para_id = ?id, | ||
| "Processing new relay parent.", | ||
| ); | ||
|
|
||
| // Determine our assigned core. | ||
| // If it is not scheduled then ignore the relay parent. | ||
| let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? { | ||
| Some(core) => core, | ||
| None => continue, | ||
| }; | ||
|
|
||
| // Determine the group on that core. | ||
| let current_validators = | ||
| determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?; | ||
|
|
||
| let validators = current_validators.validators; | ||
| let no_one_is_assigned = validators.is_empty(); | ||
|
|
||
| if no_one_is_assigned { | ||
| continue | ||
| } | ||
|
|
||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?relay_parent, | ||
| ?validators, | ||
| para_id = ?id, | ||
| "Connecting to validators.", | ||
| ); | ||
|
|
||
| // Add the current validator group to the reserved peers | ||
| connect_to_validators(ctx, validators).await; | ||
|
||
|
|
||
| state.our_validators_groups.insert(relay_parent, ValidatorGroup::new()); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,7 @@ pub struct CollatorEvictionPolicy { | |
| impl Default for CollatorEvictionPolicy { | ||
| fn default() -> Self { | ||
| CollatorEvictionPolicy { | ||
| inactive_collator: Duration::from_secs(24), | ||
| inactive_collator: Duration::from_secs(5), | ||
| undeclared: Duration::from_secs(1), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.