Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ use futures::{
};

use std::{
collections::{btree_map::Entry, BTreeMap, HashMap, HashSet},
collections::{
btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
},
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -400,7 +402,7 @@ impl Wakeups {
}

// we are replacing previous wakeup with an earlier one.
if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
if let Some(pos) =
entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
{
Expand Down Expand Up @@ -436,7 +438,7 @@ impl Wakeups {
});

for (tick, pruned) in pruned_wakeups {
if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
if entry.get().is_empty() {
let _ = entry.remove();
Expand All @@ -457,10 +459,10 @@ impl Wakeups {
Some(tick) => {
clock.wait(tick).await;
match self.wakeups.entry(tick) {
Entry::Vacant(_) => {
BTMEntry::Vacant(_) => {
panic!("entry is known to exist since `first` was `Some`; qed")
},
Entry::Occupied(mut entry) => {
BTMEntry::Occupied(mut entry) => {
let (hash, candidate_hash) = entry.get_mut().pop()
.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");

Expand Down Expand Up @@ -509,7 +511,7 @@ impl ApprovalState {
struct CurrentlyCheckingSet {
/// Invariant: The contained `Vec` needs to stay sorted as we are using `binary_search_by_key`
/// on it.
candidate_hash_map: HashMap<CandidateHash, Vec<Hash>>,
candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
}

Expand All @@ -529,21 +531,26 @@ impl CurrentlyCheckingSet {
relay_block: Hash,
launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
) -> SubsystemResult<()> {
let val = self.candidate_hash_map.entry(candidate_hash).or_insert(Default::default());

if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) {
let _ = val.insert(k, relay_block);
let work = launch_work.await?;
self.currently_checking.push(Box::pin(async move {
match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
None => ApprovalState {
candidate_hash,
validator_index,
approval_outcome: ApprovalOutcome::TimedOut,
},
Some(approval_state) => approval_state,
}
}));
match self.candidate_hash_map.entry(candidate_hash) {
HMEntry::Occupied(mut entry) => {
// validation already undergoing. just add the relay hash if unknown.
entry.get_mut().insert(relay_block);
},
HMEntry::Vacant(entry) => {
// validation not ongoing. launch work and time out the remote handle.
entry.insert(HashSet::new()).insert(relay_block);
let work = launch_work.await?;
self.currently_checking.push(Box::pin(async move {
match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
None => ApprovalState {
candidate_hash,
validator_index,
approval_outcome: ApprovalOutcome::TimedOut,
},
Some(approval_state) => approval_state,
}
}));
},
}

Ok(())
Expand All @@ -552,7 +559,7 @@ impl CurrentlyCheckingSet {
pub async fn next(
&mut self,
approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
) -> (Vec<Hash>, ApprovalState) {
) -> (HashSet<Hash>, ApprovalState) {
if !self.currently_checking.is_empty() {
if let Some(approval_state) = self.currently_checking.next().await {
let out = self
Expand Down