Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 132 additions & 19 deletions node/core/dispute-coordinator/src/real/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

//! Dispute coordinator subsystem in initialized state (after first active leaf is received).

use std::{collections::HashSet, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};

use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use lru::LruCache;

use sc_keystore::LocalKeystore;

Expand All @@ -37,7 +41,7 @@ use polkadot_node_subsystem::{
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
};
use polkadot_primitives::{
v1::{
Expand All @@ -48,11 +52,12 @@ use polkadot_primitives::{
v2::SessionInfo,
};

use crate::{metrics::Metrics, real::DisputeCoordinatorSubsystem, LOG_TARGET};

use crate::{
error::{log_error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
metrics::Metrics,
real::DisputeCoordinatorSubsystem,
status::{get_active_with_status, Clock, DisputeStatus, Timestamp},
LOG_TARGET,
};

use super::{
Expand All @@ -66,6 +71,9 @@ use super::{
OverlayedBackend,
};

const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 100;
const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 20;

/// After the first active leaves update we transition to `Initialized` state.
///
/// Before the first active leaves update we can't really do much. We cannot check incoming
Expand All @@ -80,6 +88,11 @@ pub struct Initialized {
ordering_provider: OrderingProvider,
participation_receiver: WorkerMessageReceiver,
metrics: Metrics,
// This tracks only rolling session window failures.
// It can be a `Vec` if the need to track more arises.
error: Option<SessionsUnavailable>,
/// Latest relay blocks that have been successfully scraped.
last_scraped_blocks: LruCache<Hash, ()>,
}

impl Initialized {
Expand All @@ -105,6 +118,8 @@ impl Initialized {
participation,
participation_receiver,
metrics,
error: None,
last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY),
}
}

Expand Down Expand Up @@ -245,22 +260,26 @@ impl Initialized {
.await?;
self.participation.process_active_leaves_update(ctx, &update).await?;

let new_activations = update.activated.into_iter().map(|a| a.hash);
for new_leaf in new_activations {
match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await {
if let Some(new_leaf) = update.activated {
match self
.rolling_session_window
.cache_session_info_for_head(ctx, new_leaf.hash)
.await
{
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
);
continue
self.error = Some(e);
},
Ok(SessionWindowUpdate::Advanced {
new_window_end: window_end,
new_window_start,
..
}) => {
self.error = None;
let session = window_end;
if self.highest_session < session {
tracing::trace!(
Expand All @@ -277,7 +296,57 @@ impl Initialized {
},
Ok(SessionWindowUpdate::Unchanged) => {},
};
self.scrape_on_chain_votes(ctx, overlay_db, new_leaf, now).await?;

// Scrape the head if above rolling session update went well.
if self.error.is_none() {
let _ = self
.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now)
.await
.map_err(|err| {
tracing::warn!(
target: LOG_TARGET,
"Skipping scraping block #{}({}) due to error: {}",
new_leaf.number,
new_leaf.hash,
err
);
});
}

// Try to scrape any blocks for which we could not get the current session or did not receive an
// active leaves update.
let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS);
let ancestors = OrderingProvider::get_block_ancestors(
ctx.sender(),
new_leaf.hash,
new_leaf.number,
target_block,
&mut self.last_scraped_blocks,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
"Skipping leaf ancestors scraping due to error: {}",
err
);
Vec::new()
});

// We could do this in parallel, but we don't want to overindex on the wasm instances
// usage.
for ancestor in ancestors {
let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err(
|err| {
tracing::warn!(
target: LOG_TARGET,
"Skipping scraping block {} due to error: {}",
ancestor,
err
);
},
);
}
}

Ok(())
Expand All @@ -293,6 +362,11 @@ impl Initialized {
new_leaf: Hash,
now: u64,
) -> Result<()> {
// Avoid scraping twice.
if self.last_scraped_blocks.get(&new_leaf).is_some() {
return Ok(())
}

// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
Expand Down Expand Up @@ -331,6 +405,9 @@ impl Initialized {
};

if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
// This block is not interesting as it doesnt contain any backing votes or disputes. We'll
// mark it here as scraped to prevent further processing.
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -413,6 +490,7 @@ impl Initialized {
}

if disputes.is_empty() {
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -490,6 +568,8 @@ impl Initialized {
"Attempted import of on-chain statement of concluded dispute failed"),
}
}

self.last_scraped_blocks.put(new_leaf, ());
Ok(())
}

Expand Down Expand Up @@ -533,18 +613,39 @@ impl Initialized {
}
},
DisputeCoordinatorMessage::RecentDisputes(tx) => {
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
// Return error if session information is missing.
self.ensure_available_session_info()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};

let _ = tx.send(recent_disputes.keys().cloned().collect());
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
let recent_disputes =
overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter();
let _ =
tx.send(get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect());
// Return error if session information is missing.
self.ensure_available_session_info()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.collect(),
);
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
// Return error if session information is missing.
self.ensure_available_session_info()?;

let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
for (session_index, candidate_hash) in query {
if let Some(v) =
overlay_db.load_candidate_votes(session_index, &candidate_hash)?
{
Expand Down Expand Up @@ -581,6 +682,9 @@ impl Initialized {
block_descriptions,
tx,
} => {
// Return error if session information is missing.
self.ensure_available_session_info()?;

let undisputed_chain = determine_undisputed_chain(
overlay_db,
base_number,
Expand All @@ -595,6 +699,15 @@ impl Initialized {
Ok(Box::new(|| Ok(())))
}

// Helper function for checking subsystem errors in message processing.
fn ensure_available_session_info(&self) -> Result<()> {
if let Some(subsystem_error) = self.error.clone() {
return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error)))
}

Ok(())
}

async fn handle_import_statements(
&mut self,
ctx: &mut impl SubsystemContext,
Expand Down
13 changes: 12 additions & 1 deletion node/core/dispute-coordinator/src/real/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
println!("|-------------------------|");
let future = async {
let backend = DbBackend::new(self.store.clone(), self.config.column_config());
self.run(ctx, backend, Box::new(SystemClock))
Expand All @@ -144,6 +145,8 @@ impl DisputeCoordinatorSubsystem {
keystore: Arc<LocalKeystore>,
metrics: Metrics,
) -> Self {
println!("*** |-------------------------|");

Self { store, config, keystore, metrics }
}

Expand All @@ -159,7 +162,15 @@ impl DisputeCoordinatorSubsystem {
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
B: Backend + 'static,
{
let res = self.initialize(&mut ctx, backend, &*clock).await?;
println!("|-----------run-----------|");
let res = match self.initialize(&mut ctx, backend, &*clock).await {
Ok(res) => res,
Err(err) => {
println!("|-----------{:?}-----------|", err);
return Err(err)
},
};
println!("|-------initialized-------|");

let (participations, first_leaf, initialized, backend) = match res {
// Concluded:
Expand Down
Loading