From 3a23be11ff50bfe0fc4b4a082232052c3d49b84f Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Sat, 8 Apr 2023 14:47:27 +0000 Subject: [PATCH 1/8] Send lag update message Signed-off-by: Andrei Sandu --- node/network/approval-distribution/src/lib.rs | 1 + node/service/src/lib.rs | 1 + node/service/src/relay_chain_selection.rs | 49 ++++++++++++++++--- node/subsystem-types/src/messages.rs | 2 + 4 files changed, 46 insertions(+), 7 deletions(-) diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index f0df22b559e6..e660eed10a31 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -1764,6 +1764,7 @@ impl ApprovalDistribution { ); } }, + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(_) => {}, } } } diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 913c30b46c4e..358d28715e78 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -763,6 +763,7 @@ where basics.backend.clone(), overseer_handle.clone(), metrics, + basics.task_manager.spawn_handle(), ) } else { SelectRelayChain::new_longest_chain(basics.backend.clone()) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 41bd08e611de..cac8f0175016 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -40,14 +40,16 @@ use consensus_common::{Error as ConsensusError, SelectChain}; use futures::channel::oneshot; use polkadot_node_primitives::MAX_FINALITY_LAG as PRIMITIVES_MAX_FINALITY_LAG; use polkadot_node_subsystem::messages::{ - ApprovalVotingMessage, ChainSelectionMessage, DisputeCoordinatorMessage, - HighestApprovedAncestorBlock, + ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage, + DisputeCoordinatorMessage, HighestApprovedAncestorBlock, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::{AllMessages, Handle}; use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader}; use std::sync::Arc; +pub use service::SpawnTaskHandle; + /// The maximum amount of unfinalized blocks we are willing to allow due to approval checking /// or disputes. /// @@ -162,13 +164,21 @@ where /// Create a new [`SelectRelayChain`] wrapping the given chain backend /// and a handle to the overseer. - pub fn new_with_overseer(backend: Arc, overseer: Handle, metrics: Metrics) -> Self { + pub fn new_with_overseer( + backend: Arc, + overseer: Handle, + metrics: Metrics, + spawn_handle: SpawnTaskHandle, + ) -> Self { gum::debug!(target: LOG_TARGET, "Using dispute aware relay-chain selection algorithm",); SelectRelayChain { longest_chain: sc_consensus::LongestChain::new(backend.clone()), selection: IsDisputesAwareWithOverseer::Yes(SelectRelayChainInner::new( - backend, overseer, metrics, + backend, + overseer, + metrics, + spawn_handle, )), } } @@ -219,6 +229,7 @@ pub struct SelectRelayChainInner { backend: Arc, overseer: OH, metrics: Metrics, + spawn_handle: SpawnTaskHandle, } impl SelectRelayChainInner @@ -228,8 +239,13 @@ where { /// Create a new [`SelectRelayChainInner`] wrapping the given chain backend /// and a handle to the overseer. - pub fn new(backend: Arc, overseer: OH, metrics: Metrics) -> Self { - SelectRelayChainInner { backend, overseer, metrics } + pub fn new( + backend: Arc, + overseer: OH, + metrics: Metrics, + spawn_handle: SpawnTaskHandle, + ) -> Self { + SelectRelayChainInner { backend, overseer, metrics, spawn_handle } } fn block_header(&self, hash: Hash) -> Result { @@ -267,6 +283,7 @@ where backend: self.backend.clone(), overseer: self.overseer.clone(), metrics: self.metrics.clone(), + spawn_handle: self.spawn_handle.clone(), } } } @@ -307,7 +324,7 @@ impl OverseerHandleT for Handle { impl SelectRelayChainInner where B: HeaderProviderProvider, - OH: OverseerHandleT, + OH: OverseerHandleT + 'static, { /// Get all leaves of the chain, i.e. block hashes that are suitable to /// build upon and have no suitable children. @@ -455,6 +472,24 @@ where let lag = initial_leaf_number.saturating_sub(subchain_number); self.metrics.note_approval_checking_finality_lag(lag); + let mut overseer_handle = self.overseer.clone(); + let lag_update_task = async move { + overseer_handle + .send_msg( + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; + }; + + // TODO: Send feedback to approval distribution regarding approval checking lag. + // Spawn task for sending the message. + self.spawn_handle.spawn( + "approval-checking-lag-update", + Some("relay-chain-selection"), + Box::pin(lag_update_task), + ); + let (lag, subchain_head) = { // Prevent sending flawed data to the dispute-coordinator. if Some(subchain_block_descriptions.len() as _) != diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 95895a5b0aec..c21fb71819ec 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -824,6 +824,8 @@ pub enum ApprovalDistributionMessage { HashSet<(Hash, CandidateIndex)>, oneshot::Sender>, ), + /// Approval checking lag update measured in blocks. + ApprovalCheckingLagUpdate(BlockNumber), } /// Message to the Gossip Support subsystem. From 3341a35d4911f6f54f1581d2f4d4888b40c0339c Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 10 Apr 2023 09:27:21 +0000 Subject: [PATCH 2/8] Process ApprovalCheckingLagUpdate Signed-off-by: Andrei Sandu --- node/network/approval-distribution/src/lib.rs | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index e660eed10a31..f66bb5db6e30 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -124,12 +124,12 @@ struct AggressionConfig { } impl AggressionConfig { - /// Returns `true` if block is not too old depending on the aggression level - fn is_age_relevant(&self, block_age: BlockNumber) -> bool { + /// Returns `true` if lag is past threshold depending on the aggression level + fn should_trigger_aggression(&self, approval_checking_lag: BlockNumber) -> bool { if let Some(t) = self.l1_threshold { - block_age >= t + approval_checking_lag >= t } else if let Some(t) = self.resend_unfinalized_period { - block_age > 0 && block_age % t == 0 + approval_checking_lag > 0 && approval_checking_lag % t == 0 } else { false } @@ -184,6 +184,9 @@ struct State { /// HashMap from active leaves to spans spans: HashMap, + + /// Current approval checking finality lag. + approval_checking_lag: BlockNumber, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1425,20 +1428,20 @@ impl State { resend: Resend, metrics: &Metrics, ) { + let config = self.aggression_config.clone(); + + if !self.aggression_config.should_trigger_aggression(self.approval_checking_lag) { + return + } + let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num); let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num); - let config = self.aggression_config.clone(); let (min_age, max_age) = match (min_age, max_age) { (Some(min), Some(max)) => (min, max), _ => return, // empty. }; - let diff = max_age - min_age; - if !self.aggression_config.is_age_relevant(diff) { - return - } - adjust_required_routing_and_propagate( ctx, &mut self.blocks, @@ -1483,13 +1486,14 @@ impl State { if *required_routing == RequiredRouting::PendingTopology { gum::debug!( target: LOG_TARGET, - age = ?diff, + lag = ?self.approval_checking_lag, "Encountered old block pending gossip topology", ); return } - if config.l1_threshold.as_ref().map_or(false, |t| &diff >= t) { + if config.l1_threshold.as_ref().map_or(false, |t| &self.approval_checking_lag >= t) + { // Message originator sends to everyone. if local && *required_routing != RequiredRouting::All { metrics.on_aggression_l1(); @@ -1497,7 +1501,8 @@ impl State { } } - if config.l2_threshold.as_ref().map_or(false, |t| &diff >= t) { + if config.l2_threshold.as_ref().map_or(false, |t| &self.approval_checking_lag >= t) + { // Message originator sends to everyone. Everyone else sends to XY. if !local && *required_routing != RequiredRouting::GridXY { metrics.on_aggression_l2(); @@ -1764,7 +1769,10 @@ impl ApprovalDistribution { ); } }, - ApprovalDistributionMessage::ApprovalCheckingLagUpdate(_) => {}, + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => { + gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`"); + state.approval_checking_lag = lag; + }, } } } From 7ea7ce6bef3e29c9afbcfef95191d34f2227dc90 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 10 Apr 2023 09:45:50 +0000 Subject: [PATCH 3/8] Comput min age based on lag Signed-off-by: Andrei Sandu --- node/network/approval-distribution/src/lib.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index f66bb5db6e30..775bfb35fca7 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -1434,14 +1434,19 @@ impl State { return } - let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num); let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num); - let (min_age, max_age) = match (min_age, max_age) { - (Some(min), Some(max)) => (min, max), + let max_age = match max_age { + Some(max) => *max, _ => return, // empty. }; + // Since we have the approval checking lag, we need to set the `min_age` accordingly to + // enable aggresion for the oldest block that is not approved. + // + // Alternatively we could remove blocks that have been approved based on the `approval_checking_lag` value. + let min_age = max_age.saturating_sub(self.approval_checking_lag); + adjust_required_routing_and_propagate( ctx, &mut self.blocks, @@ -1479,7 +1484,7 @@ impl State { // its descendants from being finalized. Waste minimal bandwidth // this way. Also, disputes might prevent finality - again, nothing // to waste bandwidth on newer blocks for. - &block_entry.number == min_age + block_entry.number == min_age }, |required_routing, local, _| { // It's a bit surprising not to have a topology at this age. From e6bb4ac2e379bc2729045f0e14e07c719bf9291c Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 10 Apr 2023 13:21:54 +0000 Subject: [PATCH 4/8] fix comment Signed-off-by: Andrei Sandu --- node/service/src/relay_chain_selection.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index cac8f0175016..db506f028365 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -482,8 +482,8 @@ where .await; }; - // TODO: Send feedback to approval distribution regarding approval checking lag. - // Spawn task for sending the message. + // Messages sent to `approval-distrbution` are known to have high `ToF`, we need to spawn a task for sending + // the message to not block here and delay finality. self.spawn_handle.spawn( "approval-checking-lag-update", Some("relay-chain-selection"), From 533eaefb3b8d55230fd6130e469a51aa9f404fd7 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 11 Apr 2023 13:10:34 +0000 Subject: [PATCH 5/8] Fix tests Signed-off-by: Andrei Sandu --- node/network/approval-distribution/src/lib.rs | 9 +++++++-- node/network/approval-distribution/src/tests.rs | 12 ++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 775bfb35fca7..c411e92c4605 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -1431,6 +1431,11 @@ impl State { let config = self.aggression_config.clone(); if !self.aggression_config.should_trigger_aggression(self.approval_checking_lag) { + gum::trace!( + target: LOG_TARGET, + approval_checking_lag = self.approval_checking_lag, + "Aggression not enabled", + ); return } @@ -1443,10 +1448,10 @@ impl State { // Since we have the approval checking lag, we need to set the `min_age` accordingly to // enable aggresion for the oldest block that is not approved. - // - // Alternatively we could remove blocks that have been approved based on the `approval_checking_lag` value. let min_age = max_age.saturating_sub(self.approval_checking_lag); + gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",); + adjust_required_routing_and_propagate( ctx, &mut self.blocks, diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index 459b9d4899fb..b347e7451969 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -1817,6 +1817,9 @@ fn originator_aggression_l1() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(level + 1); + overseer_send(overseer, msg).await; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -2071,6 +2074,9 @@ fn non_originator_aggression_l2() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(level + 1); + overseer_send(overseer, msg).await; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -2097,6 +2103,10 @@ fn non_originator_aggression_l2() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate( + aggression_l1_threshold + level + 1, + ); + overseer_send(overseer, msg).await; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -2241,6 +2251,8 @@ fn resends_messages_periodically() { session: 1, }; + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(2); + overseer_send(overseer, msg).await; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; From dd275143d37de431dbd24c463202ba999ab89503 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 12 Apr 2023 14:35:25 +0000 Subject: [PATCH 6/8] Fix test build Signed-off-by: Andrei Sandu --- Cargo.lock | 1 + node/service/Cargo.toml | 1 + node/service/src/tests.rs | 3 +++ 3 files changed, 5 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index aee481b80e33..b27d7b7cf3a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7883,6 +7883,7 @@ dependencies = [ "substrate-prometheus-endpoint", "tempfile", "thiserror", + "tokio", "tracing-gum", "westend-runtime", "westend-runtime-constants", diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index abcea3cc033c..5e85387dff37 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -136,6 +136,7 @@ polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } env_logger = "0.9.0" assert_matches = "1.5.0" tempfile = "3.2" +tokio = "1.24.2" [features] default = ["db", "full-node", "polkadot-native"] diff --git a/node/service/src/tests.rs b/node/service/src/tests.rs index 1a2997fe6723..3f578bf7f56f 100644 --- a/node/service/src/tests.rs +++ b/node/service/src/tests.rs @@ -33,6 +33,7 @@ use std::{ }; use assert_matches::assert_matches; +use service::TaskManager; use std::{sync::Arc, time::Duration}; use futures::{channel::oneshot, prelude::*}; @@ -75,6 +76,7 @@ fn test_harness>( .try_init(); let pool = sp_core::testing::TaskExecutor::new(); + let task_manager = TaskManager::new(tokio::runtime::Handle::current(), None).unwrap(); let (mut context, virtual_overseer) = test_helpers::make_subsystem_context(pool); let (finality_target_tx, finality_target_rx) = oneshot::channel::>(); @@ -83,6 +85,7 @@ fn test_harness>( Arc::new(case_vars.chain.clone()), context.sender().clone(), Default::default(), + task_manager.spawn_handle(), ); let target_hash = case_vars.target_block.clone(); From 215bd4694e1accecbf0b450081c61fa79973c8da Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 12 Apr 2023 15:58:38 +0000 Subject: [PATCH 7/8] Make the spawnhandle optional Signed-off-by: Andrei Sandu --- Cargo.lock | 1 - node/service/Cargo.toml | 1 - node/service/src/lib.rs | 2 +- node/service/src/relay_chain_selection.rs | 38 ++++++++++++----------- node/service/src/tests.rs | 4 +-- 5 files changed, 22 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b27d7b7cf3a1..aee481b80e33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7883,7 +7883,6 @@ dependencies = [ "substrate-prometheus-endpoint", "tempfile", "thiserror", - "tokio", "tracing-gum", "westend-runtime", "westend-runtime-constants", diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 5e85387dff37..abcea3cc033c 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -136,7 +136,6 @@ polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } env_logger = "0.9.0" assert_matches = "1.5.0" tempfile = "3.2" -tokio = "1.24.2" [features] default = ["db", "full-node", "polkadot-native"] diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 358d28715e78..20546348acdb 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -763,7 +763,7 @@ where basics.backend.clone(), overseer_handle.clone(), metrics, - basics.task_manager.spawn_handle(), + Some(basics.task_manager.spawn_handle()), ) } else { SelectRelayChain::new_longest_chain(basics.backend.clone()) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index db506f028365..00d7044fbeb1 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -168,7 +168,7 @@ where backend: Arc, overseer: Handle, metrics: Metrics, - spawn_handle: SpawnTaskHandle, + spawn_handle: Option, ) -> Self { gum::debug!(target: LOG_TARGET, "Using dispute aware relay-chain selection algorithm",); @@ -229,7 +229,7 @@ pub struct SelectRelayChainInner { backend: Arc, overseer: OH, metrics: Metrics, - spawn_handle: SpawnTaskHandle, + spawn_handle: Option, } impl SelectRelayChainInner @@ -243,7 +243,7 @@ where backend: Arc, overseer: OH, metrics: Metrics, - spawn_handle: SpawnTaskHandle, + spawn_handle: Option, ) -> Self { SelectRelayChainInner { backend, overseer, metrics, spawn_handle } } @@ -472,23 +472,25 @@ where let lag = initial_leaf_number.saturating_sub(subchain_number); self.metrics.note_approval_checking_finality_lag(lag); - let mut overseer_handle = self.overseer.clone(); - let lag_update_task = async move { - overseer_handle - .send_msg( - ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), - std::any::type_name::(), - ) - .await; - }; - // Messages sent to `approval-distrbution` are known to have high `ToF`, we need to spawn a task for sending // the message to not block here and delay finality. - self.spawn_handle.spawn( - "approval-checking-lag-update", - Some("relay-chain-selection"), - Box::pin(lag_update_task), - ); + if let Some(spawn_handle) = &self.spawn_handle { + let mut overseer_handle = self.overseer.clone(); + let lag_update_task = async move { + overseer_handle + .send_msg( + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; + }; + + spawn_handle.spawn( + "approval-checking-lag-update", + Some("relay-chain-selection"), + Box::pin(lag_update_task), + ); + } let (lag, subchain_head) = { // Prevent sending flawed data to the dispute-coordinator. diff --git a/node/service/src/tests.rs b/node/service/src/tests.rs index 3f578bf7f56f..8439bea95090 100644 --- a/node/service/src/tests.rs +++ b/node/service/src/tests.rs @@ -76,7 +76,6 @@ fn test_harness>( .try_init(); let pool = sp_core::testing::TaskExecutor::new(); - let task_manager = TaskManager::new(tokio::runtime::Handle::current(), None).unwrap(); let (mut context, virtual_overseer) = test_helpers::make_subsystem_context(pool); let (finality_target_tx, finality_target_rx) = oneshot::channel::>(); @@ -85,7 +84,7 @@ fn test_harness>( Arc::new(case_vars.chain.clone()), context.sender().clone(), Default::default(), - task_manager.spawn_handle(), + None, ); let target_hash = case_vars.target_block.clone(); @@ -102,7 +101,6 @@ fn test_harness>( futures::pin_mut!(test_fut); futures::pin_mut!(selection_process); - futures::executor::block_on(future::join( async move { let _overseer = test_fut.await; From 6ddf786b3e898df64c7370482d9502b82015a6c0 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 12 Apr 2023 18:55:32 +0000 Subject: [PATCH 8/8] remove unused Signed-off-by: Andrei Sandu --- node/service/src/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/node/service/src/tests.rs b/node/service/src/tests.rs index 8439bea95090..5cd5c5817043 100644 --- a/node/service/src/tests.rs +++ b/node/service/src/tests.rs @@ -33,7 +33,6 @@ use std::{ }; use assert_matches::assert_matches; -use service::TaskManager; use std::{sync::Arc, time::Duration}; use futures::{channel::oneshot, prelude::*};