diff --git a/polkadot/runtime/parachains/src/assigner_coretime/tests.rs b/polkadot/runtime/parachains/src/assigner_coretime/tests.rs index 41cf21e267e43..81a0988ea67cd 100644 --- a/polkadot/runtime/parachains/src/assigner_coretime/tests.rs +++ b/polkadot/runtime/parachains/src/assigner_coretime/tests.rs @@ -75,7 +75,7 @@ fn run_to_block( Scheduler::initializer_initialize(b + 1); // In the real runtime this is expected to be called by the `InclusionInherent` pallet. - Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1); + Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1); } } diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs index 8ac6ab77beee5..5747413e71478 100644 --- a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs +++ b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs @@ -77,7 +77,7 @@ fn run_to_block( OnDemandAssigner::on_initialize(b + 1); // In the real runtime this is expected to be called by the `InclusionInherent` pallet. - Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1); + Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1); } } diff --git a/polkadot/runtime/parachains/src/assigner_parachains/tests.rs b/polkadot/runtime/parachains/src/assigner_parachains/tests.rs index ebd24e89162ad..14cb1a8978602 100644 --- a/polkadot/runtime/parachains/src/assigner_parachains/tests.rs +++ b/polkadot/runtime/parachains/src/assigner_parachains/tests.rs @@ -71,7 +71,7 @@ fn run_to_block( Scheduler::initializer_initialize(b + 1); // In the real runtime this is expected to be called by the `InclusionInherent` pallet. - Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1); + Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1); } } diff --git a/polkadot/runtime/parachains/src/builder.rs b/polkadot/runtime/parachains/src/builder.rs index 5ed5a2b527c07..c046526ba372b 100644 --- a/polkadot/runtime/parachains/src/builder.rs +++ b/polkadot/runtime/parachains/src/builder.rs @@ -92,11 +92,17 @@ pub(crate) struct BenchBuilder { /// will correspond to core index 3. There must be one entry for each core with a dispute /// statement set. dispute_sessions: Vec, + /// Paras here will both be backed in the inherent data and already occupying a core (which is + /// freed via bitfields). + /// /// Map from para id to number of validity votes. Core indices are generated based on /// `elastic_paras` configuration. Each para id in `elastic_paras` gets the /// specified amount of consecutive cores assigned to it. If a para id is not present /// in `elastic_paras` it get assigned to a single core. backed_and_concluding_paras: BTreeMap, + + /// Paras which don't yet occupy a core, but will after the inherent has been processed. + backed_in_inherent_paras: BTreeMap, /// Map from para id (seed) to number of chained candidates. elastic_paras: BTreeMap, /// Make every candidate include a code upgrade by setting this to `Some` where the interior @@ -132,6 +138,7 @@ impl BenchBuilder { dispute_statements: BTreeMap::new(), dispute_sessions: Default::default(), backed_and_concluding_paras: Default::default(), + backed_in_inherent_paras: Default::default(), elastic_paras: Default::default(), code_upgrade: None, fill_claimqueue: true, @@ -167,6 +174,12 @@ impl BenchBuilder { self } + /// Set a map from para id seed to number of validity votes for votes in inherent data. + pub(crate) fn set_backed_in_inherent_paras(mut self, backed: BTreeMap) -> Self { + self.backed_in_inherent_paras = backed; + self + } + /// Set a map from para id seed to number of cores assigned to it. pub(crate) fn set_elastic_paras(mut self, elastic_paras: BTreeMap) -> Self { self.elastic_paras = elastic_paras; @@ -753,8 +766,8 @@ impl BenchBuilder { /// /// Note that this API only allows building scenarios where the `backed_and_concluding_paras` /// are mutually exclusive with the cores for disputes. So - /// `backed_and_concluding_paras.len() + dispute_sessions.len()` must be less than the max - /// number of cores. + /// `backed_and_concluding_paras.len() + dispute_sessions.len() + backed_in_inherent_paras` must + /// be less than the max number of cores. pub(crate) fn build(self) -> Bench { // Make sure relevant storage is cleared. This is just to get the asserts to work when // running tests because it seems the storage is not cleared in between. @@ -771,8 +784,10 @@ impl BenchBuilder { .sum::() .saturating_sub(self.elastic_paras.len() as usize); - let used_cores = - self.dispute_sessions.len() + self.backed_and_concluding_paras.len() + extra_cores; + let used_cores = self.dispute_sessions.len() + + self.backed_and_concluding_paras.len() + + self.backed_in_inherent_paras.len() + + extra_cores; assert!(used_cores <= max_cores); let fill_claimqueue = self.fill_claimqueue; @@ -793,8 +808,12 @@ impl BenchBuilder { &builder.elastic_paras, used_cores, ); + + let mut backed_in_inherent = BTreeMap::new(); + backed_in_inherent.append(&mut builder.backed_and_concluding_paras.clone()); + backed_in_inherent.append(&mut builder.backed_in_inherent_paras.clone()); let backed_candidates = builder.create_backed_candidates( - &builder.backed_and_concluding_paras, + &backed_in_inherent, &builder.elastic_paras, builder.code_upgrade, ); @@ -845,12 +864,16 @@ impl BenchBuilder { scheduler::AvailabilityCores::::set(cores); core_idx = 0u32; + + // We need entries in the claim queue for those: + all_cores.append(&mut builder.backed_in_inherent_paras.clone()); + if fill_claimqueue { let cores = all_cores .keys() .flat_map(|para_id| { (0..elastic_paras.get(¶_id).cloned().unwrap_or(1)) - .filter_map(|_para_local_core_idx| { + .map(|_para_local_core_idx| { let ttl = configuration::ActiveConfig::::get().scheduler_params.ttl; // Load an assignment into provider so that one is present to pop let assignment = @@ -859,17 +882,11 @@ impl BenchBuilder { ParaId::from(*para_id), ); - let entry = ( - CoreIndex(core_idx), - [ParasEntry::new(assignment, now + ttl)].into(), - ); - let res = if builder.unavailable_cores.contains(&core_idx) { - None - } else { - Some(entry) - }; core_idx += 1; - res + ( + CoreIndex(core_idx - 1), + [ParasEntry::new(assignment, now + ttl)].into(), + ) }) .collect::>)>>() }) diff --git a/polkadot/runtime/parachains/src/configuration.rs b/polkadot/runtime/parachains/src/configuration.rs index 10ecaa16a8469..bffeab4a0d21b 100644 --- a/polkadot/runtime/parachains/src/configuration.rs +++ b/polkadot/runtime/parachains/src/configuration.rs @@ -335,6 +335,8 @@ pub enum InconsistentError { InconsistentExecutorParams { inner: ExecutorParamError }, /// TTL should be bigger than lookahead LookaheadExceedsTTL, + /// Lookahead is zero, while it must be at least 1 for parachains to work. + LookaheadZero, /// Passed in queue size for on-demand was too large. OnDemandQueueSizeTooLarge, /// Number of delay tranches cannot be 0. @@ -432,6 +434,10 @@ where return Err(LookaheadExceedsTTL) } + if self.scheduler_params.lookahead == 0 { + return Err(LookaheadZero) + } + if self.scheduler_params.on_demand_queue_max_size > ON_DEMAND_MAX_QUEUE_MAX_SIZE { return Err(OnDemandQueueSizeTooLarge) } diff --git a/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs b/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs index 267a9781a1061..4c8b093451ed5 100644 --- a/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs +++ b/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs @@ -110,7 +110,7 @@ benchmarks! { .collect(); let scenario = BenchBuilder::::new() - .set_backed_and_concluding_paras(cores_with_backed.clone()) + .set_backed_in_inherent_paras(cores_with_backed.clone()) .build(); let mut benchmark = scenario.data.clone(); @@ -161,7 +161,7 @@ benchmarks! { .collect(); let scenario = BenchBuilder::::new() - .set_backed_and_concluding_paras(cores_with_backed.clone()) + .set_backed_in_inherent_paras(cores_with_backed.clone()) .set_code_upgrade(v) .build(); diff --git a/polkadot/runtime/parachains/src/paras_inherent/mod.rs b/polkadot/runtime/parachains/src/paras_inherent/mod.rs index 386873aad4570..8b527c09490d6 100644 --- a/polkadot/runtime/parachains/src/paras_inherent/mod.rs +++ b/polkadot/runtime/parachains/src/paras_inherent/mod.rs @@ -560,7 +560,7 @@ impl Pallet { .chain(freed_disputed.into_iter().map(|core| (core, FreedReason::Concluded))) .chain(freed_timeout.into_iter().map(|c| (c, FreedReason::TimedOut))) .collect::>(); - scheduler::Pallet::::free_cores_and_fill_claimqueue(freed, now); + scheduler::Pallet::::free_cores_and_fill_claim_queue(freed, now); METRICS.on_candidates_processed_total(backed_candidates.len() as u64); @@ -570,12 +570,13 @@ impl Pallet { .map(|b| *b) .unwrap_or(false); - let mut scheduled: BTreeMap> = BTreeMap::new(); - let mut total_scheduled_cores = 0; + let mut eligible: BTreeMap> = BTreeMap::new(); + let mut total_eligible_cores = 0; - for (core_idx, para_id) in scheduler::Pallet::::scheduled_paras() { - total_scheduled_cores += 1; - scheduled.entry(para_id).or_default().insert(core_idx); + for (core_idx, para_id) in scheduler::Pallet::::eligible_paras() { + total_eligible_cores += 1; + log::trace!(target: LOG_TARGET, "Found eligible para {:?} on core {:?}", para_id, core_idx); + eligible.entry(para_id).or_default().insert(core_idx); } let initial_candidate_count = backed_candidates.len(); @@ -583,12 +584,12 @@ impl Pallet { backed_candidates, &allowed_relay_parents, concluded_invalid_hashes, - scheduled, + eligible, core_index_enabled, ); let count = count_backed_candidates(&backed_candidates_with_core); - ensure!(count <= total_scheduled_cores, Error::::UnscheduledCandidate); + ensure!(count <= total_eligible_cores, Error::::UnscheduledCandidate); METRICS.on_candidates_sanitized(count as u64); @@ -1422,7 +1423,7 @@ fn map_candidates_to_cores::claimqueue_is_empty()); + assert!(scheduler::Pallet::::claim_queue_is_empty()); // Nothing is filtered out (including the backed candidates.) assert_eq!( @@ -257,7 +257,7 @@ mod enter { .unwrap(); // The current schedule is empty prior to calling `create_inherent_enter`. - assert!(scheduler::Pallet::::claimqueue_is_empty()); + assert!(scheduler::Pallet::::claim_queue_is_empty()); assert!(pallet::OnChainVotes::::get().is_none()); @@ -372,7 +372,7 @@ mod enter { let mut inherent_data = InherentData::new(); inherent_data.put_data(PARACHAINS_INHERENT_IDENTIFIER, &scenario.data).unwrap(); - assert!(!scheduler::Pallet::::claimqueue_is_empty()); + assert!(!scheduler::Pallet::::claim_queue_is_empty()); // The right candidates have been filtered out (the ones for cores 0,4,5) assert_eq!( @@ -618,7 +618,7 @@ mod enter { .unwrap(); // The current schedule is empty prior to calling `create_inherent_enter`. - assert!(scheduler::Pallet::::claimqueue_is_empty()); + assert!(scheduler::Pallet::::claim_queue_is_empty()); let multi_dispute_inherent_data = Pallet::::create_inherent_inner(&inherent_data.clone()).unwrap(); @@ -690,7 +690,7 @@ mod enter { .unwrap(); // The current schedule is empty prior to calling `create_inherent_enter`. - assert!(scheduler::Pallet::::claimqueue_is_empty()); + assert!(scheduler::Pallet::::claim_queue_is_empty()); let limit_inherent_data = Pallet::::create_inherent_inner(&inherent_data.clone()).unwrap(); @@ -762,7 +762,7 @@ mod enter { .unwrap(); // The current schedule is empty prior to calling `create_inherent_enter`. - assert!(scheduler::Pallet::::claimqueue_is_empty()); + assert!(scheduler::Pallet::::claim_queue_is_empty()); // Nothing is filtered out (including the backed candidates.) let limit_inherent_data = @@ -849,7 +849,7 @@ mod enter { .unwrap(); // The current schedule is empty prior to calling `create_inherent_enter`. - assert!(scheduler::Pallet::::claimqueue_is_empty()); + assert!(scheduler::Pallet::::claim_queue_is_empty()); // Nothing is filtered out (including the backed candidates.) let limit_inherent_data = @@ -1818,7 +1818,7 @@ mod sanitizers { ]); // Update scheduler's claimqueue with the parachains - scheduler::Pallet::::set_claimqueue(BTreeMap::from([ + scheduler::Pallet::::set_claim_queue(BTreeMap::from([ ( CoreIndex::from(0), VecDeque::from([ParasEntry::new( @@ -2001,7 +2001,7 @@ mod sanitizers { ]); // Update scheduler's claimqueue with the parachains - scheduler::Pallet::::set_claimqueue(BTreeMap::from([ + scheduler::Pallet::::set_claim_queue(BTreeMap::from([ ( CoreIndex::from(0), VecDeque::from([ParasEntry::new( @@ -2542,7 +2542,7 @@ mod sanitizers { ]); // Update scheduler's claimqueue with the parachains - scheduler::Pallet::::set_claimqueue(BTreeMap::from([ + scheduler::Pallet::::set_claim_queue(BTreeMap::from([ ( CoreIndex::from(0), VecDeque::from([ParasEntry::new( diff --git a/polkadot/runtime/parachains/src/runtime_api_impl/v10.rs b/polkadot/runtime/parachains/src/runtime_api_impl/v10.rs index dbb79b86c56cc..4417ec75abd67 100644 --- a/polkadot/runtime/parachains/src/runtime_api_impl/v10.rs +++ b/polkadot/runtime/parachains/src/runtime_api_impl/v10.rs @@ -66,7 +66,7 @@ pub fn availability_cores() -> Vec::free_cores_and_fill_claimqueue(Vec::new(), now); + scheduler::Pallet::::free_cores_and_fill_claim_queue(Vec::new(), now); let time_out_for = scheduler::Pallet::::availability_timeout_predicate(); @@ -305,7 +305,7 @@ pub fn validation_code( /// Implementation for the `candidate_pending_availability` function of the runtime API. #[deprecated( - note = "`candidate_pending_availability` will be removed. Use `candidates_pending_availability` to query + note = "`candidate_pending_availability` will be removed. Use `candidates_pending_availability` to query all candidates pending availability" )] pub fn candidate_pending_availability( diff --git a/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs b/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs index 8c239dc207f65..62e96e9fbb051 100644 --- a/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs +++ b/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs @@ -16,7 +16,7 @@ //! Put implementations of functions from staging APIs here. -use crate::{inclusion, initializer, scheduler}; +use crate::{configuration, inclusion, initializer, scheduler}; use polkadot_primitives::{CommittedCandidateReceipt, CoreIndex, Id as ParaId}; use sp_runtime::traits::One; use sp_std::{ @@ -32,12 +32,21 @@ pub fn claim_queue() -> BTreeMap>::free_cores_and_fill_claimqueue(Vec::new(), now); + >::free_cores_and_fill_claim_queue(Vec::new(), now); + let config = configuration::ActiveConfig::::get(); + // Extra sanity, config should already never be smaller than 1: + let n_lookahead = config.scheduler_params.lookahead.max(1); scheduler::ClaimQueue::::get() .into_iter() .map(|(core_index, entries)| { - (core_index, entries.into_iter().map(|e| e.para_id()).collect()) + // on cores timing out internal claim queue size may be temporarily longer than it + // should be as the timed out assignment might got pushed back to an already full claim + // queue: + ( + core_index, + entries.into_iter().map(|e| e.para_id()).take(n_lookahead as usize).collect(), + ) }) .collect() } diff --git a/polkadot/runtime/parachains/src/scheduler.rs b/polkadot/runtime/parachains/src/scheduler.rs index 0442301a32fff..33b4d849c490f 100644 --- a/polkadot/runtime/parachains/src/scheduler.rs +++ b/polkadot/runtime/parachains/src/scheduler.rs @@ -36,6 +36,8 @@ //! number of groups as availability cores. Validator groups will be assigned to different //! availability cores over time. +use core::iter::Peekable; + use crate::{configuration, initializer::SessionChangeNotification, paras}; use frame_support::{pallet_prelude::*, traits::Defensive}; use frame_system::pallet_prelude::BlockNumberFor; @@ -45,7 +47,10 @@ use polkadot_primitives::{ }; use sp_runtime::traits::One; use sp_std::{ - collections::{btree_map::BTreeMap, vec_deque::VecDeque}, + collections::{ + btree_map::{self, BTreeMap}, + vec_deque::VecDeque, + }, prelude::*, }; @@ -190,7 +195,29 @@ pub mod pallet { } } -type PositionInClaimqueue = u32; +type PositionInClaimQueue = u32; + +struct ClaimQueueIterator { + next_idx: u32, + queue: Peekable>>, +} + +impl Iterator for ClaimQueueIterator { + type Item = (CoreIndex, VecDeque); + + fn next(&mut self) -> Option { + let (idx, _) = self.queue.peek()?; + let val = if idx != &CoreIndex(self.next_idx) { + log::trace!(target: LOG_TARGET, "idx did not match claim queue idx: {:?} vs {:?}", idx, self.next_idx); + (CoreIndex(self.next_idx), VecDeque::new()) + } else { + let (idx, q) = self.queue.next()?; + (idx, q) + }; + self.next_idx += 1; + Some(val) + } +} impl Pallet { /// Called by the initializer to initialize the scheduler pallet. @@ -203,7 +230,7 @@ impl Pallet { /// Called before the initializer notifies of a new session. pub(crate) fn pre_new_session() { - Self::push_claimqueue_items_to_assignment_provider(); + Self::push_claim_queue_items_to_assignment_provider(); Self::push_occupied_cores_to_assignment_provider(); } @@ -309,37 +336,51 @@ impl Pallet { (concluded_paras, timedout_paras) } - /// Note that the given cores have become occupied. Update the claimqueue accordingly. + /// Get an iterator into the claim queues. + /// + /// This iterator will have an item for each and every core index up to the maximum core index + /// found in the claim queue. In other words there will be no holes/missing core indices, + /// between core 0 and the maximum, even if the claim queue was missing entries for particular + /// indices in between. (The iterator will return an empty `VecDeque` for those indices. + fn claim_queue_iterator() -> impl Iterator>)> { + let queues = ClaimQueue::::get(); + return ClaimQueueIterator::> { + next_idx: 0, + queue: queues.into_iter().peekable(), + } + } + + /// Note that the given cores have become occupied. Update the claim queue accordingly. pub(crate) fn occupied( now_occupied: BTreeMap, - ) -> BTreeMap { + ) -> BTreeMap { let mut availability_cores = AvailabilityCores::::get(); log::debug!(target: LOG_TARGET, "[occupied] now_occupied {:?}", now_occupied); - let pos_mapping: BTreeMap = now_occupied + let pos_mapping: BTreeMap = now_occupied .iter() .flat_map(|(core_idx, para_id)| { - match Self::remove_from_claimqueue(*core_idx, *para_id) { + match Self::remove_from_claim_queue(*core_idx, *para_id) { Err(e) => { log::debug!( target: LOG_TARGET, - "[occupied] error on remove_from_claimqueue {}", + "[occupied] error on remove_from_claim queue {}", e ); None }, - Ok((pos_in_claimqueue, pe)) => { + Ok((pos_in_claim_queue, pe)) => { availability_cores[core_idx.0 as usize] = CoreOccupied::Paras(pe); - Some((*core_idx, pos_in_claimqueue)) + Some((*core_idx, pos_in_claim_queue)) }, } }) .collect(); // Drop expired claims after processing now_occupied. - Self::drop_expired_claims_from_claimqueue(); + Self::drop_expired_claims_from_claim_queue(); AvailabilityCores::::set(availability_cores); @@ -349,7 +390,7 @@ impl Pallet { /// Iterates through every element in all claim queues and tries to add new assignments from the /// `AssignmentProvider`. A claim is considered expired if it's `ttl` field is lower than the /// current block height. - fn drop_expired_claims_from_claimqueue() { + fn drop_expired_claims_from_claim_queue() { let now = frame_system::Pallet::::block_number(); let availability_cores = AvailabilityCores::::get(); let ttl = configuration::ActiveConfig::::get().scheduler_params.ttl; @@ -357,13 +398,13 @@ impl Pallet { ClaimQueue::::mutate(|cq| { for (idx, _) in (0u32..).zip(availability_cores) { let core_idx = CoreIndex(idx); - if let Some(core_claimqueue) = cq.get_mut(&core_idx) { + if let Some(core_claim_queue) = cq.get_mut(&core_idx) { let mut i = 0; let mut num_dropped = 0; - while i < core_claimqueue.len() { - let maybe_dropped = if let Some(entry) = core_claimqueue.get(i) { + while i < core_claim_queue.len() { + let maybe_dropped = if let Some(entry) = core_claim_queue.get(i) { if entry.ttl < now { - core_claimqueue.remove(i) + core_claim_queue.remove(i) } else { None } @@ -381,11 +422,11 @@ impl Pallet { for _ in 0..num_dropped { // For all claims dropped due to TTL, attempt to pop a new entry to - // the back of the claimqueue. + // the back of the claim queue. if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) { - core_claimqueue.push_back(ParasEntry::new(assignment, now + ttl)); + core_claim_queue.push_back(ParasEntry::new(assignment, now + ttl)); } } } @@ -536,10 +577,10 @@ impl Pallet { } // on new session - fn push_claimqueue_items_to_assignment_provider() { + fn push_claim_queue_items_to_assignment_provider() { for (_, claim_queue) in ClaimQueue::::take() { // Push back in reverse order so that when we pop from the provider again, - // the entries in the claimqueue are in the same order as they are right now. + // the entries in the claim queue are in the same order as they are right now. for para_entry in claim_queue.into_iter().rev() { Self::maybe_push_assignment(para_entry); } @@ -554,15 +595,8 @@ impl Pallet { } } - // - // ClaimQueue related functions - // - fn claimqueue_lookahead() -> u32 { - configuration::ActiveConfig::::get().scheduler_params.lookahead - } - - /// Frees cores and fills the free claimqueue spots by popping from the `AssignmentProvider`. - pub fn free_cores_and_fill_claimqueue( + /// Frees cores and fills the free claim queue spots by popping from the `AssignmentProvider`. + pub fn free_cores_and_fill_claim_queue( just_freed_cores: impl IntoIterator, now: BlockNumberFor, ) { @@ -573,26 +607,33 @@ impl Pallet { if ValidatorGroups::::decode_len().map_or(true, |l| l == 0) { return } - // If there exists a core, ensure we schedule at least one job onto it. - let n_lookahead = Self::claimqueue_lookahead().max(1); let n_session_cores = T::AssignmentProvider::session_core_count(); let cq = ClaimQueue::::get(); let config = configuration::ActiveConfig::::get(); + // Extra sanity, config should already never be smaller than 1: + let n_lookahead = config.scheduler_params.lookahead.max(1); let max_availability_timeouts = config.scheduler_params.max_availability_timeouts; let ttl = config.scheduler_params.ttl; for core_idx in 0..n_session_cores { let core_idx = CoreIndex::from(core_idx); + let n_lookahead_used = cq.get(&core_idx).map_or(0, |v| v.len() as u32); + // add previously timedout paras back into the queue if let Some(mut entry) = timedout_paras.remove(&core_idx) { if entry.availability_timeouts < max_availability_timeouts { // Increment the timeout counter. entry.availability_timeouts += 1; - // Reset the ttl so that a timed out assignment. - entry.ttl = now + ttl; - Self::add_to_claimqueue(core_idx, entry); - // The claim has been added back into the claimqueue. + if n_lookahead_used < n_lookahead { + entry.ttl = now + ttl; + } else { + // Over max capacity, we need to bump ttl (we exceeded the claim queue + // size, so otherwise the entry might get dropped before reaching the top): + entry.ttl = now + ttl + One::one(); + } + Self::add_to_claim_queue(core_idx, entry); + // The claim has been added back into the claim queue. // Do not pop another assignment for the core. continue } else { @@ -606,12 +647,9 @@ impl Pallet { if let Some(concluded_para) = concluded_paras.remove(&core_idx) { T::AssignmentProvider::report_processed(concluded_para); } - // We consider occupied cores to be part of the claimqueue - let n_lookahead_used = cq.get(&core_idx).map_or(0, |v| v.len() as u32) + - if Self::is_core_occupied(core_idx) { 1 } else { 0 }; for _ in n_lookahead_used..n_lookahead { if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) { - Self::add_to_claimqueue(core_idx, ParasEntry::new(assignment, now + ttl)); + Self::add_to_claim_queue(core_idx, ParasEntry::new(assignment, now + ttl)); } } } @@ -620,24 +658,17 @@ impl Pallet { debug_assert!(concluded_paras.is_empty()); } - fn is_core_occupied(core_idx: CoreIndex) -> bool { - match AvailabilityCores::::get().get(core_idx.0 as usize) { - None | Some(CoreOccupied::Free) => false, - Some(CoreOccupied::Paras(_)) => true, - } - } - - fn add_to_claimqueue(core_idx: CoreIndex, pe: ParasEntryType) { + fn add_to_claim_queue(core_idx: CoreIndex, pe: ParasEntryType) { ClaimQueue::::mutate(|la| { la.entry(core_idx).or_default().push_back(pe); }); } /// Returns `ParasEntry` with `para_id` at `core_idx` if found. - fn remove_from_claimqueue( + fn remove_from_claim_queue( core_idx: CoreIndex, para_id: ParaId, - ) -> Result<(PositionInClaimqueue, ParasEntryType), &'static str> { + ) -> Result<(PositionInClaimQueue, ParasEntryType), &'static str> { ClaimQueue::::mutate(|cq| { let core_claims = cq.get_mut(&core_idx).ok_or("core_idx not found in lookahead")?; @@ -654,20 +685,38 @@ impl Pallet { /// Paras scheduled next in the claim queue. pub(crate) fn scheduled_paras() -> impl Iterator { - let claimqueue = ClaimQueue::::get(); - claimqueue + let claim_queue = ClaimQueue::::get(); + claim_queue .into_iter() .filter_map(|(core_idx, v)| v.front().map(|e| (core_idx, e.assignment.para_id()))) } + /// Paras that may get backed on cores. + /// + /// 1. The para must be scheduled on core. + /// 2. Core needs to be free, otherwise backing is not possible. + pub(crate) fn eligible_paras() -> impl Iterator { + let availability_cores = AvailabilityCores::::get(); + + Self::claim_queue_iterator().zip(availability_cores.into_iter()).filter_map( + |((core_idx, queue), core)| { + if core != CoreOccupied::Free { + return None + } + let next_scheduled = queue.front()?; + Some((core_idx, next_scheduled.assignment.para_id())) + }, + ) + } + #[cfg(any(feature = "try-runtime", test))] - fn claimqueue_len() -> usize { + fn claim_queue_len() -> usize { ClaimQueue::::get().iter().map(|la_vec| la_vec.1.len()).sum() } #[cfg(all(not(feature = "runtime-benchmarks"), test))] - pub(crate) fn claimqueue_is_empty() -> bool { - Self::claimqueue_len() == 0 + pub(crate) fn claim_queue_is_empty() -> bool { + Self::claim_queue_len() == 0 } #[cfg(test)] @@ -676,7 +725,7 @@ impl Pallet { } #[cfg(test)] - pub(crate) fn set_claimqueue(claimqueue: BTreeMap>>) { - ClaimQueue::::set(claimqueue); + pub(crate) fn set_claim_queue(claim_queue: BTreeMap>>) { + ClaimQueue::::set(claim_queue); } } diff --git a/polkadot/runtime/parachains/src/scheduler/migration.rs b/polkadot/runtime/parachains/src/scheduler/migration.rs index 57f4fd670fbe2..84d7d4b567102 100644 --- a/polkadot/runtime/parachains/src/scheduler/migration.rs +++ b/polkadot/runtime/parachains/src/scheduler/migration.rs @@ -248,7 +248,7 @@ mod v1 { .count(); ensure!( - Pallet::::claimqueue_len() as u32 + availability_cores_waiting as u32 == + Pallet::::claim_queue_len() as u32 + availability_cores_waiting as u32 == expected_len, "ClaimQueue and AvailabilityCores should have the correct length", ); diff --git a/polkadot/runtime/parachains/src/scheduler/tests.rs b/polkadot/runtime/parachains/src/scheduler/tests.rs index 74ad8adf00c4c..32811241e171c 100644 --- a/polkadot/runtime/parachains/src/scheduler/tests.rs +++ b/polkadot/runtime/parachains/src/scheduler/tests.rs @@ -80,7 +80,7 @@ fn run_to_block( Scheduler::initializer_initialize(b + 1); // In the real runtime this is expected to be called by the `InclusionInherent` pallet. - Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1); + Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1); } } @@ -158,6 +158,37 @@ fn scheduled_entries() -> impl Iterator(vec![para_id])); // Claim is dropped post call. - Scheduler::drop_expired_claims_from_claimqueue(); + Scheduler::drop_expired_claims_from_claim_queue(); assert!(!claimqueue_contains_para_ids::(vec![para_id])); // Add a claim on core 0 with a ttl in the future (15). let paras_entry = ParasEntry::new(Assignment::Bulk(para_id), now + 5); - Scheduler::add_to_claimqueue(core_idx, paras_entry.clone()); + Scheduler::add_to_claim_queue(core_idx, paras_entry.clone()); // Claim is in queue post call. - Scheduler::drop_expired_claims_from_claimqueue(); + Scheduler::drop_expired_claims_from_claim_queue(); assert!(claimqueue_contains_para_ids::(vec![para_id])); now = now + 6; run_to_block(now, |_| None); // Claim is dropped - Scheduler::drop_expired_claims_from_claimqueue(); + Scheduler::drop_expired_claims_from_claim_queue(); assert!(!claimqueue_contains_para_ids::(vec![para_id])); // Add a claim on core 0 with a ttl == now (16) let paras_entry = ParasEntry::new(Assignment::Bulk(para_id), now); - Scheduler::add_to_claimqueue(core_idx, paras_entry.clone()); + Scheduler::add_to_claim_queue(core_idx, paras_entry.clone()); // Claim is in queue post call. - Scheduler::drop_expired_claims_from_claimqueue(); + Scheduler::drop_expired_claims_from_claim_queue(); assert!(claimqueue_contains_para_ids::(vec![para_id])); now = now + 1; run_to_block(now, |_| None); // Drop expired claim. - Scheduler::drop_expired_claims_from_claimqueue(); + Scheduler::drop_expired_claims_from_claim_queue(); assert!(!claimqueue_contains_para_ids::(vec![para_id])); // Add a claim on core 0 with a ttl == now (17) let paras_entry_non_expired = ParasEntry::new(Assignment::Bulk(para_id), now); let paras_entry_expired = ParasEntry::new(Assignment::Bulk(para_id), now - 2); // ttls = [17, 15, 17] - Scheduler::add_to_claimqueue(core_idx, paras_entry_non_expired.clone()); - Scheduler::add_to_claimqueue(core_idx, paras_entry_expired.clone()); - Scheduler::add_to_claimqueue(core_idx, paras_entry_non_expired.clone()); + Scheduler::add_to_claim_queue(core_idx, paras_entry_non_expired.clone()); + Scheduler::add_to_claim_queue(core_idx, paras_entry_expired.clone()); + Scheduler::add_to_claim_queue(core_idx, paras_entry_non_expired.clone()); let cq = scheduler::ClaimQueue::::get(); assert_eq!(cq.get(&core_idx).unwrap().len(), 3); @@ -231,7 +262,7 @@ fn claimqueue_ttl_drop_fn_works() { MockAssigner::add_test_assignment(assignment.clone()); // Drop expired claim. - Scheduler::drop_expired_claims_from_claimqueue(); + Scheduler::drop_expired_claims_from_claim_queue(); let cq = scheduler::ClaimQueue::::get(); let cqc = cq.get(&core_idx).unwrap(); @@ -378,7 +409,7 @@ fn fill_claimqueue_fills() { run_to_block(2, |_| None); { - assert_eq!(Scheduler::claimqueue_len(), 3); + assert_eq!(Scheduler::claim_queue_len(), 3); let scheduled: BTreeMap<_, _> = scheduled_entries().collect(); // Was added a block later, note the TTL. @@ -488,9 +519,8 @@ fn schedule_schedules_including_just_freed() { .for_each(|(_core_idx, core_queue)| assert_eq!(core_queue.len(), 0)) } - // add a couple more para claims - the claim on `b` will go to the 3rd core - // (2) and the claim on `d` will go back to the 1st para core (0). The claim on `e` - // then will go for core `1`. + MockAssigner::add_test_assignment(assignment_a.clone()); + MockAssigner::add_test_assignment(assignment_c.clone()); MockAssigner::add_test_assignment(assignment_b.clone()); MockAssigner::add_test_assignment(assignment_d.clone()); MockAssigner::add_test_assignment(assignment_e.clone()); @@ -500,8 +530,7 @@ fn schedule_schedules_including_just_freed() { { let scheduled: BTreeMap<_, _> = scheduled_entries().collect(); - // cores 0 and 1 are occupied by claims. core 2 was free. - assert_eq!(scheduled.len(), 1); + assert_eq!(scheduled.len(), 3); assert_eq!( scheduled.get(&CoreIndex(2)).unwrap(), &ParasEntry { @@ -519,7 +548,7 @@ fn schedule_schedules_including_just_freed() { ] .into_iter() .collect(); - Scheduler::free_cores_and_fill_claimqueue(just_updated, now); + Scheduler::free_cores_and_fill_claim_queue(just_updated, now); { let scheduled: BTreeMap<_, _> = scheduled_entries().collect(); @@ -529,17 +558,28 @@ fn schedule_schedules_including_just_freed() { assert_eq!( scheduled.get(&CoreIndex(0)).unwrap(), &ParasEntry { - assignment: Assignment::Bulk(para_d), + // Next entry in queue is `a` again: + assignment: Assignment::Bulk(para_a), availability_timeouts: 0, ttl: 8 }, ); // Although C was descheduled, the core `2` was occupied so C goes back to the queue. + assert_eq!( + scheduler::ClaimQueue::::get()[&CoreIndex(1)][1], + ParasEntry { + assignment: Assignment::Bulk(para_c), + // End of the queue should be the pushed back entry: + availability_timeouts: 1, + // ttl 1 higher: + ttl: 9 + }, + ); assert_eq!( scheduled.get(&CoreIndex(1)).unwrap(), &ParasEntry { assignment: Assignment::Bulk(para_c), - availability_timeouts: 1, + availability_timeouts: 0, ttl: 8 }, ); @@ -552,8 +592,6 @@ fn schedule_schedules_including_just_freed() { }, ); - // Para A claim should have been wiped, but para C claim should remain. - assert!(!claimqueue_contains_para_ids::(vec![para_a])); assert!(claimqueue_contains_para_ids::(vec![para_c])); assert!(!availability_cores_contains_para_ids::(vec![para_a, para_c])); } @@ -627,12 +665,13 @@ fn schedule_clears_availability_cores() { // Add more assignments MockAssigner::add_test_assignment(assignment_a.clone()); + MockAssigner::add_test_assignment(assignment_b.clone()); MockAssigner::add_test_assignment(assignment_c.clone()); run_to_block(3, |_| None); // now note that cores 0 and 2 were freed. - Scheduler::free_cores_and_fill_claimqueue( + Scheduler::free_cores_and_fill_claim_queue( vec![(CoreIndex(0), FreedReason::Concluded), (CoreIndex(2), FreedReason::Concluded)] .into_iter() .collect::>(), @@ -807,7 +846,7 @@ fn on_demand_claims_are_pruned_after_timing_out() { ] .into_iter() .collect(); - Scheduler::free_cores_and_fill_claimqueue(just_updated, now); + Scheduler::free_cores_and_fill_claim_queue(just_updated, now); // ParaId a exists in the claim queue until max_retries is reached. if n < max_timeouts + now { @@ -854,7 +893,7 @@ fn on_demand_claims_are_pruned_after_timing_out() { } } - Scheduler::free_cores_and_fill_claimqueue(just_updated, now); + Scheduler::free_cores_and_fill_claim_queue(just_updated, now); // ParaId a exists in the claim queue until groups are rotated. if n < 31 { @@ -943,12 +982,12 @@ fn next_up_on_available_uses_next_scheduled_or_none() { ttl: 5 as u32, }; - Scheduler::add_to_claimqueue(CoreIndex(0), entry_a.clone()); + Scheduler::add_to_claim_queue(CoreIndex(0), entry_a.clone()); run_to_block(2, |_| None); { - assert_eq!(Scheduler::claimqueue_len(), 1); + assert_eq!(Scheduler::claim_queue_len(), 1); assert_eq!(scheduler::AvailabilityCores::::get().len(), 1); let mut map = BTreeMap::new(); @@ -963,7 +1002,7 @@ fn next_up_on_available_uses_next_scheduled_or_none() { assert!(Scheduler::next_up_on_available(CoreIndex(0)).is_none()); - Scheduler::add_to_claimqueue(CoreIndex(0), entry_b); + Scheduler::add_to_claim_queue(CoreIndex(0), entry_b); assert_eq!( Scheduler::next_up_on_available(CoreIndex(0)).unwrap(), @@ -1032,7 +1071,7 @@ fn next_up_on_time_out_reuses_claim_if_nothing_queued() { MockAssigner::add_test_assignment(assignment_b.clone()); // Pop assignment_b into the claimqueue - Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), 2); + Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), 2); //// Now that there is an earlier next-up, we use that. assert_eq!( @@ -1113,7 +1152,7 @@ fn session_change_requires_reschedule_dropping_removed_paras() { _ => None, }); - Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), 3); + Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), 3); assert_eq!( scheduler::ClaimQueue::::get(), @@ -1161,7 +1200,7 @@ fn session_change_requires_reschedule_dropping_removed_paras() { let groups = ValidatorGroups::::get(); assert_eq!(groups.len(), 5); - Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), 4); + Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), 4); assert_eq!( scheduler::ClaimQueue::::get(), diff --git a/prdoc/pr_4691.prdoc b/prdoc/pr_4691.prdoc new file mode 100644 index 0000000000000..18cbb2296d43b --- /dev/null +++ b/prdoc/pr_4691.prdoc @@ -0,0 +1,14 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Fix claim queue size + +doc: + - audience: Runtime User + description: | + Ensure claim queue size is always the number configured by ` scheduler_params.lookahead`. Previously the claim queue of a core was shortened by 1 if the core was occupied. + + +crates: + - name: polkadot-runtime-parachains + bump: minor