Skip to content
47 changes: 11 additions & 36 deletions crates/storage/trie_db/layering.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use ethrex_common::H256;
use rayon::iter::{ParallelBridge, ParallelIterator};
use rustc_hash::FxHashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -135,45 +134,21 @@ impl TrieLayerCache {
self.layers.insert(state_root, Arc::new(entry));
}

/// Rebuilds the global bloom filter accruing all current existing layers.
/// Rebuilds the global bloom filter by inserting all keys from all layers.
pub fn rebuild_bloom(&mut self) {
let mut blooms: Vec<_> = self
.layers
.values()
.par_bridge()
.map(|entry| {
let Ok(mut bloom) = Self::create_filter() else {
tracing::warn!("TrieLayerCache: rebuild_bloom could not create filter");
return None;
};
for (p, _) in entry.nodes.iter() {
if let Err(qfilter::Error::CapacityExceeded) = bloom.insert(p) {
tracing::warn!("TrieLayerCache: rebuild_bloom capacity exceeded");
return None;
}
let mut new_global_filter = Self::create_filter().unwrap();

self.layers.iter().for_each(|(_, layer)| {
layer.nodes.iter().for_each(|(path, _)| {
if let Err(qfilter::Error::CapacityExceeded) = new_global_filter.insert(path) {
tracing::warn!("TrieLayerCache: rebuild_bloom capacity exceeded");
self.bloom = None;
return;
}
Some(bloom)
})
.collect();
});

let Some(mut ret) = blooms.pop().flatten() else {
tracing::warn!("TrieLayerCache: rebuild_bloom no valid bloom found");
self.bloom = None;
return;
};
for bloom in blooms.iter() {
let Some(bloom) = bloom else {
tracing::warn!("TrieLayerCache: rebuild_bloom no valid bloom found");
self.bloom = None;
return;
};
if let Err(qfilter::Error::CapacityExceeded) = ret.merge(false, bloom) {
tracing::warn!("TrieLayerCache: rebuild_bloom capacity exceeded");
self.bloom = None;
return;
}
}
self.bloom = Some(ret);
self.bloom = Some(new_global_filter);
}

pub fn commit(&mut self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
Expand Down
Loading