Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### 2025-10-13


- Remove explicit cache-related options from RocksDB configuration and reverted optimistic transactions to reduce RAM usage [#4853](https://github.com/lambdaclass/ethrex/pull/4853)
- Remove unnecesary mul in ecpairing [#4843](https://github.com/lambdaclass/ethrex/pull/4843)

### 2025-10-06
Expand Down
61 changes: 24 additions & 37 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use ethrex_common::{
};
use ethrex_trie::{Nibbles, Trie};
use rocksdb::{
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, MultiThreaded,
OptimisticTransactionDB, Options, WriteBatchWithTransaction,
BlockBasedOptions, BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded,
Options, WriteBatch,
};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -111,7 +111,7 @@ const CF_FULLSYNC_HEADERS: &str = "fullsync_headers";

#[derive(Debug)]
pub struct Store {
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
trie_cache: Arc<RwLock<TrieLayerCache>>,
}

Expand All @@ -121,8 +121,6 @@ impl Store {
db_options.create_if_missing(true);
db_options.create_missing_column_families(true);

let cache = Cache::new_lru_cache(4 * 1024 * 1024 * 1024); // 4GB cache

db_options.set_max_open_files(-1);
db_options.set_max_file_opening_threads(16);

Expand Down Expand Up @@ -175,18 +173,17 @@ impl Store {
];

// Get existing column families to know which ones to drop later
let existing_cfs =
match OptimisticTransactionDB::<MultiThreaded>::list_cf(&db_options, path) {
Ok(cfs) => {
info!("Found existing column families: {:?}", cfs);
cfs
}
Err(_) => {
// Database doesn't exist yet
info!("Database doesn't exist, will create with expected column families");
vec!["default".to_string()]
}
};
let existing_cfs = match DBWithThreadMode::<MultiThreaded>::list_cf(&db_options, path) {
Ok(cfs) => {
info!("Found existing column families: {:?}", cfs);
cfs
}
Err(_) => {
// Database doesn't exist yet
info!("Database doesn't exist, will create with expected column families");
vec!["default".to_string()]
}
};

// Create descriptors for ALL existing CFs + expected ones (RocksDB requires opening all existing CFs)
let mut all_cfs_to_open = HashSet::new();
Expand Down Expand Up @@ -220,9 +217,7 @@ impl Store {
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(32 * 1024); // 32KB blocks
block_opts.set_cache_index_and_filter_blocks(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_CANONICAL_BLOCK_HASHES | CF_BLOCK_NUMBERS => {
Expand All @@ -232,10 +227,8 @@ impl Store {
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_bloom_filter(10.0, false);
block_opts.set_cache_index_and_filter_blocks(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_TRIE_NODES => {
Expand All @@ -248,10 +241,7 @@ impl Store {

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_block_cache(&cache);
block_opts.set_bloom_filter(10.0, false); // 10 bits per key
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_RECEIPTS | CF_ACCOUNT_CODES => {
Expand All @@ -261,9 +251,7 @@ impl Store {
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(32 * 1024); // 32KB
block_opts.set_block_cache(&cache);
cf_opts.set_block_based_table_factory(&block_opts);
}
_ => {
Expand All @@ -275,15 +263,14 @@ impl Store {

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024);
block_opts.set_block_cache(&cache);
cf_opts.set_block_based_table_factory(&block_opts);
}
}

cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, cf_opts));
}

let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
path,
cf_descriptors,
Expand Down Expand Up @@ -381,7 +368,7 @@ impl Store {
) -> Result<(), StoreError> {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

for (cf_name, key, value) in batch_ops {
let cf = db.cf_handle(&cf_name).ok_or_else(|| {
Expand Down Expand Up @@ -498,7 +485,7 @@ impl StoreEngine for Store {
)?;

let _span = tracing::trace_span!("Block DB update").entered();
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let mut trie = trie_cache.write().map_err(|_| StoreError::LockError)?;
if let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) {
Expand Down Expand Up @@ -580,7 +567,7 @@ impl StoreEngine for Store {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let [cf_headers, cf_bodies, cf_block_numbers, cf_tx_locations] = open_cfs(
&db,
Expand Down Expand Up @@ -689,7 +676,7 @@ impl StoreEngine for Store {
}

async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let Some(hash) = self.get_canonical_block_hash_sync(block_number)? else {
return Ok(());
Expand Down Expand Up @@ -939,7 +926,7 @@ impl StoreEngine for Store {
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;

let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

while let Some(Ok((key, _))) = iter.next() {
batch.delete_cf(&cf, key);
Expand Down Expand Up @@ -1203,7 +1190,7 @@ impl StoreEngine for Store {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let [cf_canonical, cf_chain_data] =
open_cfs(&db, [CF_CANONICAL_BLOCK_HASHES, CF_CHAIN_DATA])?;
Expand Down Expand Up @@ -1448,7 +1435,7 @@ impl StoreEngine for Store {
) -> Result<(), StoreError> {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();
let cf = db.cf_handle(CF_TRIE_NODES).ok_or_else(|| {
StoreError::Custom("Column family not found: CF_TRIE_NODES".to_string())
})?;
Expand Down Expand Up @@ -1525,7 +1512,7 @@ impl StoreEngine for Store {
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;

let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

while let Some(Ok((key, _))) = iter.next() {
batch.delete_cf(&cf, key);
Expand All @@ -1541,7 +1528,7 @@ impl StoreEngine for Store {

/// Open column families
fn open_cfs<'a, const N: usize>(
db: &'a Arc<OptimisticTransactionDB<MultiThreaded>>,
db: &'a Arc<DBWithThreadMode<MultiThreaded>>,
names: [&str; N],
) -> Result<[Arc<BoundColumnFamily<'a>>; N], StoreError> {
let mut handles = Vec::with_capacity(N);
Expand Down
16 changes: 8 additions & 8 deletions crates/storage/trie_db/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use ethrex_common::H256;
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::{Nibbles, Node, TrieDB, error::TrieError};
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::sync::Arc;

use crate::trie_db::layering::apply_prefix;

/// RocksDB implementation for the TrieDB trait, with get and put operations.
pub struct RocksDBTrieDB {
/// RocksDB database
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
/// Column family name
cf_name: String,
/// Storage trie address prefix
Expand All @@ -18,7 +18,7 @@ pub struct RocksDBTrieDB {

impl RocksDBTrieDB {
pub fn new(
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
cf_name: &str,
address_prefix: Option<H256>,
) -> Result<Self, TrieError> {
Expand Down Expand Up @@ -64,7 +64,7 @@ impl TrieDB for RocksDBTrieDB {

fn put_batch(&self, key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
let cf = self.cf_handle()?;
let mut batch = rocksdb::WriteBatchWithTransaction::default();
let mut batch = rocksdb::WriteBatch::default();

for (key, value) in key_values {
let db_key = self.make_key(key);
Expand All @@ -82,7 +82,7 @@ impl TrieDB for RocksDBTrieDB {

fn put_batch_no_alloc(&self, key_values: &[(Nibbles, Node)]) -> Result<(), TrieError> {
let cf = self.cf_handle()?;
let mut batch = rocksdb::WriteBatchWithTransaction::default();
let mut batch = rocksdb::WriteBatch::default();
// 532 is the maximum size of an encoded branch node.
let mut buffer = Vec::with_capacity(532);

Expand Down Expand Up @@ -117,7 +117,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down Expand Up @@ -157,7 +157,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down Expand Up @@ -194,7 +194,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down
12 changes: 6 additions & 6 deletions crates/storage/trie_db/rocksdb_locked.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
use ethrex_common::H256;
use ethrex_trie::{Nibbles, TrieDB, error::TrieError};
use rocksdb::{MultiThreaded, OptimisticTransactionDB, SnapshotWithThreadMode};
use rocksdb::{DBWithThreadMode, MultiThreaded, SnapshotWithThreadMode};
use std::sync::Arc;

use crate::trie_db::layering::apply_prefix;

/// RocksDB locked implementation for the TrieDB trait, read-only with consistent snapshot.
pub struct RocksDBLockedTrieDB {
/// RocksDB database
db: &'static Arc<OptimisticTransactionDB<MultiThreaded>>,
db: &'static Arc<DBWithThreadMode<MultiThreaded>>,
/// Column family handle
cf: std::sync::Arc<rocksdb::BoundColumnFamily<'static>>,
/// Read-only snapshot for consistent reads
snapshot: SnapshotWithThreadMode<'static, OptimisticTransactionDB<MultiThreaded>>,
snapshot: SnapshotWithThreadMode<'static, DBWithThreadMode<MultiThreaded>>,
/// Storage trie address prefix
address_prefix: Option<H256>,
}

impl RocksDBLockedTrieDB {
pub fn new(
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
cf_name: &str,
address_prefix: Option<H256>,
) -> Result<Self, TrieError> {
Expand Down Expand Up @@ -54,8 +54,8 @@ impl Drop for RocksDBLockedTrieDB {
// Restore the leaked database reference
unsafe {
drop(Box::from_raw(
self.db as *const Arc<OptimisticTransactionDB<MultiThreaded>>
as *mut Arc<OptimisticTransactionDB<MultiThreaded>>,
self.db as *const Arc<DBWithThreadMode<MultiThreaded>>
as *mut Arc<DBWithThreadMode<MultiThreaded>>,
));
}
}
Expand Down