Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 97 additions & 20 deletions client/db/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const DB_HASH_LEN: usize = 32;
/// Hash type that this backend uses for the database.
pub type DbHash = [u8; DB_HASH_LEN];

/// Maximum number of blocks to walk back when searching for an indexed canonical block.
/// This limits the search depth when the cached `LATEST_CANONICAL_INDEXED_BLOCK` is stale
/// (e.g., after a reorg or if it points to an unindexed block).
const MAX_WALKBACK_DEPTH: u64 = 16;

/// Database settings.
pub struct DatabaseSettings {
/// Where to find the database.
Expand All @@ -60,6 +65,7 @@ pub(crate) mod columns {

pub mod static_keys {
pub const CURRENT_SYNCING_TIPS: &[u8] = b"CURRENT_SYNCING_TIPS";
pub const LATEST_CANONICAL_INDEXED_BLOCK: &[u8] = b"LATEST_CANONICAL_INDEXED_BLOCK";
}

#[derive(Clone)]
Expand Down Expand Up @@ -101,37 +107,42 @@ impl<Block: BlockT, C: HeaderBackend<Block>> fc_api::Backend<Block> for Backend<

async fn latest_block_hash(&self) -> Result<Block::Hash, String> {
// Return the latest block hash that is both indexed AND on the canonical chain.
// This prevents returning stale data during reorgs.
// The canonical indexed block is tracked by mapping-sync when blocks are synced.
//
// Note: During initial sync or after restart while mapping-sync catches up,
// this returns the genesis block hash. This is consistent with Geth's behavior
// where eth_getBlockByNumber("latest") returns block 0 during initial sync.
// Users can check sync status via eth_syncing to determine if the node is
// still catching up.
let best_number: u64 = self.client.info().best_number.unique_saturated_into();

// Get the canonical hash for verification.
let canonical_hash = self
.client
.hash(best_number.unique_saturated_into())
.map_err(|e| format!("{e:?}"))?;

// Query mapping-sync for the ethereum block hash at best_number
if let Some(eth_hash) = self.mapping.block_hash_by_number(best_number)? {
// Get the substrate block hash(es) for this ethereum block hash
if let Some(substrate_hashes) = self.mapping.block_hash(&eth_hash)? {
// Verify the mapped hash is on the canonical chain.
// During a reorg, the mapping may point to a reorged-out block.
if let Some(canonical) = canonical_hash {
if substrate_hashes.contains(&canonical) {
return Ok(canonical);
}
let (block_number, should_persist_on_hit) =
match self.mapping.latest_canonical_indexed_block_number()? {
Some(cached) => {
let clamped = cached.min(best_number);
(clamped, clamped != cached)
}
// Mapping exists but is stale (reorg happened) - treat as not indexed
None => (best_number, true),
};

if let Some(canonical_hash) = self.indexed_canonical_hash_at(block_number)? {
if should_persist_on_hit {
self.mapping
.set_latest_canonical_indexed_block(block_number)?;
}
return Ok(canonical_hash);
}

// Cached canonical block is stale (reorg happened), or meta key was absent
// and best block is not indexed yet. Walk back to the latest indexed
// canonical block and persist the recovered pointer.
if let Some((recovered_number, recovered_hash)) =
self.find_latest_indexed_canonical_block(block_number.saturating_sub(1))?
{
self.mapping
.set_latest_canonical_indexed_block(recovered_number)?;
return Ok(recovered_hash);
}

// Block not indexed yet or stale - return genesis
Ok(self.client.info().genesis_hash)
}
}
Expand Down Expand Up @@ -219,6 +230,48 @@ impl<Block: BlockT, C: HeaderBackend<Block>> Backend<Block, C> {
pub fn meta(&self) -> &Arc<MetaDb<Block>> {
&self.meta
}

/// Returns the canonical hash at `block_number` if it is indexed.
fn indexed_canonical_hash_at(&self, block_number: u64) -> Result<Option<Block::Hash>, String> {
let Some(eth_hash) = self.mapping.block_hash_by_number(block_number)? else {
return Ok(None);
};

let Some(substrate_hashes) = self.mapping.block_hash(&eth_hash)? else {
return Ok(None);
};

let Some(canonical_hash) = self
.client
.hash(block_number.unique_saturated_into())
.map_err(|e| format!("{e:?}"))?
else {
return Ok(None);
};

if substrate_hashes.contains(&canonical_hash) {
return Ok(Some(canonical_hash));
}

Ok(None)
}

/// Finds the latest indexed block that is on the canonical chain by walking
/// backwards from `start_block`. Returns `None` if no indexed canonical block
/// is found within `MAX_WALKBACK_DEPTH` blocks.
fn find_latest_indexed_canonical_block(
&self,
start_block: u64,
) -> Result<Option<(u64, Block::Hash)>, String> {
let min_block = start_block.saturating_sub(MAX_WALKBACK_DEPTH);
for block_number in (min_block..=start_block).rev() {
if let Some(canonical_hash) = self.indexed_canonical_hash_at(block_number)? {
return Ok(Some((block_number, canonical_hash)));
}
}

Ok(None)
}
}

pub struct MetaDb<Block> {
Expand Down Expand Up @@ -425,4 +478,28 @@ impl<Block: BlockT> MappingDb<Block> {
None => Ok(None),
}
}

/// Returns the latest canonical indexed block number, or None if not set.
pub fn latest_canonical_indexed_block_number(&self) -> Result<Option<u64>, String> {
match self
.db
.get(columns::META, static_keys::LATEST_CANONICAL_INDEXED_BLOCK)
{
Some(raw) => Ok(Some(
u64::decode(&mut &raw[..]).map_err(|e| format!("{e:?}"))?,
)),
None => Ok(None),
}
}

/// Sets the latest canonical indexed block number.
pub fn set_latest_canonical_indexed_block(&self, block_number: u64) -> Result<(), String> {
let mut transaction = sp_database::Transaction::new();
transaction.set(
columns::META,
static_keys::LATEST_CANONICAL_INDEXED_BLOCK,
&block_number.encode(),
);
self.db.commit(transaction).map_err(|e| e.to_string())
}
}
10 changes: 10 additions & 0 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ where
let is_new_best = best_info.is_some() || client.info().best_hash == hash;
let reorg_info = best_info.and_then(|info| info.reorg_info);

// Update the latest canonical indexed block when this block is the new best.
// This is the authoritative place to track canonical blocks since we know
// at sync time whether the block is on the canonical chain.
if is_new_best {
let block_number: u64 = (*operating_header.number()).unique_saturated_into();
frontier_backend
.mapping()
.set_latest_canonical_indexed_block(block_number)?;
}

emit_block_notification(
pubsub_notification_sinks.as_ref(),
sync_oracle.as_ref(),
Expand Down
4 changes: 2 additions & 2 deletions client/rpc-core/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ pub trait EthApi {

/// Returns block author.
#[method(name = "eth_coinbase")]
fn author(&self) -> RpcResult<H160>;
async fn author(&self) -> RpcResult<H160>;

/// Returns accounts list.
#[method(name = "eth_accounts")]
fn accounts(&self) -> RpcResult<Vec<H160>>;

/// Returns highest block number.
#[method(name = "eth_blockNumber")]
fn block_number(&self) -> RpcResult<U256>;
async fn block_number(&self) -> RpcResult<U256>;

/// Returns the chain ID used for transaction signing at the
/// current best block. None is returned if not
Expand Down
31 changes: 25 additions & 6 deletions client/rpc/src/eth/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,15 @@ where
}
}

pub fn author(&self) -> RpcResult<H160> {
let hash = self.client.info().best_hash;
pub async fn author(&self) -> RpcResult<H160> {
// Use the latest indexed block hash to ensure consistency with other RPCs.
// This avoids returning data from a block that isn't visible via
// eth_getBlockByNumber("latest").
let hash = self
.backend
.latest_block_hash()
.await
.map_err(|err| internal_err(format!("{err:?}")))?;
let current_block = self
.storage_override
.current_block(hash)
Expand All @@ -85,10 +92,22 @@ where
.collect::<Vec<_>>())
}

pub fn block_number(&self) -> RpcResult<U256> {
let best_number = self.client.info().best_number;
let best_number = UniqueSaturatedInto::<u128>::unique_saturated_into(best_number);
Ok(U256::from(best_number))
pub async fn block_number(&self) -> RpcResult<U256> {
// Use the latest indexed block hash to ensure consistency with other RPCs.
// This avoids returning a block number that isn't yet visible via
// eth_getBlockByNumber("latest").
let hash = self
.backend
.latest_block_hash()
.await
.map_err(|err| internal_err(format!("{err:?}")))?;
let number = self
.client
.number(hash)
.map_err(|err| internal_err(format!("{err:?}")))?
.ok_or_else(|| internal_err("Block number not found for latest indexed block"))?;
let number = UniqueSaturatedInto::<u128>::unique_saturated_into(number);
Ok(U256::from(number))
}

pub fn chain_id(&self) -> RpcResult<Option<U64>> {
Expand Down
81 changes: 55 additions & 26 deletions client/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ where
C: HeaderBackend<B> + 'static,
P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
{
/// Returns the latest indexed block number.
/// This ensures consistency with other RPCs that use mapping-sync.
async fn latest_indexed_block_number(&self) -> RpcResult<NumberFor<B>> {
let hash = self
.backend
.latest_block_hash()
.await
.map_err(|err| internal_err(format!("{err:?}")))?;
let Some(number) = self
.client
.number(hash)
.map_err(|err| internal_err(format!("{err:?}")))?
else {
return Err(internal_err(
"Block number not found for latest indexed block",
));
};
Ok(number)
}

fn create_filter(&self, filter_type: FilterType) -> RpcResult<U256> {
let info = self.client.info();
let best_hash = info.best_hash;
Expand Down Expand Up @@ -197,6 +217,9 @@ where
let info = self.client.info();
let best_hash = info.best_hash;
let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
// Get latest indexed block number before acquiring the lock to avoid
// holding the lock across an await point.
let latest_indexed_number = self.latest_indexed_block_number().await?;
let pool = self.filter_pool.clone();
// Try to lock.
let path = if let Ok(locked) = &mut pool.lock() {
Expand Down Expand Up @@ -258,27 +281,16 @@ where
}
// For each event since last poll, get a vector of ethereum logs.
FilterType::Log(filter) => {
// Update filter `last_poll`.
locked.insert(
key,
FilterPoolItem {
last_poll: BlockNumberOrHash::Num(best_number + 1),
filter_type: pool_item.filter_type.clone(),
at_block: pool_item.at_block,
pending_transaction_hashes: HashSet::new(),
},
);

// Either the filter-specific `to` block or best block.
let best_number = self.client.info().best_number;
// Either the filter-specific `to` block or latest indexed block.
// Use latest indexed block to ensure consistency with other RPCs.
let mut current_number = filter
.to_block
.and_then(|v| v.to_min_block_num())
.map(|s| s.unique_saturated_into())
.unwrap_or(best_number);
.unwrap_or(latest_indexed_number);

if current_number > best_number {
current_number = best_number;
if current_number > latest_indexed_number {
current_number = latest_indexed_number;
}

// The from clause is the max(last_poll, filter_from).
Expand All @@ -296,6 +308,21 @@ where

let from_number = std::cmp::max(last_poll, filter_from);

// Update filter `last_poll` based on the same capped head we query.
// This avoids skipping blocks when best_number is ahead of indexed data.
let next_last_poll =
UniqueSaturatedInto::<u64>::unique_saturated_into(current_number)
.saturating_add(1);
locked.insert(
key,
FilterPoolItem {
last_poll: BlockNumberOrHash::Num(next_last_poll),
filter_type: pool_item.filter_type.clone(),
at_block: pool_item.at_block,
pending_transaction_hashes: HashSet::new(),
},
);

// Build the response.
FuturePath::Log {
filter: filter.clone(),
Expand Down Expand Up @@ -397,22 +424,23 @@ where

let filter = filter_result?;

let best_number = client.info().best_number;
// Use latest indexed block to ensure consistency with other RPCs.
let latest_number = self.latest_indexed_block_number().await?;
let mut current_number = filter
.to_block
.and_then(|v| v.to_min_block_num())
.map(|s| s.unique_saturated_into())
.unwrap_or(best_number);
.unwrap_or(latest_number);

if current_number > best_number {
current_number = best_number;
if current_number > latest_number {
current_number = latest_number;
}

let from_number = filter
.from_block
.and_then(|v| v.to_min_block_num())
.map(|s| s.unique_saturated_into())
.unwrap_or(best_number);
.unwrap_or(latest_number);

let logs = if backend.is_indexed() {
filter_range_logs_indexed(
Expand Down Expand Up @@ -483,22 +511,23 @@ where
logs = filter_block_logs(&filter, block, statuses);
}
} else {
let best_number = client.info().best_number;
// Use latest indexed block to ensure consistency with other RPCs.
let latest_number = self.latest_indexed_block_number().await?;
let mut current_number = filter
.to_block
.and_then(|v| v.to_min_block_num())
.map(|s| s.unique_saturated_into())
.unwrap_or(best_number);
.unwrap_or(latest_number);

if current_number > best_number {
current_number = best_number;
if current_number > latest_number {
current_number = latest_number;
}

let from_number = filter
.from_block
.and_then(|v| v.to_min_block_num())
.map(|s| s.unique_saturated_into())
.unwrap_or(best_number);
.unwrap_or(latest_number);

let block_range = current_number.saturating_sub(from_number);
if block_range > self.max_block_range.into() {
Expand Down
Loading
Loading