diff --git a/polkadot/runtime/parachains/src/assigner.rs b/polkadot/runtime/parachains/src/assigner.rs index b21e857a47137..9e408df61dc18 100644 --- a/polkadot/runtime/parachains/src/assigner.rs +++ b/polkadot/runtime/parachains/src/assigner.rs @@ -53,7 +53,8 @@ impl Pallet { fn is_bulk_core(core_idx: &CoreIndex) -> bool { let parachain_cores = as AssignmentProvider>>::session_core_count(); - (0..parachain_cores).contains(&core_idx.0) + + core_idx.0 < parachain_cores } } diff --git a/polkadot/runtime/parachains/src/assigner_parachains.rs b/polkadot/runtime/parachains/src/assigner_parachains.rs index d605d86605151..866e8290052a8 100644 --- a/polkadot/runtime/parachains/src/assigner_parachains.rs +++ b/polkadot/runtime/parachains/src/assigner_parachains.rs @@ -39,7 +39,7 @@ pub mod pallet { impl AssignmentProvider> for Pallet { fn session_core_count() -> u32 { - >::parachains().len() as u32 + paras::Parachains::::decode_len().unwrap_or(0) as u32 } fn pop_assignment_for_core( @@ -62,7 +62,7 @@ impl AssignmentProvider> for Pallet { max_availability_timeouts: 0, // The next assignment already goes to the same [`ParaId`], this can be any number // that's high enough to clear the time it takes to clear backing/availability. - ttl: BlockNumberFor::::from(10u32), + ttl: 10u32.into(), } } } diff --git a/polkadot/runtime/parachains/src/scheduler.rs b/polkadot/runtime/parachains/src/scheduler.rs index 60b2a9254600e..af48019124de2 100644 --- a/polkadot/runtime/parachains/src/scheduler.rs +++ b/polkadot/runtime/parachains/src/scheduler.rs @@ -605,14 +605,10 @@ impl Pallet { /// Moves all elements in the claimqueue forward. fn move_claimqueue_forward() { let mut cq = ClaimQueue::::get(); - for (_, core_queue) in cq.iter_mut() { + for core_queue in cq.values_mut() { // First pop the finished claims from the front. - match core_queue.front() { - None => {}, - Some(None) => { - core_queue.pop_front(); - }, - Some(_) => {}, + if let Some(None) = core_queue.front() { + core_queue.pop_front(); } } @@ -628,9 +624,10 @@ impl Pallet { // This can only happen on new sessions at which we move all assignments back to the // provider. Hence, there's nothing we need to do here. - if ValidatorGroups::::get().is_empty() { + if ValidatorGroups::::decode_len().map_or(true, |l| l == 0) { return } + let n_lookahead = Self::claimqueue_lookahead(); let n_session_cores = T::AssignmentProvider::session_core_count(); let cq = ClaimQueue::::get(); @@ -686,8 +683,7 @@ impl Pallet { fn add_to_claimqueue(core_idx: CoreIndex, pe: ParasEntry>) { ClaimQueue::::mutate(|la| { - let la_deque = la.entry(core_idx).or_insert_with(|| VecDeque::new()); - la_deque.push_back(Some(pe)); + la.entry(core_idx).or_default().push_back(Some(pe)); }); } diff --git a/polkadot/runtime/parachains/src/scheduler/migration.rs b/polkadot/runtime/parachains/src/scheduler/migration.rs index c1ce95b10e3cb..bb9a647e955ca 100644 --- a/polkadot/runtime/parachains/src/scheduler/migration.rs +++ b/polkadot/runtime/parachains/src/scheduler/migration.rs @@ -25,36 +25,45 @@ use frame_support::{ mod v0 { use super::*; - use primitives::CollatorId; + use primitives::{CollatorId, Id}; + #[storage_alias] pub(super) type Scheduled = StorageValue, Vec, ValueQuery>; - #[derive(Encode, Decode)] - pub struct QueuedParathread { - claim: primitives::ParathreadEntry, - core_offset: u32, - } + #[derive(Clone, Encode, Decode)] + #[cfg_attr(feature = "std", derive(PartialEq))] + pub struct ParathreadClaim(pub Id, pub CollatorId); - #[derive(Encode, Decode, Default)] - pub struct ParathreadClaimQueue { - queue: Vec, - next_core_offset: u32, + #[derive(Clone, Encode, Decode)] + #[cfg_attr(feature = "std", derive(PartialEq))] + pub struct ParathreadEntry { + /// The claim. + pub claim: ParathreadClaim, + /// Number of retries. + pub retries: u32, } - // Only here to facilitate the migration. - impl ParathreadClaimQueue { - pub fn len(self) -> usize { - self.queue.len() - } + /// What is occupying a specific availability core. + #[derive(Clone, Encode, Decode)] + #[cfg_attr(feature = "std", derive(PartialEq))] + pub enum CoreOccupied { + /// A parathread. + Parathread(ParathreadEntry), + /// A parachain. + Parachain, } + /// The actual type isn't important, as we only delete the key in the state. #[storage_alias] - pub(super) type ParathreadQueue = - StorageValue, ParathreadClaimQueue, ValueQuery>; + pub(crate) type AvailabilityCores = + StorageValue, Vec>, ValueQuery>; + /// The actual type isn't important, as we only delete the key in the state. #[storage_alias] - pub(super) type ParathreadClaimIndex = - StorageValue, Vec, ValueQuery>; + pub(super) type ParathreadQueue = StorageValue, (), ValueQuery>; + + #[storage_alias] + pub(super) type ParathreadClaimIndex = StorageValue, (), ValueQuery>; /// The assignment type. #[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)] @@ -108,30 +117,36 @@ pub mod v1 { #[cfg(feature = "try-runtime")] fn pre_upgrade() -> Result, sp_runtime::DispatchError> { - log::trace!( + let n: u32 = v0::Scheduled::::get().len() as u32 + + v0::AvailabilityCores::::get().iter().filter(|c| c.is_some()).count() as u32; + + log::info!( target: crate::scheduler::LOG_TARGET, - "Scheduled before migration: {}", - v0::Scheduled::::get().len() + "Number of scheduled and waiting for availability before: {n}", ); - let bytes = u32::to_be_bytes(v0::Scheduled::::get().len() as u32); - - Ok(bytes.to_vec()) + Ok(n.encode()) } #[cfg(feature = "try-runtime")] fn post_upgrade(state: Vec) -> Result<(), sp_runtime::DispatchError> { - log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()"); + log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()"); ensure!( - v0::Scheduled::::get().len() == 0, + v0::Scheduled::::get().is_empty(), "Scheduled should be empty after the migration" ); - let sched_len = u32::from_be_bytes(state.try_into().unwrap()); + let expected_len = u32::decode(&mut &state[..]).unwrap(); + let availability_cores_waiting = super::AvailabilityCores::::get() + .iter() + .filter(|c| !matches!(c, CoreOccupied::Free)) + .count(); + ensure!( - Pallet::::claimqueue_len() as u32 == sched_len, - "Scheduled completely moved to ClaimQueue after migration" + Pallet::::claimqueue_len() as u32 + availability_cores_waiting as u32 == + expected_len, + "ClaimQueue and AvailabilityCores should have the correct length", ); Ok(()) @@ -142,11 +157,8 @@ pub mod v1 { pub fn migrate_to_v1() -> Weight { let mut weight: Weight = Weight::zero(); - let pq = v0::ParathreadQueue::::take(); - let pq_len = pq.len() as u64; - - let pci = v0::ParathreadClaimIndex::::take(); - let pci_len = pci.len() as u64; + v0::ParathreadQueue::::kill(); + v0::ParathreadClaimIndex::::kill(); let now = >::block_number(); let scheduled = v0::Scheduled::::take(); @@ -158,10 +170,35 @@ pub fn migrate_to_v1() -> Weight { Pallet::::add_to_claimqueue(core_idx, pe); } + let parachains = paras::Pallet::::parachains(); + let availability_cores = v0::AvailabilityCores::::take(); + let mut new_availability_cores = Vec::new(); + + for (core_index, core) in availability_cores.into_iter().enumerate() { + let new_core = if let Some(core) = core { + match core { + v0::CoreOccupied::Parachain => CoreOccupied::Paras(ParasEntry::new( + Assignment::new(parachains[core_index]), + now, + )), + v0::CoreOccupied::Parathread(entry) => + CoreOccupied::Paras(ParasEntry::new(Assignment::new(entry.claim.0), now)), + } + } else { + CoreOccupied::Free + }; + + new_availability_cores.push(new_core); + } + + super::AvailabilityCores::::set(new_availability_cores); + // 2x as once for Scheduled and once for Claimqueue weight = weight.saturating_add(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len)); - weight = weight.saturating_add(T::DbWeight::get().reads_writes(pq_len, pq_len)); - weight = weight.saturating_add(T::DbWeight::get().reads_writes(pci_len, pci_len)); + // reading parachains + availability_cores, writing AvailabilityCores + weight = weight.saturating_add(T::DbWeight::get().reads_writes(2, 1)); + // 2x kill + weight = weight.saturating_add(T::DbWeight::get().writes(2)); weight }