Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
244d111
Improve "latest" block resolution on pruned nodes
arturgontijo Mar 7, 2026
490a555
Use config.state_pruning as state_pruning_blocks in node template
arturgontijo Mar 7, 2026
70cafb4
Coderabbit: better logs + use finalized block
arturgontijo Mar 9, 2026
1d82875
Retain in-window tips when skipping pruned blocks
arturgontijo Mar 9, 2026
38b7abd
Skip target lookup failure explicitly
arturgontijo Mar 9, 2026
d0682bd
Backfill BLOCK_NUMBER_MAPPING for the full live window
arturgontijo Mar 9, 2026
542407c
Add max_blocks to backfill_number_mappings()
arturgontijo Mar 9, 2026
9b0a7c2
Add pruning_skip_retains_in_window_tips() test
arturgontijo Mar 9, 2026
320b632
Use find_post_log() + test-pruning-skip.ts
arturgontijo Mar 9, 2026
523b3e9
Make latest_block_hash() read-only + rpc fast path
arturgontijo Mar 11, 2026
d054b4d
Validate eth hash from digest before writing BLOCK_MAPPING
arturgontijo Mar 11, 2026
9200578
Repair missing TRANSACTION_MAPPING when state becomes readable
arturgontijo Mar 11, 2026
a6aab75
Add integration test for block and tx consistency after a reorg
arturgontijo Mar 12, 2026
1e562a2
Check all txns when deciding tx mapping repair
arturgontijo Mar 12, 2026
911345c
Restore a background cursor-based sweep
arturgontijo Mar 12, 2026
bacee58
Remove unnecessary comment/report
arturgontijo Mar 12, 2026
17bcb64
coderabbit review
arturgontijo Mar 12, 2026
53a88f5
coderabbit: propagate client.hash() failures
arturgontijo Mar 12, 2026
ecf10e1
coderabbit: no more ok().flatten()
arturgontijo Mar 12, 2026
53f82ef
Remove unnecesasry reconcile #0..#0
arturgontijo Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use sc_client_api::backend::{Backend, StorageProvider};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_blockchain::{Backend as _, HeaderBackend};
use sp_consensus::SyncOracle;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, SaturatedConversion, UniqueSaturatedInto, Zero,
};
// Frontier
use fc_storage::StorageOverride;
use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
Expand All @@ -44,14 +46,27 @@ use worker::BestBlockInfo;

pub const CANONICAL_NUMBER_REPAIR_BATCH_SIZE: u64 = 2048;

/// Sync a single block's Ethereum mapping from its consensus digest into the Frontier DB.
pub fn sync_block<Block: BlockT, C: HeaderBackend<Block>>(
client: &C,
storage_override: Arc<dyn StorageOverride<Block>>,
backend: &fc_db::kv::Backend<Block, C>,
header: &Block::Header,
) -> Result<(), String> {
let substrate_block_hash = header.hash();
let block_number: u64 = (*header.number()).unique_saturated_into();
let number_mapping_write = fc_db::kv::NumberMappingWrite::Skip;

// Write BLOCK_NUMBER_MAPPING when this block is canonical at this number, so
// latest_block_hash() / indexed_canonical_hash_at() find it during catch-up.
// Uses only HeaderBackend::hash() — no state access, pruning-safe.
// If the hash lookup fails or returns None, use Skip so we don't abort the
// sync (best-effort; avoids stalling on intermittent or edge-condition failures).
let canonical_hash_at_number = client.hash(*header.number()).ok().flatten();
let number_mapping_write = if canonical_hash_at_number == Some(substrate_block_hash) {
fc_db::kv::NumberMappingWrite::Write
} else {
fc_db::kv::NumberMappingWrite::Skip
};

match fp_consensus::find_log(header.digest()) {
Ok(log) => {
Expand Down Expand Up @@ -113,7 +128,16 @@ pub fn sync_block<Block: BlockT, C: HeaderBackend<Block>>(
)
}
}
None => backend.mapping().write_none(substrate_block_hash),
None => {
// State is unavailable — likely pruned.
log::warn!(
target: "mapping-sync",
"State unavailable for block #{block_number} ({substrate_block_hash:?}); \
marking as synced with no ethereum mapping. \
This may indicate the pruning window is too narrow.",
);
backend.mapping().write_none(substrate_block_hash)
}
}
}
},
Expand Down Expand Up @@ -207,6 +231,7 @@ pub fn sync_one_block<Block: BlockT, C, BE>(
storage_override: Arc<dyn StorageOverride<Block>>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
sync_from: <Block::Header as HeaderT>::Number,
state_pruning_blocks: Option<u64>,
strategy: SyncStrategy,
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
pubsub_notification_sinks: Arc<
Expand Down Expand Up @@ -267,7 +292,82 @@ where
{
return Ok(false);
}

// On pruned nodes: if this block is behind the live state window
// (finalized_number - pruning_blocks), jump the syncing tip forward
// to the oldest block we can still access state for, retaining any
// other queued tips that are already within the live window.
// This prevents the worker stalling permanently on blocks whose
// state has been pruned.
if let Some(pruning_blocks) = state_pruning_blocks {
let finalized_number_u64: u64 = client.info().finalized_number.unique_saturated_into();
let live_window_start_u64 = finalized_number_u64.saturating_sub(pruning_blocks);
let sync_from_u64: u64 = sync_from.unique_saturated_into();
let skip_to_u64 = live_window_start_u64.max(sync_from_u64);
let current_number_u64: u64 = (*operating_header.number()).unique_saturated_into();

if current_number_u64 < skip_to_u64 {
let skip_to_number =
skip_to_u64.saturated_into::<<Block::Header as HeaderT>::Number>();
match client.hash(skip_to_number) {
Ok(Some(skip_hash)) => {
log::warn!(
target: "mapping-sync",
"Pruned node: skipping blocks #{}..#{} (outside live state window), \
jumping tip to #{}",
current_number_u64,
skip_to_u64.saturating_sub(1),
skip_to_u64,
);
// Retain any tips still within the live window rather than
// discarding them — they may be unsynced fork branches that
// need indexing. Replace only the out-of-window tip with
// the skip target.
current_syncing_tips.retain(|tip| {
substrate_backend
.blockchain()
.header(*tip)
.ok()
.flatten()
.map(|h| {
let n: u64 = (*h.number()).unique_saturated_into();
n >= skip_to_u64
})
.unwrap_or(false)
});
current_syncing_tips.push(skip_hash);
frontier_backend
.meta()
.write_current_syncing_tips(current_syncing_tips)?;
return Ok(true);
}
Ok(None) => {
// Target block not yet known to the client (e.g. node still
// syncing headers). Return false to back off and retry later
// rather than falling through to sync a pruned block.
frontier_backend
.meta()
.write_current_syncing_tips(current_syncing_tips)?;
return Ok(false);
}
Err(e) => {
// Transient client error. Back off and retry rather than
// falling through to sync a pruned block.
log::warn!(
target: "mapping-sync",
"Pruned node: failed to resolve skip target #{skip_to_u64}: {e:?}; will retry.",
);
frontier_backend
.meta()
.write_current_syncing_tips(current_syncing_tips)?;
return Ok(false);
}
}
}
}

sync_block(
client,
storage_override.clone(),
frontier_backend,
&operating_header,
Expand Down Expand Up @@ -323,6 +423,7 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
frontier_backend: &fc_db::kv::Backend<Block, C>,
limit: usize,
sync_from: <Block::Header as HeaderT>::Number,
state_pruning_blocks: Option<u64>,
strategy: SyncStrategy,
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
pubsub_notification_sinks: Arc<
Expand All @@ -346,6 +447,7 @@ where
storage_override.clone(),
frontier_backend,
sync_from,
state_pruning_blocks,
strategy,
sync_oracle.clone(),
pubsub_notification_sinks.clone(),
Expand Down
8 changes: 8 additions & 0 deletions client/mapping-sync/src/kv/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub struct MappingSyncWorker<Block: BlockT, C, BE> {
have_next: bool,
retry_times: usize,
sync_from: <Block::Header as HeaderT>::Number,
/// If set, blocks older than (best - state_pruning_blocks) are skipped during catch-up
/// so the sync tip does not get stuck behind pruned state.
state_pruning_blocks: Option<u64>,
strategy: SyncStrategy,

sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
Expand All @@ -86,6 +89,7 @@ impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {
frontier_backend: Arc<fc_db::kv::Backend<Block, C>>,
retry_times: usize,
sync_from: <Block::Header as HeaderT>::Number,
state_pruning_blocks: Option<u64>,
strategy: SyncStrategy,
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
pubsub_notification_sinks: Arc<
Expand All @@ -105,6 +109,7 @@ impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {
have_next: true,
retry_times,
sync_from,
state_pruning_blocks,
strategy,

sync_oracle,
Expand Down Expand Up @@ -183,6 +188,7 @@ where
self.frontier_backend.as_ref(),
self.retry_times,
self.sync_from,
self.state_pruning_blocks,
self.strategy,
self.sync_oracle.clone(),
self.pubsub_notification_sinks.clone(),
Expand Down Expand Up @@ -346,6 +352,7 @@ mod tests {
frontier_backend,
3,
0,
None,
SyncStrategy::Normal,
Arc::new(test_sync_oracle),
pubsub_notification_sinks_inner,
Expand Down Expand Up @@ -493,6 +500,7 @@ mod tests {
frontier_backend,
3,
0,
None,
SyncStrategy::Normal,
Arc::new(test_sync_oracle),
pubsub_notification_sinks_inner,
Expand Down
34 changes: 21 additions & 13 deletions client/mapping-sync/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,19 +1084,27 @@ mod test {
futures_timer::Delay::new(Duration::from_millis(100)).await;
}

// Test the reorged chain is correctly indexed.
let res = sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
.fetch_all(&pool)
.await
.expect("test query result")
.iter()
.map(|row| {
let substrate_block_hash = H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]);
let is_canon = row.get::<i32, _>(1);
let block_number = row.get::<i32, _>(2);
(substrate_block_hash, is_canon, block_number)
})
.collect::<Vec<(H256, i32, i32)>>();
// Wait for the indexer to process all 20 blocks (async worker may lag).
let timeout = std::time::Instant::now() + Duration::from_secs(5);
let res = loop {
let rows =
sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
.fetch_all(&pool)
.await
.expect("test query result")
.iter()
.map(|row| {
let substrate_block_hash = H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]);
let is_canon = row.get::<i32, _>(1);
let block_number = row.get::<i32, _>(2);
(substrate_block_hash, is_canon, block_number)
})
.collect::<Vec<(H256, i32, i32)>>();
if rows.len() == 20 || std::time::Instant::now() >= timeout {
break rows;
}
futures_timer::Delay::new(Duration::from_millis(50)).await;
};

// 20 blocks in total
assert_eq!(res.len(), 20);
Expand Down
2 changes: 2 additions & 0 deletions template/node/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub async fn spawn_frontier_tasks<B, RA, HF>(
storage_override: Arc<dyn StorageOverride<B>>,
fee_history_cache: FeeHistoryCache,
fee_history_cache_limit: FeeHistoryCacheLimit,
state_pruning_blocks: Option<u64>,
sync: Arc<SyncingService<B>>,
pubsub_notification_sinks: Arc<
fc_mapping_sync::EthereumBlockNotificationSinks<
Expand Down Expand Up @@ -174,6 +175,7 @@ pub async fn spawn_frontier_tasks<B, RA, HF>(
b.clone(),
3,
0u32.into(),
state_pruning_blocks,
fc_mapping_sync::SyncStrategy::Normal,
sync,
pubsub_notification_sinks,
Expand Down
11 changes: 11 additions & 0 deletions template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,16 @@ where
})
};

// Derive state_pruning_blocks from the node's --state-pruning so the mapping-sync
// worker can skip past pruned blocks during catch-up (KV backend only).
let state_pruning_blocks = config.state_pruning.as_ref().and_then(|mode| {
if let sc_service::PruningMode::Constrained(c) = mode {
c.max_blocks.map(u64::from)
} else {
None
}
});

let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
config,
client: client.clone(),
Expand All @@ -526,6 +536,7 @@ where
storage_override,
fee_history_cache,
fee_history_cache_limit,
state_pruning_blocks,
sync_service.clone(),
pubsub_notification_sinks,
)
Expand Down
Loading