Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 158 additions & 87 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,91 +311,130 @@ async fn run<Context>(
struct PrepareValidationState {
session_index: Option<SessionIndex>,
is_next_session_authority: bool,
// PVF host won't prepare the same code hash twice, so here we just avoid extra communication
already_prepared_code_hashes: HashSet<ValidationCodeHash>,
executor_params: Option<ExecutorParams>,
waiting: HashSet<ValidationCodeHash>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Let's add some comments what each collection represents.

pending: HashSet<ValidationCodeHash>,
processed: HashSet<ValidationCodeHash>,
// How many PVFs per block we take to prepare themselves for the next session validation
per_block_limit: usize,
}

impl PrepareValidationState {
fn waiting_to_vec(&self) -> Vec<ValidationCodeHash> {
self.waiting.iter().cloned().collect()
}

fn pending_chunk_to_vec(&self) -> Vec<ValidationCodeHash> {
self.pending.iter().cloned().take(self.per_block_limit).collect()
}

fn is_queued(&self, code_hash: &ValidationCodeHash) -> bool {
self.pending.contains(code_hash) || self.processed.contains(code_hash)
}
}

impl Default for PrepareValidationState {
fn default() -> Self {
Self {
session_index: None,
is_next_session_authority: false,
already_prepared_code_hashes: HashSet::new(),
per_block_limit: 1,
executor_params: None,
waiting: HashSet::new(),
pending: HashSet::new(),
processed: HashSet::new(),
}
}
}

async fn maybe_prepare_validation<Sender>(
sender: &mut Sender,
keystore: KeystorePtr,
validation_backend: impl ValidationBackend,
mut validation_backend: impl ValidationBackend,
update: ActiveLeavesUpdate,
state: &mut PrepareValidationState,
) where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let Some(leaf) = update.activated else { return };
let new_session_index = new_session_index(sender, state.session_index, leaf.hash).await;
if new_session_index.is_some() {
state.session_index = new_session_index;
state.already_prepared_code_hashes.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be fixed by simply not clearing this or clearing it more infrequent let's say every 24h ?

state.is_next_session_authority = check_next_session_authority(
sender,
keystore,
leaf.hash,
state.session_index.expect("qed: just checked above"),
)
.await;
let relay_parent = leaf.hash;

let Some(session_index) = session_index(sender, relay_parent).await else { return };
if state.session_index.map_or(true, |i| session_index > i) {
state.waiting.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This if block is big enough, that it can be extracted out in a state.maybe_init_on_new_session().

state.pending.clear();
state.processed.clear();
state.session_index = Some(session_index);
state.is_next_session_authority =
is_next_session_authority(sender, keystore, relay_parent, session_index).await;
state.executor_params = if state.is_next_session_authority {
executor_params(sender, relay_parent).await
} else {
None
};
}

// On every active leaf check candidates and prepare PVFs our node doesn't have yet.
if state.is_next_session_authority {
let code_hashes = prepare_pvfs_for_backed_candidates(
sender,
validation_backend,
leaf.hash,
&state.already_prepared_code_hashes,
state.per_block_limit,
if !state.is_next_session_authority {
return
};
let Some(ref executor_params) = state.executor_params else { return };

for event in candidate_events(sender, relay_parent).await {
let CandidateEvent::CandidateBacked(receipt, ..) = event else { continue };
let code_hash = receipt.descriptor.validation_code_hash;
if !state.is_queued(&code_hash) {
let _ = state.waiting.insert(code_hash);
}
}

if state.pending.is_empty() {
let missing = missing_validation_code_hashes(
&mut validation_backend,
state.waiting_to_vec(),
executor_params.clone(),
)
.await;
state.already_prepared_code_hashes.extend(code_hashes.unwrap_or_default());
.await
.into_iter()
.collect();

state.processed.extend(state.waiting.difference(&missing));
state.pending = missing;
state.waiting.clear();
}

let code_hashes = prepare_pvfs_for_backed_candidates(
sender,
&mut validation_backend,
relay_parent,
state.pending_chunk_to_vec(),
executor_params.clone(),
)
.await;
for hash in code_hashes {
let _ = state.pending.remove(&hash);
let _ = state.processed.insert(hash);
}
}

// Returns the new session index if it is greater than the current one.
async fn new_session_index<Sender>(
sender: &mut Sender,
session_index: Option<SessionIndex>,
relay_parent: Hash,
) -> Option<SessionIndex>
async fn session_index<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<SessionIndex>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let Ok(Ok(new_session_index)) =
util::request_session_index_for_child(relay_parent, sender).await.await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch session index from runtime API",
);
return None
};

session_index.map_or(Some(new_session_index), |index| {
if new_session_index > index {
Some(new_session_index)
} else {
match util::request_session_index_for_child(relay_parent, sender).await.await {
Ok(Ok(index)) => Some(index),
_ => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch session index from runtime API",
);
None
}
})
},
}
}

// Returns true if the node is an authority in the next session.
async fn check_next_session_authority<Sender>(
async fn is_next_session_authority<Sender>(
sender: &mut Sender,
keystore: KeystorePtr,
relay_parent: Hash,
Expand Down Expand Up @@ -441,50 +480,64 @@ where
is_past_present_or_future_authority && !is_present_authority
}

// Sends PVF with unknown code hashes to the validation host returning the list of code hashes sent.
async fn prepare_pvfs_for_backed_candidates<Sender>(
sender: &mut Sender,
mut validation_backend: impl ValidationBackend,
relay_parent: Hash,
already_prepared: &HashSet<ValidationCodeHash>,
per_block_limit: usize,
) -> Option<Vec<ValidationCodeHash>>
async fn executor_params<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<ExecutorParams>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let Ok(Ok(events)) = util::request_candidate_events(relay_parent, sender).await.await else {
if let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await {
Some(executor_params)
} else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch candidate events from runtime API",
"cannot fetch executor params for the session",
);
return None
};
let code_hashes = events
.into_iter()
.filter_map(|e| match e {
CandidateEvent::CandidateBacked(receipt, ..) => {
let h = receipt.descriptor.validation_code_hash;
if already_prepared.contains(&h) {
None
} else {
Some(h)
}
},
_ => None,
None
}
}

async fn candidate_events<Sender>(sender: &mut Sender, relay_parent: Hash) -> Vec<CandidateEvent>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
match util::request_candidate_events(relay_parent, sender).await.await {
Ok(Ok(events)) => events,
_ => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch candidate events from runtime API",
);
vec![]
},
}
}

async fn missing_validation_code_hashes(
validation_backend: &mut impl ValidationBackend,
code_hashes: Vec<ValidationCodeHash>,
executor_params: ExecutorParams,
) -> Vec<ValidationCodeHash> {
validation_backend
.ensure_pvf(code_hashes, executor_params)
.await
.unwrap_or_else(|err| {
gum::warn!(target: LOG_TARGET, ?err, "cannot ensure prepared PVF");
vec![]
})
.take(per_block_limit)
.collect::<Vec<_>>();
}

let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch executor params for the session",
);
return None
};
// Sends PVF with unknown code hashes to the validation host returning the list of code hashes sent.
async fn prepare_pvfs_for_backed_candidates<Sender>(
sender: &mut Sender,
validation_backend: &mut impl ValidationBackend,
relay_parent: Hash,
code_hashes: Vec<ValidationCodeHash>,
executor_params: ExecutorParams,
) -> Vec<ValidationCodeHash>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);

let mut active_pvfs = vec![];
Expand Down Expand Up @@ -525,7 +578,7 @@ where
}

if active_pvfs.is_empty() {
return None
return vec![]
}

if let Err(err) = validation_backend.heads_up(active_pvfs).await {
Expand All @@ -535,7 +588,7 @@ where
?err,
"cannot prepare PVF for the next session",
);
return None
return vec![]
};

gum::debug!(
Expand All @@ -545,7 +598,7 @@ where
"Prepared PVF for the next session",
);

Some(processed_code_hashes)
processed_code_hashes
}

struct RuntimeRequestFailed;
Expand Down Expand Up @@ -1144,6 +1197,12 @@ trait ValidationBackend {
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;

async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String>;

async fn ensure_pvf(
&mut self,
code_hashes: Vec<ValidationCodeHash>,
executor_params: ExecutorParams,
) -> Result<Vec<ValidationCodeHash>, String>;
}

#[async_trait]
Expand Down Expand Up @@ -1190,6 +1249,18 @@ impl ValidationBackend for ValidationHost {
async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
self.heads_up(active_pvfs).await
}

async fn ensure_pvf(
&mut self,
code_hashes: Vec<ValidationCodeHash>,
executor_params: ExecutorParams,
) -> Result<Vec<ValidationCodeHash>, String> {
let (tx, rx) = oneshot::channel();
self.ensure_pvf(code_hashes, executor_params, tx).await?;
let result = rx.await.map_err(|err| err.to_string())?;

result
}
}

/// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks
Expand Down
Loading