From c13fa40fcc06b13356d60a2e5186e04c794eb1c1 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 20 Oct 2023 16:13:29 +0300 Subject: [PATCH 01/11] Handling of disabled validators in backing subsystem (#1259) Handles validator disabling in the backing subsystem. The PR introduces two changes: 1. Don't import statements from disabled validators. 2. Don't validate and second if the local validator is disabled. The purpose of this change is first to ignore statements from disabled validators on the node side. This is an optimisation as these statements are supposed to be cleared in the runtime too (still not implemented at the moment). Additionally if the local node is a validator which is disabled - don't process any new candidates. --------- Co-authored-by: ordian Co-authored-by: ordian --- polkadot/node/core/backing/src/lib.rs | 87 ++++- polkadot/node/core/backing/src/tests/mod.rs | 327 +++++++++++++++++- .../src/tests/prospective_parachains.rs | 20 ++ polkadot/node/core/provisioner/src/lib.rs | 59 +--- polkadot/node/subsystem-util/src/lib.rs | 80 ++++- polkadot/node/subsystem-util/src/vstaging.rs | 56 +++ 6 files changed, 556 insertions(+), 73 deletions(-) create mode 100644 polkadot/node/subsystem-util/src/vstaging.rs diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 434051f1b00f4..ba25ebb192e23 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -118,6 +118,7 @@ use statement_table::{ }, Config as TableConfig, Context as TableContextTrait, Table, }; +use util::vstaging::get_disabled_validators_with_fallback; mod error; @@ -383,6 +384,21 @@ struct TableContext { validator: Option, groups: HashMap>, validators: Vec, + disabled_validators: Vec, +} + +impl TableContext { + // Returns `true` if the provided `ValidatorIndex` is in the disabled validators list + pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool { + self.disabled_validators + .iter() + .any(|disabled_val_idx| *disabled_val_idx == *validator_idx) + } + + // Returns `true` if the local validator is in the disabled validators list + pub fn local_validator_is_disabled(&self) -> Option { + self.validator.as_ref().map(|v| v.disabled()) + } } impl TableContextTrait for TableContext { @@ -1010,21 +1026,33 @@ async fn construct_per_relay_parent_state( let minimum_backing_votes = try_runtime_api!(request_min_backing_votes(parent, session_index, ctx.sender()).await); + // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 + // Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to + // `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the + // `try_join!` above and use `try_runtime_api!` to get `disabled_validators` + let disabled_validators = get_disabled_validators_with_fallback(ctx.sender(), parent) + .await + .map_err(Error::UtilError)?; + let signing_context = SigningContext { parent_hash: parent, session_index }; - let validator = - match Validator::construct(&validators, signing_context.clone(), keystore.clone()) { - Ok(v) => Some(v), - Err(util::Error::NotAValidator) => None, - Err(e) => { - gum::warn!( - target: LOG_TARGET, - err = ?e, - "Cannot participate in candidate backing", - ); + let validator = match Validator::construct( + &validators, + &disabled_validators, + signing_context.clone(), + keystore.clone(), + ) { + Ok(v) => Some(v), + Err(util::Error::NotAValidator) => None, + Err(e) => { + gum::warn!( + target: LOG_TARGET, + err = ?e, + "Cannot participate in candidate backing", + ); - return Ok(None) - }, - }; + return Ok(None) + }, + }; let mut groups = HashMap::new(); let n_cores = cores.len(); @@ -1054,7 +1082,7 @@ async fn construct_per_relay_parent_state( } } - let table_context = TableContext { groups, validators, validator }; + let table_context = TableContext { validator, groups, validators, disabled_validators }; let table_config = TableConfig { allow_multiple_seconded: match mode { ProspectiveParachainsMode::Enabled { .. } => true, @@ -1726,6 +1754,19 @@ async fn kick_off_validation_work( background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>, attesting: AttestingData, ) -> Result<(), Error> { + // Do nothing if the local validator is disabled or not a validator at all + match rp_state.table_context.local_validator_is_disabled() { + Some(true) => { + gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation"); + return Ok(()) + }, + Some(false) => {}, // we are not disabled - move on + None => { + gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation"); + return Ok(()) + }, + } + let candidate_hash = attesting.candidate.hash(); if rp_state.issued_statements.contains(&candidate_hash) { return Ok(()) @@ -1783,6 +1824,16 @@ async fn maybe_validate_and_import( }, }; + // Don't import statement if the sender is disabled + if rp_state.table_context.validator_is_disabled(&statement.validator_index()) { + gum::debug!( + target: LOG_TARGET, + sender_validator_idx = ?statement.validator_index(), + "Not importing statement because the sender is disabled" + ); + return Ok(()) + } + let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await; // if we get an Error::RejectedByProspectiveParachains, @@ -1944,6 +1995,13 @@ async fn handle_second_message( Some(r) => r, }; + // Just return if the local validator is disabled. If we are here the local node should be a + // validator but defensively use `unwrap_or(false)` to continue processing in this case. + if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) { + gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second"); + return Ok(()) + } + // Sanity check that candidate is from our assignment. if Some(candidate.descriptor().para_id) != rp_state.assignment { gum::debug!( @@ -1990,6 +2048,7 @@ async fn handle_statement_message( ) -> Result<(), Error> { let _timer = metrics.time_process_statement(); + // Validator disabling is handled in `maybe_validate_and_import` match maybe_validate_and_import(ctx, state, relay_parent, statement).await { Err(Error::ValidationFailed(_)) => Ok(()), Err(e) => Err(e), diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs index c12be72556e36..fc262aed77612 100644 --- a/polkadot/node/core/backing/src/tests/mod.rs +++ b/polkadot/node/core/backing/src/tests/mod.rs @@ -41,7 +41,7 @@ use sp_keyring::Sr25519Keyring; use sp_keystore::Keystore; use sp_tracing as _; use statement_table::v2::Misbehavior; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; mod prospective_parachains; @@ -77,6 +77,7 @@ struct TestState { signing_context: SigningContext, relay_parent: Hash, minimum_backing_votes: u32, + disabled_validators: Vec, } impl TestState { @@ -148,6 +149,7 @@ impl Default for TestState { signing_context, relay_parent, minimum_backing_votes: LEGACY_MIN_BACKING_VOTES, + disabled_validators: Vec::new(), } } } @@ -293,6 +295,26 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS tx.send(Ok(test_state.minimum_backing_votes)).unwrap(); } ); + + // Check that subsystem job issues a request for the runtime version. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx)) + ) if parent == test_state.relay_parent => { + tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap(); + } + ); + + // Check that subsystem job issues a request for the disabled validators. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::DisabledValidators(tx)) + ) if parent == test_state.relay_parent => { + tx.send(Ok(test_state.disabled_validators.clone())).unwrap(); + } + ); } async fn assert_validation_requests( @@ -1420,6 +1442,7 @@ fn candidate_backing_reorders_votes() { let table_context = TableContext { validator: None, + disabled_validators: Vec::new(), groups: validator_groups, validators: validator_public.clone(), }; @@ -1957,3 +1980,305 @@ fn new_leaf_view_doesnt_clobber_old() { virtual_overseer }); } + +// Test that a disabled local validator doesn't do any work on `CandidateBackingMessage::Second` +#[test] +fn disabled_validator_doesnt_distribute_statement_on_receiving_second() { + let mut test_state = TestState::default(); + test_state.disabled_validators.push(ValidatorIndex(0)); + + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { + test_startup(&mut virtual_overseer, &test_state).await; + + let pov = PoV { block_data: BlockData(vec![42, 43, 44]) }; + let pvd = dummy_pvd(); + let validation_code = ValidationCode(vec![1, 2, 3]); + + let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); + + let pov_hash = pov.hash(); + let candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash, + head_data: expected_head_data.clone(), + erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()), + persisted_validation_data_hash: pvd.hash(), + validation_code: validation_code.0.clone(), + } + .build(); + + let second = CandidateBackingMessage::Second( + test_state.relay_parent, + candidate.to_plain(), + pvd.clone(), + pov.clone(), + ); + + virtual_overseer.send(FromOrchestra::Communication { msg: second }).await; + + // Ensure backing subsystem is not doing any work + assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None); + + virtual_overseer + .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::stop_work(test_state.relay_parent), + ))) + .await; + virtual_overseer + }); +} + +// Test that a disabled local validator doesn't do any work on `CandidateBackingMessage::Statement` +#[test] +fn disabled_validator_doesnt_distribute_statement_on_receiving_statement() { + let mut test_state = TestState::default(); + test_state.disabled_validators.push(ValidatorIndex(0)); + + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { + test_startup(&mut virtual_overseer, &test_state).await; + + let pov = PoV { block_data: BlockData(vec![42, 43, 44]) }; + let pvd = dummy_pvd(); + let validation_code = ValidationCode(vec![1, 2, 3]); + + let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); + + let pov_hash = pov.hash(); + let candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash, + head_data: expected_head_data.clone(), + erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()), + persisted_validation_data_hash: pvd.hash(), + validation_code: validation_code.0.clone(), + } + .build(); + + let public2 = Keystore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[2].to_seed()), + ) + .expect("Insert key into keystore"); + + let signed = SignedFullStatementWithPVD::sign( + &test_state.keystore, + StatementWithPVD::Seconded(candidate.clone(), pvd.clone()), + &test_state.signing_context, + ValidatorIndex(2), + &public2.into(), + ) + .ok() + .flatten() + .expect("should be signed"); + + let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed.clone()); + + virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await; + + // Ensure backing subsystem is not doing any work + assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None); + + virtual_overseer + .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::stop_work(test_state.relay_parent), + ))) + .await; + virtual_overseer + }); +} + +// Test that a validator doesn't do any work on receiving a `CandidateBackingMessage::Statement` +// from a disabled validator +#[test] +fn validator_ignores_statements_from_disabled_validators() { + let mut test_state = TestState::default(); + test_state.disabled_validators.push(ValidatorIndex(2)); + + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { + test_startup(&mut virtual_overseer, &test_state).await; + + let pov = PoV { block_data: BlockData(vec![42, 43, 44]) }; + let pvd = dummy_pvd(); + let validation_code = ValidationCode(vec![1, 2, 3]); + + let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); + + let pov_hash = pov.hash(); + let candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash, + head_data: expected_head_data.clone(), + erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()), + persisted_validation_data_hash: pvd.hash(), + validation_code: validation_code.0.clone(), + } + .build(); + let candidate_commitments_hash = candidate.commitments.hash(); + + let public2 = Keystore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[2].to_seed()), + ) + .expect("Insert key into keystore"); + + let signed_2 = SignedFullStatementWithPVD::sign( + &test_state.keystore, + StatementWithPVD::Seconded(candidate.clone(), pvd.clone()), + &test_state.signing_context, + ValidatorIndex(2), + &public2.into(), + ) + .ok() + .flatten() + .expect("should be signed"); + + let statement_2 = + CandidateBackingMessage::Statement(test_state.relay_parent, signed_2.clone()); + + virtual_overseer.send(FromOrchestra::Communication { msg: statement_2 }).await; + + // Ensure backing subsystem is not doing any work + assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None); + + // Now send a statement from a honest validator and make sure it gets processed + let public3 = Keystore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[3].to_seed()), + ) + .expect("Insert key into keystore"); + + let signed_3 = SignedFullStatementWithPVD::sign( + &test_state.keystore, + StatementWithPVD::Seconded(candidate.clone(), pvd.clone()), + &test_state.signing_context, + ValidatorIndex(3), + &public3.into(), + ) + .ok() + .flatten() + .expect("should be signed"); + + let statement_3 = + CandidateBackingMessage::Statement(test_state.relay_parent, signed_3.clone()); + + virtual_overseer.send(FromOrchestra::Communication { msg: statement_3 }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::ValidationCodeByHash(hash, tx)) + ) if hash == validation_code.hash() => { + tx.send(Ok(Some(validation_code.clone()))).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + + // Sending a `Statement::Seconded` for our assignment will start + // validation process. The first thing requested is the PoV. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityDistribution( + AvailabilityDistributionMessage::FetchPoV { + relay_parent, + tx, + .. + } + ) if relay_parent == test_state.relay_parent => { + tx.send(pov.clone()).unwrap(); + } + ); + + // The next step is the actual request to Validation subsystem + // to validate the `Seconded` candidate. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::CandidateValidation( + CandidateValidationMessage::ValidateFromExhaustive( + _pvd, + _validation_code, + candidate_receipt, + _pov, + _, + timeout, + tx, + ), + ) if _pvd == pvd && + _validation_code == validation_code && + *_pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && + timeout == PvfExecTimeoutKind::Backing && + candidate_commitments_hash == candidate_receipt.commitments_hash => + { + tx.send(Ok( + ValidationResult::Valid(CandidateCommitments { + head_data: expected_head_data.clone(), + upward_messages: Default::default(), + horizontal_messages: Default::default(), + new_validation_code: None, + processed_downward_messages: 0, + hrmp_watermark: 0, + }, test_state.validation_data.clone()), + )).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. } + ) if candidate_hash == candidate.hash() => { + tx.send(Ok(())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::Share(hash, _stmt) + ) => { + assert_eq!(test_state.relay_parent, hash); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::Provisioner( + ProvisionerMessage::ProvisionableData( + _, + ProvisionableData::BackedCandidate(candidate_receipt) + ) + ) => { + assert_eq!(candidate_receipt, candidate.to_plain()); + } + ); + + virtual_overseer + .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::stop_work(test_state.relay_parent), + ))) + .await; + virtual_overseer + }); +} diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs index e7c29e11bb470..578f21bef6651 100644 --- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs +++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs @@ -195,6 +195,26 @@ async fn activate_leaf( tx.send(Ok(test_state.minimum_backing_votes)).unwrap(); } ); + + // Check that subsystem job issues a request for the runtime version. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx)) + ) if parent == hash => { + tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap(); + } + ); + + // Check that the subsystem job issues a request for the disabled validators. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::DisabledValidators(tx)) + ) if parent == hash => { + tx.send(Ok(Vec::new())).unwrap(); + } + ); } } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 8893bdc6549d2..3970b8572612d 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -29,13 +29,13 @@ use polkadot_node_subsystem::{ jaeger, messages::{ CandidateBackingMessage, ChainApiMessage, ProspectiveParachainsMessage, ProvisionableData, - ProvisionerInherentData, ProvisionerMessage, RuntimeApiMessage, RuntimeApiRequest, + ProvisionerInherentData, ProvisionerMessage, RuntimeApiRequest, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, PerLeafSpan, - RuntimeApiError, SpawnedSubsystem, SubsystemError, + SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_util::{ - request_availability_cores, request_persisted_validation_data, + has_required_runtime, request_availability_cores, request_persisted_validation_data, runtime::{prospective_parachains_mode, ProspectiveParachainsMode}, TimeoutExt, }; @@ -856,56 +856,3 @@ fn bitfields_indicate_availability( 3 * availability.count_ones() >= 2 * availability.len() } - -// If we have to be absolutely precise here, this method gets the version of the `ParachainHost` -// api. For brevity we'll just call it 'runtime version'. -async fn has_required_runtime( - sender: &mut impl overseer::ProvisionerSenderTrait, - relay_parent: Hash, - required_runtime_version: u32, -) -> bool { - gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version"); - - let (tx, rx) = oneshot::channel(); - sender - .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx))) - .await; - - match rx.await { - Result::Ok(Ok(runtime_version)) => { - gum::trace!( - target: LOG_TARGET, - ?relay_parent, - ?runtime_version, - ?required_runtime_version, - "Fetched ParachainHost runtime api version" - ); - runtime_version >= required_runtime_version - }, - Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => { - gum::trace!( - target: LOG_TARGET, - ?relay_parent, - ?error, - "Execution error while fetching ParachainHost runtime api version" - ); - false - }, - Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => { - gum::trace!( - target: LOG_TARGET, - ?relay_parent, - "NotSupported error while fetching ParachainHost runtime api version" - ); - false - }, - Result::Err(_) => { - gum::trace!( - target: LOG_TARGET, - ?relay_parent, - "Cancelled error while fetching ParachainHost runtime api version" - ); - false - }, - } -} diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index a5f3e9d4a0c0e..1fe52767df61c 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -55,6 +55,7 @@ use sp_core::ByteArray; use sp_keystore::{Error as KeystoreError, KeystorePtr}; use std::time::Duration; use thiserror::Error; +use vstaging::get_disabled_validators_with_fallback; pub use metered; pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS; @@ -79,6 +80,9 @@ pub mod inclusion_emulator; /// Convenient and efficient runtime info access. pub mod runtime; +/// Helpers for working with unreleased runtime calls +pub mod vstaging; + /// Nested message sending /// /// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous @@ -92,6 +96,8 @@ mod determine_new_blocks; #[cfg(test)] mod tests; +const LOG_TARGET: &'static str = "parachain::subsystem-util"; + /// Duration a job will wait after sending a stop signal before hard-aborting. pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1); /// Capacity of channels to and from individual jobs @@ -157,6 +163,62 @@ where rx } +/// Verifies if `ParachainHost` runtime api is at least at version `required_runtime_version`. This +/// method is used to determine if a given runtime call is supported by the runtime. +pub async fn has_required_runtime( + sender: &mut Sender, + relay_parent: Hash, + required_runtime_version: u32, +) -> bool +where + Sender: SubsystemSender, +{ + gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version"); + + let (tx, rx) = oneshot::channel(); + sender + .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx))) + .await; + + match rx.await { + Result::Ok(Ok(runtime_version)) => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + ?runtime_version, + ?required_runtime_version, + "Fetched ParachainHost runtime api version" + ); + runtime_version >= required_runtime_version + }, + Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + ?error, + "Execution error while fetching ParachainHost runtime api version" + ); + false + }, + Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + "NotSupported error while fetching ParachainHost runtime api version" + ); + false + }, + Result::Err(_) => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + "Cancelled error while fetching ParachainHost runtime api version" + ); + false + }, + } +} + /// Construct specialized request functions for the runtime. /// /// These would otherwise get pretty repetitive. @@ -378,6 +440,7 @@ pub struct Validator { signing_context: SigningContext, key: ValidatorId, index: ValidatorIndex, + disabled: bool, } impl Validator { @@ -399,7 +462,12 @@ impl Validator { let validators = validators?; - Self::construct(&validators, signing_context, keystore) + // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 + // When `DisabledValidators` is released remove this and add a + // `request_disabled_validators` call here + let disabled_validators = get_disabled_validators_with_fallback(sender, parent).await?; + + Self::construct(&validators, &disabled_validators, signing_context, keystore) } /// Construct a validator instance without performing runtime fetches. @@ -407,13 +475,16 @@ impl Validator { /// This can be useful if external code also needs the same data. pub fn construct( validators: &[ValidatorId], + disabled_validators: &[ValidatorIndex], signing_context: SigningContext, keystore: KeystorePtr, ) -> Result { let (key, index) = signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?; - Ok(Validator { signing_context, key, index }) + let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index); + + Ok(Validator { signing_context, key, index, disabled }) } /// Get this validator's id. @@ -426,6 +497,11 @@ impl Validator { self.index } + /// Get the enabled/disabled state of this validator + pub fn disabled(&self) -> bool { + self.disabled + } + /// Get the current signing context. pub fn signing_context(&self) -> &SigningContext { &self.signing_context diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs new file mode 100644 index 0000000000000..3efd3b61f93cd --- /dev/null +++ b/polkadot/node/subsystem-util/src/vstaging.rs @@ -0,0 +1,56 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Contains helpers for staging runtime calls. +//! +//! This module is intended to contain common boiler plate code handling unreleased runtime API +//! calls. + +use polkadot_node_subsystem_types::messages::{RuntimeApiMessage, RuntimeApiRequest}; +use polkadot_overseer::SubsystemSender; +use polkadot_primitives::{Hash, ValidatorIndex}; + +use crate::{has_required_runtime, request_disabled_validators, Error}; + +const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging"; + +// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 +/// Returns disabled validators list if the runtime supports it. Otherwise logs a debug messages and +/// returns an empty vec. +/// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this function and +/// replace all usages with `request_disabled_validators` +pub async fn get_disabled_validators_with_fallback>( + sender: &mut Sender, + relay_parent: Hash, +) -> Result, Error> { + let disabled_validators = if has_required_runtime( + sender, + relay_parent, + RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT, + ) + .await + { + request_disabled_validators(relay_parent, sender) + .await + .await + .map_err(Error::Oneshot)?? + } else { + gum::debug!(target: LOG_TARGET, "Runtime doesn't support `DisabledValidators` - continuing with an empty disabled validators set"); + vec![] + }; + + Ok(disabled_validators) +} From 13eec78a381c5be505386877f0abfea790da05e9 Mon Sep 17 00:00:00 2001 From: ordian Date: Wed, 20 Dec 2023 16:12:02 +0100 Subject: [PATCH 02/11] fix tests --- polkadot/node/core/backing/src/tests/mod.rs | 28 +++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs index fc262aed77612..1957f4e19c54b 100644 --- a/polkadot/node/core/backing/src/tests/mod.rs +++ b/polkadot/node/core/backing/src/tests/mod.rs @@ -2213,25 +2213,27 @@ fn validator_ignores_statements_from_disabled_validators() { // The next step is the actual request to Validation subsystem // to validate the `Seconded` candidate. + let expected_pov = pov; + let expected_validation_code = validation_code; assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive( - _pvd, - _validation_code, + CandidateValidationMessage::ValidateFromExhaustive { + validation_data, + validation_code, candidate_receipt, - _pov, - _, - timeout, - tx, - ), - ) if _pvd == pvd && - _validation_code == validation_code && - *_pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && - timeout == PvfExecTimeoutKind::Backing && + pov, + executor_params: _, + exec_kind, + response_sender, + } + ) if validation_data == pvd && + validation_code == expected_validation_code && + *pov == expected_pov && &candidate_receipt.descriptor == candidate.descriptor() && + exec_kind == PvfExecKind::Backing && candidate_commitments_hash == candidate_receipt.commitments_hash => { - tx.send(Ok( + response_sender.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), upward_messages: Default::default(), From e3f267c15927912c22fba4405fe922548b7922bc Mon Sep 17 00:00:00 2001 From: ordian Date: Wed, 20 Dec 2023 16:12:22 +0100 Subject: [PATCH 03/11] prdoc --- prdoc/pr_2764.prdoc | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 prdoc/pr_2764.prdoc diff --git a/prdoc/pr_2764.prdoc b/prdoc/pr_2764.prdoc new file mode 100644 index 0000000000000..2c6839e91cea2 --- /dev/null +++ b/prdoc/pr_2764.prdoc @@ -0,0 +1,16 @@ +title: Validator disabling in Backing. + +doc: + - audience: Validator + description: | + Once a validator has been disabled for misbehavior, it will no longer + sign backing statements in the current era. + +migrations: + db: [] + runtime: [] + +crates: + - name: polkadot-node-core-backing + +host_functions: [] From bbf9df5f5a90c197a0889579b82c4347859051ad Mon Sep 17 00:00:00 2001 From: ordian Date: Wed, 20 Dec 2023 16:13:57 +0100 Subject: [PATCH 04/11] prdoc: fix audience --- prdoc/pr_2764.prdoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prdoc/pr_2764.prdoc b/prdoc/pr_2764.prdoc index 2c6839e91cea2..adfa4f47c93d8 100644 --- a/prdoc/pr_2764.prdoc +++ b/prdoc/pr_2764.prdoc @@ -1,7 +1,7 @@ title: Validator disabling in Backing. doc: - - audience: Validator + - audience: Node Operator description: | Once a validator has been disabled for misbehavior, it will no longer sign backing statements in the current era. From 41efe9dc7c192ff3c262f55c4f2fb2ad67485ab8 Mon Sep 17 00:00:00 2001 From: ordian Date: Wed, 20 Dec 2023 14:58:20 +0100 Subject: [PATCH 05/11] statement-distribution: validator disabling --- .gitlab/pipeline/zombienet/polkadot.yml | 8 + .../statement-distribution/src/error.rs | 5 +- .../statement-distribution/src/v2/grid.rs | 4 +- .../statement-distribution/src/v2/mod.rs | 242 +++- .../statement-distribution/src/v2/requests.rs | 29 +- .../src/v2/statement_store.rs | 13 +- .../src/v2/tests/cluster.rs | 145 +-- .../src/v2/tests/grid.rs | 198 +-- .../src/v2/tests/mod.rs | 180 ++- .../src/v2/tests/requests.rs | 1149 +++++++++++++---- .../node/backing/statement-distribution.md | 25 + .../functional/0010-validator-disabling.toml | 39 + .../functional/0010-validator-disabling.zndsl | 21 + substrate/frame/system/src/tests.rs | 2 +- .../utils/wasm-builder/src/wasm_project.rs | 2 +- 15 files changed, 1396 insertions(+), 666 deletions(-) create mode 100644 polkadot/zombienet_tests/functional/0010-validator-disabling.toml create mode 100644 polkadot/zombienet_tests/functional/0010-validator-disabling.zndsl diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml index 6b89648c4e36e..bb261f91e4533 100644 --- a/.gitlab/pipeline/zombienet/polkadot.yml +++ b/.gitlab/pipeline/zombienet/polkadot.yml @@ -139,6 +139,14 @@ zombienet-polkadot-functional-0009-approval-voting-coalescing: --local-dir="${LOCAL_DIR}/functional" --test="0009-approval-voting-coalescing.zndsl" +zombienet-polkadot-functional-0010-validator-disabling: + extends: + - .zombienet-polkadot-common + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + --local-dir="${LOCAL_DIR}/functional" + --test="0010-validator-disabling.zndsl" + zombienet-polkadot-smoke-0001-parachains-smoke-test: extends: - .zombienet-polkadot-common diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs index b676e5b6a2235..4f589245eddd3 100644 --- a/polkadot/node/network/statement-distribution/src/error.rs +++ b/polkadot/node/network/statement-distribution/src/error.rs @@ -20,7 +20,7 @@ use polkadot_node_network_protocol::PeerId; use polkadot_node_subsystem::{RuntimeApiError, SubsystemError}; use polkadot_node_subsystem_util::{ - backing_implicit_view::FetchError as ImplicitViewFetchError, runtime, + self as util, backing_implicit_view::FetchError as ImplicitViewFetchError, runtime, }; use polkadot_primitives::{CandidateHash, Hash, Id as ParaId}; @@ -75,6 +75,9 @@ pub enum Error { #[error("Fetching availability cores failed {0:?}")] FetchAvailabilityCores(RuntimeApiError), + #[error("Fetching disabled validators failed {0:?}")] + FetchDisabledValidators(util::Error), + #[error("Fetching validator groups failed {0:?}")] FetchValidatorGroups(RuntimeApiError), diff --git a/polkadot/node/network/statement-distribution/src/v2/grid.rs b/polkadot/node/network/statement-distribution/src/v2/grid.rs index 19bad34c44ff9..19f23053192c1 100644 --- a/polkadot/node/network/statement-distribution/src/v2/grid.rs +++ b/polkadot/node/network/statement-distribution/src/v2/grid.rs @@ -253,7 +253,9 @@ impl GridTracker { /// This checks whether the peer is allowed to send us manifests /// about this group at this relay-parent. This also does sanity /// checks on the format of the manifest and the amount of votes - /// it contains. It has effects on the stored state only when successful. + /// it contains. It assumes that the votes from disabled validators + /// are already filtered out. + /// It has effects on the stored state only when successful. /// /// This returns a `bool` on success, which if true indicates that an acknowledgement is /// to be sent in response to the received manifest. This only occurs when the diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 2f06d3685b814..02fdecdd9bb32 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -17,11 +17,11 @@ //! Implementation of the v2 statement distribution protocol, //! designed for asynchronous backing. -use net_protocol::{filter_by_peer_version, peer_set::ProtocolVersion}; +use bitvec::prelude::{BitVec, Lsb0}; use polkadot_node_network_protocol::{ - self as net_protocol, + self as net_protocol, filter_by_peer_version, grid_topology::SessionGridTopology, - peer_set::ValidationVersion, + peer_set::{ProtocolVersion, ValidationVersion}, request_response::{ incoming::OutgoingResponse, v2::{AttestedCandidateRequest, AttestedCandidateResponse}, @@ -64,7 +64,7 @@ use futures::{ use std::{ collections::{ hash_map::{Entry, HashMap}, - HashSet, + BTreeSet, HashSet, }, time::{Duration, Instant}, }; @@ -96,6 +96,7 @@ const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement"); const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep = Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent"); const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements"); +const COST_DISABLED_VALIDATOR: Rep = Rep::CostMinor("Sent a statement from a disabled validator"); const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep = Rep::CostMinor("Unexpected Manifest, missing knowlege for relay parent"); @@ -189,6 +190,8 @@ struct PerSessionState { // getting the topology from the gossip-support subsystem grid_view: Option, local_validator: Option, + // We store the latest state here based on union of leaves. + disabled_validators: BTreeSet, } impl PerSessionState { @@ -205,7 +208,16 @@ impl PerSessionState { ) .map(|(_, index)| LocalValidatorIndex::Active(index)); - PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator } + let disabled_validators = BTreeSet::new(); + + PerSessionState { + session_info, + groups, + authority_lookup, + grid_view: None, + local_validator, + disabled_validators, + } } fn supply_topology( @@ -234,6 +246,33 @@ impl PerSessionState { fn is_not_validator(&self) -> bool { self.grid_view.is_some() && self.local_validator.is_none() } + + /// A convenience function to generate a disabled bitmask for the given backing group. + /// The output bits are set to `true` for validators that are disabled. + /// Returns `None` if the group index is out of bounds. + pub fn disabled_bitmask(&self, group: GroupIndex) -> Option> { + let group = self.groups.get(group)?; + let mask = BitVec::from_iter(group.iter().map(|v| self.is_disabled(v))); + Some(mask) + } + + /// Returns `true` if the given validator is disabled in the current session. + pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool { + self.disabled_validators.contains(validator_index) + } + + /// Extend the list of disabled validators. + pub fn extend_disabled_validators( + &mut self, + disabled: impl IntoIterator, + ) { + self.disabled_validators.extend(disabled); + } + + /// Clear the list of disabled validators. + pub fn clear_disabled_validators(&mut self) { + self.disabled_validators.clear(); + } } pub(crate) struct State { @@ -510,13 +549,20 @@ pub(crate) async fn handle_active_leaves_update( let new_relay_parents = state.implicit_view.all_allowed_relay_parents().cloned().collect::>(); - for new_relay_parent in new_relay_parents.iter().cloned() { - if state.per_relay_parent.contains_key(&new_relay_parent) { - continue - } - // New leaf: fetch info from runtime API and initialize - // `per_relay_parent`. + // We clear the list of disabled validators to reset it properly based on union of leaves. + let mut cleared_disabled_validators: BTreeSet = BTreeSet::new(); + + for new_relay_parent in new_relay_parents.iter().cloned() { + // Even if we processed this relay parent before, we need to fetch the list of disabled + // validators based on union of active leaves. + let disabled_validators = + polkadot_node_subsystem_util::vstaging::get_disabled_validators_with_fallback( + ctx.sender(), + new_relay_parent, + ) + .await + .map_err(JfyiError::FetchDisabledValidators)?; let session_index = polkadot_node_subsystem_util::request_session_index_for_child( new_relay_parent, @@ -527,23 +573,6 @@ pub(crate) async fn handle_active_leaves_update( .map_err(JfyiError::RuntimeApiUnavailable)? .map_err(JfyiError::FetchSessionIndex)?; - let availability_cores = polkadot_node_subsystem_util::request_availability_cores( - new_relay_parent, - ctx.sender(), - ) - .await - .await - .map_err(JfyiError::RuntimeApiUnavailable)? - .map_err(JfyiError::FetchAvailabilityCores)?; - - let group_rotation_info = - polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender()) - .await - .await - .map_err(JfyiError::RuntimeApiUnavailable)? - .map_err(JfyiError::FetchValidatorGroups)? - .1; - if !state.per_session.contains_key(&session_index) { let session_info = polkadot_node_subsystem_util::request_session_info( new_relay_parent, @@ -579,9 +608,49 @@ pub(crate) async fn handle_active_leaves_update( let per_session = state .per_session - .get(&session_index) + .get_mut(&session_index) .expect("either existed or just inserted; qed"); + if cleared_disabled_validators.insert(session_index) { + per_session.clear_disabled_validators(); + } + + if !disabled_validators.is_empty() { + gum::debug!( + target: LOG_TARGET, + relay_parent = ?new_relay_parent, + ?session_index, + ?disabled_validators, + "Disabled validators detected" + ); + + per_session.extend_disabled_validators(disabled_validators); + } + + if state.per_relay_parent.contains_key(&new_relay_parent) { + continue + } + + // New leaf: fetch info from runtime API and initialize + // `per_relay_parent`. + + let availability_cores = polkadot_node_subsystem_util::request_availability_cores( + new_relay_parent, + ctx.sender(), + ) + .await + .await + .map_err(JfyiError::RuntimeApiUnavailable)? + .map_err(JfyiError::FetchAvailabilityCores)?; + + let group_rotation_info = + polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender()) + .await + .await + .map_err(JfyiError::RuntimeApiUnavailable)? + .map_err(JfyiError::FetchValidatorGroups)? + .1; + let local_validator = per_session.local_validator.and_then(|v| { if let LocalValidatorIndex::Active(idx) = v { find_active_validator_state( @@ -1452,6 +1521,17 @@ async fn handle_incoming_statement( }, }; + if per_session.is_disabled(&statement.unchecked_validator_index()) { + gum::debug!( + target: LOG_TARGET, + ?relay_parent, + validator_index = ?statement.unchecked_validator_index(), + "Ignoring a statement from disabled validator." + ); + modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await; + return + } + let (active, cluster_sender_index) = { // This block of code only returns `Some` when both the originator and // the sending peer are in the cluster. @@ -1572,7 +1652,7 @@ async fn handle_incoming_statement( checked_statement.clone(), StatementOrigin::Remote, ) { - Err(statement_store::ValidatorUnknown) => { + Err(statement_store::Error::ValidatorUnknown) => { // sanity: should never happen. gum::warn!( target: LOG_TARGET, @@ -2110,7 +2190,7 @@ async fn handle_incoming_manifest_common<'a, Context>( candidate_hash: CandidateHash, relay_parent: Hash, para_id: ParaId, - manifest_summary: grid::ManifestSummary, + mut manifest_summary: grid::ManifestSummary, manifest_kind: grid::ManifestKind, reputation: &mut ReputationAggregator, ) -> Option> { @@ -2195,6 +2275,12 @@ async fn handle_incoming_manifest_common<'a, Context>( // 2. sanity checks: peer is validator, bitvec size, import into grid tracker let group_index = manifest_summary.claimed_group_index; let claimed_parent_hash = manifest_summary.claimed_parent_hash; + + // Ignore votes from disabled validators when counting towards the threshold. + let disabled_mask = per_session.disabled_bitmask(group_index).unwrap_or_default(); + manifest_summary.statement_knowledge.mask_seconded(&disabled_mask); + manifest_summary.statement_knowledge.mask_valid(&disabled_mask); + let acknowledge = match local_validator.grid_tracker.import_manifest( grid_topology, &per_session.groups, @@ -2770,6 +2856,13 @@ pub(crate) async fn dispatch_requests(ctx: &mut Context, state: &mut St } } + // Add disabled validators to the unwanted mask. + let disabled_mask = per_session + .disabled_bitmask(group_index) + .expect("group existence checked above; qed"); + unwanted_mask.seconded_in_group |= &disabled_mask; + unwanted_mask.validated_in_group |= &disabled_mask; + // don't require a backing threshold for cluster candidates. let local_validator = relay_parent_state.local_validator.as_ref()?; let require_backing = local_validator @@ -2777,14 +2870,14 @@ pub(crate) async fn dispatch_requests(ctx: &mut Context, state: &mut St .as_ref() .map_or(true, |active| active.group != group_index); - Some(RequestProperties { - unwanted_mask, - backing_threshold: if require_backing { - Some(per_session.groups.get_size_and_backing_threshold(group_index)?.1) - } else { - None - }, - }) + let backing_threshold = if require_backing { + let threshold = per_session.groups.get_size_and_backing_threshold(group_index)?.1; + Some(threshold) + } else { + None + }; + + Some(RequestProperties { unwanted_mask, backing_threshold }) }; while let Some(request) = state.request_manager.next_request( @@ -2857,6 +2950,10 @@ pub(crate) async fn handle_response( Some(g) => g, }; + let disabled_mask = per_session + .disabled_bitmask(group_index) + .expect("group_index checked above; qed"); + let res = response.validate_response( &mut state.request_manager, group, @@ -2871,6 +2968,7 @@ pub(crate) async fn handle_response( Some(g_index) == expected_group }, + disabled_mask, ); for (peer, rep) in res.reputation_changes { @@ -2968,6 +3066,14 @@ pub(crate) async fn handle_response( // includable. } +/// Returns true if the statement filter meets the backing threshold for grid requests. +pub(crate) fn seconded_and_sufficient( + filter: &StatementFilter, + backing_threshold: Option, +) -> bool { + backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t) +} + /// Answer an incoming request for a candidate. pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { let ResponderMessage { request, sent_feedback } = message; @@ -3008,11 +3114,13 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { Some(d) => d, }; - let group_size = per_session + let group_index = confirmed.group_index(); + let group = per_session .groups - .get(confirmed.group_index()) - .expect("group from session's candidate always known; qed") - .len(); + .get(group_index) + .expect("group from session's candidate always known; qed"); + + let group_size = group.len(); // check request bitfields are right size. if mask.seconded_in_group.len() != group_size || mask.validated_in_group.len() != group_size { @@ -3065,17 +3173,59 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { // Transform mask with 'OR' semantics into one with 'AND' semantics for the API used // below. - let and_mask = StatementFilter { + let mut and_mask = StatementFilter { seconded_in_group: !mask.seconded_in_group.clone(), validated_in_group: !mask.validated_in_group.clone(), }; + // Ignore disabled validators from the latest state when sending the response. + let disabled_mask = + per_session.disabled_bitmask(group_index).expect("group existence checked; qed"); + and_mask.mask_seconded(&disabled_mask); + and_mask.mask_valid(&disabled_mask); + + let mut sent_filter = StatementFilter::blank(group_size); let statements: Vec<_> = relay_parent_state .statement_store - .group_statements(&per_session.groups, confirmed.group_index(), *candidate_hash, &and_mask) - .map(|s| s.as_unchecked().clone()) + .group_statements(&per_session.groups, group_index, *candidate_hash, &and_mask) + .map(|s| { + let s = s.as_unchecked().clone(); + let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x); + let Some(i) = index_in_group(s.unchecked_validator_index()) else { return s }; + + match s.unchecked_payload() { + CompactStatement::Seconded(_) => { + sent_filter.seconded_in_group.set(i, true); + }, + CompactStatement::Valid(_) => { + sent_filter.validated_in_group.set(i, true); + }, + } + s + }) .collect(); + // There should be no response at all for grid requests when the + // backing threshold is no longer met as a result of disabled validators. + if !is_cluster { + let threshold = per_session + .groups + .get_size_and_backing_threshold(group_index) + .expect("group existence checked above; qed") + .1; + + if !seconded_and_sufficient(&sent_filter, Some(threshold)) { + gum::info!( + target: LOG_TARGET, + ?candidate_hash, + relay_parent = ?confirmed.relay_parent(), + ?group_index, + "Dropping a request from a grid peer because the backing threshold is no longer met." + ); + return + } + } + // Update bookkeeping about which statements peers have received. for statement in &statements { if is_cluster { diff --git a/polkadot/node/network/statement-distribution/src/v2/requests.rs b/polkadot/node/network/statement-distribution/src/v2/requests.rs index 8507a4b827690..d6b522735e660 100644 --- a/polkadot/node/network/statement-distribution/src/v2/requests.rs +++ b/polkadot/node/network/statement-distribution/src/v2/requests.rs @@ -30,12 +30,13 @@ //! (which requires state not owned by the request manager). use super::{ - BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE, - COST_INVALID_RESPONSE, COST_INVALID_SIGNATURE, COST_UNREQUESTED_RESPONSE_STATEMENT, - REQUEST_RETRY_DELAY, + seconded_and_sufficient, BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, + COST_IMPROPERLY_DECODED_RESPONSE, COST_INVALID_RESPONSE, COST_INVALID_SIGNATURE, + COST_UNREQUESTED_RESPONSE_STATEMENT, REQUEST_RETRY_DELAY, }; use crate::LOG_TARGET; +use bitvec::prelude::{BitVec, Lsb0}; use polkadot_node_network_protocol::{ request_response::{ outgoing::{Recipient as RequestRecipient, RequestError}, @@ -495,10 +496,6 @@ fn find_request_target_with_update( } } -fn seconded_and_sufficient(filter: &StatementFilter, backing_threshold: Option) -> bool { - backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t) -} - /// A response to a request, which has not yet been handled. pub struct UnhandledResponse { response: TaggedResponse, @@ -542,6 +539,7 @@ impl UnhandledResponse { session: SessionIndex, validator_key_lookup: impl Fn(ValidatorIndex) -> Option, allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool, + disabled_mask: BitVec, ) -> ResponseValidationOutput { let UnhandledResponse { response: TaggedResponse { identifier, requested_peer, props, response }, @@ -625,6 +623,7 @@ impl UnhandledResponse { session, validator_key_lookup, allowed_para_lookup, + disabled_mask, ); if let CandidateRequestStatus::Complete { .. } = output.request_status { @@ -644,6 +643,7 @@ fn validate_complete_response( session: SessionIndex, validator_key_lookup: impl Fn(ValidatorIndex) -> Option, allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool, + disabled_mask: BitVec, ) -> ResponseValidationOutput { let RequestProperties { backing_threshold, mut unwanted_mask } = props; @@ -737,6 +737,10 @@ fn validate_complete_response( rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT)); continue } + + if disabled_mask.get(i).map_or(false, |x| *x) { + continue + } }, CompactStatement::Valid(_) => { if unwanted_mask.validated_in_group[i] { @@ -748,6 +752,10 @@ fn validate_complete_response( rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT)); continue } + + if disabled_mask.get(i).map_or(false, |x| *x) { + continue + } }, } @@ -1013,6 +1021,7 @@ mod tests { let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]; let unwanted_mask = StatementFilter::blank(group_size); + let disabled_mask: BitVec = Default::default(); let request_properties = RequestProperties { unwanted_mask, backing_threshold: None }; // Get requests. @@ -1056,6 +1065,7 @@ mod tests { 0, validator_key_lookup, allowed_para_lookup, + disabled_mask.clone(), ); assert_eq!( output, @@ -1094,6 +1104,7 @@ mod tests { 0, validator_key_lookup, allowed_para_lookup, + disabled_mask, ); assert_eq!( output, @@ -1167,12 +1178,14 @@ mod tests { }; let validator_key_lookup = |_v| None; let allowed_para_lookup = |_para, _g_index| true; + let disabled_mask: BitVec = Default::default(); let output = response.validate_response( &mut request_manager, group, 0, validator_key_lookup, allowed_para_lookup, + disabled_mask, ); assert_eq!( output, @@ -1245,12 +1258,14 @@ mod tests { let validator_key_lookup = |_v| None; let allowed_para_lookup = |_para, _g_index| true; let statements = vec![]; + let disabled_mask: BitVec = Default::default(); let output = response.validate_response( &mut request_manager, group, 0, validator_key_lookup, allowed_para_lookup, + disabled_mask, ); assert_eq!( output, diff --git a/polkadot/node/network/statement-distribution/src/v2/statement_store.rs b/polkadot/node/network/statement-distribution/src/v2/statement_store.rs index 96d976e22cd51..022461e55511c 100644 --- a/polkadot/node/network/statement-distribution/src/v2/statement_store.rs +++ b/polkadot/node/network/statement-distribution/src/v2/statement_store.rs @@ -97,10 +97,10 @@ impl StatementStore { groups: &Groups, statement: SignedStatement, origin: StatementOrigin, - ) -> Result { + ) -> Result { let validator_index = statement.validator_index(); let validator_meta = match self.validator_meta.get_mut(&validator_index) { - None => return Err(ValidatorUnknown), + None => return Err(Error::ValidatorUnknown), Some(m) => m, }; @@ -134,7 +134,7 @@ impl StatementStore { "groups passed into `insert` differ from those used at store creation" ); - return Err(ValidatorUnknown) + return Err(Error::ValidatorUnknown) }, }; @@ -251,9 +251,12 @@ impl StatementStore { } } -/// Error indicating that the validator was unknown. +/// Error when inserting a statement into the statement store. #[derive(Debug)] -pub struct ValidatorUnknown; +pub enum Error { + /// The validator was unknown. + ValidatorUnknown, +} type Fingerprint = (ValidatorIndex, CompactStatement); diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs index a9f5b537b3238..7ffed9d47d4bd 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs @@ -75,15 +75,7 @@ fn share_seconded_circulated_to_cluster() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let full_signed = state .sign_statement( @@ -120,7 +112,7 @@ fn share_seconded_circulated_to_cluster() { // sharing a `Seconded` message confirms a candidate, which leads to new // fragment tree updates. - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; overseer }); @@ -156,15 +148,7 @@ fn cluster_valid_statement_before_seconded_ignored() { .await; send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let signed_valid = state.sign_statement( v_a, @@ -226,15 +210,7 @@ fn cluster_statement_bad_signature() { .await; send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // sign statements with wrong signing context, leading to bad signature. let statements = vec![ @@ -308,15 +284,7 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() { .await; send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let statement = state .sign_statement( @@ -370,15 +338,7 @@ fn statement_from_non_cluster_originator_unexpected() { connect_peer(&mut overseer, peer_a.clone(), None).await; send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let statement = state .sign_statement( @@ -448,15 +408,7 @@ fn seconded_statement_leads_to_request() { .await; send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let statement = state .sign_statement( @@ -497,7 +449,7 @@ fn seconded_statement_leads_to_request() { if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; overseer }); @@ -544,15 +496,7 @@ fn cluster_statements_shared_seconded_first() { .await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let full_signed = state .sign_statement( @@ -579,7 +523,7 @@ fn cluster_statements_shared_seconded_first() { .await; // result of new confirmed candidate. - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; overseer .send(FromOrchestra::Communication { @@ -677,15 +621,7 @@ fn cluster_accounts_for_implicit_view() { send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let full_signed = state .sign_statement( @@ -722,7 +658,7 @@ fn cluster_accounts_for_implicit_view() { // sharing a `Seconded` message confirms a candidate, which leads to new // fragment tree updates. - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; // activate new leaf, which has relay-parent in implicit view. let next_relay_parent = Hash::repeat_byte(2); @@ -730,15 +666,7 @@ fn cluster_accounts_for_implicit_view() { next_test_leaf.parent_hash = relay_parent; next_test_leaf.number = 2; - activate_leaf(&mut overseer, &next_test_leaf, &state, false).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(next_relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &next_test_leaf, &state, false, vec![]).await; send_peer_view_change(&mut overseer, peer_a.clone(), view![next_relay_parent]).await; send_peer_view_change(&mut overseer, peer_b.clone(), view![next_relay_parent]).await; @@ -820,15 +748,7 @@ fn cluster_messages_imported_after_confirmed_candidate_importable_check() { send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Peer sends `Seconded` statement. { @@ -885,8 +805,6 @@ fn cluster_messages_imported_after_confirmed_candidate_importable_check() { }, vec![(relay_parent, vec![0])], )], - None, - false, ) .await; @@ -953,15 +871,7 @@ fn cluster_messages_imported_after_new_leaf_importable_check() { send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Peer sends `Seconded` statement. { @@ -1008,17 +918,18 @@ fn cluster_messages_imported_after_new_leaf_importable_check() { ); } - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; let next_relay_parent = Hash::repeat_byte(2); let mut next_test_leaf = state.make_dummy_leaf(next_relay_parent); next_test_leaf.parent_hash = relay_parent; next_test_leaf.number = 2; - activate_leaf(&mut overseer, &next_test_leaf, &state, false).await; - - answer_expected_hypothetical_depth_request( + activate_leaf( &mut overseer, + &next_test_leaf, + &state, + false, vec![( HypotheticalCandidate::Complete { candidate_hash, @@ -1027,8 +938,6 @@ fn cluster_messages_imported_after_new_leaf_importable_check() { }, vec![(relay_parent, vec![0])], )], - Some(next_relay_parent), - false, ) .await; @@ -1117,15 +1026,7 @@ fn ensure_seconding_limit_is_respected() { send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Confirm the candidates locally so that we don't send out requests. @@ -1152,7 +1053,7 @@ fn ensure_seconding_limit_is_respected() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Candidate 2. @@ -1178,7 +1079,7 @@ fn ensure_seconding_limit_is_respected() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Send first statement from peer A. diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs index aa1a473b833f4..1ac9f4d45e714 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs @@ -102,15 +102,7 @@ fn backed_candidate_leads_to_advertisement() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -137,7 +129,7 @@ fn backed_candidate_leads_to_advertisement() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Send enough statements to make candidate backable, make sure announcements are sent. @@ -232,7 +224,7 @@ fn backed_candidate_leads_to_advertisement() { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } overseer @@ -320,15 +312,7 @@ fn received_advertisement_before_confirmation_leads_to_request() { send_peer_view_change(&mut overseer, peer_d.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -400,7 +384,7 @@ fn received_advertisement_before_confirmation_leads_to_request() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() => { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } overseer @@ -530,7 +514,7 @@ fn received_advertisement_after_backing_leads_to_acknowledgement() { assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_STATEMENT); assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_RESPONSE); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Receive Backed message. @@ -561,7 +545,7 @@ fn received_advertisement_after_backing_leads_to_acknowledgement() { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Receive a manifest about the same candidate from peer D. @@ -733,7 +717,7 @@ fn received_acknowledgements_for_locally_confirmed() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Receive an unexpected acknowledgement from peer D. @@ -798,7 +782,7 @@ fn received_acknowledgements_for_locally_confirmed() { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Receive an unexpected acknowledgement from peer D. @@ -930,7 +914,7 @@ fn received_acknowledgements_for_externally_confirmed() { assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_STATEMENT); assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_RESPONSE); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } let ack = BackedCandidateAcknowledgement { @@ -1022,15 +1006,7 @@ fn received_advertisement_after_confirmation_before_backing() { send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -1121,7 +1097,7 @@ fn received_advertisement_after_confirmation_before_backing() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Receive advertisement from peer D (after confirmation but before backing). @@ -1208,15 +1184,7 @@ fn additional_statements_are_shared_after_manifest_exchange() { send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -1301,13 +1269,8 @@ fn additional_statements_are_shared_after_manifest_exchange() { persisted_validation_data: pvd.clone(), }; let membership = vec![(relay_parent, vec![0])]; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![(hypothetical, membership)], - None, - false, - ) - .await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![(hypothetical, membership)]) + .await; // Statements are sent to the Backing subsystem. { @@ -1371,7 +1334,7 @@ fn additional_statements_are_shared_after_manifest_exchange() { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Receive a manifest about the same candidate from peer D. Contains different statements. @@ -1514,17 +1477,8 @@ fn advertisement_sent_when_peer_enters_relay_parent_view() { send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; - - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; // Confirm the candidate locally so that we don't send out requests. @@ -1549,7 +1503,7 @@ fn advertisement_sent_when_peer_enters_relay_parent_view() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Send enough statements to make candidate backable, make sure announcements are sent. @@ -1616,7 +1570,7 @@ fn advertisement_sent_when_peer_enters_relay_parent_view() { }) .await; - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; // Relay parent enters view of peer C. { @@ -1737,17 +1691,8 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; // Confirm the candidate locally so that we don't send out requests. @@ -1772,7 +1717,7 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Send enough statements to make candidate backable, make sure announcements are sent. @@ -1867,7 +1812,7 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Peer leaves view. @@ -1949,17 +1894,8 @@ fn grid_statements_imported_to_backing() { send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; - - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; // Receive an advertisement from C. @@ -2042,13 +1978,8 @@ fn grid_statements_imported_to_backing() { persisted_validation_data: pvd.clone(), }; let membership = vec![(relay_parent, vec![0])]; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![(hypothetical, membership)], - None, - false, - ) - .await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![(hypothetical, membership)]) + .await; // Receive messages from Backing subsystem. { @@ -2165,17 +2096,8 @@ fn advertisements_rejected_from_incorrect_peers() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; - - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; let manifest = BackedCandidateManifest { @@ -2289,17 +2211,8 @@ fn manifest_rejected_with_unknown_relay_parent() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; let manifest = BackedCandidateManifest { @@ -2391,17 +2304,8 @@ fn manifest_rejected_when_not_a_validator() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; - - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; let manifest = BackedCandidateManifest { @@ -2498,17 +2402,8 @@ fn manifest_rejected_when_group_does_not_match_para() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; - - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; let manifest = BackedCandidateManifest { @@ -2613,17 +2508,8 @@ fn peer_reported_for_advertisement_conflicting_with_confirmed_candidate() { send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; let manifest = BackedCandidateManifest { @@ -2713,7 +2599,7 @@ fn peer_reported_for_advertisement_conflicting_with_confirmed_candidate() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Receive conflicting advertisement from peer C after confirmation. @@ -2755,7 +2641,6 @@ fn inactive_local_participates_in_grid() { async_backing_params: None, }; - let dummy_relay_parent = Hash::repeat_byte(2); let relay_parent = Hash::repeat_byte(1); let peer_a = PeerId::random(); @@ -2795,25 +2680,10 @@ fn inactive_local_participates_in_grid() { send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &dummy_leaf, &state, true).await; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(dummy_relay_parent), - false, - ) - .await; - + activate_leaf(&mut overseer, &dummy_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; - activate_leaf(&mut overseer, &test_leaf, &state, false).await; - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, false, vec![]).await; // Receive an advertisement from A. let manifest = BackedCandidateManifest { @@ -2876,7 +2746,7 @@ fn inactive_local_participates_in_grid() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; overseer }); diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs index c34cf20d716ca..3ce43202b9542 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs @@ -187,12 +187,30 @@ impl TestState { collator: None, }) }), + disabled_validators: Default::default(), para_data: (0..self.session_info.validator_groups.len()) .map(|i| (ParaId::from(i as u32), PerParaData::new(1, vec![1, 2, 3].into()))) .collect(), + minimum_backing_votes: 2, } } + fn make_dummy_leaf_with_disabled_validators( + &self, + relay_parent: Hash, + disabled_validators: Vec, + ) -> TestLeaf { + TestLeaf { disabled_validators, ..self.make_dummy_leaf(relay_parent) } + } + + fn make_dummy_leaf_with_min_backing_votes( + &self, + relay_parent: Hash, + minimum_backing_votes: u32, + ) -> TestLeaf { + TestLeaf { minimum_backing_votes, ..self.make_dummy_leaf(relay_parent) } + } + fn make_availability_cores(&self, f: impl Fn(usize) -> CoreState) -> Vec { (0..self.session_info.validator_groups.len()).map(f).collect() } @@ -240,6 +258,19 @@ impl TestState { .collect() } + fn index_within_group( + &self, + group_index: GroupIndex, + validator_index: ValidatorIndex, + ) -> Option { + self.session_info + .validator_groups + .get(group_index) + .unwrap() + .iter() + .position(|&i| i == validator_index) + } + fn discovery_id(&self, validator_index: ValidatorIndex) -> AuthorityDiscoveryId { self.session_info.discovery_keys[validator_index.0 as usize].clone() } @@ -284,7 +315,7 @@ impl TestState { &mut self, peer: PeerId, request: AttestedCandidateRequest, - ) -> impl Future { + ) -> impl Future> { let (tx, rx) = futures::channel::oneshot::channel(); let req = sc_network::config::IncomingRequest { peer, @@ -293,7 +324,7 @@ impl TestState { }; self.req_sender.send(req).await.unwrap(); - rx.map(|r| r.unwrap()) + rx.map(|r| r.ok()) } } @@ -366,7 +397,9 @@ struct TestLeaf { parent_hash: Hash, session: SessionIndex, availability_cores: Vec, + disabled_validators: Vec, para_data: Vec<(ParaId, PerParaData)>, + minimum_backing_votes: u32, } impl TestLeaf { @@ -447,9 +480,7 @@ async fn setup_test_and_connect_peers( } } - activate_leaf(overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request(overseer, vec![], Some(relay_parent), false).await; + activate_leaf(overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(overseer, state.make_dummy_topology()).await; @@ -472,6 +503,7 @@ async fn activate_leaf( leaf: &TestLeaf, test_state: &TestState, is_new_session: bool, + hypothetical_frontier: Vec<(HypotheticalCandidate, FragmentTreeMembership)>, ) { let activated = new_leaf(leaf.hash, leaf.number); @@ -481,7 +513,14 @@ async fn activate_leaf( )))) .await; - handle_leaf_activation(virtual_overseer, leaf, test_state, is_new_session).await; + handle_leaf_activation( + virtual_overseer, + leaf, + test_state, + is_new_session, + hypothetical_frontier, + ) + .await; } async fn handle_leaf_activation( @@ -489,8 +528,18 @@ async fn handle_leaf_activation( leaf: &TestLeaf, test_state: &TestState, is_new_session: bool, + hypothetical_frontier: Vec<(HypotheticalCandidate, FragmentTreeMembership)>, ) { - let TestLeaf { number, hash, parent_hash, para_data, session, availability_cores } = leaf; + let TestLeaf { + number, + hash, + parent_hash, + para_data, + session, + availability_cores, + disabled_validators, + minimum_backing_votes, + } = leaf; assert_matches!( virtual_overseer.recv().await, @@ -530,51 +579,82 @@ async fn handle_leaf_activation( } ); - assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::SessionIndexForChild(tx))) if parent == *hash => { - tx.send(Ok(*session)).unwrap(); - } - ); - - assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx))) if parent == *hash => { - tx.send(Ok(availability_cores.clone())).unwrap(); - } - ); - - let validator_groups = test_state.session_info.validator_groups.to_vec(); - let group_rotation_info = - GroupRotationInfo { session_start_block: 1, group_rotation_frequency: 12, now: 1 }; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::ValidatorGroups(tx))) if parent == *hash => { - tx.send(Ok((validator_groups, group_rotation_info))).unwrap(); - } - ); - - if is_new_session { - assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::SessionInfo(s, tx))) if parent == *hash && s == *session => { + loop { + match virtual_overseer.recv().await { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _parent, + RuntimeApiRequest::Version(tx), + )) => { + tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + parent, + RuntimeApiRequest::DisabledValidators(tx), + )) if parent == *hash => { + tx.send(Ok(disabled_validators.clone())).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _parent, + RuntimeApiRequest::DisabledValidators(tx), + )) => { + tx.send(Ok(Vec::new())).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _parent, // assume all active leaves are in the same session + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + tx.send(Ok(*session)).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) if parent == *hash && s == *session => { + assert!(is_new_session, "only expecting this call in a new session"); tx.send(Ok(Some(test_state.session_info.clone()))).unwrap(); - } - ); - - assert_matches!( - virtual_overseer.recv().await, + }, AllMessages::RuntimeApi(RuntimeApiMessage::Request( parent, RuntimeApiRequest::MinimumBackingVotes(session_index, tx), )) if parent == *hash && session_index == *session => { - tx.send(Ok(2)).unwrap(); - } - ); + assert!(is_new_session, "only expecting this call in a new session"); + tx.send(Ok(*minimum_backing_votes)).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + parent, + RuntimeApiRequest::AvailabilityCores(tx), + )) if parent == *hash => { + tx.send(Ok(availability_cores.clone())).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + parent, + RuntimeApiRequest::ValidatorGroups(tx), + )) if parent == *hash => { + let validator_groups = test_state.session_info.validator_groups.to_vec(); + let group_rotation_info = GroupRotationInfo { + session_start_block: 1, + group_rotation_frequency: 12, + now: 1, + }; + tx.send(Ok((validator_groups, group_rotation_info))).unwrap(); + }, + AllMessages::ProspectiveParachains( + ProspectiveParachainsMessage::GetHypotheticalFrontier(req, tx), + ) => { + assert_eq!(req.fragment_tree_relay_parent, Some(*hash)); + assert!(!req.backed_in_path_only); + for (i, (candidate, _)) in hypothetical_frontier.iter().enumerate() { + assert!( + req.candidates.iter().any(|c| &c == &candidate), + "did not receive request for hypothetical candidate {}", + i, + ); + } + tx.send(hypothetical_frontier).unwrap(); + // this is the last expected runtime api call + break + }, + msg => panic!("unexpected runtime API call: {msg:?}"), + } } } @@ -614,16 +694,14 @@ async fn handle_sent_request( async fn answer_expected_hypothetical_depth_request( virtual_overseer: &mut VirtualOverseer, responses: Vec<(HypotheticalCandidate, FragmentTreeMembership)>, - expected_leaf_hash: Option, - expected_backed_in_path_only: bool, ) { assert_matches!( virtual_overseer.recv().await, AllMessages::ProspectiveParachains( ProspectiveParachainsMessage::GetHypotheticalFrontier(req, tx) ) => { - assert_eq!(req.fragment_tree_relay_parent, expected_leaf_hash); - assert_eq!(req.backed_in_path_only, expected_backed_in_path_only); + assert_eq!(req.fragment_tree_relay_parent, None); + assert!(!req.backed_in_path_only); for (i, (candidate, _)) in responses.iter().enumerate() { assert!( req.candidates.iter().any(|c| &c == &candidate), diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs index 1eec8290fabae..04934b31482ed 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs @@ -86,15 +86,7 @@ fn cluster_peer_allowed_to_send_incomplete_statements() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Peer in cluster sends a statement, triggering a request. { @@ -176,7 +168,7 @@ fn cluster_peer_allowed_to_send_incomplete_statements() { ); } - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; overseer }); @@ -272,15 +264,7 @@ fn peer_reported_for_providing_statements_meant_to_be_masked_out() { send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -354,7 +338,7 @@ fn peer_reported_for_providing_statements_meant_to_be_masked_out() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Peer C advertises candidate 2. @@ -426,7 +410,7 @@ fn peer_reported_for_providing_statements_meant_to_be_masked_out() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Peer C sends an announcement for candidate 3. Should hit seconding limit for validator 1. @@ -537,15 +521,7 @@ fn peer_reported_for_not_enough_statements() { send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -657,7 +633,7 @@ fn peer_reported_for_not_enough_statements() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } overseer @@ -725,15 +701,7 @@ fn peer_reported_for_duplicate_statements() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Peer in cluster sends a statement, triggering a request. { @@ -820,7 +788,7 @@ fn peer_reported_for_duplicate_statements() { ); } - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; overseer }); @@ -887,15 +855,7 @@ fn peer_reported_for_providing_statements_with_invalid_signatures() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Peer in cluster sends a statement, triggering a request. { @@ -958,7 +918,7 @@ fn peer_reported_for_providing_statements_with_invalid_signatures() { if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } overseer @@ -1026,15 +986,7 @@ fn peer_reported_for_providing_statements_with_wrong_validator_id() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Peer in cluster sends a statement, triggering a request. { @@ -1096,7 +1048,7 @@ fn peer_reported_for_providing_statements_with_wrong_validator_id() { if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } overseer @@ -1104,26 +1056,31 @@ fn peer_reported_for_providing_statements_with_wrong_validator_id() { } #[test] -fn local_node_sanity_checks_incoming_requests() { +fn disabled_validators_added_to_unwanted_mask() { + let group_size = 3; let config = TestConfig { validator_count: 20, - group_size: 3, + group_size, local_validator: LocalRole::Validator, async_backing_params: None, }; let relay_parent = Hash::repeat_byte(1); - let peer_a = PeerId::random(); + let peer_disabled = PeerId::random(); let peer_b = PeerId::random(); - let peer_c = PeerId::random(); - let peer_d = PeerId::random(); - test_harness(config, |mut state, mut overseer| async move { + test_harness(config, |state, mut overseer| async move { let local_validator = state.local.clone().unwrap(); let local_group_index = local_validator.group_index.unwrap(); let local_para = ParaId::from(local_group_index.0); + let other_group_validators = state.group_validators(local_group_index, true); + let index_disabled = other_group_validators[0]; + let index_within_group = state.index_within_group(local_group_index, index_disabled); + let index_b = other_group_validators[1]; - let test_leaf = state.make_dummy_leaf(relay_parent); + let disabled_validators = vec![index_disabled]; + let test_leaf = + state.make_dummy_leaf_with_disabled_validators(relay_parent, disabled_validators); let (candidate, pvd) = make_candidate( relay_parent, @@ -1135,200 +1092,164 @@ fn local_node_sanity_checks_incoming_requests() { ); let candidate_hash = candidate.hash(); - // peer A is in group, has relay parent in view. - // peer B is in group, has no relay parent in view. - // peer C is not in group, has relay parent in view. + // peer A is in group, has relay parent in view and disabled. + // peer B is in group, has relay parent in view. { - let other_group_validators = state.group_validators(local_group_index, true); - connect_peer( &mut overseer, - peer_a.clone(), - Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()), + peer_disabled.clone(), + Some(vec![state.discovery_id(index_disabled)].into_iter().collect()), ) .await; - connect_peer( &mut overseer, peer_b.clone(), - Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()), + Some(vec![state.discovery_id(index_b)].into_iter().collect()), ) .await; - - connect_peer(&mut overseer, peer_c.clone(), None).await; - - send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; - send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; + send_peer_view_change(&mut overseer, peer_disabled.clone(), view![relay_parent]).await; + send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - let mask = StatementFilter::blank(state.config.group_size); + let seconded_disabled = state + .sign_statement( + index_disabled, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); - // Should drop requests for unknown candidates. + let seconded_b = state + .sign_statement( + index_b, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); { - let (pending_response, rx) = oneshot::channel(); - state - .req_sender - .send(RawIncomingRequest { - // Request from peer that received manifest. - peer: peer_c, - payload: request_v2::AttestedCandidateRequest { - candidate_hash: candidate.hash(), - mask: mask.clone(), - } - .encode(), - pending_response, - }) - .await - .unwrap(); + send_peer_message( + &mut overseer, + peer_disabled.clone(), + protocol_v2::StatementDistributionMessage::Statement( + relay_parent, + seconded_disabled.clone(), + ), + ) + .await; - assert_matches!(rx.await, Err(oneshot::Canceled)); + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_disabled && r == COST_DISABLED_VALIDATOR.into() => { } + ); } - // Confirm candidate. { - let full_signed = state - .sign_statement( - local_validator.validator_index, - CompactStatement::Seconded(candidate_hash), - &SigningContext { session_index: 1, parent_hash: relay_parent }, - ) - .convert_to_superpayload(StatementWithPVD::Seconded(candidate.clone(), pvd.clone())) - .unwrap(); - - overseer - .send(FromOrchestra::Communication { - msg: StatementDistributionMessage::Share(relay_parent, full_signed), - }) - .await; + send_peer_message( + &mut overseer, + peer_b.clone(), + protocol_v2::StatementDistributionMessage::Statement( + relay_parent, + seconded_b.clone(), + ), + ) + .await; assert_matches!( overseer.recv().await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( - peers, - Versioned::V2(protocol_v2::ValidationProtocol::StatementDistribution( - protocol_v2::StatementDistributionMessage::Statement( - r, - s, - ) - )) - )) => { - assert_eq!(peers, vec![peer_a.clone()]); - assert_eq!(r, relay_parent); - assert_eq!(s.unchecked_payload(), &CompactStatement::Seconded(candidate_hash)); - assert_eq!(s.unchecked_validator_index(), local_validator.validator_index); - } + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { } ); - - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; } - // Should drop requests from unknown peers. + // Send a request to peer and mock its response with a statement from disabled validator. { - let (pending_response, rx) = oneshot::channel(); - state - .req_sender - .send(RawIncomingRequest { - // Request from peer that received manifest. - peer: peer_d, - payload: request_v2::AttestedCandidateRequest { - candidate_hash: candidate.hash(), - mask: mask.clone(), - } - .encode(), - pending_response, - }) - .await - .unwrap(); - - assert_matches!(rx.await, Err(oneshot::Canceled)); - } + let statements = vec![seconded_disabled]; + let mut mask = StatementFilter::blank(group_size); + let i = index_within_group.unwrap(); + mask.seconded_in_group.set(i, true); + mask.validated_in_group.set(i, true); - // Should drop requests with bitfields of the wrong size. - { - let mask = StatementFilter::blank(state.config.group_size + 1); - let response = state - .send_request( - peer_c, - request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask }, - ) - .await - .await; + handle_sent_request( + &mut overseer, + peer_b, + candidate_hash, + mask, + candidate.clone(), + pvd.clone(), + statements, + ) + .await; assert_matches!( - response, - RawOutgoingResponse { - result, - reputation_changes, - sent_feedback - } => { - assert_matches!(result, Err(())); - assert_eq!(reputation_changes, vec![COST_INVALID_REQUEST_BITFIELD_SIZE.into()]); - assert_matches!(sent_feedback, None); - } + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_b && r == COST_UNREQUESTED_RESPONSE_STATEMENT.into() => { } ); - } - // Local node should reject requests if we did not send a manifest to that peer. - { - let response = state - .send_request( - peer_c, - request_v2::AttestedCandidateRequest { - candidate_hash: candidate.hash(), - mask: mask.clone(), - }, - ) - .await - .await; + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_b && r == BENEFIT_VALID_RESPONSE.into() => { } + ); - // Should get `COST_UNEXPECTED_REQUEST` response. assert_matches!( - response, - RawOutgoingResponse { - result, - reputation_changes, - sent_feedback - } => { - assert_matches!(result, Err(())); - assert_eq!(reputation_changes, vec![COST_UNEXPECTED_REQUEST.into()]); - assert_matches!(sent_feedback, None); + overseer.recv().await, + AllMessages:: NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V2( + protocol_v2::ValidationProtocol::StatementDistribution( + protocol_v2::StatementDistributionMessage::Statement(hash, statement), + ), + ), + ) + ) => { + assert_eq!(peers, vec![peer_disabled]); + assert_eq!(hash, relay_parent); + assert_eq!(statement, seconded_b); } ); + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } overseer }); } +// We send a request to a peer and after receiving the response +// we learn about a validator being disabled. We should filter out +// the statement from the disabled validator when receiving it. #[test] -fn local_node_checks_that_peer_can_request_before_responding() { +fn when_validator_disabled_after_sending_the_request() { + let group_size = 3; let config = TestConfig { validator_count: 20, - group_size: 3, + group_size, local_validator: LocalRole::Validator, async_backing_params: None, }; let relay_parent = Hash::repeat_byte(1); - let peer_a = PeerId::random(); + let another_relay_parent = Hash::repeat_byte(2); + let peer_disabled_later = PeerId::random(); let peer_b = PeerId::random(); - test_harness(config, |mut state, mut overseer| async move { + test_harness(config, |state, mut overseer| async move { let local_validator = state.local.clone().unwrap(); let local_group_index = local_validator.group_index.unwrap(); let local_para = ParaId::from(local_group_index.0); + let other_group_validators = state.group_validators(local_group_index, true); + let index_disabled = other_group_validators[0]; + let index_b = other_group_validators[1]; - let test_leaf = state.make_dummy_leaf(relay_parent); + let test_leaf = state.make_dummy_leaf_with_disabled_validators(relay_parent, vec![]); + let test_leaf_disabled = state + .make_dummy_leaf_with_disabled_validators(another_relay_parent, vec![index_disabled]); let (candidate, pvd) = make_candidate( relay_parent, @@ -1340,20 +1261,733 @@ fn local_node_checks_that_peer_can_request_before_responding() { ); let candidate_hash = candidate.hash(); - // Peers A and B are in group and have relay parent in view. - let other_group_validators = state.group_validators(local_group_index, true); + // peer A is in group, has relay parent in view and disabled later. + // peer B is in group, has relay parent in view. + { + connect_peer( + &mut overseer, + peer_disabled_later.clone(), + Some(vec![state.discovery_id(index_disabled)].into_iter().collect()), + ) + .await; + connect_peer( + &mut overseer, + peer_b.clone(), + Some(vec![state.discovery_id(index_b)].into_iter().collect()), + ) + .await; + send_peer_view_change(&mut overseer, peer_disabled_later.clone(), view![relay_parent]) + .await; + send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await; + } - connect_peer( - &mut overseer, - peer_a.clone(), - Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()), - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; - connect_peer( - &mut overseer, - peer_b.clone(), - Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()), + let seconded_disabled = state + .sign_statement( + index_disabled, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); + + let seconded_b = state + .sign_statement( + index_b, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); + { + send_peer_message( + &mut overseer, + peer_b.clone(), + protocol_v2::StatementDistributionMessage::Statement( + relay_parent, + seconded_b.clone(), + ), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { } + ); + } + + // Send a request to peer and activate leaf when a validator is disabled; + // mock the response with a statement from disabled validator. + { + let statements = vec![seconded_disabled]; + let mask = StatementFilter::blank(group_size); + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(mut requests, IfDisconnected::ImmediateError)) => { + assert_eq!(requests.len(), 1); + assert_matches!( + requests.pop().unwrap(), + Requests::AttestedCandidateV2(outgoing) => { + assert_eq!(outgoing.peer, Recipient::Peer(peer_b)); + assert_eq!(outgoing.payload.candidate_hash, candidate_hash); + assert_eq!(outgoing.payload.mask, mask); + + activate_leaf(&mut overseer, &test_leaf_disabled, &state, false, vec![]).await; + + let res = AttestedCandidateResponse { + candidate_receipt: candidate, + persisted_validation_data: pvd, + statements, + }; + outgoing.pending_response.send(Ok(res.encode())).unwrap(); + } + ); + } + ); + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_b && r == BENEFIT_VALID_RESPONSE.into() => { } + ); + + assert_matches!( + overseer.recv().await, + AllMessages:: NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V2( + protocol_v2::ValidationProtocol::StatementDistribution( + protocol_v2::StatementDistributionMessage::Statement(hash, statement), + ), + ), + ) + ) => { + assert_eq!(peers, vec![peer_disabled_later]); + assert_eq!(hash, relay_parent); + assert_eq!(statement, seconded_b); + } + ); + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; + } + + overseer + }); +} + +#[test] +fn no_response_for_grid_request_not_meeting_quorum() { + let validator_count = 6; + let group_size = 3; + let config = TestConfig { + validator_count, + group_size, + local_validator: LocalRole::Validator, + async_backing_params: None, + }; + + let relay_parent = Hash::repeat_byte(1); + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + test_harness(config, |mut state, mut overseer| async move { + let local_validator = state.local.clone().unwrap(); + let local_group_index = local_validator.group_index.unwrap(); + let local_para = ParaId::from(local_group_index.0); + + let test_leaf = state.make_dummy_leaf_with_min_backing_votes(relay_parent, 2); + + let (candidate, pvd) = make_candidate( + relay_parent, + 1, + local_para, + test_leaf.para_data(local_para).head_data.clone(), + vec![4, 5, 6].into(), + Hash::repeat_byte(42).into(), + ); + let candidate_hash = candidate.hash(); + + let other_group_validators = state.group_validators(local_group_index, true); + let target_group_validators = + state.group_validators((local_group_index.0 + 1).into(), true); + let v_a = other_group_validators[0]; + let v_b = other_group_validators[1]; + let v_c = target_group_validators[0]; + + // peer A is in group, has relay parent in view. + // peer B is in group, has no relay parent in view. + // peer C is not in group, has relay parent in view. + { + connect_peer( + &mut overseer, + peer_a.clone(), + Some(vec![state.discovery_id(v_a)].into_iter().collect()), + ) + .await; + + connect_peer( + &mut overseer, + peer_b.clone(), + Some(vec![state.discovery_id(v_b)].into_iter().collect()), + ) + .await; + + connect_peer( + &mut overseer, + peer_c.clone(), + Some(vec![state.discovery_id(v_c)].into_iter().collect()), + ) + .await; + + send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; + send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; + } + + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; + + // Send gossip topology. + send_new_topology(&mut overseer, state.make_dummy_topology()).await; + + // Confirm the candidate locally so that we don't send out requests. + { + let statement = state + .sign_full_statement( + local_validator.validator_index, + Statement::Seconded(candidate.clone()), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + pvd.clone(), + ) + .clone(); + + overseer + .send(FromOrchestra::Communication { + msg: StatementDistributionMessage::Share(relay_parent, statement), + }) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] + ); + + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; + } + + // Send enough statements to make candidate backable, make sure announcements are sent. + + // Send statement from peer A. + { + let statement = state + .sign_statement( + v_a, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); + + send_peer_message( + &mut overseer, + peer_a.clone(), + protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { } + ); + } + + // Send statement from peer B. + let statement_b = state + .sign_statement( + v_b, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); + { + send_peer_message( + &mut overseer, + peer_b.clone(), + protocol_v2::StatementDistributionMessage::Statement( + relay_parent, + statement_b.clone(), + ), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { } + ); + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] + ); + } + + // Send Backed notification. + { + overseer + .send(FromOrchestra::Communication { + msg: StatementDistributionMessage::Backed(candidate_hash), + }) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages:: NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V2( + protocol_v2::ValidationProtocol::StatementDistribution( + protocol_v2::StatementDistributionMessage::BackedCandidateManifest(manifest), + ), + ), + ) + ) => { + assert_eq!(peers, vec![peer_c]); + assert_eq!(manifest, BackedCandidateManifest { + relay_parent, + candidate_hash, + group_index: local_validator.group_index.unwrap(), + para_id: local_para, + parent_head_data_hash: pvd.parent_head.hash(), + statement_knowledge: StatementFilter { + seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1], + validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], + }, + }); + } + ); + + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; + } + + let mask = StatementFilter { + seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], + validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], + }; + + let relay_2 = Hash::repeat_byte(2); + let disabled_validators = vec![v_a]; + let leaf_2 = state.make_dummy_leaf_with_disabled_validators(relay_2, disabled_validators); + activate_leaf(&mut overseer, &leaf_2, &state, false, vec![]).await; + + // Incoming request to local node. Local node should not send the response as v_a is + // disabled and hence the quorum is not reached. + { + let response = state + .send_request( + peer_c, + request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask }, + ) + .await + .await; + + assert!( + response.is_none(), + "We should not send a response as the quorum is not reached yet" + ); + } + + overseer + }); +} + +#[test] +fn disabling_works_from_the_latest_state_not_relay_parent() { + let group_size = 3; + let config = TestConfig { + validator_count: 20, + group_size, + local_validator: LocalRole::Validator, + async_backing_params: None, + }; + + let relay_1 = Hash::repeat_byte(1); + let relay_2 = Hash::repeat_byte(2); + let peer_disabled = PeerId::random(); + + test_harness(config, |state, mut overseer| async move { + let local_validator = state.local.clone().unwrap(); + let local_group_index = local_validator.group_index.unwrap(); + let local_para = ParaId::from(local_group_index.0); + + let other_group_validators = state.group_validators(local_group_index, true); + let index_disabled = other_group_validators[0]; + + let leaf_1 = state.make_dummy_leaf(relay_1); + let disabled_validators = vec![index_disabled]; + let leaf_2 = state.make_dummy_leaf_with_disabled_validators(relay_2, disabled_validators); + + let (candidate_1, pvd_1) = make_candidate( + relay_1, + 1, + local_para, + leaf_1.para_data(local_para).head_data.clone(), + vec![4, 5, 6].into(), + Hash::repeat_byte(42).into(), + ); + let candidate_1_hash = candidate_1.hash(); + + let (candidate_2, _) = make_candidate( + relay_1, + 1, + local_para, + leaf_1.para_data(local_para).head_data.clone(), + vec![4, 5, 6, 7].into(), + Hash::repeat_byte(42).into(), + ); + let candidate_2_hash = candidate_2.hash(); + + { + connect_peer( + &mut overseer, + peer_disabled.clone(), + Some(vec![state.discovery_id(index_disabled)].into_iter().collect()), + ) + .await; + send_peer_view_change(&mut overseer, peer_disabled.clone(), view![relay_1]).await; + } + + activate_leaf(&mut overseer, &leaf_1, &state, true, vec![]).await; + + let seconded_1 = state + .sign_statement( + index_disabled, + CompactStatement::Seconded(candidate_1_hash), + &SigningContext { parent_hash: relay_1, session_index: 1 }, + ) + .as_unchecked() + .clone(); + + let seconded_2 = state + .sign_statement( + index_disabled, + CompactStatement::Seconded(candidate_2_hash), + &SigningContext { parent_hash: relay_1, session_index: 1 }, + ) + .as_unchecked() + .clone(); + { + send_peer_message( + &mut overseer, + peer_disabled.clone(), + protocol_v2::StatementDistributionMessage::Statement(relay_1, seconded_1.clone()), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_disabled && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { } + ); + } + + { + handle_sent_request( + &mut overseer, + peer_disabled, + candidate_1_hash, + StatementFilter::blank(group_size), + candidate_1.clone(), + pvd_1.clone(), + vec![seconded_1.clone()], + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_disabled && r == BENEFIT_VALID_STATEMENT.into() => { } + ); + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_disabled && r == BENEFIT_VALID_RESPONSE.into() => { } + ); + + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; + } + + activate_leaf(&mut overseer, &leaf_2, &state, false, vec![]).await; + + { + send_peer_message( + &mut overseer, + peer_disabled.clone(), + protocol_v2::StatementDistributionMessage::Statement(relay_1, seconded_2.clone()), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_disabled && r == COST_DISABLED_VALIDATOR.into() => { } + ); + } + + overseer + }); +} + +#[test] +fn local_node_sanity_checks_incoming_requests() { + let config = TestConfig { + validator_count: 20, + group_size: 3, + local_validator: LocalRole::Validator, + async_backing_params: None, + }; + + let relay_parent = Hash::repeat_byte(1); + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + let peer_d = PeerId::random(); + + test_harness(config, |mut state, mut overseer| async move { + let local_validator = state.local.clone().unwrap(); + let local_group_index = local_validator.group_index.unwrap(); + let local_para = ParaId::from(local_group_index.0); + + let test_leaf = state.make_dummy_leaf(relay_parent); + + let (candidate, pvd) = make_candidate( + relay_parent, + 1, + local_para, + test_leaf.para_data(local_para).head_data.clone(), + vec![4, 5, 6].into(), + Hash::repeat_byte(42).into(), + ); + let candidate_hash = candidate.hash(); + + // peer A is in group, has relay parent in view. + // peer B is in group, has no relay parent in view. + // peer C is not in group, has relay parent in view. + { + let other_group_validators = state.group_validators(local_group_index, true); + + connect_peer( + &mut overseer, + peer_a.clone(), + Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()), + ) + .await; + + connect_peer( + &mut overseer, + peer_b.clone(), + Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()), + ) + .await; + + connect_peer(&mut overseer, peer_c.clone(), None).await; + + send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; + send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; + } + + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; + + let mask = StatementFilter::blank(state.config.group_size); + + // Should drop requests for unknown candidates. + { + let (pending_response, rx) = oneshot::channel(); + state + .req_sender + .send(RawIncomingRequest { + // Request from peer that received manifest. + peer: peer_c, + payload: request_v2::AttestedCandidateRequest { + candidate_hash: candidate.hash(), + mask: mask.clone(), + } + .encode(), + pending_response, + }) + .await + .unwrap(); + + assert_matches!(rx.await, Err(oneshot::Canceled)); + } + + // Confirm candidate. + { + let full_signed = state + .sign_statement( + local_validator.validator_index, + CompactStatement::Seconded(candidate_hash), + &SigningContext { session_index: 1, parent_hash: relay_parent }, + ) + .convert_to_superpayload(StatementWithPVD::Seconded(candidate.clone(), pvd.clone())) + .unwrap(); + + overseer + .send(FromOrchestra::Communication { + msg: StatementDistributionMessage::Share(relay_parent, full_signed), + }) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V2(protocol_v2::ValidationProtocol::StatementDistribution( + protocol_v2::StatementDistributionMessage::Statement( + r, + s, + ) + )) + )) => { + assert_eq!(peers, vec![peer_a.clone()]); + assert_eq!(r, relay_parent); + assert_eq!(s.unchecked_payload(), &CompactStatement::Seconded(candidate_hash)); + assert_eq!(s.unchecked_validator_index(), local_validator.validator_index); + } + ); + + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; + } + + // Should drop requests from unknown peers. + { + let (pending_response, rx) = oneshot::channel(); + state + .req_sender + .send(RawIncomingRequest { + // Request from peer that received manifest. + peer: peer_d, + payload: request_v2::AttestedCandidateRequest { + candidate_hash: candidate.hash(), + mask: mask.clone(), + } + .encode(), + pending_response, + }) + .await + .unwrap(); + + assert_matches!(rx.await, Err(oneshot::Canceled)); + } + + // Should drop requests with bitfields of the wrong size. + { + let mask = StatementFilter::blank(state.config.group_size + 1); + let response = state + .send_request( + peer_c, + request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask }, + ) + .await + .await + .unwrap(); + + assert_matches!( + response, + RawOutgoingResponse { + result, + reputation_changes, + sent_feedback + } => { + assert_matches!(result, Err(())); + assert_eq!(reputation_changes, vec![COST_INVALID_REQUEST_BITFIELD_SIZE.into()]); + assert_matches!(sent_feedback, None); + } + ); + } + + // Local node should reject requests if we did not send a manifest to that peer. + { + let response = state + .send_request( + peer_c, + request_v2::AttestedCandidateRequest { + candidate_hash: candidate.hash(), + mask: mask.clone(), + }, + ) + .await + .await + .unwrap(); + + // Should get `COST_UNEXPECTED_REQUEST` response. + assert_matches!( + response, + RawOutgoingResponse { + result, + reputation_changes, + sent_feedback + } => { + assert_matches!(result, Err(())); + assert_eq!(reputation_changes, vec![COST_UNEXPECTED_REQUEST.into()]); + assert_matches!(sent_feedback, None); + } + ); + } + + overseer + }); +} + +#[test] +fn local_node_checks_that_peer_can_request_before_responding() { + let config = TestConfig { + validator_count: 20, + group_size: 3, + local_validator: LocalRole::Validator, + async_backing_params: None, + }; + + let relay_parent = Hash::repeat_byte(1); + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + test_harness(config, |mut state, mut overseer| async move { + let local_validator = state.local.clone().unwrap(); + let local_group_index = local_validator.group_index.unwrap(); + let local_para = ParaId::from(local_group_index.0); + + let test_leaf = state.make_dummy_leaf(relay_parent); + + let (candidate, pvd) = make_candidate( + relay_parent, + 1, + local_para, + test_leaf.para_data(local_para).head_data.clone(), + vec![4, 5, 6].into(), + Hash::repeat_byte(42).into(), + ); + let candidate_hash = candidate.hash(); + + // Peers A and B are in group and have relay parent in view. + let other_group_validators = state.group_validators(local_group_index, true); + + connect_peer( + &mut overseer, + peer_a.clone(), + Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()), + ) + .await; + + connect_peer( + &mut overseer, + peer_b.clone(), + Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()), ) .await; let peer_b_index = other_group_validators[1]; @@ -1362,15 +1996,7 @@ fn local_node_checks_that_peer_can_request_before_responding() { send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await; // Finish setup - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; let mask = StatementFilter::blank(state.config.group_size); @@ -1409,7 +2035,7 @@ fn local_node_checks_that_peer_can_request_before_responding() { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; // Local node should respond to requests from peers in the same group // which appear to not have already seen the candidate @@ -1424,7 +2050,8 @@ fn local_node_checks_that_peer_can_request_before_responding() { }, ) .await - .await; + .await + .unwrap(); let expected_statements = vec![signed.into_unchecked()]; assert_matches!(response, full_response => { @@ -1473,7 +2100,8 @@ fn local_node_checks_that_peer_can_request_before_responding() { }, ) .await - .await; + .await + .unwrap(); // Peer already knows about this candidate. Should reject. assert_matches!( @@ -1535,7 +2163,7 @@ fn local_node_respects_statement_mask() { let local_group_index = local_validator.group_index.unwrap(); let local_para = ParaId::from(local_group_index.0); - let test_leaf = state.make_dummy_leaf(relay_parent); + let test_leaf = state.make_dummy_leaf_with_min_backing_votes(relay_parent, 2); let (candidate, pvd) = make_candidate( relay_parent, @@ -1592,15 +2220,7 @@ fn local_node_respects_statement_mask() { send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -1627,26 +2247,28 @@ fn local_node_respects_statement_mask() { AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a] ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Send enough statements to make candidate backable, make sure announcements are sent. // Send statement from peer A. + let statement_a = state + .sign_statement( + v_a, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); { - let statement = state - .sign_statement( - v_a, - CompactStatement::Seconded(candidate_hash), - &SigningContext { parent_hash: relay_parent, session_index: 1 }, - ) - .as_unchecked() - .clone(); - send_peer_message( &mut overseer, peer_a.clone(), - protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement), + protocol_v2::StatementDistributionMessage::Statement( + relay_parent, + statement_a.clone(), + ), ) .await; @@ -1724,12 +2346,12 @@ fn local_node_respects_statement_mask() { } ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // `1` indicates statements NOT to request. let mask = StatementFilter { - seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], + seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], }; @@ -1741,9 +2363,10 @@ fn local_node_respects_statement_mask() { request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask }, ) .await - .await; + .await + .unwrap(); - let expected_statements = vec![statement_b]; + let expected_statements = vec![statement_a, statement_b]; assert_matches!(response, full_response => { // Response is the same for v2. let request_v2::AttestedCandidateResponse { candidate_receipt, persisted_validation_data, statements } = @@ -1837,15 +2460,7 @@ fn should_delay_before_retrying_dropped_requests() { send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; } - activate_leaf(&mut overseer, &test_leaf, &state, true).await; - - answer_expected_hypothetical_depth_request( - &mut overseer, - vec![], - Some(relay_parent), - false, - ) - .await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; // Send gossip topology. send_new_topology(&mut overseer, state.make_dummy_topology()).await; @@ -1984,7 +2599,7 @@ fn should_delay_before_retrying_dropped_requests() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } // Sleep for the given amount of time. This should reset the delay for the first candidate. @@ -2051,7 +2666,7 @@ fn should_delay_before_retrying_dropped_requests() { if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() ); - answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await; } overseer diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md index 86a1bf1214134..e6e597c531787 100644 --- a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md @@ -123,6 +123,31 @@ only send "importable" statements to the backing subsystem itself. backable and part of the hypothetical frontier. - Note that requesting is not an implicit acknowledgement, and an explicit acknowledgement must be sent upon receipt. +### Disabled validators + +After a validator is disabled in the runtime, other validators should no longer +accept statements from it. Filtering out of statements from disabled validators +on the node side is purely an optimization, as it will be done in the runtime +as well. + +Because we use the state of the active leaves to +check whether a validator is disabled instead of the relay parent, the notion +of being disabled is inherently racy: +- the responder has learned about the disabled validator before the requester +- the receiver has witnessed the disabled validator after sending the request + +We could have sent a manifest to a peer, then received information about +disabling, and then receive a request. This can break an invariant of the grid +mode: +- the response is required to indicate quorum + +Due to the above, there should be no response at all for grid requests when +the backing threshold is no longer met as a result of disabled validators. +In addition to that, we add disabled validators to the request's unwanted +mask. This ensures that the sender will not send statements from disabled +validators (at least from the perspective of the receiver at the moment of the +request). This doesn't fully avoid race conditions, but tries to minimize them. + ## Messages ### Incoming diff --git a/polkadot/zombienet_tests/functional/0010-validator-disabling.toml b/polkadot/zombienet_tests/functional/0010-validator-disabling.toml new file mode 100644 index 0000000000000..6701d60d74d14 --- /dev/null +++ b/polkadot/zombienet_tests/functional/0010-validator-disabling.toml @@ -0,0 +1,39 @@ +[settings] +timeout = 1000 +bootnode = true + +[relaychain.genesis.runtimeGenesis.patch.configuration.config] + max_validators_per_core = 1 + needed_approvals = 2 + group_rotation_frequency = 10 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "westend-local" # for the disabling to take an effect +default_command = "polkadot" + +[relaychain.default_resources] +limits = { memory = "4G", cpu = "2" } +requests = { memory = "2G", cpu = "1" } + + [[relaychain.node_groups]] + name = "honest-validator" + count = 3 + args = ["-lparachain=debug"] + + [[relaychain.node_groups]] + image = "{{MALUS_IMAGE}}" + name = "malus-validator" + command = "malus suggest-garbage-candidate" + args = ["-lMALUS=trace"] + count = 1 + +[[parachains]] +id = 1000 +cumulus_based = true + + [parachains.collator] + name = "alice" + command = "polkadot-parachain" + image = "{{CUMULUS_IMAGE}}" + args = ["-lparachain=debug"] diff --git a/polkadot/zombienet_tests/functional/0010-validator-disabling.zndsl b/polkadot/zombienet_tests/functional/0010-validator-disabling.zndsl new file mode 100644 index 0000000000000..c810266102061 --- /dev/null +++ b/polkadot/zombienet_tests/functional/0010-validator-disabling.zndsl @@ -0,0 +1,21 @@ +Description: Test validator disabling effects +Network: ./0010-validator-disabling.toml +Creds: config + +# Ensure nodes are up and running +honest-validator: reports node_roles is 4 + +# Ensure parachain is registered +honest-validator: parachain 1000 is registered within 100 seconds + +# Ensure parachain made progress +honest-validator: parachain 1000 block height is at least 1 within 300 seconds + +# Wait for the dispute +honest-validator-1: reports parachain_candidate_disputes_total is at least 1 within 600 seconds + +# Disputes should conclude +honest-validator: reports polkadot_parachain_candidate_dispute_concluded{validity="invalid"} is at least 1 within 200 seconds + +# Wait for a few blocks for the disabling to take place. +honest-validator: log line contains "Disabled validators detected" within 180 seconds diff --git a/substrate/frame/system/src/tests.rs b/substrate/frame/system/src/tests.rs index 6fbddaaf22940..e1432725d5c8f 100644 --- a/substrate/frame/system/src/tests.rs +++ b/substrate/frame/system/src/tests.rs @@ -788,7 +788,7 @@ fn last_runtime_upgrade_spec_version_usage() { // a runtime upgrade in the pipeline of being applied, you should use the spec version // of this upgrade. if System::last_runtime_upgrade_spec_version() > 1337 { - return Weight::zero(); + return Weight::zero() } // Do the migration. diff --git a/substrate/utils/wasm-builder/src/wasm_project.rs b/substrate/utils/wasm-builder/src/wasm_project.rs index 5bf44c2c9b20e..2126a49bd7ffe 100644 --- a/substrate/utils/wasm-builder/src/wasm_project.rs +++ b/substrate/utils/wasm-builder/src/wasm_project.rs @@ -935,7 +935,7 @@ fn generate_rerun_if_changed_instructions( while let Some(dependency) = dependencies.pop() { // Ignore all dev dependencies if dependency.kind == DependencyKind::Development { - continue; + continue } let path_or_git_dep = From 551219b6c7221593a2ee3e652e71c58b377995b9 Mon Sep 17 00:00:00 2001 From: ordian Date: Wed, 20 Dec 2023 16:19:46 +0100 Subject: [PATCH 06/11] prdoc --- prdoc/pr_1841.prdoc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 prdoc/pr_1841.prdoc diff --git a/prdoc/pr_1841.prdoc b/prdoc/pr_1841.prdoc new file mode 100644 index 0000000000000..c99583e6dc309 --- /dev/null +++ b/prdoc/pr_1841.prdoc @@ -0,0 +1,18 @@ +title: Validator disabling in Statement Distribution. + +doc: + - audience: Node Operator + description: | + Once a validator has been disabled for misbehavior, other validators + should no longer gossip its backing statements in the current era. + If they do, it might result in disconnects from the network due to low + reputation. + +migrations: + db: [] + runtime: [] + +crates: + - name: polkadot-statement-distribution + +host_functions: [] From f637437227552d162f8887ced006203f7fa6b33d Mon Sep 17 00:00:00 2001 From: ordian Date: Wed, 20 Dec 2023 18:08:09 +0100 Subject: [PATCH 07/11] empty commit to trigger CI From 3a88190177a81241fa4deb6d65cf886a1d36df45 Mon Sep 17 00:00:00 2001 From: ordian Date: Wed, 20 Dec 2023 18:09:41 +0100 Subject: [PATCH 08/11] revert unrelated fmt changes --- substrate/frame/system/src/tests.rs | 2 +- substrate/utils/wasm-builder/src/wasm_project.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/frame/system/src/tests.rs b/substrate/frame/system/src/tests.rs index e1432725d5c8f..6fbddaaf22940 100644 --- a/substrate/frame/system/src/tests.rs +++ b/substrate/frame/system/src/tests.rs @@ -788,7 +788,7 @@ fn last_runtime_upgrade_spec_version_usage() { // a runtime upgrade in the pipeline of being applied, you should use the spec version // of this upgrade. if System::last_runtime_upgrade_spec_version() > 1337 { - return Weight::zero() + return Weight::zero(); } // Do the migration. diff --git a/substrate/utils/wasm-builder/src/wasm_project.rs b/substrate/utils/wasm-builder/src/wasm_project.rs index 2126a49bd7ffe..5bf44c2c9b20e 100644 --- a/substrate/utils/wasm-builder/src/wasm_project.rs +++ b/substrate/utils/wasm-builder/src/wasm_project.rs @@ -935,7 +935,7 @@ fn generate_rerun_if_changed_instructions( while let Some(dependency) = dependencies.pop() { // Ignore all dev dependencies if dependency.kind == DependencyKind::Development { - continue + continue; } let path_or_git_dep = From 4af496f3eff8e1aeb1cd1d7eb98957067def8a9b Mon Sep 17 00:00:00 2001 From: ordian Date: Thu, 21 Dec 2023 17:18:32 +0100 Subject: [PATCH 09/11] address nit: move the check out of the match --- .../statement-distribution/src/v2/requests.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/requests.rs b/polkadot/node/network/statement-distribution/src/v2/requests.rs index d6b522735e660..bed3d5c18ae2b 100644 --- a/polkadot/node/network/statement-distribution/src/v2/requests.rs +++ b/polkadot/node/network/statement-distribution/src/v2/requests.rs @@ -737,10 +737,6 @@ fn validate_complete_response( rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT)); continue } - - if disabled_mask.get(i).map_or(false, |x| *x) { - continue - } }, CompactStatement::Valid(_) => { if unwanted_mask.validated_in_group[i] { @@ -752,13 +748,13 @@ fn validate_complete_response( rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT)); continue } - - if disabled_mask.get(i).map_or(false, |x| *x) { - continue - } }, } + if disabled_mask.get(i).map_or(false, |x| *x) { + continue + } + let validator_public = match validator_key_lookup(unchecked_statement.unchecked_validator_index()) { None => { From 2644aab76b801ca5d48c55ccf96faec59a3af8d6 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 9 Jan 2024 10:47:30 +0200 Subject: [PATCH 10/11] Fix a compilation error after merge --- polkadot/node/network/statement-distribution/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs index 4f589245eddd3..a49a6aa7414cc 100644 --- a/polkadot/node/network/statement-distribution/src/error.rs +++ b/polkadot/node/network/statement-distribution/src/error.rs @@ -76,7 +76,7 @@ pub enum Error { FetchAvailabilityCores(RuntimeApiError), #[error("Fetching disabled validators failed {0:?}")] - FetchDisabledValidators(util::Error), + FetchDisabledValidators(runtime::Error), #[error("Fetching validator groups failed {0:?}")] FetchValidatorGroups(RuntimeApiError), From 0797fd282b84844b163ecac1b71b481d1464c0bf Mon Sep 17 00:00:00 2001 From: ordian Date: Tue, 9 Jan 2024 11:20:20 +0000 Subject: [PATCH 11/11] fix warning --- polkadot/node/network/statement-distribution/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs index a49a6aa7414cc..a712ab6da436f 100644 --- a/polkadot/node/network/statement-distribution/src/error.rs +++ b/polkadot/node/network/statement-distribution/src/error.rs @@ -20,7 +20,7 @@ use polkadot_node_network_protocol::PeerId; use polkadot_node_subsystem::{RuntimeApiError, SubsystemError}; use polkadot_node_subsystem_util::{ - self as util, backing_implicit_view::FetchError as ImplicitViewFetchError, runtime, + backing_implicit_view::FetchError as ImplicitViewFetchError, runtime, }; use polkadot_primitives::{CandidateHash, Hash, Id as ParaId};