From 8e81f2e8a9f73d478bb12a1ebbe142d152adabe0 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 22 Dec 2022 08:21:13 -0500 Subject: [PATCH 01/14] Implement `remove_by_relay_parent` --- .../src/vstaging/requests.rs | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index 2214c9ba4b4a..ffac6bd2e038 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -213,7 +213,24 @@ impl RequestManager { } } - // TODO [now]: removal based on relay-parent. + /// Remove based on relay-parent. + pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) { + // Remove from `by_priority` and `requests`. + self.by_priority.retain(|(_priority, id)| { + let retain = relay_parent != id.relay_parent; + if !retain { + self.requests.remove(id); + } + retain + }); + // Remove from `unique_identifiers`. + for (_, candidate_identifier_set) in self.unique_identifiers.iter_mut() { + candidate_identifier_set.retain(|id| relay_parent != id.relay_parent); + } + // TODO: is this necessary? + // If any candidates don't have unique identifiers, remove them. + self.unique_identifiers.retain(|_, set| !set.is_empty()); + } /// Yields the next request to dispatch, if there is any. /// @@ -615,5 +632,56 @@ fn insert_or_update_priority( mod tests { use super::*; + #[test] + fn test_remove_by_relay_parent() { + let parent_a = Hash::from_low_u64_le(10); + let parent_b = Hash::from_low_u64_le(11); + let parent_c = Hash::from_low_u64_le(12); + + let candidate_a1 = CandidateHash(Hash::from_low_u64_le(101)); + let candidate_a2 = CandidateHash(Hash::from_low_u64_le(102)); + let candidate_b1 = CandidateHash(Hash::from_low_u64_le(111)); + let candidate_b2 = CandidateHash(Hash::from_low_u64_le(112)); + let candidate_c1 = CandidateHash(Hash::from_low_u64_le(121)); + let duplicate_hash = CandidateHash(Hash::from_low_u64_le(121)); + + let mut request_manager = RequestManager::new(); + request_manager.get_or_insert(parent_a, candidate_a1, 1.into()); + request_manager.get_or_insert(parent_a, candidate_a2, 1.into()); + request_manager.get_or_insert(parent_b, candidate_b1, 1.into()); + request_manager.get_or_insert(parent_b, candidate_b2, 2.into()); + request_manager.get_or_insert(parent_c, candidate_c1, 2.into()); + request_manager.get_or_insert(parent_a, duplicate_hash, 1.into()); + + assert_eq!(request_manager.requests.len(), 6); + assert_eq!(request_manager.by_priority.len(), 6); + assert_eq!(request_manager.unique_identifiers.len(), 5); + + request_manager.remove_by_relay_parent(parent_a); + + assert_eq!(request_manager.requests.len(), 3); + assert_eq!(request_manager.by_priority.len(), 3); + assert_eq!(request_manager.unique_identifiers.len(), 3); + + assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1)); + assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2)); + assert!(!request_manager.unique_identifiers.contains_key(&duplicate_hash)); + + request_manager.remove_by_relay_parent(parent_b); + + assert_eq!(request_manager.requests.len(), 1); + assert_eq!(request_manager.by_priority.len(), 1); + assert_eq!(request_manager.unique_identifiers.len(), 1); + + assert!(!request_manager.unique_identifiers.contains_key(&candidate_b1)); + assert!(!request_manager.unique_identifiers.contains_key(&candidate_b2)); + + request_manager.remove_by_relay_parent(parent_c); + + assert!(request_manager.requests.is_empty()); + assert!(request_manager.requests.is_empty()); + assert!(request_manager.requests.is_empty()); + } + // TODO [now]: test priority ordering. } From cf1a5513c1c5809ed3bd4c86523b06852b156314 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 22 Dec 2022 09:48:43 -0500 Subject: [PATCH 02/14] Extract `minimum_votes` to shared primitives. --- node/core/backing/src/lib.rs | 12 ++---------- .../statement-distribution/src/vstaging/groups.rs | 10 +--------- .../statement-distribution/src/vstaging/mod.rs | 8 +------- node/primitives/src/lib.rs | 8 ++++++++ 4 files changed, 12 insertions(+), 26 deletions(-) diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 1688732ac0d6..9742a3f1983d 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -80,8 +80,8 @@ use futures::{ use error::{Error, FatalResult}; use polkadot_node_primitives::{ - AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD, - ValidationResult, BACKING_EXECUTION_TIMEOUT, + minimum_votes, AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, + StatementWithPVD, ValidationResult, BACKING_EXECUTION_TIMEOUT, }; use polkadot_node_subsystem::{ messages::{ @@ -385,14 +385,6 @@ struct AttestingData { backing: Vec, } -/// How many votes we need to consider a candidate backed. -/// -/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and -/// statement distribution. -fn minimum_votes(n_validators: usize) -> usize { - std::cmp::min(2, n_validators) -} - #[derive(Default)] struct TableContext { validator: Option, diff --git a/node/network/statement-distribution/src/vstaging/groups.rs b/node/network/statement-distribution/src/vstaging/groups.rs index 5f3618cd7946..7edca96c64d1 100644 --- a/node/network/statement-distribution/src/vstaging/groups.rs +++ b/node/network/statement-distribution/src/vstaging/groups.rs @@ -16,6 +16,7 @@ //! A utility for tracking groups and their members within a session. +use polkadot_node_primitives::minimum_votes; use polkadot_primitives::vstaging::{AuthorityDiscoveryId, GroupIndex, ValidatorIndex}; use std::collections::HashMap; @@ -78,12 +79,3 @@ impl Groups { self.by_discovery_key.get(&discovery_key).map(|x| *x) } } - -/// How many votes we need to consider a candidate backed. -/// -/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and -/// the backing subsystem. -// TODO [now]: extract to shared primitives. -fn minimum_votes(n_validators: usize) -> usize { - std::cmp::min(2, n_validators) -} diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index 90ffc9b00aab..2d4629b324df 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -180,6 +180,7 @@ pub(crate) struct State { // For the provided validator index, if there is a connected peer // controlling the given authority ID, +// TODO [now]: finish above doc. fn connected_validator_peer( authorities: &HashMap, per_session: &PerSessionState, @@ -214,13 +215,6 @@ impl PeerState { } } -/// How many votes we need to consider a candidate backed. -/// -/// WARNING: This has to be kept in sync with the runtime check in the inclusion module. -fn minimum_votes(n_validators: usize) -> usize { - std::cmp::min(2, n_validators) -} - #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] pub(crate) async fn handle_network_update( ctx: &mut Context, diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 908d05bd128b..8b8584e0434a 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -605,3 +605,11 @@ pub fn maybe_compress_pov(pov: PoV) -> PoV { let pov = PoV { block_data: BlockData(raw) }; pov } + +/// How many votes we need to consider a candidate backed. +/// +/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and +/// the backing subsystem. +pub fn minimum_votes(n_validators: usize) -> usize { + std::cmp::min(2, n_validators) +} From 5bb7d204ff5cadc9159bff9453d497dbd7dda544 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 26 Dec 2022 16:10:33 -0500 Subject: [PATCH 03/14] Add `can_send_statements_received_with_prejudice` test --- .../src/vstaging/cluster.rs | 53 ++++++++++++++++++- .../src/vstaging/mod.rs | 2 +- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/cluster.rs b/node/network/statement-distribution/src/vstaging/cluster.rs index 9c17e3e17e14..c79ebf08b663 100644 --- a/node/network/statement-distribution/src/vstaging/cluster.rs +++ b/node/network/statement-distribution/src/vstaging/cluster.rs @@ -172,6 +172,7 @@ impl ClusterTracker { } /// Note that we accepted an incoming statement. This updates internal structures. + /// /// Should only be called after a successful `can_receive` call. pub fn note_received( &mut self, @@ -748,5 +749,55 @@ mod tests { ); } - // TODO [now] ensure statements received with prejudice don't prevent sending later + // Ensure statements received with prejudice don't prevent sending later. + #[test] + fn can_send_statements_received_with_prejudice() { + let group = + vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)]; + + let seconding_limit = 1; + + let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty"); + let hash_a = CandidateHash(Hash::repeat_byte(1)); + let hash_b = CandidateHash(Hash::repeat_byte(2)); + + assert_eq!( + tracker.can_receive( + ValidatorIndex(200), + ValidatorIndex(5), + CompactStatement::Seconded(hash_a), + ), + Ok(Accept::Ok), + ); + + tracker.note_received( + ValidatorIndex(200), + ValidatorIndex(5), + CompactStatement::Seconded(hash_a), + ); + + assert_eq!( + tracker.can_receive( + ValidatorIndex(24), + ValidatorIndex(5), + CompactStatement::Seconded(hash_b), + ), + Ok(Accept::WithPrejudice), + ); + + tracker.note_received( + ValidatorIndex(24), + ValidatorIndex(5), + CompactStatement::Seconded(hash_b), + ); + + assert_eq!( + tracker.can_send( + ValidatorIndex(24), + ValidatorIndex(5), + CompactStatement::Seconded(hash_a), + ), + Ok(()), + ); + } } diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index 2d4629b324df..719e5f4986a4 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -67,7 +67,7 @@ mod statement_store; const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement"); const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep = - Rep::CostMinor("Unexpected Statement, missing knowlege for relay parent"); + Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent"); const COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE: Rep = Rep::CostMinor("Unexpected Statement, unknown candidate"); const COST_UNEXPECTED_STATEMENT_REMOTE: Rep = From 80e49e6db30e1457c399865cb945415e674f021b Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 26 Dec 2022 18:40:26 -0500 Subject: [PATCH 04/14] Fix test --- .../statement-distribution/src/vstaging/requests.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index ffac6bd2e038..9140c4ed1824 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -665,7 +665,8 @@ mod tests { assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1)); assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2)); - assert!(!request_manager.unique_identifiers.contains_key(&duplicate_hash)); + // Duplicate hash should still be there (under a different parent). + assert!(request_manager.unique_identifiers.contains_key(&duplicate_hash)); request_manager.remove_by_relay_parent(parent_b); @@ -679,8 +680,8 @@ mod tests { request_manager.remove_by_relay_parent(parent_c); assert!(request_manager.requests.is_empty()); - assert!(request_manager.requests.is_empty()); - assert!(request_manager.requests.is_empty()); + assert!(request_manager.by_priority.is_empty()); + assert!(request_manager.unique_identifiers.is_empty()); } // TODO [now]: test priority ordering. From 2e26e76e47bd4c928942f37737b87316b6379330 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 9 Jan 2023 08:28:48 -0500 Subject: [PATCH 05/14] Update docstrings --- node/primitives/src/lib.rs | 3 +-- runtime/parachains/src/inclusion/mod.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 8b8584e0434a..c57668e361bc 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -608,8 +608,7 @@ pub fn maybe_compress_pov(pov: PoV) -> PoV { /// How many votes we need to consider a candidate backed. /// -/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and -/// the backing subsystem. +/// WARNING: This has to be kept in sync with the runtime check in the inclusion module. pub fn minimum_votes(n_validators: usize) -> usize { std::cmp::min(2, n_validators) } diff --git a/runtime/parachains/src/inclusion/mod.rs b/runtime/parachains/src/inclusion/mod.rs index e885efc2b6e1..f635c19cb0b2 100644 --- a/runtime/parachains/src/inclusion/mod.rs +++ b/runtime/parachains/src/inclusion/mod.rs @@ -172,8 +172,7 @@ impl Default for ProcessedCandidates { /// Number of backing votes we need for a valid backing. /// -/// WARNING: This check has to be kept in sync with the node side check in the backing -/// subsystem. +/// WARNING: This check has to be kept in sync with the node side checks. pub fn minimum_backing_votes(n_validators: usize) -> usize { // For considerations on this value see: // https://github.com/paritytech/polkadot/pull/1656#issuecomment-999734650 From 4218ad29125abfd3847d7a75267e84ceeaab5d28 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 23 Jan 2023 15:39:10 +0100 Subject: [PATCH 06/14] Cargo fmt --- node/core/backing/src/lib.rs | 6 ++-- .../network/statement-distribution/src/lib.rs | 3 +- .../src/vstaging/candidates.rs | 4 ++- .../src/vstaging/grid.rs | 32 ++++++++++--------- .../src/vstaging/mod.rs | 20 ++++++------ 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 9355d3d7c466..2ec9aa651564 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -1571,9 +1571,9 @@ async fn import_statement( }) .await; - // TODO [now]: notify statement distribution of backed - // candidate. alter control flow so "Share" is always sent - // first. + // TODO [now]: notify statement distribution of backed + // candidate. alter control flow so "Share" is always sent + // first. } else { // The provisioner waits on candidate-backing, which means // that we need to send unbounded messages to avoid cycles. diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index e910bc410830..829f3a8554a3 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -271,7 +271,8 @@ impl StatementDistributionSubsystem { ctx, unimplemented!(), // TODO [now] state candidate_hash, - ).await; + ) + .await; }, }, } diff --git a/node/network/statement-distribution/src/vstaging/candidates.rs b/node/network/statement-distribution/src/vstaging/candidates.rs index 37d41868a071..ef8cf9907fa4 100644 --- a/node/network/statement-distribution/src/vstaging/candidates.rs +++ b/node/network/statement-distribution/src/vstaging/candidates.rs @@ -263,7 +263,9 @@ impl Candidates { /// Note that a candidate is backed. No-op if the candidate is not confirmed. pub fn note_backed(&mut self, candidate_hash: &CandidateHash) { - if let Some(&mut CandidateState::Confirmed(ref mut c)) = self.candidates.get_mut(candidate_hash) { + if let Some(&mut CandidateState::Confirmed(ref mut c)) = + self.candidates.get_mut(candidate_hash) + { c.backed = true; } } diff --git a/node/network/statement-distribution/src/vstaging/grid.rs b/node/network/statement-distribution/src/vstaging/grid.rs index cebeb079f1a0..5d936e0de83e 100644 --- a/node/network/statement-distribution/src/vstaging/grid.rs +++ b/node/network/statement-distribution/src/vstaging/grid.rs @@ -213,9 +213,11 @@ impl GridTracker { // * They are in the sending set for the group AND we have sent them // a manifest AND the received manifest is partial. ManifestKind::Full => is_receiver, - ManifestKind::Acknowledgement => is_sender && self.confirmed_backed - .get(&candidate_hash) - .map_or(false, |c| c.has_sent_manifest_to(sender)), + ManifestKind::Acknowledgement => + is_sender && + self.confirmed_backed + .get(&candidate_hash) + .map_or(false, |c| c.has_sent_manifest_to(sender)), }; if !manifest_allowed { @@ -267,7 +269,8 @@ impl GridTracker { if let Some(confirmed) = self.confirmed_backed.get_mut(&candidate_hash) { // TODO [now]: send statements they need if this is an ack if is_receiver && !confirmed.has_sent_manifest_to(sender) { - self.pending_communication.entry(sender) + self.pending_communication + .entry(sender) .or_default() .insert(candidate_hash, ManifestKind::Acknowledgement); @@ -317,7 +320,6 @@ impl GridTracker { .and_then(|r| r.candidate_statement_filter(&candidate_hash)) .expect("unconfirmed is only populated by validators who have sent manifest; qed"); - // No need to send direct statements, because our local knowledge is `None` c.manifest_received_from(v, statement_filter); } @@ -329,25 +331,25 @@ impl GridTracker { // advertise onwards ad accept received advertisements - let sending_group_manifests = group_topology - .sending - .iter() - .map(|v| (*v, ManifestKind::Full)); + let sending_group_manifests = + group_topology.sending.iter().map(|v| (*v, ManifestKind::Full)); - let receiving_group_manifests = group_topology - .receiving - .iter() - .filter_map(|v| if c.has_received_manifest_from(*v) { + let receiving_group_manifests = group_topology.receiving.iter().filter_map(|v| { + if c.has_received_manifest_from(*v) { Some((*v, ManifestKind::Acknowledgement)) } else { None - }); + } + }); // Note that order is important: if a validator is part of both the sending // and receiving groups, we may overwrite a `Full` manifest with a `Acknowledgement` // one. for (v, manifest_mode) in sending_group_manifests.chain(receiving_group_manifests) { - self.pending_communication.entry(v).or_default().insert(candidate_hash, manifest_mode); + self.pending_communication + .entry(v) + .or_default() + .insert(candidate_hash, manifest_mode); } self.pending_communication diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index 7084e119e274..fe01ba72e4e9 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -1320,13 +1320,10 @@ fn group_for_para( ) -> Option { // Note: this won't work well for parathreads as it assumes that core assignments are fixed // across blocks. - let core_index = availability_cores - .iter() - .position(|c| c.para_id() == Some(para_id)); + let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id)); - core_index.map(|c| { - group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()) - }) + core_index + .map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len())) } #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] @@ -1371,7 +1368,8 @@ async fn fragment_tree_update_inner( candidate_hash, receipt, persisted_validation_data, - } = hypo { + } = hypo + { let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash); let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent); if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) { @@ -1391,7 +1389,8 @@ async fn fragment_tree_update_inner( prs, confirmed, per_session, - ).await; + ) + .await; } } } @@ -1453,7 +1452,7 @@ pub(crate) async fn handle_backed_candidate_message( ); return - } + }, Some(c) => c, }; @@ -1475,7 +1474,8 @@ pub(crate) async fn handle_backed_candidate_message( per_session, &state.authorities, &state.peers, - ).await; + ) + .await; } #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] From a0c80c87cb3e18904dfcb184339746ef585e7a04 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 23 Jan 2023 17:47:23 +0100 Subject: [PATCH 07/14] Fix compile error --- node/network/statement-distribution/src/vstaging/groups.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/network/statement-distribution/src/vstaging/groups.rs b/node/network/statement-distribution/src/vstaging/groups.rs index b99c77b0eba1..e03fa63bc3f1 100644 --- a/node/network/statement-distribution/src/vstaging/groups.rs +++ b/node/network/statement-distribution/src/vstaging/groups.rs @@ -69,7 +69,7 @@ impl Groups { &self, group_index: GroupIndex, ) -> Option<(usize, usize)> { - self.get(group_index).map(|g| (g.len(), super::minimum_votes(g.len()))) + self.get(group_index).map(|g| (g.len(), minimum_votes(g.len()))) } /// Get the group index for a validator by index. From 394080115f064075f46cf71d53de354e4f54ae4a Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 24 Jan 2023 13:00:52 +0100 Subject: [PATCH 08/14] Fix compile errors in tests --- .../src/vstaging/grid.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/grid.rs b/node/network/statement-distribution/src/vstaging/grid.rs index cebeb079f1a0..484a8d7bfc0f 100644 --- a/node/network/statement-distribution/src/vstaging/grid.rs +++ b/node/network/statement-distribution/src/vstaging/grid.rs @@ -1064,7 +1064,7 @@ mod tests { }; let groups = Groups::new( - vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]], + vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]].into(), &[ AuthorityDiscoveryPair::generate().0.public(), AuthorityDiscoveryPair::generate().0.public(), @@ -1090,6 +1090,7 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, + ManifestKind::Full, ValidatorIndex(1), ), Err(ManifestImportError::Disallowed) @@ -1109,6 +1110,7 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, + ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Disallowed) @@ -1131,7 +1133,7 @@ mod tests { }; let groups = Groups::new( - vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]], + vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]].into(), &[ AuthorityDiscoveryPair::generate().0.public(), AuthorityDiscoveryPair::generate().0.public(), @@ -1155,6 +1157,7 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, + ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) @@ -1172,6 +1175,7 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1, 0], }, + ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) @@ -1194,7 +1198,7 @@ mod tests { }; let groups = Groups::new( - vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]], + vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]].into(), &[ AuthorityDiscoveryPair::generate().0.public(), AuthorityDiscoveryPair::generate().0.public(), @@ -1218,6 +1222,7 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1], }, + ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) @@ -1240,7 +1245,7 @@ mod tests { }; let groups = Groups::new( - vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]], + vec![vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]].into(), &[ AuthorityDiscoveryPair::generate().0.public(), AuthorityDiscoveryPair::generate().0.public(), @@ -1266,6 +1271,7 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], }, + ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) @@ -1285,6 +1291,7 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], }, + ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) @@ -1304,9 +1311,10 @@ mod tests { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], }, + ManifestKind::Full, ValidatorIndex(0), ), - Ok(()) + Ok(None) ); } From 6ec6996549111ed586866fb3ac886244ed884198 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 24 Jan 2023 13:01:55 +0100 Subject: [PATCH 09/14] Cargo fmt --- node/core/backing/src/lib.rs | 6 ++-- .../network/statement-distribution/src/lib.rs | 3 +- .../src/vstaging/candidates.rs | 4 ++- .../src/vstaging/grid.rs | 32 ++++++++++--------- .../src/vstaging/mod.rs | 20 ++++++------ 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 5473ba128914..e6be86f2b9a4 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -1579,9 +1579,9 @@ async fn import_statement( }) .await; - // TODO [now]: notify statement distribution of backed - // candidate. alter control flow so "Share" is always sent - // first. + // TODO [now]: notify statement distribution of backed + // candidate. alter control flow so "Share" is always sent + // first. } else { // The provisioner waits on candidate-backing, which means // that we need to send unbounded messages to avoid cycles. diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index e910bc410830..829f3a8554a3 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -271,7 +271,8 @@ impl StatementDistributionSubsystem { ctx, unimplemented!(), // TODO [now] state candidate_hash, - ).await; + ) + .await; }, }, } diff --git a/node/network/statement-distribution/src/vstaging/candidates.rs b/node/network/statement-distribution/src/vstaging/candidates.rs index 37d41868a071..ef8cf9907fa4 100644 --- a/node/network/statement-distribution/src/vstaging/candidates.rs +++ b/node/network/statement-distribution/src/vstaging/candidates.rs @@ -263,7 +263,9 @@ impl Candidates { /// Note that a candidate is backed. No-op if the candidate is not confirmed. pub fn note_backed(&mut self, candidate_hash: &CandidateHash) { - if let Some(&mut CandidateState::Confirmed(ref mut c)) = self.candidates.get_mut(candidate_hash) { + if let Some(&mut CandidateState::Confirmed(ref mut c)) = + self.candidates.get_mut(candidate_hash) + { c.backed = true; } } diff --git a/node/network/statement-distribution/src/vstaging/grid.rs b/node/network/statement-distribution/src/vstaging/grid.rs index 484a8d7bfc0f..f7016ab39ec1 100644 --- a/node/network/statement-distribution/src/vstaging/grid.rs +++ b/node/network/statement-distribution/src/vstaging/grid.rs @@ -213,9 +213,11 @@ impl GridTracker { // * They are in the sending set for the group AND we have sent them // a manifest AND the received manifest is partial. ManifestKind::Full => is_receiver, - ManifestKind::Acknowledgement => is_sender && self.confirmed_backed - .get(&candidate_hash) - .map_or(false, |c| c.has_sent_manifest_to(sender)), + ManifestKind::Acknowledgement => + is_sender && + self.confirmed_backed + .get(&candidate_hash) + .map_or(false, |c| c.has_sent_manifest_to(sender)), }; if !manifest_allowed { @@ -267,7 +269,8 @@ impl GridTracker { if let Some(confirmed) = self.confirmed_backed.get_mut(&candidate_hash) { // TODO [now]: send statements they need if this is an ack if is_receiver && !confirmed.has_sent_manifest_to(sender) { - self.pending_communication.entry(sender) + self.pending_communication + .entry(sender) .or_default() .insert(candidate_hash, ManifestKind::Acknowledgement); @@ -317,7 +320,6 @@ impl GridTracker { .and_then(|r| r.candidate_statement_filter(&candidate_hash)) .expect("unconfirmed is only populated by validators who have sent manifest; qed"); - // No need to send direct statements, because our local knowledge is `None` c.manifest_received_from(v, statement_filter); } @@ -329,25 +331,25 @@ impl GridTracker { // advertise onwards ad accept received advertisements - let sending_group_manifests = group_topology - .sending - .iter() - .map(|v| (*v, ManifestKind::Full)); + let sending_group_manifests = + group_topology.sending.iter().map(|v| (*v, ManifestKind::Full)); - let receiving_group_manifests = group_topology - .receiving - .iter() - .filter_map(|v| if c.has_received_manifest_from(*v) { + let receiving_group_manifests = group_topology.receiving.iter().filter_map(|v| { + if c.has_received_manifest_from(*v) { Some((*v, ManifestKind::Acknowledgement)) } else { None - }); + } + }); // Note that order is important: if a validator is part of both the sending // and receiving groups, we may overwrite a `Full` manifest with a `Acknowledgement` // one. for (v, manifest_mode) in sending_group_manifests.chain(receiving_group_manifests) { - self.pending_communication.entry(v).or_default().insert(candidate_hash, manifest_mode); + self.pending_communication + .entry(v) + .or_default() + .insert(candidate_hash, manifest_mode); } self.pending_communication diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index acdee49c3c96..64e18f04cd69 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -1327,13 +1327,10 @@ fn group_for_para( ) -> Option { // Note: this won't work well for parathreads as it assumes that core assignments are fixed // across blocks. - let core_index = availability_cores - .iter() - .position(|c| c.para_id() == Some(para_id)); + let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id)); - core_index.map(|c| { - group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()) - }) + core_index + .map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len())) } #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] @@ -1378,7 +1375,8 @@ async fn fragment_tree_update_inner( candidate_hash, receipt, persisted_validation_data, - } = hypo { + } = hypo + { let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash); let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent); if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) { @@ -1398,7 +1396,8 @@ async fn fragment_tree_update_inner( prs, confirmed, per_session, - ).await; + ) + .await; } } } @@ -1460,7 +1459,7 @@ pub(crate) async fn handle_backed_candidate_message( ); return - } + }, Some(c) => c, }; @@ -1482,7 +1481,8 @@ pub(crate) async fn handle_backed_candidate_message( per_session, &state.authorities, &state.peers, - ).await; + ) + .await; } #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] From ad9c8edf8324ab3c47f72b7684b37cf2e4d5adfe Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 24 Jan 2023 13:12:42 +0100 Subject: [PATCH 10/14] Add module docs; write `test_priority_ordering` (first draft) --- .../src/vstaging/requests.rs | 72 ++++++++++++++++--- 1 file changed, 61 insertions(+), 11 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index 39733607d5be..59b6978cfaad 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -13,7 +13,18 @@ //! A requester for full information on candidates. //! -// TODO [now]: some module docs. +//! 1. We use `RequestManager::get_or_insert().get_mut()` to add and mutate [`RequestedCandidate`]s, either setting the +//! priority or adding a peer we know has the candidate. We currently prioritize "cluster" candidates (those from our +//! own group, although the cluster mechanism could be made to include multiple groups in the future) over "grid" +//! candidates (those from other groups). +//! +//! 2. The main loop of the module will invoke [`RequestManager::next_request`] in a loop until it returns `None`, +//! dispatching all requests with the `NetworkBridgeTxMessage`. The receiving half of the channel is owned by the +//! [`RequestManager`]. +//! +//! 3. The main loop of the module will also select over [`RequestManager::await_incoming`] to receive +//! [`UnhandledResponse`]s, which it then validates using [`UnhandledResponse::validate_response`] (which requires state +//! not owned by the request manager). use super::{ BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE, @@ -641,16 +652,16 @@ mod tests { #[test] fn test_remove_by_relay_parent() { - let parent_a = Hash::from_low_u64_le(10); - let parent_b = Hash::from_low_u64_le(11); - let parent_c = Hash::from_low_u64_le(12); + let parent_a = Hash::from_low_u64_le(1); + let parent_b = Hash::from_low_u64_le(2); + let parent_c = Hash::from_low_u64_le(3); - let candidate_a1 = CandidateHash(Hash::from_low_u64_le(101)); - let candidate_a2 = CandidateHash(Hash::from_low_u64_le(102)); - let candidate_b1 = CandidateHash(Hash::from_low_u64_le(111)); - let candidate_b2 = CandidateHash(Hash::from_low_u64_le(112)); - let candidate_c1 = CandidateHash(Hash::from_low_u64_le(121)); - let duplicate_hash = CandidateHash(Hash::from_low_u64_le(121)); + let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11)); + let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12)); + let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21)); + let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22)); + let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31)); + let duplicate_hash = CandidateHash(Hash::from_low_u64_le(31)); let mut request_manager = RequestManager::new(); request_manager.get_or_insert(parent_a, candidate_a1, 1.into()); @@ -691,5 +702,44 @@ mod tests { assert!(request_manager.unique_identifiers.is_empty()); } - // TODO [now]: test priority ordering. + #[test] + fn test_priority_ordering() { + let parent_a = Hash::from_low_u64_le(1); + let parent_b = Hash::from_low_u64_le(2); + let parent_c = Hash::from_low_u64_le(3); + + let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11)); + let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12)); + let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21)); + let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22)); + let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31)); + + let mut request_manager = RequestManager::new(); + + // Add some entries, set a couple of them to cluster (high) priority. + let entry = request_manager.get_or_insert(parent_a, candidate_a1, 1.into()); + let identifier_a1 = entry.identifier; + let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into()); + let identifier_a2 = entry.identifier; + entry.get_mut().set_cluster_priority(); + let entry = request_manager.get_or_insert(parent_b, candidate_b1, 1.into()); + let identifier_b1 = entry.identifier; + let entry = request_manager.get_or_insert(parent_b, candidate_b2, 2.into()); + let identifier_b2 = entry.identifier; + let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into()); + let identifier_c1 = entry.identifier; + entry.get_mut().set_cluster_priority(); + + let attempts = 0; + assert_eq!( + request_manager.by_priority, + vec![ + (Priority { origin: Origin::Cluster, attempts }, identifier_a2), + (Priority { origin: Origin::Cluster, attempts }, identifier_c1), + (Priority { origin: Origin::Unspecified, attempts }, identifier_a1), + (Priority { origin: Origin::Unspecified, attempts }, identifier_b1), + (Priority { origin: Origin::Unspecified, attempts }, identifier_b2), + ] + ); + } } From f978c63fa2c9979648462b3558982e55fe8db3a4 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 24 Jan 2023 13:21:14 +0100 Subject: [PATCH 11/14] Fix `test_priority_ordering` --- .../src/vstaging/requests.rs | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index 59b6978cfaad..c2102cb782f9 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -717,18 +717,28 @@ mod tests { let mut request_manager = RequestManager::new(); // Add some entries, set a couple of them to cluster (high) priority. - let entry = request_manager.get_or_insert(parent_a, candidate_a1, 1.into()); - let identifier_a1 = entry.identifier; - let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into()); - let identifier_a2 = entry.identifier; - entry.get_mut().set_cluster_priority(); - let entry = request_manager.get_or_insert(parent_b, candidate_b1, 1.into()); - let identifier_b1 = entry.identifier; - let entry = request_manager.get_or_insert(parent_b, candidate_b2, 2.into()); - let identifier_b2 = entry.identifier; - let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into()); - let identifier_c1 = entry.identifier; - entry.get_mut().set_cluster_priority(); + let identifier_a1 = request_manager + .get_or_insert(parent_a, candidate_a1, 1.into()) + .identifier + .clone(); + let identifier_a2 = { + let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into()); + entry.get_mut().set_cluster_priority(); + entry.identifier.clone() + }; + let identifier_b1 = request_manager + .get_or_insert(parent_b, candidate_b1, 1.into()) + .identifier + .clone(); + let identifier_b2 = request_manager + .get_or_insert(parent_b, candidate_b2, 2.into()) + .identifier + .clone(); + let identifier_c1 = { + let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into()); + entry.get_mut().set_cluster_priority(); + entry.identifier.clone() + }; let attempts = 0; assert_eq!( From 42b9a3464390b01ccf38535bb8874f6948aa15d8 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 24 Jan 2023 13:25:45 +0100 Subject: [PATCH 12/14] Move `insert_or_update_priority`: `Drop` -> `set_cluster_priority` --- .../statement-distribution/src/vstaging/mod.rs | 2 +- .../src/vstaging/requests.rs | 16 ++++++---------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index fe01ba72e4e9..91004adc94a3 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -1020,7 +1020,7 @@ async fn handle_incoming_statement( // We only successfully accept statements from the grid on unconfirmed // candidates, therefore this check only passes if the statement is from the cluster - request_entry.get_mut().set_cluster_priority(); + request_entry.set_cluster_priority(); } let was_fresh = match per_relay_parent.statement_store.insert( diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index c2102cb782f9..00ce000cd02d 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -96,11 +96,6 @@ impl RequestedCandidate { self.known_by.push_back(peer); } } - - /// Note that the candidate is required for the cluster. - pub fn set_cluster_priority(&mut self) { - self.priority.origin = Origin::Cluster; - } } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -128,10 +123,11 @@ impl<'a> Entry<'a> { pub fn get_mut(&mut self) -> &mut RequestedCandidate { &mut self.requested } -} -impl<'a> Drop for Entry<'a> { - fn drop(&mut self) { + /// Note that the candidate is required for the cluster. + pub fn set_cluster_priority(&mut self) { + self.requested.priority.origin = Origin::Cluster; + insert_or_update_priority( &mut *self.by_priority, Some(self.prev_index), @@ -723,7 +719,7 @@ mod tests { .clone(); let identifier_a2 = { let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into()); - entry.get_mut().set_cluster_priority(); + entry.set_cluster_priority(); entry.identifier.clone() }; let identifier_b1 = request_manager @@ -736,7 +732,7 @@ mod tests { .clone(); let identifier_c1 = { let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into()); - entry.get_mut().set_cluster_priority(); + entry.set_cluster_priority(); entry.identifier.clone() }; From 2e6adf7d19e89970fbd64e2b0b43ccf0df4351c3 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Wed, 25 Jan 2023 16:58:35 +0100 Subject: [PATCH 13/14] Address review comments --- .../src/vstaging/mod.rs | 5 ++--- .../src/vstaging/requests.rs | 21 ++++++++++++++----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index 91004adc94a3..5094f860aa7e 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -190,9 +190,8 @@ pub(crate) struct State { request_manager: RequestManager, } -// For the provided validator index, if there is a connected peer -// controlling the given authority ID, -// TODO [now]: finish above doc. +// For the provided validator index, if there is a connected peer controlling the given authority +// ID, then return that peer's `PeerId`. fn connected_validator_peer( authorities: &HashMap, per_session: &PerSessionState, diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index 00ce000cd02d..9564ac155143 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -222,21 +222,32 @@ impl RequestManager { /// Remove based on relay-parent. pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) { + let mut candidate_hashes = HashSet::new(); + // Remove from `by_priority` and `requests`. self.by_priority.retain(|(_priority, id)| { let retain = relay_parent != id.relay_parent; if !retain { self.requests.remove(id); + candidate_hashes.insert(id.candidate_hash); } retain }); + // Remove from `unique_identifiers`. - for (_, candidate_identifier_set) in self.unique_identifiers.iter_mut() { - candidate_identifier_set.retain(|id| relay_parent != id.relay_parent); + for candidate_hash in candidate_hashes { + match self.unique_identifiers.entry(candidate_hash) { + HEntry::Occupied(mut entry) => { + entry.get_mut().retain(|id| relay_parent != id.relay_parent); + if entry.get().is_empty() { + entry.remove(); + } + }, + // We can expect to encounter vacant entries, but only if nodes are misbehaving and + // we don't use a deduplicating collection; there are no issues from ignoring it. + HEntry::Vacant(entry) => (), + } } - // TODO: is this necessary? - // If any candidates don't have unique identifiers, remove them. - self.unique_identifiers.retain(|_, set| !set.is_empty()); } /// Yields the next request to dispatch, if there is any. From 2aaa46da80ec4cb1fc1553eb4350b0573b428014 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Wed, 25 Jan 2023 17:32:43 +0100 Subject: [PATCH 14/14] Remove `Entry::get_mut` --- .../statement-distribution/src/vstaging/mod.rs | 3 +-- .../src/vstaging/requests.rs | 17 +++++------------ 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index 1b65f3a43cb9..60c9b99ea877 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -1110,7 +1110,7 @@ async fn handle_incoming_statement( .request_manager .get_or_insert(relay_parent, candidate_hash, originator_group); - request_entry.get_mut().add_peer(peer); + request_entry.add_peer(peer); // We only successfully accept statements from the grid on confirmed // candidates, therefore this check only passes if the statement is from the cluster @@ -1827,7 +1827,6 @@ async fn handle_incoming_manifest( state .request_manager .get_or_insert(manifest.relay_parent, manifest.candidate_hash, manifest.group_index) - .get_mut() .add_peer(peer); } } diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index 9564ac155143..df55e9afc4f6 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -89,15 +89,6 @@ pub struct RequestedCandidate { in_flight: bool, } -impl RequestedCandidate { - /// Add a peer to the set of known peers. - pub fn add_peer(&mut self, peer: PeerId) { - if !self.known_by.contains(&peer) { - self.known_by.push_back(peer); - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] enum Origin { Cluster = 0, @@ -119,9 +110,11 @@ pub struct Entry<'a> { } impl<'a> Entry<'a> { - /// Access the underlying requested candidate. - pub fn get_mut(&mut self) -> &mut RequestedCandidate { - &mut self.requested + /// Add a peer to the set of known peers. + pub fn add_peer(&mut self, peer: PeerId) { + if !self.requested.known_by.contains(&peer) { + self.requested.known_by.push_back(peer); + } } /// Note that the candidate is required for the cluster.