Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/mapping-sync/src/kv/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct BestBlockInfo<Block: BlockT> {
/// The block number (for pruning purposes).
pub block_number: <Block::Header as HeaderT>::Number,
/// Reorg info if this block became best as part of a reorganization.
pub reorg_info: Option<ReorgInfo<Block>>,
pub reorg_info: Option<Arc<ReorgInfo<Block>>>,
}

pub struct MappingSyncWorker<Block: BlockT, C, BE> {
Expand Down Expand Up @@ -139,7 +139,7 @@ where
if notification.is_new_best {
// For notification: include new_best_hash per Ethereum spec.
let reorg_info = notification.tree_route.as_ref().map(|tree_route| {
ReorgInfo::from_tree_route(tree_route, notification.hash)
Arc::new(ReorgInfo::from_tree_route(tree_route, notification.hash))
});
self.best_at_import.insert(
notification.hash,
Expand Down
32 changes: 30 additions & 2 deletions client/mapping-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub mod sql;
use sp_blockchain::TreeRoute;
use sp_consensus::SyncOracle;
use sp_runtime::traits::Block as BlockT;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};

#[derive(Copy, Clone, Eq, PartialEq)]
pub enum SyncStrategy {
Expand All @@ -36,6 +40,18 @@ pub enum SyncStrategy {
pub type EthereumBlockNotificationSinks<T> =
parking_lot::Mutex<Vec<sc_utils::mpsc::TracingUnboundedSender<T>>>;

/// Default hard cap for pending notifications per subscriber channel.
/// Subscribers above this threshold are considered lagging and are dropped.
const DEFAULT_MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER: usize = 512;

static MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER: AtomicUsize =
AtomicUsize::new(DEFAULT_MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER);

/// Configure the hard cap for pending notifications per subscriber channel.
pub fn set_max_pending_notifications_per_subscriber(max_pending: usize) {
MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER.store(max_pending.max(1), Ordering::Relaxed);
}

/// Information about a chain reorganization.
///
/// When a reorg occurs, this struct contains the blocks that were removed from
Expand Down Expand Up @@ -87,7 +103,7 @@ pub struct EthereumBlockNotification<Block: BlockT> {
pub is_new_best: bool,
pub hash: Block::Hash,
/// Optional reorg information. Present when this block became best as part of a reorg.
pub reorg_info: Option<ReorgInfo<Block>>,
pub reorg_info: Option<Arc<ReorgInfo<Block>>>,
}

/// Context for emitting block notifications.
Expand All @@ -99,7 +115,7 @@ pub struct BlockNotificationContext<Block: BlockT> {
/// Whether this block is the new best block.
pub is_new_best: bool,
/// Optional reorg information if this block became best as part of a reorg.
pub reorg_info: Option<ReorgInfo<Block>>,
pub reorg_info: Option<Arc<ReorgInfo<Block>>>,
}

/// Emit block notification to all registered sinks.
Expand All @@ -124,6 +140,18 @@ pub fn emit_block_notification<Block: BlockT>(
}

sinks.retain(|sink| {
let max_pending = MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER.load(Ordering::Relaxed);
if sink.len() >= max_pending {
log::debug!(
target: "mapping-sync",
"Dropping lagging pubsub subscriber (pending={}, max={})",
sink.len(),
max_pending,
);
let _ = sink.close();
return false;
}

sink.unbounded_send(EthereumBlockNotification {
is_new_best: context.is_new_best,
hash: context.hash,
Expand Down
4 changes: 2 additions & 2 deletions client/mapping-sync/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub enum WorkerCommand<Block: BlockT<Hash = H256>> {
block_hash: H256,
/// Whether this block was the new best at import time.
is_new_best: bool,
reorg_info: Option<ReorgInfo<Block>>,
reorg_info: Option<Arc<ReorgInfo<Block>>>,
},
/// Canonicalize the enacted and retracted blocks reported via import notifications.
Canonicalize {
Expand Down Expand Up @@ -251,7 +251,7 @@ where
"🔀 Re-org happened at new best {}, proceeding to canonicalize db",
notification.hash
);
let info = ReorgInfo::from_tree_route(tree_route, notification.hash);
let info = Arc::new(ReorgInfo::from_tree_route(tree_route, notification.hash));
// Note: new_best is handled separately by IndexBestBlock.
tx.send(WorkerCommand::Canonicalize {
common: info.common_ancestor,
Expand Down
107 changes: 52 additions & 55 deletions client/rpc/src/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::{marker::PhantomData, sync::Arc};

use ethereum::TransactionV3 as EthereumTransaction;
use futures::{future, FutureExt as _, StreamExt as _};
use futures::{future, stream::BoxStream, FutureExt as _, StreamExt as _};
use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink};
use log::debug;
// Substrate
Expand Down Expand Up @@ -119,21 +119,43 @@ where
}
}

/// Get headers for enacted blocks during a reorg, including the new best block.
///
/// Per Ethereum spec (https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads):
/// "When a chain reorganization occurs, this subscription will emit an event
/// containing all new headers (blocks) for the new chain. This means that you
/// may see multiple headers emitted with the same height (block number)."
///
/// Returns headers in ascending order (oldest first), with `new_best` last.
fn get_reorg_headers(&self, enacted: &[B::Hash], new_best: B::Hash) -> Vec<PubSubResult> {
enacted
.iter()
.chain(std::iter::once(&new_best))
.filter_map(|hash| self.storage_override.current_block(*hash))
.map(PubSubResult::header)
.collect()
/// Convert a block notification into a stream of `newHeads` items.
/// For reorgs this emits enacted headers followed by the new best block.
fn new_heads_from_notification(
&self,
notification: EthereumBlockNotification<B>,
) -> BoxStream<'static, PubSubResult> {
if !notification.is_new_best {
return futures::stream::empty().boxed();
}

if let Some(reorg_info) = notification.reorg_info {
debug!(
target: "eth-pubsub",
"Reorg detected: new_best={:?}, {} blocks retracted, {} blocks enacted",
reorg_info.new_best,
reorg_info.retracted.len(),
reorg_info.enacted.len()
);

let pubsub = self.clone();
let enacted = reorg_info.enacted.clone();
let new_best = reorg_info.new_best;
return futures::stream::iter(
enacted
.into_iter()
.chain(std::iter::once(new_best))
.filter_map(move |hash| pubsub.storage_override.current_block(hash))
.map(PubSubResult::header),
)
.boxed();
}

let maybe_header = self
.storage_override
.current_block(notification.hash)
.map(PubSubResult::header);
futures::stream::iter(maybe_header).boxed()
}

fn notify_logs(
Expand Down Expand Up @@ -261,43 +283,10 @@ where
// Per Ethereum spec, when a reorg occurs, we must emit all headers
// for the new canonical chain. The reorg_info field in the notification
// contains the enacted blocks when a reorg occurred.
let stream = block_notification_stream.filter_map(move |notification| {
if !notification.is_new_best {
return future::ready(None);
}

// Check if this block came from a reorg
let headers = if let Some(ref reorg_info) = notification.reorg_info {
debug!(
target: "eth-pubsub",
"Reorg detected: new_best={:?}, {} blocks retracted, {} blocks enacted",
reorg_info.new_best,
reorg_info.retracted.len(),
reorg_info.enacted.len()
);
// Emit all enacted blocks followed by the new best block
pubsub.get_reorg_headers(&reorg_info.enacted, reorg_info.new_best)
} else {
// Normal case: just emit the new block
if let Some(block) =
pubsub.storage_override.current_block(notification.hash)
{
vec![PubSubResult::header(block)]
} else {
return future::ready(None);
}
};

if headers.is_empty() {
return future::ready(None);
}

future::ready(Some(headers))
let flat_stream = block_notification_stream.flat_map(move |notification| {
pubsub.new_heads_from_notification(notification)
});

// Flatten the Vec<PubSubResult> into individual PubSubResult items
let flat_stream = stream.flat_map(futures::stream::iter);

PendingSubscription::from(pending)
.pipe_from_stream(flat_stream, BoundedVecDeque::new(16))
.await
Expand Down Expand Up @@ -330,9 +319,13 @@ where
// in case of reorg, the first event is emitted right away.
let syncing_status = pubsub.syncing_status().await;
let subscription = Subscription::from(sink);
let _ = subscription
if subscription
.send(&PubSubResult::SyncingStatus(syncing_status))
.await;
.await
.is_err()
{
return;
}

// When the node is not under a major syncing (i.e. from genesis), react
// normally to import notifications.
Expand All @@ -344,9 +337,13 @@ where
let syncing_status = pubsub.sync.is_major_syncing();
if syncing_status != last_syncing_status {
let syncing_status = pubsub.syncing_status().await;
let _ = subscription
if subscription
.send(&PubSubResult::SyncingStatus(syncing_status))
.await;
.await
.is_err()
{
break;
}
}
last_syncing_status = syncing_status;
}
Expand Down
4 changes: 4 additions & 0 deletions template/node/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ pub struct EthConfiguration {
/// Default value is 200MB.
#[arg(long, default_value = "209715200")]
pub frontier_sql_backend_cache_size: u64,

/// Maximum pending pubsub notifications per subscriber before it is dropped.
#[arg(long, default_value = "512")]
pub pubsub_max_pending_notifications: usize,
}

pub struct FrontierPartialComponents {
Expand Down
3 changes: 3 additions & 0 deletions template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ where
// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
// The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel.
// This way we avoid race conditions when using native substrate block import notification stream.
fc_mapping_sync::set_max_pending_notifications_per_subscriber(
eth_config.pubsub_max_pending_notifications,
);
let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
fc_mapping_sync::EthereumBlockNotification<B>,
> = Default::default();
Expand Down
Loading