Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### 2025-10-06

- Remove explicit cache-related options from RocksDB configuration and reverted optimistic transactions to reduce RAM usage [#4853](https://github.com/lambdaclass/ethrex/pull/4853)

### 2025-10-06

- Improve block headers vec handling in syncer [#4771](https://github.com/lambdaclass/ethrex/pull/4771)
- Refactor current_step sync metric from a `Mutex<String>` to a simple atomic. [#4772](https://github.com/lambdaclass/ethrex/pull/4772)

Expand Down
57 changes: 22 additions & 35 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use ethrex_common::{
};
use ethrex_trie::{Nibbles, NodeHash, Trie};
use rocksdb::{
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, MultiThreaded,
OptimisticTransactionDB, Options, WriteBatchWithTransaction,
BlockBasedOptions, BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded,
Options, WriteBatch,
};
use std::{collections::HashSet, path::Path, sync::Arc};
use tracing::info;
Expand Down Expand Up @@ -103,7 +103,7 @@ const CF_INVALID_ANCESTORS: &str = "invalid_ancestors";

#[derive(Debug)]
pub struct Store {
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
}

impl Store {
Expand All @@ -112,8 +112,6 @@ impl Store {
db_options.create_if_missing(true);
db_options.create_missing_column_families(true);

let cache = Cache::new_lru_cache(4 * 1024 * 1024 * 1024); // 4GB cache

db_options.set_max_open_files(-1);
db_options.set_max_file_opening_threads(16);

Expand Down Expand Up @@ -166,18 +164,17 @@ impl Store {
];

// Get existing column families to know which ones to drop later
let existing_cfs =
match OptimisticTransactionDB::<MultiThreaded>::list_cf(&db_options, path) {
Ok(cfs) => {
info!("Found existing column families: {:?}", cfs);
cfs
}
Err(_) => {
// Database doesn't exist yet
info!("Database doesn't exist, will create with expected column families");
vec!["default".to_string()]
}
};
let existing_cfs = match DBWithThreadMode::<MultiThreaded>::list_cf(&db_options, path) {
Ok(cfs) => {
info!("Found existing column families: {:?}", cfs);
cfs
}
Err(_) => {
// Database doesn't exist yet
info!("Database doesn't exist, will create with expected column families");
vec!["default".to_string()]
}
};

// Create descriptors for ALL existing CFs + expected ones (RocksDB requires opening all existing CFs)
let mut all_cfs_to_open = HashSet::new();
Expand Down Expand Up @@ -211,9 +208,7 @@ impl Store {
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(32 * 1024); // 32KB blocks
block_opts.set_cache_index_and_filter_blocks(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_CANONICAL_BLOCK_HASHES | CF_BLOCK_NUMBERS => {
Expand All @@ -223,10 +218,8 @@ impl Store {
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_bloom_filter(10.0, false);
block_opts.set_cache_index_and_filter_blocks(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_STATE_TRIE_NODES | CF_STORAGE_TRIES_NODES => {
Expand All @@ -239,10 +232,7 @@ impl Store {

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_block_cache(&cache);
block_opts.set_bloom_filter(10.0, false); // 10 bits per key
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_RECEIPTS | CF_ACCOUNT_CODES => {
Expand All @@ -252,9 +242,7 @@ impl Store {
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(32 * 1024); // 32KB
block_opts.set_block_cache(&cache);
cf_opts.set_block_based_table_factory(&block_opts);
}
_ => {
Expand All @@ -266,15 +254,14 @@ impl Store {

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024);
block_opts.set_block_cache(&cache);
cf_opts.set_block_based_table_factory(&block_opts);
}
}

cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, cf_opts));
}

let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
path,
cf_descriptors,
Expand Down Expand Up @@ -370,7 +357,7 @@ impl Store {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

for (cf_name, key, value) in batch_ops {
let cf = db.cf_handle(&cf_name).ok_or_else(|| {
Expand Down Expand Up @@ -467,7 +454,7 @@ impl StoreEngine for Store {
)?;

let _span = tracing::trace_span!("Block DB update").entered();
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

for (node_hash, node_data) in update_batch.account_updates {
batch.put_cf(&cf_state, node_hash.as_ref(), node_data);
Expand Down Expand Up @@ -537,7 +524,7 @@ impl StoreEngine for Store {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let [cf_headers, cf_bodies, cf_block_numbers, cf_tx_locations] = open_cfs(
&db,
Expand Down Expand Up @@ -646,7 +633,7 @@ impl StoreEngine for Store {
}

async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let Some(hash) = self.get_canonical_block_hash_sync(block_number)? else {
return Ok(());
Expand Down Expand Up @@ -896,7 +883,7 @@ impl StoreEngine for Store {
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;

let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

while let Some(Ok((key, _))) = iter.next() {
batch.delete_cf(&cf, key);
Expand Down Expand Up @@ -1120,7 +1107,7 @@ impl StoreEngine for Store {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let [cf_canonical, cf_chain_data] =
open_cfs(&db, [CF_CANONICAL_BLOCK_HASHES, CF_CHAIN_DATA])?;
Expand Down Expand Up @@ -1396,7 +1383,7 @@ impl StoreEngine for Store {

/// Open column families
fn open_cfs<'a, const N: usize>(
db: &'a Arc<OptimisticTransactionDB<MultiThreaded>>,
db: &'a Arc<DBWithThreadMode<MultiThreaded>>,
names: [&str; N],
) -> Result<[Arc<BoundColumnFamily<'a>>; N], StoreError> {
let mut handles = Vec::with_capacity(N);
Expand Down
16 changes: 8 additions & 8 deletions crates/storage/trie_db/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use ethrex_common::H256;
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::{Node, NodeHash, TrieDB, error::TrieError};
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::sync::Arc;

/// RocksDB implementation for the TrieDB trait, with get and put operations.
pub struct RocksDBTrieDB {
/// RocksDB database
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
/// Column family name
cf_name: String,
/// Storage trie address prefix
Expand All @@ -16,7 +16,7 @@ pub struct RocksDBTrieDB {

impl RocksDBTrieDB {
pub fn new(
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
cf_name: &str,
address_prefix: Option<H256>,
) -> Result<Self, TrieError> {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl TrieDB for RocksDBTrieDB {

fn put_batch(&self, key_values: Vec<(NodeHash, Vec<u8>)>) -> Result<(), TrieError> {
let cf = self.cf_handle()?;
let mut batch = rocksdb::WriteBatchWithTransaction::default();
let mut batch = rocksdb::WriteBatch::default();

for (key, value) in key_values {
let db_key = self.make_key(&key);
Expand Down Expand Up @@ -104,7 +104,7 @@ impl TrieDB for RocksDBTrieDB {
mod tests {
use super::*;
use ethrex_trie::NodeHash;
use rocksdb::{ColumnFamilyDescriptor, MultiThreaded, Options};
use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options};
use tempfile::TempDir;

#[test]
Expand All @@ -118,7 +118,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down Expand Up @@ -158,7 +158,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down Expand Up @@ -195,7 +195,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down
12 changes: 6 additions & 6 deletions crates/storage/trie_db/rocksdb_locked.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use ethrex_common::H256;
use ethrex_trie::{NodeHash, TrieDB, error::TrieError};
use rocksdb::{MultiThreaded, OptimisticTransactionDB, SnapshotWithThreadMode};
use rocksdb::{DBWithThreadMode, MultiThreaded, SnapshotWithThreadMode};
use std::sync::Arc;

/// RocksDB locked implementation for the TrieDB trait, read-only with consistent snapshot.
pub struct RocksDBLockedTrieDB {
/// RocksDB database
db: &'static Arc<OptimisticTransactionDB<MultiThreaded>>,
db: &'static Arc<DBWithThreadMode<MultiThreaded>>,
/// Column family handle
cf: std::sync::Arc<rocksdb::BoundColumnFamily<'static>>,
/// Read-only snapshot for consistent reads
snapshot: SnapshotWithThreadMode<'static, OptimisticTransactionDB<MultiThreaded>>,
snapshot: SnapshotWithThreadMode<'static, DBWithThreadMode<MultiThreaded>>,
/// Storage trie address prefix
address_prefix: Option<H256>,
}

impl RocksDBLockedTrieDB {
pub fn new(
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
cf_name: &str,
address_prefix: Option<H256>,
) -> Result<Self, TrieError> {
Expand Down Expand Up @@ -61,8 +61,8 @@ impl Drop for RocksDBLockedTrieDB {
// Restore the leaked database reference
unsafe {
drop(Box::from_raw(
self.db as *const Arc<OptimisticTransactionDB<MultiThreaded>>
as *mut Arc<OptimisticTransactionDB<MultiThreaded>>,
self.db as *const Arc<DBWithThreadMode<MultiThreaded>>
as *mut Arc<DBWithThreadMode<MultiThreaded>>,
));
}
}
Expand Down
Loading