Skip to content
16 changes: 5 additions & 11 deletions polkadot/runtime/parachains/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
.keys()
.flat_map(|para_id| {
(0..elastic_paras.get(&para_id).cloned().unwrap_or(1))
.filter_map(|_para_local_core_idx| {
.map(|_para_local_core_idx| {
let ttl = configuration::ActiveConfig::<T>::get().scheduler_params.ttl;
// Load an assignment into provider so that one is present to pop
let assignment =
Expand All @@ -859,17 +859,11 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
ParaId::from(*para_id),
);

let entry = (
CoreIndex(core_idx),
[ParasEntry::new(assignment, now + ttl)].into(),
);
let res = if builder.unavailable_cores.contains(&core_idx) {
None
} else {
Some(entry)
};
core_idx += 1;
res
(
CoreIndex(core_idx - 1),
[ParasEntry::new(assignment, now + ttl)].into(),
)
})
.collect::<Vec<(CoreIndex, VecDeque<ParasEntry<_>>)>>()
})
Expand Down
6 changes: 6 additions & 0 deletions polkadot/runtime/parachains/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ pub enum InconsistentError<BlockNumber> {
InconsistentExecutorParams { inner: ExecutorParamError },
/// TTL should be bigger than lookahead
LookaheadExceedsTTL,
/// Lookahead is zero, while it must be at least 1 for parachains to work.
LookaheadZero,
/// Passed in queue size for on-demand was too large.
OnDemandQueueSizeTooLarge,
/// Number of delay tranches cannot be 0.
Expand Down Expand Up @@ -432,6 +434,10 @@ where
return Err(LookaheadExceedsTTL)
}

if self.scheduler_params.lookahead == 0 {
return Err(LookaheadZero)
}

if self.scheduler_params.on_demand_queue_max_size > ON_DEMAND_MAX_QUEUE_MAX_SIZE {
return Err(OnDemandQueueSizeTooLarge)
}
Expand Down
14 changes: 7 additions & 7 deletions polkadot/runtime/parachains/src/paras_inherent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,25 +570,25 @@ impl<T: Config> Pallet<T> {
.map(|b| *b)
.unwrap_or(false);

let mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
let mut total_scheduled_cores = 0;
let mut eligible: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
let mut total_eligible_cores = 0;

for (core_idx, para_id) in scheduler::Pallet::<T>::scheduled_paras() {
total_scheduled_cores += 1;
scheduled.entry(para_id).or_default().insert(core_idx);
for (core_idx, para_id) in scheduler::Pallet::<T>::eligible_paras() {
total_eligible_cores += 1;
eligible.entry(para_id).or_default().insert(core_idx);
}

let initial_candidate_count = backed_candidates.len();
let backed_candidates_with_core = sanitize_backed_candidates::<T>(
backed_candidates,
&allowed_relay_parents,
concluded_invalid_hashes,
scheduled,
eligible,
core_index_enabled,
);
let count = count_backed_candidates(&backed_candidates_with_core);

ensure!(count <= total_scheduled_cores, Error::<T>::UnscheduledCandidate);
ensure!(count <= total_eligible_cores, Error::<T>::UnscheduledCandidate);

METRICS.on_candidates_sanitized(count as u64);

Expand Down
12 changes: 10 additions & 2 deletions polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Put implementations of functions from staging APIs here.

use crate::{inclusion, initializer, scheduler};
use crate::{configuration, inclusion, initializer, scheduler};
use primitives::{CommittedCandidateReceipt, CoreIndex, Id as ParaId};
use sp_runtime::traits::One;
use sp_std::{
Expand All @@ -33,11 +33,19 @@ pub fn claim_queue<T: scheduler::Config>() -> BTreeMap<CoreIndex, VecDeque<ParaI
// At the end of a session we clear the claim queues: Without this update call, nothing would be
// scheduled to the client.
<scheduler::Pallet<T>>::free_cores_and_fill_claimqueue(Vec::new(), now);
let config = configuration::ActiveConfig::<T>::get();
// Extra sanity, config should already never be smaller than 1:
let n_lookahead = config.scheduler_params.lookahead.max(1);

scheduler::ClaimQueue::<T>::get()
.into_iter()
.map(|(core_index, entries)| {
(core_index, entries.into_iter().map(|e| e.para_id()).collect())
// on cores timing out internal claim queue size may be temporarily longer than it
// should be:
(
core_index,
entries.into_iter().map(|e| e.para_id()).take(n_lookahead as usize).collect(),
)
})
.collect()
}
Expand Down
88 changes: 62 additions & 26 deletions polkadot/runtime/parachains/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use primitives::{
};
use sp_runtime::traits::One;
use sp_std::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque},
collections::{
btree_map::{self, BTreeMap},
vec_deque::VecDeque,
},
prelude::*,
};

Expand Down Expand Up @@ -190,7 +193,7 @@ pub mod pallet {
}
}

type PositionInClaimqueue = u32;
type PositionInClaimQueue = u32;

impl<T: Config> Pallet<T> {
/// Called by the initializer to initialize the scheduler pallet.
Expand Down Expand Up @@ -309,15 +312,41 @@ impl<T: Config> Pallet<T> {
(concluded_paras, timedout_paras)
}

/// Get an iterator into the claim queues.
///
/// This iterator will have an item for each and every core index (no holes).
fn claim_queue_iterator() -> impl Iterator<Item = (CoreIndex, VecDeque<ParasEntryType<T>>)> {
let queues = ClaimQueue::<T>::get();
struct ClaimQueueIterator<T: Config> {
next_idx: u32,
queue: btree_map::IntoIter<CoreIndex, VecDeque<ParasEntryType<T>>>,
}
impl<T: Config> Iterator for ClaimQueueIterator<T> {
type Item = (CoreIndex, VecDeque<ParasEntryType<T>>);

fn next(&mut self) -> Option<Self::Item> {
let (idx, q) = self.queue.next()?;
let val = if idx != CoreIndex(self.next_idx) {
(CoreIndex(self.next_idx), VecDeque::new())
} else {
(idx, q)
};
self.next_idx += 1;
Some(val)
}
}
return ClaimQueueIterator::<T> { next_idx: 0, queue: queues.into_iter() }
}

/// Note that the given cores have become occupied. Update the claimqueue accordingly.
pub(crate) fn occupied(
now_occupied: BTreeMap<CoreIndex, ParaId>,
) -> BTreeMap<CoreIndex, PositionInClaimqueue> {
) -> BTreeMap<CoreIndex, PositionInClaimQueue> {
let mut availability_cores = AvailabilityCores::<T>::get();

log::debug!(target: LOG_TARGET, "[occupied] now_occupied {:?}", now_occupied);

let pos_mapping: BTreeMap<CoreIndex, PositionInClaimqueue> = now_occupied
let pos_mapping: BTreeMap<CoreIndex, PositionInClaimQueue> = now_occupied
.iter()
.flat_map(|(core_idx, para_id)| {
match Self::remove_from_claimqueue(*core_idx, *para_id) {
Expand Down Expand Up @@ -554,13 +583,6 @@ impl<T: Config> Pallet<T> {
}
}

//
// ClaimQueue related functions
//
fn claimqueue_lookahead() -> u32 {
configuration::ActiveConfig::<T>::get().scheduler_params.lookahead
}

/// Frees cores and fills the free claimqueue spots by popping from the `AssignmentProvider`.
pub fn free_cores_and_fill_claimqueue(
just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
Expand All @@ -573,24 +595,30 @@ impl<T: Config> Pallet<T> {
if ValidatorGroups::<T>::decode_len().map_or(true, |l| l == 0) {
return
}
// If there exists a core, ensure we schedule at least one job onto it.
let n_lookahead = Self::claimqueue_lookahead().max(1);
let n_session_cores = T::AssignmentProvider::session_core_count();
let cq = ClaimQueue::<T>::get();
let config = configuration::ActiveConfig::<T>::get();
// Extra sanity, config should already never be smaller than 1:
let n_lookahead = config.scheduler_params.lookahead.max(1);
let max_availability_timeouts = config.scheduler_params.max_availability_timeouts;
let ttl = config.scheduler_params.ttl;

for core_idx in 0..n_session_cores {
let core_idx = CoreIndex::from(core_idx);

let n_lookahead_used = cq.get(&core_idx).map_or(0, |v| v.len() as u32);

// add previously timedout paras back into the queue
if let Some(mut entry) = timedout_paras.remove(&core_idx) {
if entry.availability_timeouts < max_availability_timeouts {
// Increment the timeout counter.
entry.availability_timeouts += 1;
// Reset the ttl so that a timed out assignment.
entry.ttl = now + ttl;
if n_lookahead_used < n_lookahead {
entry.ttl = now + ttl;
} else {
// Over max capacity, we need to bump ttl:
entry.ttl = now + ttl + One::one();
}
Self::add_to_claimqueue(core_idx, entry);
// The claim has been added back into the claimqueue.
// Do not pop another assignment for the core.
Expand All @@ -606,9 +634,6 @@ impl<T: Config> Pallet<T> {
if let Some(concluded_para) = concluded_paras.remove(&core_idx) {
T::AssignmentProvider::report_processed(concluded_para);
}
// We consider occupied cores to be part of the claimqueue
let n_lookahead_used = cq.get(&core_idx).map_or(0, |v| v.len() as u32) +
if Self::is_core_occupied(core_idx) { 1 } else { 0 };
for _ in n_lookahead_used..n_lookahead {
if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
Self::add_to_claimqueue(core_idx, ParasEntry::new(assignment, now + ttl));
Expand All @@ -620,13 +645,6 @@ impl<T: Config> Pallet<T> {
debug_assert!(concluded_paras.is_empty());
}

fn is_core_occupied(core_idx: CoreIndex) -> bool {
match AvailabilityCores::<T>::get().get(core_idx.0 as usize) {
None | Some(CoreOccupied::Free) => false,
Some(CoreOccupied::Paras(_)) => true,
}
}

fn add_to_claimqueue(core_idx: CoreIndex, pe: ParasEntryType<T>) {
ClaimQueue::<T>::mutate(|la| {
la.entry(core_idx).or_default().push_back(pe);
Expand All @@ -637,7 +655,7 @@ impl<T: Config> Pallet<T> {
fn remove_from_claimqueue(
core_idx: CoreIndex,
para_id: ParaId,
) -> Result<(PositionInClaimqueue, ParasEntryType<T>), &'static str> {
) -> Result<(PositionInClaimQueue, ParasEntryType<T>), &'static str> {
ClaimQueue::<T>::mutate(|cq| {
let core_claims = cq.get_mut(&core_idx).ok_or("core_idx not found in lookahead")?;

Expand All @@ -660,6 +678,24 @@ impl<T: Config> Pallet<T> {
.filter_map(|(core_idx, v)| v.front().map(|e| (core_idx, e.assignment.para_id())))
}

/// Paras that can be backed.
///
/// 1. Must be scheduled on core.
/// 2. Core must not be occupied.
pub(crate) fn eligible_paras() -> impl Iterator<Item = (CoreIndex, ParaId)> {
let availability_cores = AvailabilityCores::<T>::get();

Self::claim_queue_iterator().zip(availability_cores.into_iter()).filter_map(
|((core_idx, queue), core)| {
if core != CoreOccupied::Free {
return None
}
let next_scheduled = queue.front()?;
Some((core_idx, next_scheduled.assignment.para_id()))
},
)
}

#[cfg(any(feature = "try-runtime", test))]
fn claimqueue_len() -> usize {
ClaimQueue::<T>::get().iter().map(|la_vec| la_vec.1.len()).sum()
Expand Down
26 changes: 17 additions & 9 deletions polkadot/runtime/parachains/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,8 @@ fn schedule_schedules_including_just_freed() {
.for_each(|(_core_idx, core_queue)| assert_eq!(core_queue.len(), 0))
}

// add a couple more para claims - the claim on `b` will go to the 3rd core
// (2) and the claim on `d` will go back to the 1st para core (0). The claim on `e`
// then will go for core `1`.
MockAssigner::add_test_assignment(assignment_a.clone());
MockAssigner::add_test_assignment(assignment_c.clone());
MockAssigner::add_test_assignment(assignment_b.clone());
MockAssigner::add_test_assignment(assignment_d.clone());
MockAssigner::add_test_assignment(assignment_e.clone());
Expand All @@ -500,8 +499,7 @@ fn schedule_schedules_including_just_freed() {
{
let scheduled: BTreeMap<_, _> = scheduled_entries().collect();

// cores 0 and 1 are occupied by claims. core 2 was free.
assert_eq!(scheduled.len(), 1);
assert_eq!(scheduled.len(), 3);
assert_eq!(
scheduled.get(&CoreIndex(2)).unwrap(),
&ParasEntry {
Expand Down Expand Up @@ -529,17 +527,28 @@ fn schedule_schedules_including_just_freed() {
assert_eq!(
scheduled.get(&CoreIndex(0)).unwrap(),
&ParasEntry {
assignment: Assignment::Bulk(para_d),
// Next entry in queue is `a` again:
assignment: Assignment::Bulk(para_a),
availability_timeouts: 0,
ttl: 8
},
);
// Although C was descheduled, the core `2` was occupied so C goes back to the queue.
assert_eq!(
scheduler::ClaimQueue::<Test>::get()[&CoreIndex(1)][1],
ParasEntry {
assignment: Assignment::Bulk(para_c),
// End of the queue should be the pushed back entry:
availability_timeouts: 1,
// ttl 1 higher:
ttl: 9
},
);
assert_eq!(
scheduled.get(&CoreIndex(1)).unwrap(),
&ParasEntry {
assignment: Assignment::Bulk(para_c),
availability_timeouts: 1,
availability_timeouts: 0,
ttl: 8
},
);
Expand All @@ -552,8 +561,6 @@ fn schedule_schedules_including_just_freed() {
},
);

// Para A claim should have been wiped, but para C claim should remain.
assert!(!claimqueue_contains_para_ids::<Test>(vec![para_a]));
assert!(claimqueue_contains_para_ids::<Test>(vec![para_c]));
assert!(!availability_cores_contains_para_ids::<Test>(vec![para_a, para_c]));
}
Expand Down Expand Up @@ -627,6 +634,7 @@ fn schedule_clears_availability_cores() {

// Add more assignments
MockAssigner::add_test_assignment(assignment_a.clone());
MockAssigner::add_test_assignment(assignment_b.clone());
MockAssigner::add_test_assignment(assignment_c.clone());

run_to_block(3, |_| None);
Expand Down