Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
#[error(transparent)]
UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error("Parachain backing state not available in runtime.")]
MissingParaBackingState,
Expand Down
54 changes: 10 additions & 44 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,23 @@ use polkadot_node_primitives::{
SubmitCollationParams,
};
use polkadot_node_subsystem::{
messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
messages::{CollationGenerationMessage, CollatorProtocolMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_para_backing_state, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
request_async_backing_params, request_availability_cores, request_para_backing_state,
request_persisted_validation_data, request_validation_code, request_validation_code_hash,
request_validators,
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core},
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
PersistedValidationData, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use std::sync::Arc;

mod error;

Expand Down Expand Up @@ -228,7 +226,9 @@ async fn handle_new_activations<Context>(
let availability_cores = availability_cores??;
let async_backing_params = async_backing_params?.ok();
let n_validators = validators??.len();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
.await
.map_err(crate::error::Error::UtilRuntime)?;

// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();
Expand Down Expand Up @@ -655,37 +655,3 @@ fn erasure_root(
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
Ok(polkadot_erasure_coding::branches(&chunks).root())
}

// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
async fn fetch_claim_queue(
sender: &mut impl overseer::CollationGenerationSenderTrait,
relay_parent: Hash,
) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
{
let res = request_claim_queue(relay_parent, sender).await.await??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}

// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
// This function is supposed to be used in `handle_new_activations` hence the return type.
fn fetch_next_scheduled_on_core(
claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}
3 changes: 3 additions & 0 deletions polkadot/node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub enum Error {
#[error("Fetching validator groups failed {0:?}")]
FetchValidatorGroups(RuntimeApiError),

#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),

#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,

Expand Down
107 changes: 75 additions & 32 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{request_min_backing_votes, ProspectiveParachainsMode},
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core, ClaimQueueSnapshot},
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
Expand Down Expand Up @@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
struct PerRelayParentState {
local_validator: Option<LocalValidatorState>,
statement_store: StatementStore,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
}

impl PerRelayParentState {
Expand Down Expand Up @@ -693,15 +693,24 @@ pub(crate) async fn handle_active_leaves_update<Context>(
}
});

let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we pass per value (can't re-use the result), might make sense to move that call into determine_groups_per_para.

The result of request_availability_cores is used in find_active_validator_state, but wrongly. It uses para_id a function we should remove, as it makes no sense to either return the ParaId of the occupying para or the scheduled one.*)

Regardless, I would move the fetching code into the function that need the data. handle_active_leaves_update is already pages of code.

*) Note: This is obviously unrelated, so can be a separate PR, but it should be fixed for Coretime asap. @tdimitrov

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.await
.map_err(JfyiError::FetchClaimQueue)?;

let groups_per_para = determine_groups_per_para(
&availability_cores,
&group_rotation_info,
&maybe_claim_queue,
seconding_limit - 1,
);
state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
local_validator,
statement_store: StatementStore::new(&per_session.groups),
availability_cores,
group_rotation_info,
seconding_limit,
session: session_index,
groups_per_para,
},
);
}
Expand Down Expand Up @@ -2126,17 +2135,54 @@ async fn provide_candidate_to_grid<Context>(
}
}

fn group_for_para(
// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
fn determine_groups_per_para(
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
para_id: ParaId,
) -> Option<GroupIndex> {
// Note: this won't work well for on-demand parachains as it assumes that core assignments are
// fixed across blocks.
let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id));
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
max_candidate_depth: usize,
) -> HashMap<ParaId, Vec<GroupIndex>> {
// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next block if core has a candidate
// pending availability.
let para_core_indices = availability_cores.iter().enumerate().filter_map(|(index, core)| {
match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) => {
if max_candidate_depth >= 1 {
// Use claim queue if available, or fallback to `next_up_on_available`
let maybe_scheduled_core = match maybe_claim_queue {
Some(claim_queue) => {
// What's up next on this core ?
fetch_next_scheduled_on_core(claim_queue, CoreIndex(index as u32))
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core.next_up_on_available.clone()
},
};

maybe_scheduled_core
.filter(|scheduled_core| scheduled_core.para_id == occupied_core.para_id())
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
}
},
CoreState::Free => None,
}
});

core_index
.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()))
let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
for (para, core_index) in para_core_indices {
let group_index = group_rotation_info.group_for_core(core_index, availability_cores.len());
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
}

groups_per_para
}

#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
Expand Down Expand Up @@ -2192,18 +2238,14 @@ async fn fragment_tree_update_inner<Context>(
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent);
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
let group_index = group_for_para(
&prs.availability_cores,
&prs.group_rotation_info,
receipt.descriptor().para_id,
);

let per_session = state.per_session.get(&prs.session);
if let (Some(per_session), Some(group_index)) = (per_session, group_index) {
// TODO(maybe for sanity): perform an extra check on the candidate backing group
// index all allowed
if let Some(per_session) = per_session {
send_backing_fresh_statements(
ctx,
candidate_hash,
group_index,
confirmed.group_index(),
&receipt.descriptor().relay_parent,
prs,
confirmed,
Expand Down Expand Up @@ -2311,13 +2353,14 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(x) => x,
};

let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para_id,
);
let expected_groups = relay_parent_state.groups_per_para.get(&para_id);

if expected_group != Some(manifest_summary.claimed_group_index) {
if expected_groups.is_none() ||
!expected_groups
.expect("checked is_some(); qed")
.iter()
.any(|g| g == &manifest_summary.claimed_group_index)
{
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
}
Expand Down Expand Up @@ -3037,13 +3080,13 @@ pub(crate) async fn handle_response<Context>(
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para,
);
let expected_groups = relay_parent_state.groups_per_para.get(&para);

Some(g_index) == expected_group
expected_groups.is_some() &&
expected_groups
.expect("checked is_some(); qed")
.iter()
.any(|g| g == &g_index)
},
disabled_mask,
);
Expand Down
46 changes: 44 additions & 2 deletions polkadot/node/subsystem-util/src/vstaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
//! This module is intended to contain common boiler plate code handling unreleased runtime API
//! calls.

use std::collections::{BTreeMap, VecDeque};

use polkadot_node_subsystem_types::messages::{RuntimeApiMessage, RuntimeApiRequest};
use polkadot_overseer::SubsystemSender;
use polkadot_primitives::{Hash, ValidatorIndex};
use polkadot_primitives::{CoreIndex, Hash, Id as ParaId, ScheduledCore, ValidatorIndex};

use crate::{has_required_runtime, request_disabled_validators, runtime};
use crate::{has_required_runtime, request_claim_queue, request_disabled_validators, runtime};

const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging";

/// A snapshot of the runtime claim queue at an arbitrare relay chain block.
pub type ClaimQueueSnapshot = BTreeMap<CoreIndex, VecDeque<ParaId>>;

// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
/// Returns disabled validators list if the runtime supports it. Otherwise logs a debug messages and
/// returns an empty vec.
Expand Down Expand Up @@ -54,3 +59,40 @@ pub async fn get_disabled_validators_with_fallback<Sender: SubsystemSender<Runti

Ok(disabled_validators)
}

/// Checks if the runtime supports `request_claim_queue` and attempts to fetch the claim queue.
/// Returns `ClaimQueueSnapshot` or `None` if claim queue API is not supported by runtime.
/// Any specific [`RuntimeApiError`]s are bubbled up to the caller.
pub async fn fetch_claim_queue(
sender: &mut impl SubsystemSender<RuntimeApiMessage>,
relay_parent: Hash,
) -> Result<Option<ClaimQueueSnapshot>, runtime::Error> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
{
let res = request_claim_queue(relay_parent, sender)
.await
.await
.map_err(runtime::Error::RuntimeRequestCanceled)??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}

/// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
pub fn fetch_next_scheduled_on_core(
claim_queue: &ClaimQueueSnapshot,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}