diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index e6be86f2b9a4..2ec9aa651564 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -80,8 +80,8 @@ use futures::{ use error::{Error, FatalResult}; use polkadot_node_primitives::{ - AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD, - ValidationResult, BACKING_EXECUTION_TIMEOUT, + minimum_votes, AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, + StatementWithPVD, ValidationResult, BACKING_EXECUTION_TIMEOUT, }; use polkadot_node_subsystem::{ messages::{ @@ -374,14 +374,6 @@ struct AttestingData { backing: Vec, } -/// How many votes we need to consider a candidate backed. -/// -/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and -/// statement distribution. -fn minimum_votes(n_validators: usize) -> usize { - std::cmp::min(2, n_validators) -} - #[derive(Default)] struct TableContext { validator: Option, diff --git a/node/network/statement-distribution/src/vstaging/cluster.rs b/node/network/statement-distribution/src/vstaging/cluster.rs index 9c17e3e17e14..c79ebf08b663 100644 --- a/node/network/statement-distribution/src/vstaging/cluster.rs +++ b/node/network/statement-distribution/src/vstaging/cluster.rs @@ -172,6 +172,7 @@ impl ClusterTracker { } /// Note that we accepted an incoming statement. This updates internal structures. + /// /// Should only be called after a successful `can_receive` call. pub fn note_received( &mut self, @@ -748,5 +749,55 @@ mod tests { ); } - // TODO [now] ensure statements received with prejudice don't prevent sending later + // Ensure statements received with prejudice don't prevent sending later. + #[test] + fn can_send_statements_received_with_prejudice() { + let group = + vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)]; + + let seconding_limit = 1; + + let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty"); + let hash_a = CandidateHash(Hash::repeat_byte(1)); + let hash_b = CandidateHash(Hash::repeat_byte(2)); + + assert_eq!( + tracker.can_receive( + ValidatorIndex(200), + ValidatorIndex(5), + CompactStatement::Seconded(hash_a), + ), + Ok(Accept::Ok), + ); + + tracker.note_received( + ValidatorIndex(200), + ValidatorIndex(5), + CompactStatement::Seconded(hash_a), + ); + + assert_eq!( + tracker.can_receive( + ValidatorIndex(24), + ValidatorIndex(5), + CompactStatement::Seconded(hash_b), + ), + Ok(Accept::WithPrejudice), + ); + + tracker.note_received( + ValidatorIndex(24), + ValidatorIndex(5), + CompactStatement::Seconded(hash_b), + ); + + assert_eq!( + tracker.can_send( + ValidatorIndex(24), + ValidatorIndex(5), + CompactStatement::Seconded(hash_a), + ), + Ok(()), + ); + } } diff --git a/node/network/statement-distribution/src/vstaging/groups.rs b/node/network/statement-distribution/src/vstaging/groups.rs index 94f6c1e219b0..e03fa63bc3f1 100644 --- a/node/network/statement-distribution/src/vstaging/groups.rs +++ b/node/network/statement-distribution/src/vstaging/groups.rs @@ -16,6 +16,7 @@ //! A utility for tracking groups and their members within a session. +use polkadot_node_primitives::minimum_votes; use polkadot_primitives::vstaging::{AuthorityDiscoveryId, GroupIndex, IndexedVec, ValidatorIndex}; use std::collections::HashMap; @@ -68,7 +69,7 @@ impl Groups { &self, group_index: GroupIndex, ) -> Option<(usize, usize)> { - self.get(group_index).map(|g| (g.len(), super::minimum_votes(g.len()))) + self.get(group_index).map(|g| (g.len(), minimum_votes(g.len()))) } /// Get the group index for a validator by index. diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index e3215822877e..60c9b99ea877 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -75,7 +75,7 @@ mod statement_store; const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement"); const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep = - Rep::CostMinor("Unexpected Statement, missing knowlege for relay parent"); + Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent"); const COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE: Rep = Rep::CostMinor("Unexpected Statement, unknown candidate"); const COST_UNEXPECTED_STATEMENT_REMOTE: Rep = @@ -202,8 +202,8 @@ pub(crate) struct State { request_manager: RequestManager, } -// For the provided validator index, if there is a connected peer -// controlling the given authority ID, +// For the provided validator index, if there is a connected peer controlling the given authority +// ID, then return that peer's `PeerId`. fn connected_validator_peer( authorities: &HashMap, per_session: &PerSessionState, @@ -278,14 +278,6 @@ impl PeerState { } } -/// How many votes we need to consider a candidate backed. -/// -/// WARNING: This has to be kept in sync with the runtime check in the inclusion module. -// TODO [now]: extract to shared primitives -fn minimum_votes(n_validators: usize) -> usize { - std::cmp::min(2, n_validators) -} - #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] pub(crate) async fn handle_network_update( ctx: &mut Context, @@ -1118,11 +1110,11 @@ async fn handle_incoming_statement( .request_manager .get_or_insert(relay_parent, candidate_hash, originator_group); - request_entry.get_mut().add_peer(peer); + request_entry.add_peer(peer); // We only successfully accept statements from the grid on confirmed // candidates, therefore this check only passes if the statement is from the cluster - request_entry.get_mut().set_cluster_priority(); + request_entry.set_cluster_priority(); } let was_fresh = match per_relay_parent.statement_store.insert( @@ -1835,7 +1827,6 @@ async fn handle_incoming_manifest( state .request_manager .get_or_insert(manifest.relay_parent, manifest.candidate_hash, manifest.group_index) - .get_mut() .add_peer(peer); } } diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index 096b78380200..df55e9afc4f6 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -13,7 +13,18 @@ //! A requester for full information on candidates. //! -// TODO [now]: some module docs. +//! 1. We use `RequestManager::get_or_insert().get_mut()` to add and mutate [`RequestedCandidate`]s, either setting the +//! priority or adding a peer we know has the candidate. We currently prioritize "cluster" candidates (those from our +//! own group, although the cluster mechanism could be made to include multiple groups in the future) over "grid" +//! candidates (those from other groups). +//! +//! 2. The main loop of the module will invoke [`RequestManager::next_request`] in a loop until it returns `None`, +//! dispatching all requests with the `NetworkBridgeTxMessage`. The receiving half of the channel is owned by the +//! [`RequestManager`]. +//! +//! 3. The main loop of the module will also select over [`RequestManager::await_incoming`] to receive +//! [`UnhandledResponse`]s, which it then validates using [`UnhandledResponse::validate_response`] (which requires state +//! not owned by the request manager). use super::{ BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE, @@ -78,20 +89,6 @@ pub struct RequestedCandidate { in_flight: bool, } -impl RequestedCandidate { - /// Add a peer to the set of known peers. - pub fn add_peer(&mut self, peer: PeerId) { - if !self.known_by.contains(&peer) { - self.known_by.push_back(peer); - } - } - - /// Note that the candidate is required for the cluster. - pub fn set_cluster_priority(&mut self) { - self.priority.origin = Origin::Cluster; - } -} - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] enum Origin { Cluster = 0, @@ -113,14 +110,17 @@ pub struct Entry<'a> { } impl<'a> Entry<'a> { - /// Access the underlying requested candidate. - pub fn get_mut(&mut self) -> &mut RequestedCandidate { - &mut self.requested + /// Add a peer to the set of known peers. + pub fn add_peer(&mut self, peer: PeerId) { + if !self.requested.known_by.contains(&peer) { + self.requested.known_by.push_back(peer); + } } -} -impl<'a> Drop for Entry<'a> { - fn drop(&mut self) { + /// Note that the candidate is required for the cluster. + pub fn set_cluster_priority(&mut self) { + self.requested.priority.origin = Origin::Cluster; + insert_or_update_priority( &mut *self.by_priority, Some(self.prev_index), @@ -213,7 +213,35 @@ impl RequestManager { } } - // TODO [now]: removal based on relay-parent. + /// Remove based on relay-parent. + pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) { + let mut candidate_hashes = HashSet::new(); + + // Remove from `by_priority` and `requests`. + self.by_priority.retain(|(_priority, id)| { + let retain = relay_parent != id.relay_parent; + if !retain { + self.requests.remove(id); + candidate_hashes.insert(id.candidate_hash); + } + retain + }); + + // Remove from `unique_identifiers`. + for candidate_hash in candidate_hashes { + match self.unique_identifiers.entry(candidate_hash) { + HEntry::Occupied(mut entry) => { + entry.get_mut().retain(|id| relay_parent != id.relay_parent); + if entry.get().is_empty() { + entry.remove(); + } + }, + // We can expect to encounter vacant entries, but only if nodes are misbehaving and + // we don't use a deduplicating collection; there are no issues from ignoring it. + HEntry::Vacant(entry) => (), + } + } + } /// Yields the next request to dispatch, if there is any. /// @@ -622,5 +650,106 @@ fn insert_or_update_priority( mod tests { use super::*; - // TODO [now]: test priority ordering. + #[test] + fn test_remove_by_relay_parent() { + let parent_a = Hash::from_low_u64_le(1); + let parent_b = Hash::from_low_u64_le(2); + let parent_c = Hash::from_low_u64_le(3); + + let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11)); + let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12)); + let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21)); + let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22)); + let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31)); + let duplicate_hash = CandidateHash(Hash::from_low_u64_le(31)); + + let mut request_manager = RequestManager::new(); + request_manager.get_or_insert(parent_a, candidate_a1, 1.into()); + request_manager.get_or_insert(parent_a, candidate_a2, 1.into()); + request_manager.get_or_insert(parent_b, candidate_b1, 1.into()); + request_manager.get_or_insert(parent_b, candidate_b2, 2.into()); + request_manager.get_or_insert(parent_c, candidate_c1, 2.into()); + request_manager.get_or_insert(parent_a, duplicate_hash, 1.into()); + + assert_eq!(request_manager.requests.len(), 6); + assert_eq!(request_manager.by_priority.len(), 6); + assert_eq!(request_manager.unique_identifiers.len(), 5); + + request_manager.remove_by_relay_parent(parent_a); + + assert_eq!(request_manager.requests.len(), 3); + assert_eq!(request_manager.by_priority.len(), 3); + assert_eq!(request_manager.unique_identifiers.len(), 3); + + assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1)); + assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2)); + // Duplicate hash should still be there (under a different parent). + assert!(request_manager.unique_identifiers.contains_key(&duplicate_hash)); + + request_manager.remove_by_relay_parent(parent_b); + + assert_eq!(request_manager.requests.len(), 1); + assert_eq!(request_manager.by_priority.len(), 1); + assert_eq!(request_manager.unique_identifiers.len(), 1); + + assert!(!request_manager.unique_identifiers.contains_key(&candidate_b1)); + assert!(!request_manager.unique_identifiers.contains_key(&candidate_b2)); + + request_manager.remove_by_relay_parent(parent_c); + + assert!(request_manager.requests.is_empty()); + assert!(request_manager.by_priority.is_empty()); + assert!(request_manager.unique_identifiers.is_empty()); + } + + #[test] + fn test_priority_ordering() { + let parent_a = Hash::from_low_u64_le(1); + let parent_b = Hash::from_low_u64_le(2); + let parent_c = Hash::from_low_u64_le(3); + + let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11)); + let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12)); + let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21)); + let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22)); + let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31)); + + let mut request_manager = RequestManager::new(); + + // Add some entries, set a couple of them to cluster (high) priority. + let identifier_a1 = request_manager + .get_or_insert(parent_a, candidate_a1, 1.into()) + .identifier + .clone(); + let identifier_a2 = { + let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into()); + entry.set_cluster_priority(); + entry.identifier.clone() + }; + let identifier_b1 = request_manager + .get_or_insert(parent_b, candidate_b1, 1.into()) + .identifier + .clone(); + let identifier_b2 = request_manager + .get_or_insert(parent_b, candidate_b2, 2.into()) + .identifier + .clone(); + let identifier_c1 = { + let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into()); + entry.set_cluster_priority(); + entry.identifier.clone() + }; + + let attempts = 0; + assert_eq!( + request_manager.by_priority, + vec![ + (Priority { origin: Origin::Cluster, attempts }, identifier_a2), + (Priority { origin: Origin::Cluster, attempts }, identifier_c1), + (Priority { origin: Origin::Unspecified, attempts }, identifier_a1), + (Priority { origin: Origin::Unspecified, attempts }, identifier_b1), + (Priority { origin: Origin::Unspecified, attempts }, identifier_b2), + ] + ); + } } diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index dcbb509b298c..ca98a32b7cb0 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -623,3 +623,10 @@ pub fn maybe_compress_pov(pov: PoV) -> PoV { let pov = PoV { block_data: BlockData(raw) }; pov } + +/// How many votes we need to consider a candidate backed. +/// +/// WARNING: This has to be kept in sync with the runtime check in the inclusion module. +pub fn minimum_votes(n_validators: usize) -> usize { + std::cmp::min(2, n_validators) +} diff --git a/runtime/parachains/src/inclusion/mod.rs b/runtime/parachains/src/inclusion/mod.rs index 2c7435cad1db..95ad1da2599d 100644 --- a/runtime/parachains/src/inclusion/mod.rs +++ b/runtime/parachains/src/inclusion/mod.rs @@ -172,8 +172,7 @@ impl Default for ProcessedCandidates { /// Number of backing votes we need for a valid backing. /// -/// WARNING: This check has to be kept in sync with the node side check in the backing -/// subsystem. +/// WARNING: This check has to be kept in sync with the node side checks. pub fn minimum_backing_votes(n_validators: usize) -> usize { // For considerations on this value see: // https://github.com/paritytech/polkadot/pull/1656#issuecomment-999734650