From 7baf9441bd77c901d8dc5297b7f68464e974013f Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Tue, 2 Nov 2021 00:00:08 +0100 Subject: [PATCH 1/8] minor changes --- node/core/av-store/src/lib.rs | 135 +--------------- node/core/av-store/src/metrics.rs | 148 ++++++++++++++++++ node/network/availability-recovery/src/lib.rs | 8 +- 3 files changed, 157 insertions(+), 134 deletions(-) create mode 100644 node/core/av-store/src/metrics.rs diff --git a/node/core/av-store/src/lib.rs b/node/core/av-store/src/lib.rs index 2227442eb9ac..c4ed4ed7e419 100644 --- a/node/core/av-store/src/lib.rs +++ b/node/core/av-store/src/lib.rs @@ -33,10 +33,7 @@ use parity_scale_codec::{Decode, Encode, Error as CodecError, Input}; use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec}; use polkadot_node_primitives::{AvailableData, ErasureChunk}; -use polkadot_node_subsystem_util::{ - self as util, - metrics::{self, prometheus}, -}; +use polkadot_node_subsystem_util as util; use polkadot_primitives::v1::{ BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash, Header, ValidatorIndex, }; @@ -47,6 +44,8 @@ use polkadot_subsystem::{ SubsystemError, }; +mod metrics; + #[cfg(test)] mod tests; @@ -1273,131 +1272,3 @@ fn prune_all(db: &Arc, config: &Config, clock: &dyn Clock) -> Re db.write(tx)?; Ok(()) } - -#[derive(Clone)] -struct MetricsInner { - received_availability_chunks_total: prometheus::Counter, - pruning: prometheus::Histogram, - process_block_finalized: prometheus::Histogram, - block_activated: prometheus::Histogram, - process_message: prometheus::Histogram, - store_available_data: prometheus::Histogram, - store_chunk: prometheus::Histogram, - get_chunk: prometheus::Histogram, -} - -/// Availability metrics. -#[derive(Default, Clone)] -pub struct Metrics(Option); - -impl Metrics { - fn on_chunks_received(&self, count: usize) { - if let Some(metrics) = &self.0 { - use core::convert::TryFrom as _; - // assume usize fits into u64 - let by = u64::try_from(count).unwrap_or_default(); - metrics.received_availability_chunks_total.inc_by(by); - } - } - - /// Provide a timer for `prune_povs` which observes on drop. - fn time_pruning(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.pruning.start_timer()) - } - - /// Provide a timer for `process_block_finalized` which observes on drop. - fn time_process_block_finalized( - &self, - ) -> Option { - self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer()) - } - - /// Provide a timer for `block_activated` which observes on drop. - fn time_block_activated(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.block_activated.start_timer()) - } - - /// Provide a timer for `process_message` which observes on drop. - fn time_process_message(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.process_message.start_timer()) - } - - /// Provide a timer for `store_available_data` which observes on drop. - fn time_store_available_data(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer()) - } - - /// Provide a timer for `store_chunk` which observes on drop. - fn time_store_chunk(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) - } - - /// Provide a timer for `get_chunk` which observes on drop. - fn time_get_chunk(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer()) - } -} - -impl metrics::Metrics for Metrics { - fn try_register(registry: &prometheus::Registry) -> Result { - let metrics = MetricsInner { - received_availability_chunks_total: prometheus::register( - prometheus::Counter::new( - "parachain_received_availability_chunks_total", - "Number of availability chunks received.", - )?, - registry, - )?, - pruning: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_pruning", - "Time spent within `av_store::prune_all`", - ))?, - registry, - )?, - process_block_finalized: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_process_block_finalized", - "Time spent within `av_store::process_block_finalized`", - ))?, - registry, - )?, - block_activated: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_block_activated", - "Time spent within `av_store::process_block_activated`", - ))?, - registry, - )?, - process_message: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_process_message", - "Time spent within `av_store::process_message`", - ))?, - registry, - )?, - store_available_data: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_store_available_data", - "Time spent within `av_store::store_available_data`", - ))?, - registry, - )?, - store_chunk: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_store_chunk", - "Time spent within `av_store::store_chunk`", - ))?, - registry, - )?, - get_chunk: prometheus::register( - prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( - "parachain_av_store_get_chunk", - "Time spent fetching requested chunks.`", - ))?, - registry, - )?, - }; - Ok(Metrics(Some(metrics))) - } -} diff --git a/node/core/av-store/src/metrics.rs b/node/core/av-store/src/metrics.rs new file mode 100644 index 000000000000..23aafa9ce752 --- /dev/null +++ b/node/core/av-store/src/metrics.rs @@ -0,0 +1,148 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use polkadot_node_subsystem_util::{ + self as util, + metrics::{self, prometheus}, +}; + +#[derive(Clone)] +pub(crate) struct MetricsInner { + received_availability_chunks_total: prometheus::Counter, + pruning: prometheus::Histogram, + process_block_finalized: prometheus::Histogram, + block_activated: prometheus::Histogram, + process_message: prometheus::Histogram, + store_available_data: prometheus::Histogram, + store_chunk: prometheus::Histogram, + get_chunk: prometheus::Histogram, +} + +/// Availability metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + pub(crate) fn on_chunks_received(&self, count: usize) { + if let Some(metrics) = &self.0 { + use core::convert::TryFrom as _; + // assume usize fits into u64 + let by = u64::try_from(count).unwrap_or_default(); + metrics.received_availability_chunks_total.inc_by(by); + } + } + + /// Provide a timer for `prune_povs` which observes on drop. + pub(crate) fn time_pruning(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.pruning.start_timer()) + } + + /// Provide a timer for `process_block_finalized` which observes on drop. + pub(crate) fn time_process_block_finalized( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer()) + } + + /// Provide a timer for `block_activated` which observes on drop. + pub(crate) fn time_block_activated(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.block_activated.start_timer()) + } + + /// Provide a timer for `process_message` which observes on drop. + pub(crate) fn time_process_message(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.process_message.start_timer()) + } + + /// Provide a timer for `store_available_data` which observes on drop. + pub(crate) fn time_store_available_data(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer()) + } + + /// Provide a timer for `store_chunk` which observes on drop. + pub(crate) fn time_store_chunk(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) + } + + /// Provide a timer for `get_chunk` which observes on drop. + pub(crate) fn time_get_chunk(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer()) + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + received_availability_chunks_total: prometheus::register( + prometheus::Counter::new( + "parachain_received_availability_chunks_total", + "Number of availability chunks received.", + )?, + registry, + )?, + pruning: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_pruning", + "Time spent within `av_store::prune_all`", + ))?, + registry, + )?, + process_block_finalized: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_process_block_finalized", + "Time spent within `av_store::process_block_finalized`", + ))?, + registry, + )?, + block_activated: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_block_activated", + "Time spent within `av_store::process_block_activated`", + ))?, + registry, + )?, + process_message: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_process_message", + "Time spent within `av_store::process_message`", + ))?, + registry, + )?, + store_available_data: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_store_available_data", + "Time spent within `av_store::store_available_data`", + ))?, + registry, + )?, + store_chunk: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_store_chunk", + "Time spent within `av_store::store_chunk`", + ))?, + registry, + )?, + get_chunk: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_av_store_get_chunk", + "Time spent fetching requested chunks.`", + ))?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 0b973e03bd72..010b4d8a55c8 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -186,7 +186,7 @@ impl RequestFromBackersPhase { self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?; // Request data. - let (req, res) = OutgoingRequest::new( + let (req, response) = OutgoingRequest::new( Recipient::Authority( params.validator_authority_keys[validator_index.0 as usize].clone(), ), @@ -203,7 +203,7 @@ impl RequestFromBackersPhase { ) .await; - match res.await { + match response.await { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { if reconstructed_data_matches_root( params.validators.len(), @@ -346,6 +346,7 @@ impl RequestChunksPhase { .await; } + /// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`. async fn wait_for_chunks(&mut self, params: &InteractionParams) { let metrics = ¶ms.metrics; @@ -559,6 +560,9 @@ const fn is_unavailable( received_chunks + requesting_chunks + unrequested_validators < threshold } +/// Re-encode the data into erasure chunks in order to verify +/// the root hash of the provided merkle tree, which is built +/// on-top of the encoded chunks. fn reconstructed_data_matches_root( n_validators: usize, expected_root: &Hash, From 47b40931d09173854423c533a361c5b60aedcfce Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Tue, 2 Nov 2021 00:12:47 +0100 Subject: [PATCH 2/8] fmt --- node/core/av-store/src/metrics.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/node/core/av-store/src/metrics.rs b/node/core/av-store/src/metrics.rs index 23aafa9ce752..bf252e648b73 100644 --- a/node/core/av-store/src/metrics.rs +++ b/node/core/av-store/src/metrics.rs @@ -58,22 +58,30 @@ impl Metrics { } /// Provide a timer for `block_activated` which observes on drop. - pub(crate) fn time_block_activated(&self) -> Option { + pub(crate) fn time_block_activated( + &self, + ) -> Option { self.0.as_ref().map(|metrics| metrics.block_activated.start_timer()) } /// Provide a timer for `process_message` which observes on drop. - pub(crate) fn time_process_message(&self) -> Option { + pub(crate) fn time_process_message( + &self, + ) -> Option { self.0.as_ref().map(|metrics| metrics.process_message.start_timer()) } /// Provide a timer for `store_available_data` which observes on drop. - pub(crate) fn time_store_available_data(&self) -> Option { + pub(crate) fn time_store_available_data( + &self, + ) -> Option { self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer()) } /// Provide a timer for `store_chunk` which observes on drop. - pub(crate) fn time_store_chunk(&self) -> Option { + pub(crate) fn time_store_chunk( + &self, + ) -> Option { self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer()) } From 7efc505701096690ea3f67792ec3498d9dea0478 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Tue, 2 Nov 2021 00:30:00 +0100 Subject: [PATCH 3/8] rename to expressive types --- node/network/availability-recovery/src/lib.rs | 96 ++++++++++--------- .../availability-recovery/src/tests.rs | 2 +- .../availability/availability-recovery.md | 48 ++++------ 3 files changed, 69 insertions(+), 77 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 010b4d8a55c8..cac474ac7e71 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -104,13 +104,13 @@ pub struct AvailabilityRecoverySubsystem { metrics: Metrics, } -struct RequestFromBackersPhase { +struct RequestFromBackersSourcer { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. shuffled_backers: Vec, } -struct RequestChunksPhase { +struct RequestChunksSourcer { /// How many request have been unsuccessful so far. error_count: usize, /// Total number of responses that have been received. @@ -125,11 +125,11 @@ struct RequestChunksPhase { requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, } -struct InteractionParams { +struct DataRecoveryParams { /// Discovery ids of `validators`. validator_authority_keys: Vec, - /// Validators relevant to this `Interaction`. + /// Validators relevant to this `RecoveryTask`. validators: Vec, /// The number of pieces needed. @@ -145,33 +145,37 @@ struct InteractionParams { metrics: Metrics, } -enum InteractionPhase { - RequestFromBackers(RequestFromBackersPhase), - RequestChunks(RequestChunksPhase), +/// Source the availability data either by means +/// of direct request response protocol to +/// backers (a.k.a. fast-path), or recover from chunks. +enum Sourcer { + RequestFromBackers(RequestFromBackersSourcer), + RequestChunks(RequestChunksSourcer), } -/// A state of a single interaction reconstructing an available data. -struct Interaction { +/// A stateful reconstruction of availability data in reference to +/// a candidate hash. +struct DataRecoveryTask { sender: S, - /// The parameters of the interaction. - params: InteractionParams, + /// The parameters of the recovery process. + params: DataRecoveryParams, - /// The phase of the interaction. - phase: InteractionPhase, + /// The sourcer to obtain the availbility data. + sourcer: Sourcer, } -impl RequestFromBackersPhase { +impl RequestFromBackersSourcer { fn new(mut backers: Vec) -> Self { backers.shuffle(&mut rand::thread_rng()); - RequestFromBackersPhase { shuffled_backers: backers } + RequestFromBackersSourcer { shuffled_backers: backers } } // Run this phase to completion. async fn run( &mut self, - params: &InteractionParams, + params: &DataRecoveryParams, sender: &mut impl SubsystemSender, ) -> Result { tracing::trace!( @@ -241,12 +245,12 @@ impl RequestFromBackersPhase { } } -impl RequestChunksPhase { +impl RequestChunksSourcer { fn new(n_validators: u32) -> Self { let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect(); shuffling.shuffle(&mut rand::thread_rng()); - RequestChunksPhase { + RequestChunksSourcer { error_count: 0, total_received_responses: 0, shuffling: shuffling.into(), @@ -255,7 +259,7 @@ impl RequestChunksPhase { } } - fn is_unavailable(&self, params: &InteractionParams) -> bool { + fn is_unavailable(&self, params: &DataRecoveryParams) -> bool { is_unavailable( self.received_chunks.len(), self.requesting_chunks.total_len(), @@ -264,7 +268,7 @@ impl RequestChunksPhase { ) } - fn can_conclude(&self, params: &InteractionParams) -> bool { + fn can_conclude(&self, params: &DataRecoveryParams) -> bool { self.received_chunks.len() >= params.threshold || self.is_unavailable(params) } @@ -295,7 +299,7 @@ impl RequestChunksPhase { async fn launch_parallel_requests( &mut self, - params: &InteractionParams, + params: &DataRecoveryParams, sender: &mut impl SubsystemSender, ) { let num_requests = self.get_desired_request_count(params.threshold); @@ -347,7 +351,7 @@ impl RequestChunksPhase { } /// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`. - async fn wait_for_chunks(&mut self, params: &InteractionParams) { + async fn wait_for_chunks(&mut self, params: &DataRecoveryParams) { let metrics = ¶ms.metrics; // Wait for all current requests to conclude or time-out, or until we reach enough chunks. @@ -449,7 +453,7 @@ impl RequestChunksPhase { async fn run( &mut self, - params: &InteractionParams, + params: &DataRecoveryParams, sender: &mut impl SubsystemSender, ) -> Result { // First query the store for any chunks we've got. @@ -585,7 +589,7 @@ fn reconstructed_data_matches_root( branches.root() == *expected_root } -impl Interaction { +impl DataRecoveryTask { async fn run(mut self) -> Result { // First just see if we have the data available locally. { @@ -613,18 +617,18 @@ impl Interaction { loop { // These only fail if we cannot reach the underlying subsystem, which case there is nothing // meaningful we can do. - match self.phase { - InteractionPhase::RequestFromBackers(ref mut from_backers) => { + match self.sourcer { + Sourcer::RequestFromBackers(ref mut from_backers) => { match from_backers.run(&self.params, &mut self.sender).await { Ok(data) => break Ok(data), Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid), Err(RecoveryError::Unavailable) => - self.phase = InteractionPhase::RequestChunks(RequestChunksPhase::new( + self.sourcer = Sourcer::RequestChunks(RequestChunksSourcer::new( self.params.validators.len() as _, )), } }, - InteractionPhase::RequestChunks(ref mut from_all) => + Sourcer::RequestChunks(ref mut from_all) => break from_all.run(&self.params, &mut self.sender).await, } } @@ -632,13 +636,13 @@ impl Interaction { } /// Accumulate all awaiting sides for some particular `AvailableData`. -struct InteractionHandle { +struct RecoveryHandle { candidate_hash: CandidateHash, remote: RemoteHandle>, awaiting: Vec>>, } -impl Future for InteractionHandle { +impl Future for RecoveryHandle { type Output = Option<(CandidateHash, Result)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -683,9 +687,9 @@ impl Future for InteractionHandle { } struct State { - /// Each interaction is implemented as its own async task, + /// Each recovery task is implemented as its own async task, /// and these handles are for communicating with them. - interactions: FuturesUnordered, + ongoing_recoveries: FuturesUnordered, /// A recent block hash for which state should be available. live_block: (BlockNumber, Hash), @@ -697,7 +701,7 @@ struct State { impl Default for State { fn default() -> Self { Self { - interactions: FuturesUnordered::new(), + ongoing_recoveries: FuturesUnordered::new(), live_block: (0, Hash::default()), availability_lru: LruCache::new(LRU_SIZE), } @@ -737,7 +741,7 @@ async fn handle_signal(state: &mut State, signal: OverseerSignal) -> SubsystemRe } /// Machinery around launching interactions into the background. -async fn launch_interaction( +async fn launch_recovery_task( state: &mut State, ctx: &mut Context, session_info: SessionInfo, @@ -752,7 +756,7 @@ where { let candidate_hash = receipt.hash(); - let params = InteractionParams { + let params = DataRecoveryParams { validator_authority_keys: session_info.discovery_keys.clone(), validators: session_info.validators.clone(), threshold: recovery_threshold(session_info.validators.len())?, @@ -763,28 +767,26 @@ where let phase = backing_group .and_then(|g| session_info.validator_groups.get(g.0 as usize)) - .map(|group| { - InteractionPhase::RequestFromBackers(RequestFromBackersPhase::new(group.clone())) - }) + .map(|group| Sourcer::RequestFromBackers(RequestFromBackersSourcer::new(group.clone()))) .unwrap_or_else(|| { - InteractionPhase::RequestChunks(RequestChunksPhase::new(params.validators.len() as _)) + Sourcer::RequestChunks(RequestChunksSourcer::new(params.validators.len() as _)) }); - let interaction = Interaction { sender: ctx.sender().clone(), params, phase }; + let recovery_task = DataRecoveryTask { sender: ctx.sender().clone(), params, sourcer: phase }; - let (remote, remote_handle) = interaction.run().remote_handle(); + let (remote, remote_handle) = recovery_task.run().remote_handle(); - state.interactions.push(InteractionHandle { + state.ongoing_recoveries.push(RecoveryHandle { candidate_hash, remote: remote_handle, awaiting: vec![response_sender], }); - if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) { + if let Err(e) = ctx.spawn("recovery task", Box::pin(remote)) { tracing::warn!( target: LOG_TARGET, err = ?e, - "Failed to spawn a recovery interaction task", + "Failed to spawn a recovery task", ); } @@ -821,7 +823,7 @@ where return Ok(()) } - if let Some(i) = state.interactions.iter_mut().find(|i| i.candidate_hash == candidate_hash) { + if let Some(i) = state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash) { i.awaiting.push(response_sender); return Ok(()) } @@ -835,7 +837,7 @@ where let _span = span.child("session-info-ctx-received"); match session_info { Some(session_info) => - launch_interaction( + launch_recovery_task( state, ctx, session_info, @@ -966,7 +968,7 @@ impl AvailabilityRecoverySubsystem { } } } - output = state.interactions.select_next_some() => { + output = state.ongoing_recoveries.select_next_some() => { if let Some((candidate_hash, result)) = output { state.availability_lru.put(candidate_hash, result); } diff --git a/node/network/availability-recovery/src/tests.rs b/node/network/availability-recovery/src/tests.rs index a918b8c615d8..49e3afb74a93 100644 --- a/node/network/availability-recovery/src/tests.rs +++ b/node/network/availability-recovery/src/tests.rs @@ -1273,7 +1273,7 @@ fn does_not_query_local_validator() { fn parallel_request_calculation_works_as_expected() { let num_validators = 100; let threshold = recovery_threshold(num_validators).unwrap(); - let mut phase = RequestChunksPhase::new(100); + let mut phase = RequestChunksSourcer::new(100); assert_eq!(phase.get_desired_request_count(threshold), threshold); phase.error_count = 1; phase.total_received_responses = 1; diff --git a/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 2c9da3fbb5f8..cef4d0c40635 100644 --- a/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -21,15 +21,12 @@ Output: ## Functionality -We hold a state which tracks the current recovery interactions we have live, as well as which request IDs correspond to which interactions. An interaction is a structure encapsulating all interaction with the network necessary to recover the available data. +We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which taks. An recovery task is a structure encapsulating all interaction with the network necessary to recover the available data in respect to one candidate. ```rust struct State { - /// Each interaction is implemented as its own remote async task, and these handles are remote - /// for it. - interactions: FuturesUnordered, - /// A multiplexer over receivers from live interactions. - interaction_receivers: FuturesUnordered>, + /// Each recovery is implemented as an independent async task, and the handles only supply information about the result. + ongoing_recoveries: FuturesUnordered, /// A recent block hash for which state should be available. live_block_hash: Hash, // An LRU cache of recently recovered data. @@ -38,7 +35,7 @@ struct State { /// This is a future, which concludes either when a response is received from the interaction, /// or all the `awaiting` channels have closed. -struct InteractionHandle { +struct RecoveryHandle { candidate_hash: CandidateHash, interaction_response: RemoteHandle, awaiting: Vec>>, @@ -47,7 +44,7 @@ struct InteractionHandle { struct Unavailable; struct Concluded(CandidateHash, Result); -struct InteractionParams { +struct RecoveryTaskParams { validator_authority_keys: Vec, validators: Vec, // The number of pieces needed. @@ -56,7 +53,7 @@ struct InteractionParams { erasure_root: Hash, } -enum InteractionPhase { +enum RecoveryTask { RequestFromBackers { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. @@ -71,10 +68,10 @@ enum InteractionPhase { } } -struct Interaction { +struct DataRecoveryTask { to_subsystems: SubsystemSender, - params: InteractionParams, - phase: InteractionPhase, + params: RecoveryTaskParams, + sourcer: Sourcer, } ``` @@ -89,31 +86,24 @@ On `Conclude`, shut down the subsystem. #### `AvailabilityRecoveryMessage::RecoverAvailableData(receipt, session, Option, response)` 1. Check the `availability_lru` for the candidate and return the data if so. -1. Check if there is already an interaction handle for the request. If so, add the response handle to it. -1. Otherwise, load the session info for the given session under the state of `live_block_hash`, and initiate an interaction with *`launch_interaction`*. Add an interaction handle to the state and add the response channel to it. +1. Check if there is already an recovery handle for the request. If so, add the response handle to it. +1. Otherwise, load the session info for the given session under the state of `live_block_hash`, and initiate a recovery task with *`launch_recovery_task`*. Add a recovery handle to the state and add the response channel to it. 1. If the session info is not available, return `RecoveryError::Unavailable` on the response channel. -### From-interaction logic +### Recovery logic -#### `FromInteraction::Concluded` - -1. Load the entry from the `interactions` map. It should always exist, if not for logic errors. Send the result to each member of `awaiting`. -1. Add the entry to the `availability_lru`. - -### Interaction logic - -#### `launch_interaction(session_index, session_info, candidate_receipt, candidate_hash, Option)` +#### `launch_recovery_task(session_index, session_info, candidate_receipt, candidate_hash, Option)` 1. Compute the threshold from the session info. It should be `f + 1`, where `n = 3f + k`, where `k in {1, 2, 3}`, and `n` is the number of validators. -1. Set the various fields of `InteractionParams` based on the validator lists in `session_info` and information about the candidate. +1. Set the various fields of `DataRecoveryParams` based on the validator lists in `session_info` and information about the candidate. 1. If the `backing_group_index` is `Some`, start in the `RequestFromBackers` phase with a shuffling of the backing group validator indices and a `None` requesting value. 1. Otherwise, start in the `RequestChunks` phase with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty. 1. Set the `to_subsystems` sender to be equal to a clone of the `SubsystemContext`'s sender. 1. Initialize `received_chunks` to an empty set, as well as `requesting_chunks`. -Launch the interaction as a background task running `interaction_loop(interaction)`. +Launch the sourcer as a background task running `run(interaction)`. -#### `interaction_loop(interaction) -> Result` +#### `run(interaction) -> Result` ```rust // How many parallel requests to have going at once. @@ -121,7 +111,7 @@ const N_PARALLEL: usize = 50; ``` * Request `AvailabilityStoreMessage::QueryAvailableData`. If it exists, return that. -* If the phase is `InteractionPhase::RequestFromBackers` +* If the phase is `DataRecoveryTask::RequestFromBackers` * Loop: * If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`. * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`. @@ -130,9 +120,9 @@ const N_PARALLEL: usize = 50; * If it concludes with available data, attempt a re-encoding. * If it has the correct erasure-root, break and issue a `Ok(available_data)`. * If it has an incorrect erasure-root, return to beginning. - * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop. + * If the backer is `None`, set the phase to `DataRecoveryTask::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop. -* If the phase is `InteractionPhase::RequestChunks`: +* If the phase is `DataRecoveryTask::RequestChunks`: * Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`. * Loop: * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`. From 6997e01fd6c8b1d4e19a921c3ec45587914c6ecf Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 4 Nov 2021 09:37:27 +0100 Subject: [PATCH 4/8] chore: fixup --- node/core/av-store/src/lib.rs | 1 + node/core/av-store/src/metrics.rs | 5 +---- node/network/availability-recovery/src/lib.rs | 4 +++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/node/core/av-store/src/lib.rs b/node/core/av-store/src/lib.rs index c4ed4ed7e419..6dc2684847cf 100644 --- a/node/core/av-store/src/lib.rs +++ b/node/core/av-store/src/lib.rs @@ -45,6 +45,7 @@ use polkadot_subsystem::{ }; mod metrics; +pub use self::metrics::*; #[cfg(test)] mod tests; diff --git a/node/core/av-store/src/metrics.rs b/node/core/av-store/src/metrics.rs index bf252e648b73..fddacca6626e 100644 --- a/node/core/av-store/src/metrics.rs +++ b/node/core/av-store/src/metrics.rs @@ -14,10 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use polkadot_node_subsystem_util::{ - self as util, - metrics::{self, prometheus}, -}; +use polkadot_node_subsystem_util::metrics::{self, prometheus}; #[derive(Clone)] pub(crate) struct MetricsInner { diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index cac474ac7e71..69788564b142 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -823,7 +823,9 @@ where return Ok(()) } - if let Some(i) = state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash) { + if let Some(i) = + state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash) + { i.awaiting.push(response_sender); return Ok(()) } From 4650de1096ec7ccd12982323c888772da15c82c6 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 4 Nov 2021 14:11:11 +0100 Subject: [PATCH 5/8] chore: remove `Data` prefixes --- node/network/availability-recovery/src/lib.rs | 24 +++++++++---------- .../availability/availability-recovery.md | 12 +++++----- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 69788564b142..bb4226e4a45f 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -125,7 +125,7 @@ struct RequestChunksSourcer { requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, } -struct DataRecoveryParams { +struct RecoveryParams { /// Discovery ids of `validators`. validator_authority_keys: Vec, @@ -155,11 +155,11 @@ enum Sourcer { /// A stateful reconstruction of availability data in reference to /// a candidate hash. -struct DataRecoveryTask { +struct RecoveryTask { sender: S, /// The parameters of the recovery process. - params: DataRecoveryParams, + params: RecoveryParams, /// The sourcer to obtain the availbility data. sourcer: Sourcer, @@ -175,7 +175,7 @@ impl RequestFromBackersSourcer { // Run this phase to completion. async fn run( &mut self, - params: &DataRecoveryParams, + params: &RecoveryParams, sender: &mut impl SubsystemSender, ) -> Result { tracing::trace!( @@ -259,7 +259,7 @@ impl RequestChunksSourcer { } } - fn is_unavailable(&self, params: &DataRecoveryParams) -> bool { + fn is_unavailable(&self, params: &RecoveryParams) -> bool { is_unavailable( self.received_chunks.len(), self.requesting_chunks.total_len(), @@ -268,7 +268,7 @@ impl RequestChunksSourcer { ) } - fn can_conclude(&self, params: &DataRecoveryParams) -> bool { + fn can_conclude(&self, params: &RecoveryParams) -> bool { self.received_chunks.len() >= params.threshold || self.is_unavailable(params) } @@ -299,7 +299,7 @@ impl RequestChunksSourcer { async fn launch_parallel_requests( &mut self, - params: &DataRecoveryParams, + params: &RecoveryParams, sender: &mut impl SubsystemSender, ) { let num_requests = self.get_desired_request_count(params.threshold); @@ -351,7 +351,7 @@ impl RequestChunksSourcer { } /// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`. - async fn wait_for_chunks(&mut self, params: &DataRecoveryParams) { + async fn wait_for_chunks(&mut self, params: &RecoveryParams) { let metrics = ¶ms.metrics; // Wait for all current requests to conclude or time-out, or until we reach enough chunks. @@ -453,7 +453,7 @@ impl RequestChunksSourcer { async fn run( &mut self, - params: &DataRecoveryParams, + params: &RecoveryParams, sender: &mut impl SubsystemSender, ) -> Result { // First query the store for any chunks we've got. @@ -589,7 +589,7 @@ fn reconstructed_data_matches_root( branches.root() == *expected_root } -impl DataRecoveryTask { +impl RecoveryTask { async fn run(mut self) -> Result { // First just see if we have the data available locally. { @@ -756,7 +756,7 @@ where { let candidate_hash = receipt.hash(); - let params = DataRecoveryParams { + let params = RecoveryParams { validator_authority_keys: session_info.discovery_keys.clone(), validators: session_info.validators.clone(), threshold: recovery_threshold(session_info.validators.len())?, @@ -772,7 +772,7 @@ where Sourcer::RequestChunks(RequestChunksSourcer::new(params.validators.len() as _)) }); - let recovery_task = DataRecoveryTask { sender: ctx.sender().clone(), params, sourcer: phase }; + let recovery_task = RecoveryTask { sender: ctx.sender().clone(), params, sourcer: phase }; let (remote, remote_handle) = recovery_task.run().remote_handle(); diff --git a/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/roadmap/implementers-guide/src/node/availability/availability-recovery.md index cef4d0c40635..62d89ad85dbf 100644 --- a/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -21,7 +21,7 @@ Output: ## Functionality -We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which taks. An recovery task is a structure encapsulating all interaction with the network necessary to recover the available data in respect to one candidate. +We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which taks. A recovery task is a structure encapsulating all interaction with the network necessary to recover the available data in respect to one candidate. ```rust struct State { @@ -68,7 +68,7 @@ enum RecoveryTask { } } -struct DataRecoveryTask { +struct RecoveryTask { to_subsystems: SubsystemSender, params: RecoveryTaskParams, sourcer: Sourcer, @@ -95,7 +95,7 @@ On `Conclude`, shut down the subsystem. #### `launch_recovery_task(session_index, session_info, candidate_receipt, candidate_hash, Option)` 1. Compute the threshold from the session info. It should be `f + 1`, where `n = 3f + k`, where `k in {1, 2, 3}`, and `n` is the number of validators. -1. Set the various fields of `DataRecoveryParams` based on the validator lists in `session_info` and information about the candidate. +1. Set the various fields of `RecoveryParams` based on the validator lists in `session_info` and information about the candidate. 1. If the `backing_group_index` is `Some`, start in the `RequestFromBackers` phase with a shuffling of the backing group validator indices and a `None` requesting value. 1. Otherwise, start in the `RequestChunks` phase with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty. 1. Set the `to_subsystems` sender to be equal to a clone of the `SubsystemContext`'s sender. @@ -111,7 +111,7 @@ const N_PARALLEL: usize = 50; ``` * Request `AvailabilityStoreMessage::QueryAvailableData`. If it exists, return that. -* If the phase is `DataRecoveryTask::RequestFromBackers` +* If the phase is `RecoveryTask::RequestFromBackers` * Loop: * If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`. * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`. @@ -120,9 +120,9 @@ const N_PARALLEL: usize = 50; * If it concludes with available data, attempt a re-encoding. * If it has the correct erasure-root, break and issue a `Ok(available_data)`. * If it has an incorrect erasure-root, return to beginning. - * If the backer is `None`, set the phase to `DataRecoveryTask::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop. + * If the backer is `None`, set the phase to `RecoveryTask::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop. -* If the phase is `DataRecoveryTask::RequestChunks`: +* If the phase is `RecoveryTask::RequestChunks`: * Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`. * Loop: * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`. From e570a0b805065109e15eb3210be261e0365001b0 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 4 Nov 2021 14:31:05 +0100 Subject: [PATCH 6/8] address review comments --- node/network/availability-recovery/src/lib.rs | 6 +++--- .../src/node/availability/availability-recovery.md | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index bb4226e4a45f..cce9b5029b5f 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -73,7 +73,7 @@ mod tests; const LOG_TARGET: &str = "parachain::availability-recovery"; -// How many parallel requests interaction should have going at once. +// How many parallel recovery tasks should be running at once. const N_PARALLEL: usize = 50; // Size of the LRU cache where we keep recovered data. @@ -161,7 +161,7 @@ struct RecoveryTask { /// The parameters of the recovery process. params: RecoveryParams, - /// The sourcer to obtain the availbility data. + /// The sourcer to obtain the availability data. sourcer: Sourcer, } @@ -740,7 +740,7 @@ async fn handle_signal(state: &mut State, signal: OverseerSignal) -> SubsystemRe } } -/// Machinery around launching interactions into the background. +/// Machinery around launching recovery tasks into the background. async fn launch_recovery_task( state: &mut State, ctx: &mut Context, diff --git a/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 62d89ad85dbf..25fdde2b4500 100644 --- a/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -21,7 +21,7 @@ Output: ## Functionality -We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which taks. A recovery task is a structure encapsulating all interaction with the network necessary to recover the available data in respect to one candidate. +We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which taks. A recovery task is a structure encapsulating all recovery tasks with the network necessary to recover the available data in respect to one candidate. ```rust struct State { @@ -33,7 +33,7 @@ struct State { availability_lru: LruCache>, } -/// This is a future, which concludes either when a response is received from the interaction, +/// This is a future, which concludes either when a response is received from the recovery tasks, /// or all the `awaiting` channels have closed. struct RecoveryHandle { candidate_hash: CandidateHash, @@ -101,9 +101,9 @@ On `Conclude`, shut down the subsystem. 1. Set the `to_subsystems` sender to be equal to a clone of the `SubsystemContext`'s sender. 1. Initialize `received_chunks` to an empty set, as well as `requesting_chunks`. -Launch the sourcer as a background task running `run(interaction)`. +Launch the sourcer as a background task running `run(recovery_task)`. -#### `run(interaction) -> Result` +#### `run(recovery_task) -> Result` ```rust // How many parallel requests to have going at once. From ba5e43cdb7a0d9707ed33569199a4a9262b83db3 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 4 Nov 2021 14:52:59 +0100 Subject: [PATCH 7/8] guide items --- .../node/availability/availability-recovery.md | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 25fdde2b4500..3cd5f3ea2576 100644 --- a/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -21,7 +21,7 @@ Output: ## Functionality -We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which taks. A recovery task is a structure encapsulating all recovery tasks with the network necessary to recover the available data in respect to one candidate. +We hold a state which tracks the currently ongoing recovery tasks, as well as which request IDs correspond to which task. A recovery task is a structure encapsulating all recovery tasks with the network necessary to recover the available data in respect to one candidate. ```rust struct State { @@ -111,7 +111,7 @@ const N_PARALLEL: usize = 50; ``` * Request `AvailabilityStoreMessage::QueryAvailableData`. If it exists, return that. -* If the phase is `RecoveryTask::RequestFromBackers` +* If the task contains `RequestFromBackers` * Loop: * If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`. * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`. @@ -120,14 +120,20 @@ const N_PARALLEL: usize = 50; * If it concludes with available data, attempt a re-encoding. * If it has the correct erasure-root, break and issue a `Ok(available_data)`. * If it has an incorrect erasure-root, return to beginning. - * If the backer is `None`, set the phase to `RecoveryTask::RequestChunks` with a random shuffling of validators and empty `next_shuffling`, `received_chunks`, and `requesting_chunks` and break the loop. + * Send the result to each member of `awaiting`. + * If the backer is `None`, set the sourcer to `RecoveryTask::RequestChunks` with a random shuffling of validators and empty `received_chunks`, and `requesting_chunks` and break the loop. -* If the phase is `RecoveryTask::RequestChunks`: +* If the task contains `RequestChunks`: * Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`. * Loop: * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`. * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, insert into the front of `shuffling` to be retried. - * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`. + * If `received_chunks` has more than `threshold` entries, attempt to recover the data. + * If that fails, return `Err(RecoveryError::Invalid)` + * If correct: + * If re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. + * break and issue `Ok(available_data)` + * Send the result to each member of `awaiting`. * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`, * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`. * Issue a `NetworkBridgeMessage::Requests` and wait for the response in `requesting_chunks`. From 7c5ce14c2fee3182ecc174e3c96622e249e88461 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 4 Nov 2021 14:59:24 +0100 Subject: [PATCH 8/8] sourcer -> source, add `FromValdiators` suffix --- node/network/availability-recovery/src/lib.rs | 36 +++++++++---------- .../availability-recovery/src/tests.rs | 2 +- .../availability/availability-recovery.md | 12 +++---- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index cce9b5029b5f..98785b6d39ea 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -104,13 +104,13 @@ pub struct AvailabilityRecoverySubsystem { metrics: Metrics, } -struct RequestFromBackersSourcer { +struct RequestFromBackers { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. shuffled_backers: Vec, } -struct RequestChunksSourcer { +struct RequestChunksFromValidators { /// How many request have been unsuccessful so far. error_count: usize, /// Total number of responses that have been received. @@ -148,9 +148,9 @@ struct RecoveryParams { /// Source the availability data either by means /// of direct request response protocol to /// backers (a.k.a. fast-path), or recover from chunks. -enum Sourcer { - RequestFromBackers(RequestFromBackersSourcer), - RequestChunks(RequestChunksSourcer), +enum Source { + RequestFromBackers(RequestFromBackers), + RequestChunks(RequestChunksFromValidators), } /// A stateful reconstruction of availability data in reference to @@ -161,15 +161,15 @@ struct RecoveryTask { /// The parameters of the recovery process. params: RecoveryParams, - /// The sourcer to obtain the availability data. - sourcer: Sourcer, + /// The source to obtain the availability data from. + source: Source, } -impl RequestFromBackersSourcer { +impl RequestFromBackers { fn new(mut backers: Vec) -> Self { backers.shuffle(&mut rand::thread_rng()); - RequestFromBackersSourcer { shuffled_backers: backers } + RequestFromBackers { shuffled_backers: backers } } // Run this phase to completion. @@ -245,12 +245,12 @@ impl RequestFromBackersSourcer { } } -impl RequestChunksSourcer { +impl RequestChunksFromValidators { fn new(n_validators: u32) -> Self { let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect(); shuffling.shuffle(&mut rand::thread_rng()); - RequestChunksSourcer { + RequestChunksFromValidators { error_count: 0, total_received_responses: 0, shuffling: shuffling.into(), @@ -617,18 +617,18 @@ impl RecoveryTask { loop { // These only fail if we cannot reach the underlying subsystem, which case there is nothing // meaningful we can do. - match self.sourcer { - Sourcer::RequestFromBackers(ref mut from_backers) => { + match self.source { + Source::RequestFromBackers(ref mut from_backers) => { match from_backers.run(&self.params, &mut self.sender).await { Ok(data) => break Ok(data), Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid), Err(RecoveryError::Unavailable) => - self.sourcer = Sourcer::RequestChunks(RequestChunksSourcer::new( + self.source = Source::RequestChunks(RequestChunksFromValidators::new( self.params.validators.len() as _, )), } }, - Sourcer::RequestChunks(ref mut from_all) => + Source::RequestChunks(ref mut from_all) => break from_all.run(&self.params, &mut self.sender).await, } } @@ -767,12 +767,12 @@ where let phase = backing_group .and_then(|g| session_info.validator_groups.get(g.0 as usize)) - .map(|group| Sourcer::RequestFromBackers(RequestFromBackersSourcer::new(group.clone()))) + .map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone()))) .unwrap_or_else(|| { - Sourcer::RequestChunks(RequestChunksSourcer::new(params.validators.len() as _)) + Source::RequestChunks(RequestChunksFromValidators::new(params.validators.len() as _)) }); - let recovery_task = RecoveryTask { sender: ctx.sender().clone(), params, sourcer: phase }; + let recovery_task = RecoveryTask { sender: ctx.sender().clone(), params, source: phase }; let (remote, remote_handle) = recovery_task.run().remote_handle(); diff --git a/node/network/availability-recovery/src/tests.rs b/node/network/availability-recovery/src/tests.rs index 49e3afb74a93..190507ae4d0e 100644 --- a/node/network/availability-recovery/src/tests.rs +++ b/node/network/availability-recovery/src/tests.rs @@ -1273,7 +1273,7 @@ fn does_not_query_local_validator() { fn parallel_request_calculation_works_as_expected() { let num_validators = 100; let threshold = recovery_threshold(num_validators).unwrap(); - let mut phase = RequestChunksSourcer::new(100); + let mut phase = RequestChunksFromValidators::new(100); assert_eq!(phase.get_desired_request_count(threshold), threshold); phase.error_count = 1; phase.total_received_responses = 1; diff --git a/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/roadmap/implementers-guide/src/node/availability/availability-recovery.md index 3cd5f3ea2576..d7d822188ccb 100644 --- a/roadmap/implementers-guide/src/node/availability/availability-recovery.md +++ b/roadmap/implementers-guide/src/node/availability/availability-recovery.md @@ -59,7 +59,7 @@ enum RecoveryTask { // in which we connect to them and request the chunk. shuffled_backers: Vec, } - RequestChunks { + RequestChunksFromValidators { // a random shuffling of the validators which indicates the order in which we connect to the validators and // request the chunk from them. shuffling: Vec, @@ -71,7 +71,7 @@ enum RecoveryTask { struct RecoveryTask { to_subsystems: SubsystemSender, params: RecoveryTaskParams, - sourcer: Sourcer, + source: Source, } ``` @@ -97,11 +97,11 @@ On `Conclude`, shut down the subsystem. 1. Compute the threshold from the session info. It should be `f + 1`, where `n = 3f + k`, where `k in {1, 2, 3}`, and `n` is the number of validators. 1. Set the various fields of `RecoveryParams` based on the validator lists in `session_info` and information about the candidate. 1. If the `backing_group_index` is `Some`, start in the `RequestFromBackers` phase with a shuffling of the backing group validator indices and a `None` requesting value. -1. Otherwise, start in the `RequestChunks` phase with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty. +1. Otherwise, start in the `RequestChunksFromValidators` source with `received_chunks`,`requesting_chunks`, and `next_shuffling` all empty. 1. Set the `to_subsystems` sender to be equal to a clone of the `SubsystemContext`'s sender. 1. Initialize `received_chunks` to an empty set, as well as `requesting_chunks`. -Launch the sourcer as a background task running `run(recovery_task)`. +Launch the source as a background task running `run(recovery_task)`. #### `run(recovery_task) -> Result` @@ -121,9 +121,9 @@ const N_PARALLEL: usize = 50; * If it has the correct erasure-root, break and issue a `Ok(available_data)`. * If it has an incorrect erasure-root, return to beginning. * Send the result to each member of `awaiting`. - * If the backer is `None`, set the sourcer to `RecoveryTask::RequestChunks` with a random shuffling of validators and empty `received_chunks`, and `requesting_chunks` and break the loop. + * If the backer is `None`, set the source to `RequestChunksFromValidators` with a random shuffling of validators and empty `received_chunks`, and `requesting_chunks` and break the loop. -* If the task contains `RequestChunks`: +* If the task contains `RequestChunksFromValidators`: * Request `AvailabilityStoreMessage::QueryAllChunks`. For each chunk that exists, add it to `received_chunks` and remote the validator from `shuffling`. * Loop: * If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`.