-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Make candidate validation bounded again #2125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
092d6a3
7f1698c
4e64508
6e0f461
83a0fe4
7e88b15
0663ba0
047cc03
52b5c5e
a4cdcf8
b25eda0
77aec82
6bcc212
fb20432
ca97f47
a0e8065
4c6e73d
05e3b83
92bacd3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| #![deny(unused_crate_dependencies, unused_results)] | ||
| #![warn(missing_docs)] | ||
|
|
||
| use async_semaphore::Semaphore; | ||
| use polkadot_node_core_pvf::{ | ||
| InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError, | ||
| PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, | ||
|
|
@@ -55,10 +56,11 @@ use polkadot_primitives::{ | |
|
|
||
| use parity_scale_codec::Encode; | ||
|
|
||
| use futures::{channel::oneshot, prelude::*}; | ||
| use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; | ||
|
|
||
| use std::{ | ||
| path::PathBuf, | ||
| pin::Pin, | ||
| sync::Arc, | ||
| time::{Duration, Instant}, | ||
| }; | ||
|
|
@@ -130,6 +132,105 @@ impl<Context> CandidateValidationSubsystem { | |
| } | ||
| } | ||
|
|
||
| fn handle_validation_message<S>( | ||
| mut sender: S, | ||
| validation_host: ValidationHost, | ||
| metrics: Metrics, | ||
| msg: CandidateValidationMessage, | ||
| ) -> Pin<Box<dyn Future<Output = ()> + Send>> | ||
| where | ||
| S: SubsystemSender<RuntimeApiMessage>, | ||
| { | ||
| match msg { | ||
| CandidateValidationMessage::ValidateFromChainState { | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| response_sender, | ||
| .. | ||
| } => { | ||
| // let mut sender = ctx.sender().clone(); | ||
| // let metrics = metrics.clone(); | ||
| // let validation_host = validation_host.clone(); | ||
|
|
||
| // let guard = semaphore.acquire_arc().await; | ||
| async move { | ||
| // let _guard = guard; | ||
| let _timer = metrics.time_validate_from_chain_state(); | ||
| let res = validate_from_chain_state( | ||
| &mut sender, | ||
| validation_host, | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| &metrics, | ||
| ) | ||
| .await; | ||
|
|
||
| metrics.on_validation_event(&res); | ||
| let _ = response_sender.send(res); | ||
| } | ||
| .boxed() | ||
| }, | ||
| CandidateValidationMessage::ValidateFromExhaustive { | ||
| validation_data, | ||
| validation_code, | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| response_sender, | ||
| .. | ||
| } => { | ||
| // let metrics = metrics.clone(); | ||
| // let validation_host = validation_host.clone(); | ||
|
|
||
| // let guard = semaphore.acquire_arc().await; | ||
| async move { | ||
| // let _guard = guard; | ||
| let _timer = metrics.time_validate_from_exhaustive(); | ||
| let res = validate_candidate_exhaustive( | ||
| validation_host, | ||
| validation_data, | ||
| validation_code, | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| &metrics, | ||
| ) | ||
| .await; | ||
|
|
||
| metrics.on_validation_event(&res); | ||
| let _ = response_sender.send(res); | ||
| } | ||
| .boxed() | ||
| }, | ||
| CandidateValidationMessage::PreCheck { | ||
| relay_parent, | ||
| validation_code_hash, | ||
| response_sender, | ||
| .. | ||
| } => { | ||
| // let mut sender = ctx.sender().clone(); | ||
| // let validation_host = validation_host.clone(); | ||
|
|
||
| // let guard = semaphore.acquire_arc().await; | ||
| async move { | ||
| // let _guard = guard; | ||
| let precheck_result = | ||
| precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash) | ||
| .await; | ||
|
|
||
| let _ = response_sender.send(precheck_result); | ||
| } | ||
| .boxed() | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| #[overseer::contextbounds(CandidateValidation, prefix = self::overseer)] | ||
| async fn run<Context>( | ||
| mut ctx: Context, | ||
|
|
@@ -156,106 +257,27 @@ async fn run<Context>( | |
| .await?; | ||
| ctx.spawn_blocking("pvf-validation-host", task.boxed())?; | ||
|
|
||
| let mut tasks = FuturesUnordered::new(); | ||
| // The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size | ||
| // to allow exhaustive validation messages to fall through in case the tasks are clogged with | ||
| // `ValidateFromChainState` messages awaiting data from the runtime | ||
| let semaphore = Arc::new(Semaphore::new(polkadot_node_core_pvf::HOST_MESSAGE_QUEUE_SIZE * 2)); | ||
|
|
||
| loop { | ||
| match ctx.recv().await? { | ||
| FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {}, | ||
| FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, | ||
| FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), | ||
| FromOrchestra::Communication { msg } => match msg { | ||
| CandidateValidationMessage::ValidateFromChainState { | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| response_sender, | ||
| .. | ||
| } => { | ||
| let bg = { | ||
| let mut sender = ctx.sender().clone(); | ||
| let metrics = metrics.clone(); | ||
| let validation_host = validation_host.clone(); | ||
|
|
||
| async move { | ||
| let _timer = metrics.time_validate_from_chain_state(); | ||
| let res = validate_from_chain_state( | ||
| &mut sender, | ||
| validation_host, | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| &metrics, | ||
| ) | ||
| .await; | ||
|
|
||
| metrics.on_validation_event(&res); | ||
| let _ = response_sender.send(res); | ||
| } | ||
| }; | ||
|
|
||
| ctx.spawn("validate-from-chain-state", bg.boxed())?; | ||
| }, | ||
| CandidateValidationMessage::ValidateFromExhaustive { | ||
| validation_data, | ||
| validation_code, | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| response_sender, | ||
| .. | ||
| } => { | ||
| let bg = { | ||
| let metrics = metrics.clone(); | ||
| let validation_host = validation_host.clone(); | ||
|
|
||
| async move { | ||
| let _timer = metrics.time_validate_from_exhaustive(); | ||
| let res = validate_candidate_exhaustive( | ||
| validation_host, | ||
| validation_data, | ||
| validation_code, | ||
| candidate_receipt, | ||
| pov, | ||
| executor_params, | ||
| exec_kind, | ||
| &metrics, | ||
| ) | ||
| .await; | ||
|
|
||
| metrics.on_validation_event(&res); | ||
| let _ = response_sender.send(res); | ||
| } | ||
| }; | ||
|
|
||
| ctx.spawn("validate-from-exhaustive", bg.boxed())?; | ||
| }, | ||
| CandidateValidationMessage::PreCheck { | ||
| relay_parent, | ||
| validation_code_hash, | ||
| response_sender, | ||
| .. | ||
| } => { | ||
| let bg = { | ||
| let mut sender = ctx.sender().clone(); | ||
| let validation_host = validation_host.clone(); | ||
|
|
||
| async move { | ||
| let precheck_result = precheck_pvf( | ||
| &mut sender, | ||
| validation_host, | ||
| relay_parent, | ||
| validation_code_hash, | ||
| ) | ||
| .await; | ||
|
|
||
| let _ = response_sender.send(precheck_result); | ||
| } | ||
| }; | ||
|
|
||
| ctx.spawn("candidate-validation-pre-check", bg.boxed())?; | ||
| }, | ||
| futures::select! { | ||
|
||
| comm = ctx.recv().fuse() => { | ||
| match comm { | ||
| Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, | ||
| Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, | ||
| Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), | ||
| Ok(FromOrchestra::Communication { msg }) => { | ||
| let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg); | ||
| tasks.push(task); | ||
| } | ||
| Err(e) => return Err(SubsystemError::from(e)) | ||
| } | ||
| }, | ||
| _ = tasks.select_next_some() => () | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.