diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 22c5ed8b66e2..c8ad21ad406e 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -124,12 +124,12 @@ struct AggressionConfig { } impl AggressionConfig { - /// Returns `true` if block is not too old depending on the aggression level - fn is_age_relevant(&self, block_age: BlockNumber) -> bool { + /// Returns `true` if lag is past threshold depending on the aggression level + fn should_trigger_aggression(&self, approval_checking_lag: BlockNumber) -> bool { if let Some(t) = self.l1_threshold { - block_age >= t + approval_checking_lag >= t } else if let Some(t) = self.resend_unfinalized_period { - block_age > 0 && block_age % t == 0 + approval_checking_lag > 0 && approval_checking_lag % t == 0 } else { false } @@ -184,6 +184,9 @@ struct State { /// HashMap from active leaves to spans spans: HashMap, + + /// Current approval checking finality lag. + approval_checking_lag: BlockNumber, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1425,19 +1428,29 @@ impl State { resend: Resend, metrics: &Metrics, ) { - let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num); - let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num); let config = self.aggression_config.clone(); - let (min_age, max_age) = match (min_age, max_age) { - (Some(min), Some(max)) => (min, max), + if !self.aggression_config.should_trigger_aggression(self.approval_checking_lag) { + gum::trace!( + target: LOG_TARGET, + approval_checking_lag = self.approval_checking_lag, + "Aggression not enabled", + ); + return + } + + let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num); + + let max_age = match max_age { + Some(max) => *max, _ => return, // empty. }; - let diff = max_age - min_age; - if !self.aggression_config.is_age_relevant(diff) { - return - } + // Since we have the approval checking lag, we need to set the `min_age` accordingly to + // enable aggresion for the oldest block that is not approved. + let min_age = max_age.saturating_sub(self.approval_checking_lag); + + gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",); adjust_required_routing_and_propagate( ctx, @@ -1476,20 +1489,21 @@ impl State { // its descendants from being finalized. Waste minimal bandwidth // this way. Also, disputes might prevent finality - again, nothing // to waste bandwidth on newer blocks for. - &block_entry.number == min_age + block_entry.number == min_age }, |required_routing, local, _| { // It's a bit surprising not to have a topology at this age. if *required_routing == RequiredRouting::PendingTopology { gum::debug!( target: LOG_TARGET, - age = ?diff, + lag = ?self.approval_checking_lag, "Encountered old block pending gossip topology", ); return } - if config.l1_threshold.as_ref().map_or(false, |t| &diff >= t) { + if config.l1_threshold.as_ref().map_or(false, |t| &self.approval_checking_lag >= t) + { // Message originator sends to everyone. if local && *required_routing != RequiredRouting::All { metrics.on_aggression_l1(); @@ -1497,7 +1511,8 @@ impl State { } } - if config.l2_threshold.as_ref().map_or(false, |t| &diff >= t) { + if config.l2_threshold.as_ref().map_or(false, |t| &self.approval_checking_lag >= t) + { // Message originator sends to everyone. Everyone else sends to XY. if !local && *required_routing != RequiredRouting::GridXY { metrics.on_aggression_l2(); @@ -1764,6 +1779,10 @@ impl ApprovalDistribution { ); } }, + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => { + gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`"); + state.approval_checking_lag = lag; + }, } } } diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index 31fe0a421d38..1b05acee62be 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -1817,6 +1817,9 @@ fn originator_aggression_l1() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(level + 1); + overseer_send(overseer, msg).await; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -2071,6 +2074,9 @@ fn non_originator_aggression_l2() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(level + 1); + overseer_send(overseer, msg).await; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -2097,6 +2103,10 @@ fn non_originator_aggression_l2() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate( + aggression_l1_threshold + level + 1, + ); + overseer_send(overseer, msg).await; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -2241,6 +2251,8 @@ fn resends_messages_periodically() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(2); + overseer_send(overseer, msg).await; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 86bae65a268b..71eebeafcbfe 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -763,6 +763,7 @@ where basics.backend.clone(), overseer_handle.clone(), metrics, + Some(basics.task_manager.spawn_handle()), ) } else { SelectRelayChain::new_longest_chain(basics.backend.clone()) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 18d6866e049e..afc0ce320610 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -40,14 +40,16 @@ use consensus_common::{Error as ConsensusError, SelectChain}; use futures::channel::oneshot; use polkadot_node_primitives::MAX_FINALITY_LAG as PRIMITIVES_MAX_FINALITY_LAG; use polkadot_node_subsystem::messages::{ - ApprovalVotingMessage, ChainSelectionMessage, DisputeCoordinatorMessage, - HighestApprovedAncestorBlock, + ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage, + DisputeCoordinatorMessage, HighestApprovedAncestorBlock, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::{AllMessages, Handle}; use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader}; use std::sync::Arc; +pub use service::SpawnTaskHandle; + /// The maximum amount of unfinalized blocks we are willing to allow due to approval checking /// or disputes. /// @@ -162,13 +164,21 @@ where /// Create a new [`SelectRelayChain`] wrapping the given chain backend /// and a handle to the overseer. - pub fn new_with_overseer(backend: Arc, overseer: Handle, metrics: Metrics) -> Self { + pub fn new_with_overseer( + backend: Arc, + overseer: Handle, + metrics: Metrics, + spawn_handle: Option, + ) -> Self { gum::debug!(target: LOG_TARGET, "Using dispute aware relay-chain selection algorithm",); SelectRelayChain { longest_chain: sc_consensus::LongestChain::new(backend.clone()), selection: IsDisputesAwareWithOverseer::Yes(SelectRelayChainInner::new( - backend, overseer, metrics, + backend, + overseer, + metrics, + spawn_handle, )), } } @@ -219,6 +229,7 @@ pub struct SelectRelayChainInner { backend: Arc, overseer: OH, metrics: Metrics, + spawn_handle: Option, } impl SelectRelayChainInner @@ -228,8 +239,13 @@ where { /// Create a new [`SelectRelayChainInner`] wrapping the given chain backend /// and a handle to the overseer. - pub fn new(backend: Arc, overseer: OH, metrics: Metrics) -> Self { - SelectRelayChainInner { backend, overseer, metrics } + pub fn new( + backend: Arc, + overseer: OH, + metrics: Metrics, + spawn_handle: Option, + ) -> Self { + SelectRelayChainInner { backend, overseer, metrics, spawn_handle } } fn block_header(&self, hash: Hash) -> Result { @@ -267,6 +283,7 @@ where backend: self.backend.clone(), overseer: self.overseer.clone(), metrics: self.metrics.clone(), + spawn_handle: self.spawn_handle.clone(), } } } @@ -307,7 +324,7 @@ impl OverseerHandleT for Handle { impl SelectRelayChainInner where B: HeaderProviderProvider, - OH: OverseerHandleT, + OH: OverseerHandleT + 'static, { /// Get all leaves of the chain, i.e. block hashes that are suitable to /// build upon and have no suitable children. @@ -455,6 +472,26 @@ where let lag = initial_leaf_number.saturating_sub(subchain_number); self.metrics.note_approval_checking_finality_lag(lag); + // Messages sent to `approval-distrbution` are known to have high `ToF`, we need to spawn a task for sending + // the message to not block here and delay finality. + if let Some(spawn_handle) = &self.spawn_handle { + let mut overseer_handle = self.overseer.clone(); + let lag_update_task = async move { + overseer_handle + .send_msg( + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; + }; + + spawn_handle.spawn( + "approval-checking-lag-update", + Some("relay-chain-selection"), + Box::pin(lag_update_task), + ); + } + let (lag, subchain_head) = { // Prevent sending flawed data to the dispute-coordinator. if Some(subchain_block_descriptions.len() as _) != diff --git a/node/service/src/tests.rs b/node/service/src/tests.rs index d28391a42e51..bd0926a98a2a 100644 --- a/node/service/src/tests.rs +++ b/node/service/src/tests.rs @@ -83,6 +83,7 @@ fn test_harness>( Arc::new(case_vars.chain.clone()), context.sender().clone(), Default::default(), + None, ); let target_hash = case_vars.target_block.clone(); @@ -99,7 +100,6 @@ fn test_harness>( futures::pin_mut!(test_fut); futures::pin_mut!(selection_process); - futures::executor::block_on(future::join( async move { let _overseer = test_fut.await; diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index ee0c4ebc4711..0784e397592d 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -824,6 +824,8 @@ pub enum ApprovalDistributionMessage { HashSet<(Hash, CandidateIndex)>, oneshot::Sender>, ), + /// Approval checking lag update measured in blocks. + ApprovalCheckingLagUpdate(BlockNumber), } /// Message to the Gossip Support subsystem.