Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions prdoc/pr_10175.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
title: "revive-eth-rpc: fix blocks pruning"
doc:
- audience: Node Dev
description: |
Fixes the blocks pruning for the revive eth rpc in case of forks or chain reversions.

crates:
- name: pallet-revive-eth-rpc
bump: patch
95 changes: 71 additions & 24 deletions substrate/frame/revive/rpc/src/receipt_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,19 +256,26 @@ impl<B: BlockInfoProvider> ReceiptProvider<B> {
block_number: SubstrateBlockNumber,
block_map: &BlockHashMap,
) -> Result<(), ClientError> {
// Keep track of the latest block hashes, so we can prune older blocks.
if let Some(keep_latest_n_blocks) = self.keep_latest_n_blocks {
let mut block_number_to_hash = self.block_number_to_hashes.lock().await;

// Fork? - If inserting the same block number with a different hash, remove the old one
let mut to_remove = Vec::new();
match block_number_to_hash.insert(block_number, block_map.clone()) {
Some(old_block_map) if &old_block_map != block_map => {
let mut to_remove = Vec::new();
let mut block_number_to_hash = self.block_number_to_hashes.lock().await;

// Fork? - If inserting the same block number with a different hash, remove the old ones.
match block_number_to_hash.insert(block_number, block_map.clone()) {
Some(old_block_map) if &old_block_map != block_map => {
to_remove.push(old_block_map);

// Now loop through the blocks that were building on top of the old fork and remove
// them.
let mut next_block_number = block_number.saturating_add(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to use safe math here,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but it doesn't hurt either

while let Some(old_block_map) = block_number_to_hash.remove(&next_block_number) {
to_remove.push(old_block_map);
},
_ => {},
}
next_block_number = next_block_number.saturating_add(1);
}
},
_ => {},
}

if let Some(keep_latest_n_blocks) = self.keep_latest_n_blocks {
// If we have more blocks than we should keep, remove the oldest ones by count
// (not by block number range, to handle gaps correctly)
while block_number_to_hash.len() > keep_latest_n_blocks {
Expand All @@ -277,10 +284,16 @@ impl<B: BlockInfoProvider> ReceiptProvider<B> {
to_remove.push(block_map);
}
}
}

// Release the lock.
drop(block_number_to_hash);

if !to_remove.is_empty() {
log::trace!(target: LOG_TARGET, "Pruning old blocks: {to_remove:?}");
self.remove(&to_remove).await?;
}

Ok(())
}

Expand All @@ -302,6 +315,8 @@ impl<B: BlockInfoProvider> ReceiptProvider<B> {

log::trace!(target: LOG_TARGET, "Insert receipts for substrate block #{block_number} {:?}", substrate_block_hash);

self.prune_blocks(block.number(), &block_map).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we prune even in archive mode?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we prune forks now irrespective of whether or not we have keep_latest_n_blocks configured or not, yes.

Keeping forks in the DB is not supported correctly, because we map the number to a single hash in block_number_to_hashes.
Moreover, this would cause conflicts in the DB (which is the whole reason for this PR), because we may try inserting the same tx hash twice (if present on two forks).

Making the eth rpc server fork-aware could be a goal, but it would probably imply a redesign. This PR continues on the approach that was already taken and fixes a bug that we're encountering in anvil


// Check if mapping already exists (eg. added when processing best block and we are now
// processing finalized block)
let result = sqlx::query!(
Expand All @@ -310,8 +325,6 @@ impl<B: BlockInfoProvider> ReceiptProvider<B> {
.fetch_one(&self.pool)
.await?;

self.prune_blocks(block.number(), &block_map).await?;

// Assuming that if no mapping exists then no relevant entries in transaction_hashes and
// logs exist
if !result.exists {
Expand Down Expand Up @@ -688,9 +701,9 @@ mod tests {
async fn test_fork(pool: SqlitePool) -> anyhow::Result<()> {
let provider = setup_sqlite_provider(pool).await;

for i in [1u8, 2u8] {
let block = MockBlockInfo { hash: H256::from([i; 32]), number: 1 };
let transaction_hash = H256::from([i; 32]);
let build_block = |seed, number| {
let block = MockBlockInfo { hash: H256::from([seed; 32]), number };
let transaction_hash = H256::from([seed; 32]);
let receipts = vec![(
TransactionSigned::default(),
ReceiptInfo {
Expand All @@ -703,16 +716,50 @@ mod tests {
..Default::default()
},
)];
let ethereum_hash = H256::from([i + 1; 32]);
provider.insert(&block, &receipts, &ethereum_hash).await?;
}
assert_eq!(count(&provider.pool, "transaction_hashes", None).await, 1);
assert_eq!(count(&provider.pool, "logs", None).await, 1);
assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 1);
let ethereum_hash = H256::from([seed + 1; 32]);

(block, receipts, ethereum_hash)
};

// Build 4 blocks on consecutive heights: 0,1,2,3.
let (block0, receipts, ethereum_hash_0) = build_block(0, 0);
provider.insert(&block0, &receipts, &ethereum_hash_0).await?;
let (block1, receipts, ethereum_hash_1) = build_block(1, 1);
provider.insert(&block1, &receipts, &ethereum_hash_1).await?;
let (block2, receipts, ethereum_hash_2) = build_block(2, 2);
provider.insert(&block2, &receipts, &ethereum_hash_2).await?;
let (block3, receipts, ethereum_hash_3) = build_block(3, 3);
provider.insert(&block3, &receipts, &ethereum_hash_3).await?;

assert_eq!(count(&provider.pool, "transaction_hashes", None).await, 4);
assert_eq!(count(&provider.pool, "logs", None).await, 4);
assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 4);
assert_eq!(
provider.block_number_to_hashes.lock().await.clone(),
[
(0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
(1, BlockHashMap::new(block1.hash, ethereum_hash_1)),
(2, BlockHashMap::new(block2.hash, ethereum_hash_2)),
(3, BlockHashMap::new(block3.hash, ethereum_hash_3))
]
.into(),
);

// Now build another block on height 1.
let (fork_block, receipts, ethereum_hash_fork) = build_block(4, 1);
provider.insert(&fork_block, &receipts, &ethereum_hash_fork).await?;

assert_eq!(count(&provider.pool, "transaction_hashes", None).await, 2);
assert_eq!(count(&provider.pool, "logs", None).await, 2);
assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 2);

assert_eq!(
provider.block_number_to_hashes.lock().await.clone(),
[(1, BlockHashMap::new(H256::from([2u8; 32]), H256::from([3u8; 32])))].into(),
"New receipt for block #1 should replace the old one"
[
(0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
(1, BlockHashMap::new(fork_block.hash, ethereum_hash_fork))
]
.into(),
);

return Ok(());
Expand Down
Loading