diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b62aaffed1..951b12ba32f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,12 @@ - Change remaining_gas to i64, improving performance in gas cost calculations [#4684](https://github.com/lambdaclass/ethrex/pull/4684) +### 2025-09-30 + +- Downloading all slots of big accounts during the initial leaves download step of snap sync [#4689](https://github.com/lambdaclass/ethrex/pull/4689) +- Downloading and inserting intelligently accounts with the same state root and few (<= slots) [#4689](https://github.com/lambdaclass/ethrex/pull/4689) +- Improving the performance of state trie through an ordered insertion algorithm [#4689](https://github.com/lambdaclass/ethrex/pull/4689) + ### 2025-09-29 - Remove `OpcodeResult` to improve tight loops of lightweight opcodes [#4650](https://github.com/lambdaclass/ethrex/pull/4650) diff --git a/Cargo.lock b/Cargo.lock index fb3b661e75d..dc39775af1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1032,7 +1032,7 @@ dependencies = [ "bitflags 2.9.4", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -1319,18 +1319,18 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.23.2" +version = "1.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677" +checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" dependencies = [ "bytemuck_derive", ] [[package]] name = "bytemuck_derive" -version = "1.10.1" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f154e572231cb6ba2bd1176980827e3d5dc04cc183a75dea38109fbdd672d29" +checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" dependencies = [ "proc-macro2", "quote", @@ -1484,9 +1484,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.39" +version = "1.2.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1354349954c6fc9cb0deab020f27f783cf0b604e8bb754dc4658ecf0d29c35f" +checksum = "e1d05d92f4b1fd76aad469d46cdd858ca761576082cd37df81416691e50199fb" dependencies = [ "find-msvc-tools", "jobserver", @@ -1630,7 +1630,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c3d2abadaa28e0d277f9f6d07a2052544f045d929cd4d6f7bcfb43567c9767" dependencies = [ "hasher", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "rlp 0.5.2", ] @@ -1878,9 +1878,9 @@ checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" [[package]] name = "const_format" -version = "0.2.34" +version = "0.2.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "126f97965c8ad46d6d9163268ff28432e8f6a1196a55578867832e3049df63dd" +checksum = "7faa7469a93a566e9ccc1c73fe783b4a65c274c5ace346038dca9c39fe0030ad" dependencies = [ "const_format_proc_macros", ] @@ -2044,10 +2044,23 @@ dependencies = [ "crossbeam-channel 0.4.4", "crossbeam-deque 0.7.4", "crossbeam-epoch 0.8.2", - "crossbeam-queue", + "crossbeam-queue 0.2.3", "crossbeam-utils 0.7.2", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel 0.5.15", + "crossbeam-deque 0.8.6", + "crossbeam-epoch 0.9.18", + "crossbeam-queue 0.3.12", + "crossbeam-utils 0.8.21", +] + [[package]] name = "crossbeam-channel" version = "0.4.4" @@ -2123,6 +2136,15 @@ dependencies = [ "maybe-uninit", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils 0.8.21", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -2149,7 +2171,7 @@ dependencies = [ "bitflags 2.9.4", "crossterm_winapi", "mio", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "rustix 0.38.44", "signal-hook", "signal-hook-mio", @@ -2168,7 +2190,7 @@ dependencies = [ "document-features", "futures-core", "mio", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "rustix 1.1.2", "signal-hook", "signal-hook-mio", @@ -2391,7 +2413,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core 0.9.11", + "parking_lot_core 0.9.12", ] [[package]] @@ -3806,6 +3828,7 @@ dependencies = [ "async-trait", "bytes", "concat-kdf", + "crossbeam 0.8.4", "ctr", "ethereum-types 0.15.1", "ethrex-blockchain", @@ -3813,6 +3836,7 @@ dependencies = [ "ethrex-rlp 0.1.0", "ethrex-storage 0.1.0", "ethrex-storage-rollup", + "ethrex-threadpool", "ethrex-trie 0.1.0", "futures", "hex", @@ -3822,6 +3846,7 @@ dependencies = [ "prometheus 0.14.0", "rand 0.8.5", "rayon", + "rocksdb", "secp256k1", "serde", "serde_json", @@ -4039,6 +4064,13 @@ dependencies = [ "tracing", ] +[[package]] +name = "ethrex-threadpool" +version = "0.1.0" +dependencies = [ + "crossbeam 0.8.4", +] + [[package]] name = "ethrex-trie" version = "0.1.0" @@ -4047,9 +4079,11 @@ dependencies = [ "bytes", "cita_trie", "criterion", + "crossbeam 0.8.4", "digest 0.10.7", "ethereum-types 0.15.1", "ethrex-rlp 0.1.0", + "ethrex-threadpool", "hasher", "hex", "hex-literal", @@ -4228,9 +4262,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" +checksum = "0399f9d26e5191ce32c498bebd31e7a3ceabc2745f0ac54af3f335126c3f24b3" [[package]] name = "find_cuda_helper" @@ -4267,9 +4301,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flate2" -version = "1.1.2" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" +checksum = "dc5a4e564e38c699f2880d3fda590bedc2e69f3f84cd48b457bd892ce61d0aa9" dependencies = [ "crc32fast", "miniz_oxide", @@ -6066,7 +6100,7 @@ dependencies = [ "indexmap 2.11.4", "libc", "mdbx-sys", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "sealed", "tempfile", "thiserror 2.0.17", @@ -6121,7 +6155,7 @@ dependencies = [ "libsql-sqlite3-parser", "libsql-sys", "libsql_replication", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "serde", "serde_json", "thiserror 1.0.69", @@ -6220,7 +6254,7 @@ dependencies = [ "cbc", "libsql-rusqlite", "libsql-sys", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "prost 0.12.6", "serde", "thiserror 1.0.69", @@ -6328,11 +6362,10 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" dependencies = [ - "autocfg", "scopeguard", ] @@ -6657,6 +6690,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" dependencies = [ "adler2", + "simd-adler32", ] [[package]] @@ -7487,12 +7521,12 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" dependencies = [ "lock_api", - "parking_lot_core 0.9.11", + "parking_lot_core 0.9.12", ] [[package]] @@ -7511,15 +7545,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.11" +version = "0.9.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if 1.0.3", "libc", - "redox_syscall 0.5.17", + "redox_syscall 0.5.18", "smallvec", - "windows-targets 0.52.6", + "windows-link 0.2.0", ] [[package]] @@ -7654,20 +7688,19 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pest" -version = "2.8.2" +version = "2.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21e0a3a33733faeaf8651dfee72dd0f388f0c8e5ad496a3478fa5a922f49cfa8" +checksum = "989e7521a040efde50c3ab6bbadafbe15ab6dc042686926be59ac35d74607df4" dependencies = [ "memchr", - "thiserror 2.0.17", "ucd-trie", ] [[package]] name = "pest_derive" -version = "2.8.2" +version = "2.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc58706f770acb1dbd0973e6530a3cff4746fb721207feb3a8a6064cd0b6c663" +checksum = "187da9a3030dbafabbbfb20cb323b976dc7b7ce91fcd84f2f74d6e31d378e2de" dependencies = [ "pest", "pest_generator", @@ -7675,9 +7708,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.8.2" +version = "2.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d4f36811dfe07f7b8573462465d5cb8965fffc2e71ae377a33aecf14c2c9a2f" +checksum = "49b401d98f5757ebe97a26085998d6c0eecec4995cad6ab7fc30ffdf4b052843" dependencies = [ "pest", "pest_meta", @@ -7688,9 +7721,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.8.2" +version = "2.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42919b05089acbd0a5dcd5405fb304d17d1053847b81163d09c4ad18ce8e8420" +checksum = "72f27a2cfee9f9039c4d86faa5af122a0ac3851441a34865b8a043b46be0065a" dependencies = [ "pest", "sha2", @@ -8080,7 +8113,7 @@ dependencies = [ "lazy_static", "libc", "memchr", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "procfs", "protobuf 2.28.0", "thiserror 1.0.69", @@ -8096,7 +8129,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "protobuf 3.7.2", "thiserror 2.0.17", ] @@ -8253,7 +8286,7 @@ dependencies = [ "cfg-if 1.0.3", "itertools 0.10.5", "once_cell", - "parking_lot 0.12.4", + "parking_lot 0.12.5", ] [[package]] @@ -8505,9 +8538,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.17" +version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ "bitflags 2.9.4", ] @@ -9078,7 +9111,7 @@ dependencies = [ "hex-literal", "metal", "ndarray", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "paste", "rand 0.9.2", "rand_core 0.9.3", @@ -9559,9 +9592,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "rusty-fork" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f" +checksum = "cc6bf79ff24e648f6da1f8d1f011e9cac26491b619e6b9280f2b47f1774e6ee2" dependencies = [ "fnv", "quick-error", @@ -9970,9 +10003,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.14.1" +version = "3.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c522100790450cf78eeac1507263d0a350d4d5b30df0c8e1fe051a10c22b376e" +checksum = "6093cd8c01b25262b84927e0f7151692158fab02d961e04c979d3903eba7ecc5" dependencies = [ "base64 0.22.1", "chrono", @@ -9981,8 +10014,7 @@ dependencies = [ "indexmap 2.11.4", "schemars 0.9.0", "schemars 1.0.4", - "serde", - "serde_derive", + "serde_core", "serde_json", "serde_with_macros", "time", @@ -9990,9 +10022,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.14.1" +version = "3.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327ada00f7d64abaac1e55a6911e90cf665aa051b9a561c7006c157f4633135e" +checksum = "a7e6c180db0816026a61afa1cff5344fb7ebded7e4d3062772179f2501481c27" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -10019,7 +10051,7 @@ dependencies = [ "futures", "log", "once_cell", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "scc", "serial_test_derive", ] @@ -10708,11 +10740,12 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92e46fce4cdece99f2f07c125e902629ee25e4487d80cae6e3d6e891d5906b00" +checksum = "aa00b753ef7c942c13ee953f13609746a41c0fb8cf221849bbf3f654811a6669" dependencies = [ "futures", + "pin-project-lite", "spawned-rt", "thiserror 2.0.17", "tracing", @@ -10720,11 +10753,11 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76347472cc448d47dbf9f67541fde19dbb054793e8e0546ce8917bfb695e1b56" +checksum = "42396ff1bc8bfdcad31f099a3af74b4830fb7bdc09a70d843dcfa8bab74ecea4" dependencies = [ - "crossbeam", + "crossbeam 0.7.3", "tokio", "tokio-stream", "tokio-util", @@ -10813,7 +10846,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f" dependencies = [ "new_debug_unreachable", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "phf_shared", "precomputed-hash", ] @@ -11310,7 +11343,7 @@ dependencies = [ "io-uring", "libc", "mio", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "pin-project-lite", "signal-hook-registry", "slab", @@ -11837,7 +11870,7 @@ dependencies = [ "env_filter", "lazy_static", "log", - "parking_lot 0.12.4", + "parking_lot 0.12.5", "ratatui", "tracing", "tracing-subscriber 0.3.20", @@ -11922,7 +11955,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.3", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 34c17436ab1..9fa8dd76c34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "tooling/archive_sync", "crates/common/config", "tooling/reorgs", + "crates/concurrency", "tooling/migrations", ] exclude = ["crates/vm/levm/bench/revm_comparison", "tooling/ef_tests/state"] @@ -74,6 +75,7 @@ ethrex-prover = { path = "./crates/l2/prover" } ethrex-storage-rollup = { path = "./crates/l2/storage" } ethrex = { path = "./cmd/ethrex" } ethrex-l2-rpc = { path = "./crates/l2/networking/rpc" } +ethrex-threadpool = { path = "./crates/concurrency" } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } @@ -115,6 +117,7 @@ spawned-concurrency = "0.4.0" spawned-rt = "0.4.0" lambdaworks-crypto = "0.11.0" tui-logger = { version = "0.17.3", features = ["tracing-support"] } +crossbeam = "0.8.4" rayon = "1.10.0" rkyv = { version = "0.8.10", features = ["std", "unaligned"] } tempfile = "3.8" diff --git a/cmd/ethrex/Cargo.toml b/cmd/ethrex/Cargo.toml index 407bd2dab5c..bf33fdb179e 100644 --- a/cmd/ethrex/Cargo.toml +++ b/cmd/ethrex/Cargo.toml @@ -72,7 +72,7 @@ c-kzg = [ "ethrex-p2p/c-kzg", ] metrics = ["ethrex-blockchain/metrics", "ethrex-l2/metrics"] -rocksdb = ["ethrex-storage/rocksdb"] +rocksdb = ["ethrex-storage/rocksdb", "ethrex-p2p/rocksdb"] jemalloc = ["dep:tikv-jemallocator"] jemalloc_profiling = [ "jemalloc", diff --git a/crates/common/trie/Cargo.toml b/crates/common/trie/Cargo.toml index b50b55e0b03..b8f4e46a787 100644 --- a/crates/common/trie/Cargo.toml +++ b/crates/common/trie/Cargo.toml @@ -7,6 +7,7 @@ documentation.workspace = true [dependencies] ethrex-rlp.workspace = true +ethrex-threadpool.workspace = true ethereum-types.workspace = true anyhow = "1.0.86" @@ -21,6 +22,7 @@ rocksdb = { workspace = true, optional = true } smallvec = { version = "1.10.0", features = ["const_generics", "union"] } digest = "0.10.6" lazy_static.workspace = true +crossbeam.workspace = true [features] default = [] diff --git a/crates/common/trie/db.rs b/crates/common/trie/db.rs index 0222b22dbca..0de923571e8 100644 --- a/crates/common/trie/db.rs +++ b/crates/common/trie/db.rs @@ -1,6 +1,7 @@ use ethereum_types::H256; +use ethrex_rlp::encode::RLPEncode; -use crate::{NodeHash, NodeRLP, Trie, error::TrieError}; +use crate::{Node, NodeHash, NodeRLP, Trie, error::TrieError}; use std::{ collections::BTreeMap, sync::{Arc, Mutex}, @@ -9,6 +10,15 @@ use std::{ pub trait TrieDB: Send + Sync { fn get(&self, key: NodeHash) -> Result>, TrieError>; fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError>; + // TODO: replace putbatch with this function. + fn put_batch_no_alloc(&self, key_values: &[(NodeHash, Node)]) -> Result<(), TrieError> { + self.put_batch( + key_values + .iter() + .map(|node| (node.0, node.1.encode_to_vec())) + .collect(), + ) + } fn put(&self, key: NodeHash, value: Vec) -> Result<(), TrieError> { self.put_batch(vec![(key, value)]) } diff --git a/crates/common/trie/nibbles.rs b/crates/common/trie/nibbles.rs index cc3aaf61504..4ca0bff05b5 100644 --- a/crates/common/trie/nibbles.rs +++ b/crates/common/trie/nibbles.rs @@ -10,7 +10,7 @@ use ethrex_rlp::{ /// Struct representing a list of nibbles (half-bytes) #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct Nibbles { - data: Vec, + pub(crate) data: Vec, } impl std::hash::Hash for Nibbles { diff --git a/crates/common/trie/trie.rs b/crates/common/trie/trie.rs index cda570fcad5..6148f802345 100644 --- a/crates/common/trie/trie.rs +++ b/crates/common/trie/trie.rs @@ -8,6 +8,7 @@ mod rlp; #[cfg(test)] mod test_utils; mod trie_iter; +pub mod trie_sorted; mod verify_range; use ethereum_types::H256; use ethrex_rlp::constants::RLP_NULL; diff --git a/crates/common/trie/trie_sorted.rs b/crates/common/trie/trie_sorted.rs new file mode 100644 index 00000000000..0e0859596f7 --- /dev/null +++ b/crates/common/trie/trie_sorted.rs @@ -0,0 +1,476 @@ +use crate::{ + EMPTY_TRIE_HASH, Nibbles, Node, NodeHash, TrieDB, TrieError, + node::{BranchNode, ExtensionNode, LeafNode}, +}; +use crossbeam::channel::{Receiver, Sender, bounded}; +use ethereum_types::H256; +use ethrex_threadpool::ThreadPool; +use std::{sync::Arc, thread::scope}; +use tracing::debug; + +#[derive(Debug, Default, Clone)] +struct StackElement { + path: Nibbles, + element: BranchNode, +} + +// The large size isn't a performance problem because we use a single instance of this +// struct +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone)] +enum CenterSideElement { + Branch { node: BranchNode }, + Leaf { value: Vec }, +} + +#[derive(Debug, Clone)] +struct CenterSide { + path: Nibbles, + element: CenterSideElement, +} + +#[derive(Debug, thiserror::Error)] +pub enum TrieGenerationError { + #[error("When creating a child node, the nibbles diff was empty. Child Node {0:x?}")] + IndexNotFound(Nibbles), + #[error("When popping from the trie stack it was empty. Current position: {0:x?}")] + TrieStackEmpty(Nibbles), + #[error(transparent)] + FlushToDbError(TrieError), + #[error("When joining the write threads, error")] + ThreadJoinError(), +} + +pub const SIZE_TO_WRITE_DB: u64 = 20_000; +pub const BUFFER_COUNT: u64 = 32; + +impl CenterSide { + fn from_value(tuple: (H256, Vec)) -> CenterSide { + CenterSide { + path: Nibbles::from_raw(&tuple.0.0, true), + element: CenterSideElement::Leaf { value: tuple.1 }, + } + } + fn from_stack_element(element: StackElement) -> CenterSide { + CenterSide { + path: element.path, + element: CenterSideElement::Branch { + node: element.element, + }, + } + } +} + +fn is_child(this: &Nibbles, other: &StackElement) -> bool { + this.count_prefix(&other.path) == other.path.len() +} + +fn create_parent(center_side: &CenterSide, closest_nibbles: &Nibbles) -> StackElement { + let new_parent_nibbles = center_side + .path + .slice(0, center_side.path.count_prefix(closest_nibbles)); + StackElement { + path: new_parent_nibbles, + element: BranchNode { + choices: BranchNode::EMPTY_CHOICES, + value: vec![], + }, + } +} + +fn add_center_to_parent_and_write_queue( + nodes_to_write: &mut Vec<(NodeHash, Node)>, + center_side: &CenterSide, + parent_element: &mut StackElement, +) -> Result<(), TrieGenerationError> { + debug!("{:x?}", center_side.path); + debug!("{:x?}", parent_element.path); + let mut path = center_side.path.clone(); + path.skip_prefix(&parent_element.path); + let index = path + .next() + .ok_or(TrieGenerationError::IndexNotFound(center_side.path.clone()))?; + let node: Node = match ¢er_side.element { + CenterSideElement::Branch { node } => { + if path.is_empty() { + node.clone().into() + } else { + let hash = node.compute_hash(); + nodes_to_write.push((hash, node.clone().into())); + ExtensionNode { + prefix: path, + child: hash.into(), + } + .into() + } + } + CenterSideElement::Leaf { value } => LeafNode { + partial: path, + value: value.clone(), + } + .into(), + }; + parent_element.element.choices[index as usize] = node.compute_hash().into(); + debug!( + "branch {:x?}", + parent_element + .element + .choices + .iter() + .enumerate() + .filter_map(|(index, child)| child.is_valid().then_some(index)) + .collect::>() + ); + nodes_to_write.push((node.compute_hash(), node)); + Ok(()) +} + +fn flush_nodes_to_write( + mut nodes_to_write: Vec<(NodeHash, Node)>, + db: &dyn TrieDB, + sender: Sender>, +) -> Result<(), TrieGenerationError> { + db.put_batch_no_alloc(&nodes_to_write) + .map_err(TrieGenerationError::FlushToDbError)?; + nodes_to_write.clear(); + let _ = sender.send(nodes_to_write); + Ok(()) +} + +#[inline(never)] +pub fn trie_from_sorted_accounts<'scope, T>( + db: &'scope dyn TrieDB, + data_iter: &mut T, + scope: Arc>, + buffer_sender: Sender>, + buffer_receiver: Receiver>, +) -> Result +where + T: Iterator)> + Send, +{ + let Some(initial_value) = data_iter.next() else { + return Ok(*EMPTY_TRIE_HASH); + }; + let mut nodes_to_write: Vec<(NodeHash, Node)> = buffer_receiver + .recv() + .expect("This channel shouldn't close"); + let mut trie_stack: Vec = Vec::with_capacity(64); // Optimized for H256 + + let mut left_side = StackElement::default(); + let mut center_side: CenterSide = CenterSide::from_value(initial_value.clone()); + let mut right_side_opt: Option<(H256, Vec)> = data_iter.next(); + + while let Some(right_side) = right_side_opt { + if nodes_to_write.len() as u64 > SIZE_TO_WRITE_DB { + let buffer_sender = buffer_sender.clone(); + scope.execute_priority(Box::new(move || { + let _ = flush_nodes_to_write(nodes_to_write, db, buffer_sender); + })); + nodes_to_write = buffer_receiver + .recv() + .expect("This channel shouldn't close"); + } + + let right_side_path = Nibbles::from_bytes(right_side.0.as_bytes()); + while !is_child(&right_side_path, &left_side) { + add_center_to_parent_and_write_queue( + &mut nodes_to_write, + ¢er_side, + &mut left_side, + )?; + let temp = CenterSide::from_stack_element(left_side); + left_side = trie_stack.pop().ok_or(TrieGenerationError::TrieStackEmpty( + center_side.path.clone(), + ))?; + center_side = temp; + } + + if center_side.path.count_prefix(&left_side.path) + >= center_side.path.count_prefix(&right_side_path) + { + add_center_to_parent_and_write_queue( + &mut nodes_to_write, + ¢er_side, + &mut left_side, + )?; + } else { + let mut element = create_parent(¢er_side, &right_side_path); + add_center_to_parent_and_write_queue(&mut nodes_to_write, ¢er_side, &mut element)?; + trie_stack.push(left_side); + left_side = element; + } + center_side = CenterSide::from_value(right_side); + right_side_opt = data_iter.next(); + } + + while !is_child(¢er_side.path, &left_side) { + let temp = CenterSide::from_stack_element(left_side); + left_side = trie_stack.pop().ok_or(TrieGenerationError::TrieStackEmpty( + center_side.path.clone(), + ))?; + add_center_to_parent_and_write_queue(&mut nodes_to_write, &temp, &mut left_side)?; + } + + add_center_to_parent_and_write_queue(&mut nodes_to_write, ¢er_side, &mut left_side)?; + + while let Some(mut parent_node) = trie_stack.pop() { + add_center_to_parent_and_write_queue( + &mut nodes_to_write, + &CenterSide::from_stack_element(left_side), + &mut parent_node, + )?; + left_side = parent_node; + } + + let hash = if left_side + .element + .choices + .iter() + .filter(|choice| choice.is_valid()) + .count() + == 1 + { + let (index, child) = left_side + .element + .choices + .into_iter() + .enumerate() + .find(|(_, child)| child.is_valid()) + .unwrap(); + + debug_assert!(nodes_to_write.last().unwrap().0 == child.compute_hash()); + let (node_hash, node_hash_ref) = nodes_to_write.iter_mut().last().unwrap(); + match node_hash_ref { + Node::Branch(_) => { + let node: Node = ExtensionNode { + prefix: Nibbles::from_hex(vec![index as u8]), + child, + } + .into(); + nodes_to_write.push((node.compute_hash(), node)); + nodes_to_write + .last() + .expect("we just inserted") + .0 + .finalize() + } + Node::Extension(extension_node) => { + extension_node.prefix.data.insert(0, index as u8); + *node_hash = extension_node.compute_hash(); + node_hash.finalize() + } + Node::Leaf(leaf_node) => { + leaf_node.partial.data.insert(0, index as u8); + *node_hash = leaf_node.compute_hash(); + node_hash.finalize() + } + } + } else { + let node: Node = left_side.element.into(); + nodes_to_write.push((node.compute_hash(), node)); + nodes_to_write + .last() + .expect("we just inserted") + .0 + .finalize() + }; + + let _ = flush_nodes_to_write(nodes_to_write, db, buffer_sender); + Ok(hash) +} + +pub fn trie_from_sorted_accounts_wrap( + db: &dyn TrieDB, + accounts_iter: &mut T, +) -> Result +where + T: Iterator)> + Send, +{ + let (buffer_sender, buffer_receiver) = bounded::>(BUFFER_COUNT as usize); + for _ in 0..BUFFER_COUNT { + let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize)); + } + scope(|s| { + let pool = ThreadPool::new(12, s); + trie_from_sorted_accounts( + db, + accounts_iter, + Arc::new(pool), + buffer_sender, + buffer_receiver, + ) + }) +} + +#[cfg(test)] +mod test { + use ethereum_types::U256; + use ethrex_rlp::encode::RLPEncode; + + use crate::{InMemoryTrieDB, Trie}; + + use super::*; + use std::{collections::BTreeMap, str::FromStr, sync::Mutex}; + + fn generate_input_1() -> BTreeMap> { + let mut accounts: BTreeMap> = BTreeMap::new(); + for string in [ + "68521f7430502aef983fd7568ea179ed0f8d12d5b68883c90573781ae0778ec2", + "68db10f720d5972738df0d841d64c7117439a1a2ca9ba247e7239b19eb187414", + "6b7c1458952b903dbe3717bc7579f18e5cb1136be1b11b113cdac0f0791c07d3", + ] { + accounts.insert(H256::from_str(string).unwrap(), vec![0, 1]); + } + accounts + } + + fn generate_input_2() -> BTreeMap> { + let mut accounts: BTreeMap> = BTreeMap::new(); + for string in [ + "0532f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e9", + "14d5df819167b77851220ee266178aee165daada67ca865e9d50faed6b4fdbe3", + "6908aa86b715fcf221f208a28bb84bf6359ba9c41da04b7e17a925cdb22bf704", + "90bbe47533cd80b5d9cef6c283415edd90296bf4ac4ede6d2a6b42bb3d5e7d0e", + "90c2fdad333366cf0f18f0dded9b478590c0563e4c847c79aee0b733b5a9104f", + "af9e3efce873619102dfdb0504abd44179191bccfb624608961e71492a1ba5b7", + "b723d5841dc4d6d3fe7de03ad74dd83798c3b68f752bba29c906ec7f5a469452", + "c2c6fd64de59489f0c27e75443c24327cef6415f1d3ee1659646abefab212113", + "ca0d791e7a3e0f25d775034acecbaaf9219939288e6282d8291e181b9c3c24b0", + "f0dcaaa40dfc67925d6e172e48b8f83954ba46cfb1bb522c809f3b93b49205ee", + ] { + accounts.insert(H256::from_str(string).unwrap(), vec![0, 1]); + } + accounts + } + + fn generate_input_3() -> BTreeMap> { + let mut accounts: BTreeMap> = BTreeMap::new(); + for string in [ + "0532f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e9", + "0542f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e9", + "0552f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e9", + ] { + accounts.insert(H256::from_str(string).unwrap(), vec![0, 1]); + } + accounts + } + + fn generate_input_4() -> BTreeMap> { + let mut accounts: BTreeMap> = BTreeMap::new(); + let string = "0532f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e9"; + accounts.insert(H256::from_str(string).unwrap(), vec![0, 1]); + accounts + } + + fn generate_input_5() -> BTreeMap> { + let mut accounts: BTreeMap> = BTreeMap::new(); + for (string, value) in [ + ( + "290decd9548b62a8d60345a988386fc84ba6bc95484008f6362f93160ef3e563", + U256::from_str("1191240792495687806002885977912460542139236513636").unwrap(), + ), + ( + "295841a49a1089f4b560f91cfbb0133326654dcbb1041861fc5dde96c724a22f", + U256::from(480), + ), + ] { + accounts.insert(H256::from_str(string).unwrap(), value.encode_to_vec()); + } + accounts + } + + fn generate_input_slots_1() -> BTreeMap { + let mut slots: BTreeMap = BTreeMap::new(); + for string in [ + "0532f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e8", + "0532f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e9", + "0552f23d3bd5277790ece5a6cb6fc684bc473a91ffe3a0334049527c4f6987e9", + ] { + slots.insert(H256::from_str(string).unwrap(), U256::zero()); + } + slots + } + + pub fn run_test_account_state(accounts: BTreeMap>) { + let computed_data = Arc::new(Mutex::new(BTreeMap::new())); + let trie = Trie::new(Box::new(InMemoryTrieDB::new(computed_data.clone()))); + let db = trie.db(); + let tested_trie_hash: H256 = trie_from_sorted_accounts_wrap( + db, + &mut accounts + .clone() + .into_iter() + .map(|(hash, state)| (hash, state.encode_to_vec())), + ) + .expect("Shouldn't have errors"); + + let expected_data = Arc::new(Mutex::new(BTreeMap::new())); + let mut trie = Trie::new(Box::new(InMemoryTrieDB::new(expected_data.clone()))); + for account in accounts.iter() { + trie.insert(account.0.as_bytes().to_vec(), account.1.encode_to_vec()) + .unwrap(); + } + + assert_eq!(tested_trie_hash, trie.hash().unwrap()); + + let computed_data = computed_data.lock().unwrap(); + let expected_data = expected_data.lock().unwrap(); + for (k, v) in computed_data.iter() { + assert!(expected_data.contains_key(k)); + assert_eq!(*v, expected_data[k]); + } + } + + pub fn run_test_storage_slots(slots: BTreeMap) { + let trie = Trie::stateless(); + let db = trie.db(); + let tested_trie_hash: H256 = trie_from_sorted_accounts_wrap( + db, + &mut slots + .clone() + .into_iter() + .map(|(hash, state)| (hash, state.encode_to_vec())), + ) + .expect("Shouldn't have errors"); + + let mut trie: Trie = Trie::empty_in_memory(); + for account in slots.iter() { + trie.insert(account.0.as_bytes().to_vec(), account.1.encode_to_vec()) + .unwrap(); + } + + let trie_hash = trie.hash_no_commit(); + + assert!(tested_trie_hash == trie_hash) + } + + #[test] + fn test_1() { + run_test_account_state(generate_input_1()); + } + + #[test] + fn test_2() { + run_test_account_state(generate_input_2()); + } + + #[test] + fn test_3() { + run_test_account_state(generate_input_3()); + } + + #[test] + fn test_4() { + run_test_account_state(generate_input_4()); + } + + #[test] + fn test_5() { + run_test_account_state(generate_input_5()); + } + + #[test] + fn test_slots_1() { + run_test_storage_slots(generate_input_slots_1()); + } +} diff --git a/crates/concurrency/Cargo.lock b/crates/concurrency/Cargo.lock new file mode 100644 index 00000000000..8658b491916 --- /dev/null +++ b/crates/concurrency/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "pool_test" +version = "0.1.0" diff --git a/crates/concurrency/Cargo.toml b/crates/concurrency/Cargo.toml new file mode 100644 index 00000000000..0ab63655ef9 --- /dev/null +++ b/crates/concurrency/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "ethrex-threadpool" +version = "0.1.0" +edition = "2024" + +[lib] +path = "concurrency.rs" + +[dependencies] +crossbeam.workspace = true diff --git a/crates/concurrency/concurrency.rs b/crates/concurrency/concurrency.rs new file mode 100644 index 00000000000..fa263cc91b5 --- /dev/null +++ b/crates/concurrency/concurrency.rs @@ -0,0 +1,51 @@ +use crossbeam::channel::{Sender, select_biased, unbounded}; +use std::marker::Send; +use std::thread::{Builder, Scope}; + +pub struct ThreadPool<'scope> { + priority_sender: Sender>, // Implictly our threads in the thread pool have the receiver + nice_sender: Sender>, // Implictly our threads in the thread pool have the receiver +} + +impl<'scope> ThreadPool<'scope> { + pub fn new(thread_count: usize, scope: &'scope Scope<'scope, '_>) -> Self { + let (priority_sender, priority_receiver) = unbounded::>(); + let (nice_sender, nice_receiver) = unbounded::>(); + + for i in 0..thread_count { + let priority_receiver = priority_receiver.clone(); + let nice_receiver = nice_receiver.clone(); + let _ = Builder::new() + .name(format!("ThreadPool {i}")) + .spawn_scoped(scope, move || { + // Thread work goes here + while let Ok(task) = select_biased! { + recv(priority_receiver) -> msg => msg, + recv(nice_receiver) -> msg => msg, + } { + task(); + } + // If one of the senders closes because the threadpool is dropped, the other one + // channel may still exist and have data + while let Ok(task) = priority_receiver.recv() { + task(); + } + while let Ok(task) = nice_receiver.recv() { + task(); + } + }); + } + ThreadPool { + priority_sender, + nice_sender, + } + } + + pub fn execute(&self, task: Box) { + self.nice_sender.send(task).unwrap(); + } + + pub fn execute_priority(&self, task: Box) { + self.priority_sender.send(task).unwrap(); + } +} diff --git a/crates/l2/prover/src/guest_program/src/risc0/Cargo.lock b/crates/l2/prover/src/guest_program/src/risc0/Cargo.lock index be08d505d0c..ef01818a245 100644 --- a/crates/l2/prover/src/guest_program/src/risc0/Cargo.lock +++ b/crates/l2/prover/src/guest_program/src/risc0/Cargo.lock @@ -811,6 +811,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -830,6 +852,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1298,15 +1329,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "ethrex-threadpool" +version = "0.1.0" +dependencies = [ + "crossbeam", +] + [[package]] name = "ethrex-trie" version = "0.1.0" dependencies = [ "anyhow", "bytes", + "crossbeam", "digest", "ethereum-types", "ethrex-rlp", + "ethrex-threadpool", "hex", "lazy_static", "serde", diff --git a/crates/l2/prover/src/guest_program/src/sp1/Cargo.lock b/crates/l2/prover/src/guest_program/src/sp1/Cargo.lock index 9533705a041..72e1c4e41ac 100644 --- a/crates/l2/prover/src/guest_program/src/sp1/Cargo.lock +++ b/crates/l2/prover/src/guest_program/src/sp1/Cargo.lock @@ -621,6 +621,28 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -640,6 +662,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1061,15 +1092,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "ethrex-threadpool" +version = "0.1.0" +dependencies = [ + "crossbeam", +] + [[package]] name = "ethrex-trie" version = "0.1.0" dependencies = [ "anyhow", "bytes", + "crossbeam", "digest", "ethereum-types", "ethrex-rlp", + "ethrex-threadpool", "hex", "lazy_static", "serde", diff --git a/crates/l2/tee/quote-gen/Cargo.lock b/crates/l2/tee/quote-gen/Cargo.lock index 18f512f792d..6d5e9f4a051 100644 --- a/crates/l2/tee/quote-gen/Cargo.lock +++ b/crates/l2/tee/quote-gen/Cargo.lock @@ -1107,13 +1107,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" dependencies = [ "cfg-if 0.1.10", - "crossbeam-channel", + "crossbeam-channel 0.4.4", "crossbeam-deque 0.7.4", "crossbeam-epoch 0.8.2", - "crossbeam-queue", + "crossbeam-queue 0.2.3", "crossbeam-utils 0.7.2", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel 0.5.15", + "crossbeam-deque 0.8.6", + "crossbeam-epoch 0.9.18", + "crossbeam-queue 0.3.12", + "crossbeam-utils 0.8.21", +] + [[package]] name = "crossbeam-channel" version = "0.4.4" @@ -1124,6 +1137,15 @@ dependencies = [ "maybe-uninit", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils 0.8.21", +] + [[package]] name = "crossbeam-deque" version = "0.7.4" @@ -1180,6 +1202,15 @@ dependencies = [ "maybe-uninit", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils 0.8.21", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -2269,6 +2300,7 @@ dependencies = [ "async-trait", "bytes", "concat-kdf", + "crossbeam 0.8.4", "ctr", "ethereum-types 0.15.1", "ethrex-blockchain", @@ -2276,6 +2308,7 @@ dependencies = [ "ethrex-rlp", "ethrex-storage", "ethrex-storage-rollup", + "ethrex-threadpool", "ethrex-trie", "futures", "hex", @@ -2418,15 +2451,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "ethrex-threadpool" +version = "0.1.0" +dependencies = [ + "crossbeam 0.8.4", +] + [[package]] name = "ethrex-trie" version = "0.1.0" dependencies = [ "anyhow", "bytes", + "crossbeam 0.8.4", "digest", "ethereum-types 0.15.1", "ethrex-rlp", + "ethrex-threadpool", "hex", "lazy_static", "serde", @@ -5814,7 +5856,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76347472cc448d47dbf9f67541fde19dbb054793e8e0546ce8917bfb695e1b56" dependencies = [ - "crossbeam", + "crossbeam 0.7.3", "tokio", "tokio-stream", "tokio-util", diff --git a/crates/networking/p2p/Cargo.toml b/crates/networking/p2p/Cargo.toml index 43ad99f4d9d..9ffda225263 100644 --- a/crates/networking/p2p/Cargo.toml +++ b/crates/networking/p2p/Cargo.toml @@ -14,6 +14,7 @@ ethrex-rlp.workspace = true ethrex-storage.workspace = true ethrex-trie.workspace = true ethrex-storage-rollup.workspace = true +ethrex-threadpool.workspace = true ethereum-types.workspace = true async-trait.workspace = true @@ -31,6 +32,7 @@ spawned-rt.workspace = true spawned-concurrency.workspace = true sha2.workspace = true futures.workspace = true +rocksdb = { workspace = true, optional = true } prometheus = "0.14.0" tokio-stream = "0.1.17" @@ -46,6 +48,7 @@ ctr = "0.9.2" rand = "0.8.5" rayon = "1.10.0" +crossbeam.workspace = true [dev-dependencies] hex-literal = "0.4.1" @@ -57,6 +60,7 @@ path = "./p2p.rs" default = ["c-kzg"] c-kzg = ["ethrex-blockchain/c-kzg", "ethrex-common/c-kzg"] sync-test = [] +rocksdb = ["dep:rocksdb"] [lints.clippy] unwrap_used = "deny" diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 2aa0bfda830..1ef70c87ae9 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -252,6 +252,7 @@ pub async fn periodically_show_peer_stats_during_syncing( // Storage leaves metrics let storage_leaves_downloaded = METRICS.downloaded_storage_slots.load(Ordering::Relaxed); + let storage_accounts_inserted = METRICS.storage_tries_state_roots_computed.get(); let storage_accounts = METRICS.storage_accounts_initial.load(Ordering::Relaxed); let storage_accounts_healed = METRICS.storage_accounts_healed.load(Ordering::Relaxed); let storage_leaves_time = format_duration({ @@ -355,8 +356,8 @@ Current Header Hash: {current_header_hash:x} headers progress: {headers_download_progress} (total: {headers_to_download}, downloaded: {headers_downloaded}, remaining: {headers_remaining}) account leaves download: {account_leaves_downloaded}, elapsed: {account_leaves_time} account leaves insertion: {account_leaves_inserted_percentage:.2}%, elapsed: {account_leaves_inserted_time} -storage leaves download: {storage_leaves_downloaded}, elapsed: {storage_leaves_time}, initially accounts with storage {storage_accounts}, healed accounts {storage_accounts_healed} -storage leaves insertion: {storage_leaves_inserted_time} +storage leaves download: {storage_leaves_downloaded}, elapsed: {storage_leaves_time}, initially accounts with storage {storage_accounts}, healed accounts {storage_accounts_healed} +storage leaves insertion: {storage_accounts_inserted}, {storage_leaves_inserted_time} healing: global accounts healed {healed_accounts} global storage slots healed {healed_storages}, elapsed: {heal_time}, current throttle {heal_current_throttle} bytecodes progress: downloaded: {bytecodes_downloaded}, elapsed: {bytecodes_download_time})" ); diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 887381ef817..93282297053 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashSet, VecDeque}, + collections::{BTreeMap, HashMap, HashSet, VecDeque}, io::ErrorKind, path::{Path, PathBuf}, sync::atomic::Ordering, @@ -12,6 +12,7 @@ use ethrex_common::{ types::{AccountState, BlockBody, BlockHeader, Receipt, validate_block_body}, }; use ethrex_rlp::encode::RLPEncode; +use ethrex_storage::Store; use ethrex_trie::Nibbles; use ethrex_trie::{Node, verify_range}; @@ -37,8 +38,8 @@ use crate::{ snap::encodable_to_proof, sync::{AccountStorageRoots, BlockSyncState, block_is_stale, update_pivot}, utils::{ - SendMessageError, dump_to_file, get_account_state_snapshot_file, - get_account_storages_snapshot_file, + AccountsWithStorage, SendMessageError, dump_accounts_to_file, dump_storages_to_file, + get_account_state_snapshot_file, get_account_storages_snapshot_file, }, }; use tracing::{debug, error, info, trace, warn}; @@ -774,10 +775,6 @@ impl PeerHandler { let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<(Vec, H256, Option<(H256, H256)>)>(1000); - // channel to send the result of dumping accounts - let (dump_account_result_sender, mut dump_account_result_receiver) = - tokio::sync::mpsc::channel::>(1000); - info!("Starting to download account ranges from peers"); *METRICS.account_tries_download_start_time.lock().await = Some(SystemTime::now()); @@ -785,6 +782,7 @@ impl PeerHandler { let mut completed_tasks = 0; let mut chunk_file = 0; let mut last_update: SystemTime = SystemTime::now(); + let mut write_set = tokio::task::JoinSet::new(); loop { if all_accounts_state.len() * size_of::() >= RANGE_FILE_CHUNK_SIZE { @@ -794,8 +792,7 @@ impl PeerHandler { let account_state_chunk = current_account_hashes .into_iter() .zip(current_account_states) - .collect::>() - .encode_to_vec(); + .collect::>(); if !std::fs::exists(account_state_snapshots_dir) .map_err(|_| PeerHandlerError::NoStateSnapshotsDir)? @@ -805,22 +802,13 @@ impl PeerHandler { } let account_state_snapshots_dir_cloned = account_state_snapshots_dir.to_path_buf(); - let dump_account_result_sender_cloned = dump_account_result_sender.clone(); - tokio::task::spawn(async move { + write_set.spawn(async move { let path = get_account_state_snapshot_file( &account_state_snapshots_dir_cloned, chunk_file, ); // TODO: check the error type and handle it properly - let result = dump_to_file(&path, account_state_chunk); - dump_account_result_sender_cloned - .send(result) - .await - .inspect_err(|err| { - error!( - "Failed to send account dump result through channel. Error: {err}" - ) - }) + dump_accounts_to_file(&path, account_state_chunk) }); chunk_file += 1; @@ -871,30 +859,6 @@ impl PeerHandler { ); } - // Check if any dump account task finished - // TODO: consider tracking in-flight (dump) tasks - if let Ok(Err(dump_account_data)) = dump_account_result_receiver.try_recv() { - if dump_account_data.error == ErrorKind::StorageFull { - return Err(PeerHandlerError::StorageFull); - } - // If the dumping failed, retry it - let dump_account_result_sender_cloned = dump_account_result_sender.clone(); - tokio::task::spawn(async move { - let DumpError { path, contents, .. } = dump_account_data; - // Dump the account data - let result = dump_to_file(&path, contents); - // Send the result through the channel - dump_account_result_sender_cloned - .send(result) - .await - .inspect_err(|err| { - error!( - "Failed to send account dump result through channel. Error: {err}" - ) - }) - }); - } - let Some((peer_id, peer_channel)) = self .peer_table .use_best_peer(&SUPPORTED_ETH_CAPABILITIES) @@ -939,6 +903,13 @@ impl PeerHandler { )); } + write_set + .join_all() + .await + .into_iter() + .collect::, DumpError>>() + .map_err(PeerHandlerError::DumpError)?; + // TODO: This is repeated code, consider refactoring { let current_account_hashes = std::mem::take(&mut all_account_hashes); @@ -947,8 +918,7 @@ impl PeerHandler { let account_state_chunk = current_account_hashes .into_iter() .zip(current_account_states) - .collect::>() - .encode_to_vec(); + .collect::>(); if !std::fs::exists(account_state_snapshots_dir) .map_err(|_| PeerHandlerError::NoStateSnapshotsDir)? @@ -958,7 +928,13 @@ impl PeerHandler { } let path = get_account_state_snapshot_file(account_state_snapshots_dir, chunk_file); - std::fs::write(path, account_state_chunk) + dump_accounts_to_file(&path, account_state_chunk) + .inspect_err(|err| { + error!( + "We had an error dumping the last accounts to disk {}", + err.error + ) + }) .map_err(|_| PeerHandlerError::WriteStateSnapshotsDir(chunk_file))?; } @@ -1296,23 +1272,49 @@ impl PeerHandler { account_storages_snapshots_dir: &Path, mut chunk_index: u64, pivot_header: &mut BlockHeader, + store: Store, ) -> Result { METRICS .current_step .set(CurrentStepValue::RequestingStorageRanges); debug!("Starting request_storage_ranges function"); // 1) split the range in chunks of same length + let mut accounts_by_root_hash: BTreeMap<_, Vec<_>> = BTreeMap::new(); + for (account, (maybe_root_hash, _)) in &account_storage_roots.accounts_with_storage_root { + match maybe_root_hash { + Some(root) => { + accounts_by_root_hash + .entry(*root) + .or_default() + .push(*account); + } + None => { + let root = store + .get_account_state_by_acc_hash(pivot_header.hash(), *account) + .expect("Failed to get account in state trie") + .expect("Could not find account that should have been downloaded or healed") + .storage_root; + accounts_by_root_hash + .entry(root) + .or_default() + .push(*account); + } + } + } + let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash); + // TODO: Turn this into a stable sort for binary search. + accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len()); let chunk_size = 300; - let chunk_count = (account_storage_roots.accounts_with_storage_root.len() / chunk_size) + 1; + let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1; // list of tasks to be executed // Types are (start_index, end_index, starting_hash) // NOTE: end_index is NOT inclusive + let mut tasks_queue_not_started = VecDeque::::new(); for i in 0..chunk_count { let chunk_start = chunk_size * i; - let chunk_end = (chunk_start + chunk_size) - .min(account_storage_roots.accounts_with_storage_root.len()); + let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len()); tasks_queue_not_started.push_back(StorageTask { start_index: chunk_start, end_index: chunk_end, @@ -1321,10 +1323,6 @@ impl PeerHandler { }); } - // 2) request the chunks from peers - let mut all_account_storages = - vec![vec![]; account_storage_roots.accounts_with_storage_root.len()]; - // channel to send the tasks to the peers let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::(1000); @@ -1337,28 +1335,21 @@ impl PeerHandler { let mut completed_tasks = 0; // TODO: in a refactor, delete this replace with a structure that can handle removes - let mut accounts_done: Vec = Vec::new(); - let current_account_hashes = account_storage_roots - .accounts_with_storage_root - .iter() - .map(|a| *a.0) - .collect::>(); + let mut accounts_done: HashMap> = HashMap::new(); + // Maps storage root to vector of hashed addresses matching that root and + // vector of hashed storage keys and storage values. + let mut current_account_storages: BTreeMap = BTreeMap::new(); debug!("Starting request_storage_ranges loop"); loop { - if all_account_storages.iter().map(Vec::len).sum::() * 64 > RANGE_FILE_CHUNK_SIZE + if current_account_storages + .values() + .map(|accounts| 32 * accounts.accounts.len() + 64 * accounts.storages.len()) + .sum::() + > RANGE_FILE_CHUNK_SIZE { - let current_account_storages = std::mem::take(&mut all_account_storages); - all_account_storages = - vec![vec![]; account_storage_roots.accounts_with_storage_root.len()]; - - let snapshot = current_account_hashes - .clone() - .into_iter() - .zip(current_account_storages) - .filter(|(_, storages)| !storages.is_empty()) - .collect::>() - .encode_to_vec(); + let current_account_storages = std::mem::take(&mut current_account_storages); + let snapshot = current_account_storages.into_values().collect::>(); if !std::fs::exists(account_storages_snapshots_dir) .map_err(|_| PeerHandlerError::NoStorageSnapshotsDir)? @@ -1385,7 +1376,7 @@ impl PeerHandler { &account_storages_snapshots_dir_cloned, chunk_index, ); - dump_to_file(&path, snapshot) + dump_storages_to_file(&path, snapshot) }); chunk_index += 1; @@ -1404,8 +1395,12 @@ impl PeerHandler { self.peer_table.free_peer(&peer_id).await?; - for account in ¤t_account_hashes[start_index..remaining_start] { - accounts_done.push(*account); + for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() { + for account in accounts { + if !accounts_done.contains_key(account) { + accounts_done.insert(*account, vec![]); + } + } } if remaining_start < remaining_end { @@ -1431,10 +1426,58 @@ impl PeerHandler { }; tasks_queue_not_started.push_back(task); task_count += 1; - accounts_done.push(current_account_hashes[remaining_start]); + + let acc_hash = accounts_by_root_hash[remaining_start].1[0]; + let (_, old_intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(&acc_hash).ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + for (old_start, end) in old_intervals { + if end == &hash_end { + *old_start = hash_start; + } + } account_storage_roots .healed_accounts - .insert(current_account_hashes[start_index]); + .extend(accounts_by_root_hash[start_index].1.iter().copied()); + } else { + let mut acc_hash: H256 = H256::zero(); + // This search could potentially be expensive, but it's something that should happen very + // infrequently (only when we encounter an account we think it's big but it's not). In + // normal cases the vec we are iterating over just has one element (the big account). + for account in accounts_by_root_hash[remaining_start].1.iter() { + if let Some((_, old_intervals)) = account_storage_roots + .accounts_with_storage_root + .get(account) + { + if !old_intervals.is_empty() { + acc_hash = *account; + } + } else { + continue; + } + } + if acc_hash.is_zero() { + panic!("Should have found the account hash"); + } + let (_, old_intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(&acc_hash) + .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + old_intervals.remove( + old_intervals + .iter() + .position(|(_old_start, end)| end == &hash_end) + .ok_or(PeerHandlerError::UnrecoverableError( + "Could not find an old interval that we were tracking" + .to_owned(), + ))?, + ); + if old_intervals.is_empty() { + for account in accounts_by_root_hash[remaining_start].1.iter() { + accounts_done.insert(*account, vec![]); + account_storage_roots.healed_accounts.insert(*account); + } + } } } else { if remaining_start + 1 < remaining_end { @@ -1465,27 +1508,96 @@ impl PeerHandler { let chunk_count = (missing_storage_range / chunk_size).as_usize().max(1); - for i in 0..chunk_count { - let start_hash_u256 = start_hash_u256 + chunk_size * i; - let start_hash = H256::from_uint(&start_hash_u256); - let end_hash = if i == chunk_count - 1 { - H256::repeat_byte(0xff) + let maybe_old_intervals = account_storage_roots + .accounts_with_storage_root + .get(&accounts_by_root_hash[remaining_start].1[0]); + + if let Some((_, old_intervals)) = maybe_old_intervals { + if !old_intervals.is_empty() { + for (start_hash, end_hash) in old_intervals { + let task = StorageTask { + start_index: remaining_start, + end_index: remaining_start + 1, + start_hash: *start_hash, + end_hash: Some(*end_hash), + }; + + tasks_queue_not_started.push_back(task); + task_count += 1; + } } else { - let end_hash_u256 = - start_hash_u256.checked_add(chunk_size).unwrap_or(U256::MAX); - H256::from_uint(&end_hash_u256) - }; - - let task = StorageTask { - start_index: remaining_start, - end_index: remaining_start + 1, - start_hash, - end_hash: Some(end_hash), - }; - tasks_queue_not_started.push_back(task); - task_count += 1; + // TODO: DRY + account_storage_roots.accounts_with_storage_root.insert( + accounts_by_root_hash[remaining_start].1[0], + (None, vec![]), + ); + let (_, intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(&accounts_by_root_hash[remaining_start].1[0]) + .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + + for i in 0..chunk_count { + let start_hash_u256 = start_hash_u256 + chunk_size * i; + let start_hash = H256::from_uint(&start_hash_u256); + let end_hash = if i == chunk_count - 1 { + H256::repeat_byte(0xff) + } else { + let end_hash_u256 = start_hash_u256 + .checked_add(chunk_size) + .unwrap_or(U256::MAX); + H256::from_uint(&end_hash_u256) + }; + + let task = StorageTask { + start_index: remaining_start, + end_index: remaining_start + 1, + start_hash, + end_hash: Some(end_hash), + }; + + intervals.push((start_hash, end_hash)); + + tasks_queue_not_started.push_back(task); + task_count += 1; + } + debug!("Split big storage account into {chunk_count} chunks."); + } + } else { + account_storage_roots.accounts_with_storage_root.insert( + accounts_by_root_hash[remaining_start].1[0], + (None, vec![]), + ); + let (_, intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(&accounts_by_root_hash[remaining_start].1[0]) + .ok_or(PeerHandlerError::UnrecoverableError("Trie to get the old download intervals for an account but did not find them".to_owned()))?; + + for i in 0..chunk_count { + let start_hash_u256 = start_hash_u256 + chunk_size * i; + let start_hash = H256::from_uint(&start_hash_u256); + let end_hash = if i == chunk_count - 1 { + H256::repeat_byte(0xff) + } else { + let end_hash_u256 = start_hash_u256 + .checked_add(chunk_size) + .unwrap_or(U256::MAX); + H256::from_uint(&end_hash_u256) + }; + + let task = StorageTask { + start_index: remaining_start, + end_index: remaining_start + 1, + start_hash, + end_hash: Some(end_hash), + }; + + intervals.push((start_hash, end_hash)); + + tasks_queue_not_started.push_back(task); + task_count += 1; + } + debug!("Split big storage account into {chunk_count} chunks."); } - debug!("Split big storage account into {chunk_count} chunks."); } } @@ -1517,12 +1629,29 @@ impl PeerHandler { "Total tasks: {task_count}, completed tasks: {completed_tasks}, queued tasks: {}", tasks_queue_not_started.len() ); + // THEN: update insert to read with the correct structure and reuse + // tries, only changing the prefix for insertion. if account_storages.len() == 1 { + let (root_hash, accounts) = &accounts_by_root_hash[start_index]; // We downloaded a big storage account - all_account_storages[start_index].extend(account_storages.remove(0)); + current_account_storages + .entry(*root_hash) + .or_insert_with(|| AccountsWithStorage { + accounts: accounts.clone(), + storages: Vec::new(), + }) + .storages + .extend(account_storages.remove(0)); } else { - for (i, storage) in account_storages.into_iter().enumerate() { - all_account_storages[start_index + i] = storage; + for (i, storages) in account_storages.into_iter().enumerate() { + let (root_hash, accounts) = &accounts_by_root_hash[start_index + i]; + current_account_storages.insert( + *root_hash, + AccountsWithStorage { + accounts: accounts.clone(), + storages, + }, + ); } } } @@ -1550,13 +1679,11 @@ impl PeerHandler { let tx = task_sender.clone(); + // FIXME: this unzip is probably pointless and takes up unnecessary memory. let (chunk_account_hashes, chunk_storage_roots): (Vec<_>, Vec<_>) = - account_storage_roots - .accounts_with_storage_root + accounts_by_root_hash[task.start_index..task.end_index] .iter() - .skip(task.start_index) - .take(task.end_index - task.start_index) - .map(|(hash, root)| (*hash, *root)) + .map(|(root, storages)| (storages[0], *root)) .unzip(); if task_count - completed_tasks < 30 { @@ -1579,19 +1706,7 @@ impl PeerHandler { } { - let current_account_hashes = account_storage_roots - .accounts_with_storage_root - .iter() - .map(|a| *a.0) - .collect::>(); - let current_account_storages = std::mem::take(&mut all_account_storages); - - let snapshot = current_account_hashes - .into_iter() - .zip(current_account_storages) - .filter(|(_, storages)| !storages.is_empty()) - .collect::>() - .encode_to_vec(); + let snapshot = current_account_storages.into_values().collect::>(); if !std::fs::exists(account_storages_snapshots_dir) .map_err(|_| PeerHandlerError::NoStorageSnapshotsDir)? @@ -1601,7 +1716,7 @@ impl PeerHandler { } let path = get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index); - std::fs::write(path, snapshot) + dump_storages_to_file(&path, snapshot) .map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(chunk_index))?; } disk_joinset @@ -1615,10 +1730,12 @@ impl PeerHandler { .collect::, DumpError>>() .map_err(PeerHandlerError::DumpError)?; - for account_done in accounts_done { - account_storage_roots - .accounts_with_storage_root - .remove(&account_done); + for (account_done, intervals) in accounts_done { + if intervals.is_empty() { + account_storage_roots + .accounts_with_storage_root + .remove(&account_done); + } } // Dropping the task sender so that the recv returns None @@ -2005,6 +2122,8 @@ pub enum PeerHandlerError { NoResponseFromPeer, #[error("Dumping snapshots to disk failed {0:?}")] DumpError(DumpError), + #[error("Encountered an unexpected error. This is a bug {0}")] + UnrecoverableError(String), #[error("Error in Peer Table: {0}")] PeerTableError(#[from] PeerTableError), } diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 98509958e82..07ba3232b33 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -24,17 +24,22 @@ use ethrex_common::{ }; use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{EngineType, STATE_TRIE_SEGMENTS, Store, error::StoreError}; -use ethrex_trie::{NodeHash, Trie, TrieError}; -use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; -use std::collections::{BTreeMap, HashSet}; -use std::path::PathBuf; +use ethrex_trie::trie_sorted::TrieGenerationError; +use ethrex_trie::{Trie, TrieError}; +use rayon::iter::{ParallelBridge, ParallelIterator}; +#[cfg(not(feature = "rocksdb"))] +use std::collections::hash_map::Entry; +use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::path::{Path, PathBuf}; +#[cfg(not(feature = "rocksdb"))] +use std::sync::Mutex; use std::time::SystemTime; use std::{ array, cmp::min, - collections::{HashMap, hash_map::Entry}, + collections::HashMap, sync::{ - Arc, Mutex, + Arc, atomic::{AtomicBool, Ordering}, }, }; @@ -863,7 +868,8 @@ impl Syncer { std::fs::create_dir_all(&code_hashes_snapshot_dir).map_err(|_| SyncError::CorruptPath)?; // Create collector to store code hashes in files - let mut code_hash_collector = CodeHashCollector::new(code_hashes_snapshot_dir.clone()); + let mut code_hash_collector: CodeHashCollector = + CodeHashCollector::new(code_hashes_snapshot_dir.clone()); let mut storage_accounts = AccountStorageRoots::default(); if !std::env::var("SKIP_START_SNAP_SYNC").is_ok_and(|var| !var.is_empty()) { @@ -887,67 +893,17 @@ impl Syncer { *METRICS.account_tries_insert_start_time.lock().await = Some(SystemTime::now()); // We read the account leafs from the files in account_state_snapshots_dir, write it into // the trie to compute the nodes and stores the accounts with storages for later use - let mut computed_state_root = *EMPTY_TRIE_HASH; - for entry in std::fs::read_dir(&account_state_snapshots_dir) - .map_err(|_| SyncError::AccountStateSnapshotsDirNotFound)? - { - METRICS - .current_step - .set(crate::metrics::CurrentStepValue::InsertingAccountRangesNoDb); - - let entry = entry.map_err(|err| { - SyncError::SnapshotReadError(account_state_snapshots_dir.clone(), err) - })?; - info!("Reading account file from entry {entry:?}"); - let snapshot_path = entry.path(); - let snapshot_contents = std::fs::read(&snapshot_path) - .map_err(|err| SyncError::SnapshotReadError(snapshot_path.clone(), err))?; - let account_states_snapshot: Vec<(H256, AccountState)> = - RLPDecode::decode(&snapshot_contents) - .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?; - - storage_accounts.accounts_with_storage_root.extend( - account_states_snapshot.iter().filter_map(|(hash, state)| { - (state.storage_root != *EMPTY_TRIE_HASH) - .then_some((*hash, state.storage_root)) - }), - ); - - info!("Inserting accounts into the state trie"); - - // Collect valid code hashes from current account snapshot - let code_hashes_from_snapshot: Vec = account_states_snapshot - .iter() - .filter_map(|(_, state)| { - (state.code_hash != *EMPTY_KECCACK_HASH).then_some(state.code_hash) - }) - .collect(); - - code_hash_collector.extend(code_hashes_from_snapshot); - code_hash_collector.flush_if_needed().await?; - - let store_clone = store.clone(); - let current_state_root = - tokio::task::spawn_blocking(move || -> Result { - let mut trie = store_clone.open_state_trie(computed_state_root)?; - - for (account_hash, account) in account_states_snapshot { - METRICS - .account_tries_inserted - .fetch_add(1, Ordering::Relaxed); - trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; - } - METRICS - .current_step - .set(crate::metrics::CurrentStepValue::InsertingAccountRanges); - let current_state_root = trie.hash()?; - Ok(current_state_root) - }) - .await??; - - computed_state_root = current_state_root; - } + // Variable `accounts_with_storage` unused if not in rocksdb + #[allow(unused_variables)] + let (computed_state_root, accounts_with_storage) = insert_accounts( + store.clone(), + &mut storage_accounts, + &account_state_snapshots_dir, + &self.datadir, + &mut code_hash_collector, + ) + .await?; info!( "Finished inserting account ranges, total storage accounts: {}", storage_accounts.accounts_with_storage_root.len() @@ -966,6 +922,7 @@ impl Syncer { // is correct. To do so, we always heal the state trie before requesting storage rates let mut chunk_index = 0_u64; let mut state_leafs_healed = 0_u64; + let mut storage_range_request_attempts = 0; loop { while block_is_stale(&pivot_header) { pivot_header = update_pivot( @@ -997,16 +954,39 @@ impl Syncer { "Started request_storage_ranges with {} accounts with storage root unchanged", storage_accounts.accounts_with_storage_root.len() ); - chunk_index = self - .peers - .request_storage_ranges( - &mut storage_accounts, - account_storages_snapshots_dir.as_ref(), - chunk_index, - &mut pivot_header, - ) - .await - .map_err(SyncError::PeerHandler)?; + storage_range_request_attempts += 1; + if storage_range_request_attempts < 3 { + chunk_index = self + .peers + .request_storage_ranges( + &mut storage_accounts, + account_storages_snapshots_dir.as_ref(), + chunk_index, + &mut pivot_header, + store.clone(), + ) + .await + .map_err(SyncError::PeerHandler)?; + } else { + for (acc_hash, (maybe_root, old_intervals)) in + storage_accounts.accounts_with_storage_root.iter() + { + // When we fall into this case what happened is there are certain accounts for which + // the storage root went back to a previous value we already had, and thus could not download + // their storage leaves because we were using an old value for their storage root. + // The fallback is to ensure we mark it for storage healing. + storage_accounts.healed_accounts.insert(*acc_hash); + debug!( + "We couldn't download these accounts on request_storage_ranges. Falling back to storage healing for it. + Account hash: {:x?}, {:x?}. Number of intervals {}", + acc_hash, + maybe_root, + old_intervals.len() + ); + } + + storage_accounts.accounts_with_storage_root.clear(); + } free_peers_and_log_if_not_empty(&mut self.peers).await?; info!( @@ -1028,62 +1008,21 @@ impl Syncer { ); *METRICS.storage_tries_download_end_time.lock().await = Some(SystemTime::now()); - let maybe_big_account_storage_state_roots: Arc>> = - Arc::new(Mutex::new(HashMap::new())); - *METRICS.storage_tries_insert_start_time.lock().await = Some(SystemTime::now()); METRICS .current_step .set(crate::metrics::CurrentStepValue::InsertingStorageRanges); let account_storages_snapshots_dir = get_account_storages_snapshots_dir(&self.datadir); - for entry in std::fs::read_dir(&account_storages_snapshots_dir) - .map_err(|_| SyncError::AccountStoragesSnapshotsDirNotFound)? - { - let entry = entry.map_err(|err| { - SyncError::SnapshotReadError(account_storages_snapshots_dir.clone(), err) - })?; - info!("Reading account storage file from entry {entry:?}"); - - let snapshot_path = entry.path(); - - let snapshot_contents = std::fs::read(&snapshot_path) - .map_err(|err| SyncError::SnapshotReadError(snapshot_path.clone(), err))?; - - let account_storages_snapshot: Vec<(H256, Vec<(H256, U256)>)> = - RLPDecode::decode(&snapshot_contents) - .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?; - let maybe_big_account_storage_state_roots_clone = - maybe_big_account_storage_state_roots.clone(); - let store_clone = store.clone(); - let pivot_hash_moved = pivot_header.hash(); - info!("Starting compute of account_storages_snapshot"); - let storage_trie_node_changes = tokio::task::spawn_blocking(move || { - let store: Store = store_clone; - - // TODO: Here we are filtering again the account with empty storage because we are adding empty accounts on purpose (it was the easiest thing to do) - // We need to fix this issue in request_storage_ranges and remove this filter. - account_storages_snapshot - .into_par_iter() - .filter(|(_account_hash, storage)| !storage.is_empty()) - .map(|(account_hash, key_value_pairs)| { - compute_storage_roots( - maybe_big_account_storage_state_roots_clone.clone(), - store.clone(), - account_hash, - key_value_pairs, - pivot_hash_moved, - ) - }) - .collect::, SyncError>>() - }) - .await??; - info!("Writing to db"); + insert_storages( + store.clone(), + accounts_with_storage, + &account_storages_snapshots_dir, + &self.datadir, + &pivot_header, + ) + .await?; - store - .write_storage_trie_nodes_batch(storage_trie_node_changes) - .await?; - } *METRICS.storage_tries_insert_end_time.lock().await = Some(SystemTime::now()); info!("Finished storing storage tries"); @@ -1235,13 +1174,15 @@ impl Syncer { } } -type StorageRoots = (H256, Vec<(NodeHash, Vec)>); +#[cfg(not(feature = "rocksdb"))] +type StorageRoots = (H256, Vec<(ethrex_trie::NodeHash, Vec)>); +#[cfg(not(feature = "rocksdb"))] fn compute_storage_roots( maybe_big_account_storage_state_roots: Arc>>, store: Store, account_hash: H256, - key_value_pairs: Vec<(H256, U256)>, + key_value_pairs: &[(H256, U256)], pivot_hash: H256, ) -> Result { let account_storage_root = match maybe_big_account_storage_state_roots @@ -1346,11 +1287,12 @@ pub fn calculate_staleness_timestamp(timestamp: u64) -> u64 { timestamp + (SNAP_LIMIT as u64 * 12) } #[derive(Debug, Default)] +#[allow(clippy::type_complexity)] /// We store for optimization the accounts that need to heal storage pub struct AccountStorageRoots { /// The accounts that have not been healed are guaranteed to have the original storage root /// we can read this storage root - pub accounts_with_storage_root: BTreeMap, + pub accounts_with_storage_root: BTreeMap, Vec<(H256, H256)>)>, /// If an account has been healed, it may return to a previous state, so we just store the account /// in a hashset pub healed_accounts: HashSet, @@ -1406,12 +1348,22 @@ pub enum SyncError { NoPeers, #[error("Failed to get block headers")] NoBlockHeaders, + #[error("Couldn't create a thread")] + ThreadCreationError, #[error("Called update_pivot outside snapsync mode")] NotInSnapSync, #[error("Peer handler error: {0}")] PeerHandler(#[from] PeerHandlerError), #[error("Corrupt Path")] CorruptPath, + #[error("Sorted Trie Generation Error: {0}")] + TrieGenerationError(#[from] TrieGenerationError), + #[error("Failed to get account temp db directory")] + AccountTempDBDirNotFound, + #[error("Failed to get storage temp db directory")] + StorageTempDBDirNotFound, + #[error("RocksDB Error: {0}")] + RocksDBError(String), #[error("Bytecode file error")] BytecodeFileError, #[error("Error in Peer Table: {0}")] @@ -1508,3 +1460,336 @@ pub async fn validate_bytecodes(store: Store, state_root: H256) -> bool { } is_valid } + +#[cfg(not(feature = "rocksdb"))] +async fn insert_accounts( + store: Store, + storage_accounts: &mut AccountStorageRoots, + account_state_snapshots_dir: &Path, + _: &Path, + code_hash_collector: &mut CodeHashCollector, +) -> Result<(H256, BTreeSet), SyncError> { + let mut computed_state_root = *EMPTY_TRIE_HASH; + for entry in std::fs::read_dir(account_state_snapshots_dir) + .map_err(|_| SyncError::AccountStateSnapshotsDirNotFound)? + { + let entry = entry + .map_err(|err| SyncError::SnapshotReadError(account_state_snapshots_dir.into(), err))?; + info!("Reading account file from entry {entry:?}"); + let snapshot_path = entry.path(); + let snapshot_contents = std::fs::read(&snapshot_path) + .map_err(|err| SyncError::SnapshotReadError(snapshot_path.clone(), err))?; + let account_states_snapshot: Vec<(H256, AccountState)> = + RLPDecode::decode(&snapshot_contents) + .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?; + + storage_accounts.accounts_with_storage_root.extend( + account_states_snapshot.iter().filter_map(|(hash, state)| { + (state.storage_root != *EMPTY_TRIE_HASH) + .then_some((*hash, (Some(state.storage_root), Vec::new()))) + }), + ); + + // Collect valid code hashes from current account snapshot + let code_hashes_from_snapshot: Vec = account_states_snapshot + .iter() + .filter_map(|(_, state)| { + (state.code_hash != *EMPTY_KECCACK_HASH).then_some(state.code_hash) + }) + .collect(); + + code_hash_collector.extend(code_hashes_from_snapshot); + code_hash_collector.flush_if_needed().await?; + + info!("Inserting accounts into the state trie"); + + let store_clone = store.clone(); + let current_state_root: Result = + tokio::task::spawn_blocking(move || -> Result { + let mut trie = store_clone.open_state_trie(computed_state_root)?; + + for (account_hash, account) in account_states_snapshot { + trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; + } + info!("Comitting to disk"); + let current_state_root = trie.hash()?; + Ok(current_state_root) + }) + .await?; + + computed_state_root = current_state_root?; + } + info!("computed_state_root {computed_state_root}"); + Ok((computed_state_root, BTreeSet::new())) +} + +#[cfg(not(feature = "rocksdb"))] +async fn insert_storages( + store: Store, + _: BTreeSet, + account_storages_snapshots_dir: &Path, + _: &Path, + pivot_header: &BlockHeader, +) -> Result<(), SyncError> { + use rayon::iter::IntoParallelIterator; + let maybe_big_account_storage_state_roots: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + + for entry in std::fs::read_dir(account_storages_snapshots_dir) + .map_err(|_| SyncError::AccountStoragesSnapshotsDirNotFound)? + { + use crate::utils::AccountsWithStorage; + + let entry = entry.map_err(|err| { + SyncError::SnapshotReadError(account_storages_snapshots_dir.into(), err) + })?; + info!("Reading account storage file from entry {entry:?}"); + + let snapshot_path = entry.path(); + + let snapshot_contents = std::fs::read(&snapshot_path) + .map_err(|err| SyncError::SnapshotReadError(snapshot_path.clone(), err))?; + + #[expect(clippy::type_complexity)] + let account_storages_snapshot: Vec = + RLPDecode::decode(&snapshot_contents) + .map(|all_accounts: Vec<(Vec, Vec<(H256, U256)>)>| { + all_accounts + .into_iter() + .map(|(accounts, storages)| AccountsWithStorage { accounts, storages }) + .collect() + }) + .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?; + + let maybe_big_account_storage_state_roots_clone = + maybe_big_account_storage_state_roots.clone(); + let store_clone = store.clone(); + let pivot_hash_moved = pivot_header.hash(); + info!("Starting compute of account_storages_snapshot"); + let storage_trie_node_changes = tokio::task::spawn_blocking(move || { + let store: Store = store_clone; + + account_storages_snapshot + .into_par_iter() + .flat_map(|account_storages| { + let storages: Arc<[_]> = account_storages.storages.into(); + account_storages + .accounts + .into_par_iter() + // FIXME: we probably want to make storages an Arc + .map(move |account| (account, storages.clone())) + }) + .map(|(account, storages)| { + compute_storage_roots( + maybe_big_account_storage_state_roots_clone.clone(), + store.clone(), + account, + &storages, + pivot_hash_moved, + ) + }) + .collect::, SyncError>>() + }) + .await??; + info!("Writing to db"); + + store + .write_storage_trie_nodes_batch(storage_trie_node_changes) + .await?; + } + Ok(()) +} + +#[cfg(feature = "rocksdb")] +async fn insert_accounts( + store: Store, + storage_accounts: &mut AccountStorageRoots, + account_state_snapshots_dir: &Path, + datadir: &Path, + code_hash_collector: &mut CodeHashCollector, +) -> Result<(H256, BTreeSet), SyncError> { + use crate::utils::get_rocksdb_temp_accounts_dir; + use ethrex_trie::trie_sorted::trie_from_sorted_accounts_wrap; + + let trie = store.open_state_trie(*EMPTY_TRIE_HASH)?; + let mut db_options = rocksdb::Options::default(); + db_options.create_if_missing(true); + let db = rocksdb::DB::open(&db_options, get_rocksdb_temp_accounts_dir(datadir)) + .map_err(|_| SyncError::AccountTempDBDirNotFound)?; + let file_paths: Vec = std::fs::read_dir(account_state_snapshots_dir) + .map_err(|_| SyncError::AccountStateSnapshotsDirNotFound)? + .collect::, _>>() + .map_err(|_| SyncError::AccountStateSnapshotsDirNotFound)? + .into_iter() + .map(|res| res.path()) + .collect(); + db.ingest_external_file(file_paths) + .map_err(|err| SyncError::RocksDBError(err.into_string()))?; + let iter = db.full_iterator(rocksdb::IteratorMode::Start); + for account in iter { + let account = account.map_err(|err| SyncError::RocksDBError(err.into_string()))?; + let account_state = AccountState::decode(&account.1).map_err(SyncError::Rlp)?; + if account_state.code_hash != *EMPTY_KECCACK_HASH { + code_hash_collector.add(account_state.code_hash); + code_hash_collector.flush_if_needed().await?; + } + } + + let iter = db.full_iterator(rocksdb::IteratorMode::Start); + let compute_state_root = trie_from_sorted_accounts_wrap( + trie.db(), + &mut iter + .map(|k| k.expect("We shouldn't have a rocksdb error here")) // TODO: remove unwrap + .inspect(|(k, v)| { + METRICS + .account_tries_inserted + .fetch_add(1, Ordering::Relaxed); + let account_state = AccountState::decode(v).expect("We should have accounts here"); + if account_state.storage_root != *EMPTY_TRIE_HASH { + storage_accounts.accounts_with_storage_root.insert( + H256::from_slice(k), + (Some(account_state.storage_root), Vec::new()), + ); + } + }) + .map(|(k, v)| (H256::from_slice(&k), v.to_vec())), + ) + .map_err(SyncError::TrieGenerationError)?; + + let accounts_with_storage = + BTreeSet::from_iter(storage_accounts.accounts_with_storage_root.keys().copied()); + Ok((compute_state_root, accounts_with_storage)) +} + +#[cfg(feature = "rocksdb")] +async fn insert_storages( + store: Store, + accounts_with_storage: BTreeSet, + account_storages_snapshots_dir: &Path, + datadir: &Path, + _: &BlockHeader, +) -> Result<(), SyncError> { + use crate::utils::get_rocksdb_temp_storage_dir; + use crossbeam::channel::{bounded, unbounded}; + use ethrex_threadpool::ThreadPool; + use ethrex_trie::{ + Node, NodeHash, + trie_sorted::{BUFFER_COUNT, SIZE_TO_WRITE_DB, trie_from_sorted_accounts}, + }; + use std::thread::scope; + + struct RocksDBIterator<'a> { + iter: rocksdb::DBRawIterator<'a>, + limit: H256, + } + + impl<'a> Iterator for RocksDBIterator<'a> { + type Item = (H256, Vec); + + fn next(&mut self) -> Option { + if !self.iter.valid() { + return None; + } + let return_value = { + let key = self.iter.key(); + let value = self.iter.value(); + match (key, value) { + (Some(key), Some(value)) => { + let hash = H256::from_slice(&key[0..32]); + let key = H256::from_slice(&key[32..]); + let value = value.to_vec(); + if hash != self.limit { + None + } else { + Some((key, value)) + } + } + _ => None, + } + }; + self.iter.next(); + return_value + } + } + + let mut db_options = rocksdb::Options::default(); + db_options.create_if_missing(true); + let db = rocksdb::DB::open(&db_options, get_rocksdb_temp_storage_dir(datadir)) + .map_err(|_| SyncError::StorageTempDBDirNotFound)?; + let file_paths: Vec = std::fs::read_dir(account_storages_snapshots_dir) + .map_err(|_| SyncError::AccountStoragesSnapshotsDirNotFound)? + .collect::, _>>() + .map_err(|_| SyncError::AccountStoragesSnapshotsDirNotFound)? + .into_iter() + .map(|res| res.path()) + .collect(); + db.ingest_external_file(file_paths) + .map_err(|err| SyncError::RocksDBError(err.into_string()))?; + let snapshot = db.snapshot(); + + let account_with_storage_and_tries = accounts_with_storage + .into_iter() + .map(|account_hash| { + ( + account_hash, + store + .open_storage_trie(account_hash, *EMPTY_TRIE_HASH) + .expect("Should be able to open trie"), + ) + }) + .collect::>(); + + let (sender, receiver) = unbounded::<()>(); + let mut counter = 0; + let thread_count = std::thread::available_parallelism() + .map(|num| num.into()) + .unwrap_or(8); + + let (buffer_sender, buffer_receiver) = bounded::>(BUFFER_COUNT as usize); + for _ in 0..BUFFER_COUNT { + let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize)); + } + + scope(|scope| { + let pool: Arc> = Arc::new(ThreadPool::new(thread_count, scope)); + for (account_hash, trie) in account_with_storage_and_tries.iter() { + let sender = sender.clone(); + let buffer_sender = buffer_sender.clone(); + let buffer_receiver = buffer_receiver.clone(); + if counter >= thread_count - 1 { + let _ = receiver.recv(); + counter -= 1; + } + counter += 1; + let pool_clone = pool.clone(); + let mut iter = snapshot.raw_iterator(); + let task = Box::new(move || { + let mut buffer: [u8; 64] = [0_u8; 64]; + buffer[..32].copy_from_slice(&account_hash.0); + iter.seek(buffer); + let mut iter = RocksDBIterator { + iter, + limit: *account_hash, + }; + + let _ = trie_from_sorted_accounts( + trie.db(), + &mut iter, + pool_clone, + buffer_sender, + buffer_receiver, + ) + .inspect_err(|err: &TrieGenerationError| { + error!( + "we found an error while inserting the storage trie for the account {account_hash:x}, err {err}" + ); + }) + .map_err(SyncError::TrieGenerationError); + METRICS.storage_tries_state_roots_computed.inc(); + let _ = sender.send(()); + }); + pool.execute(task); + } + }); + Ok(()) +} diff --git a/crates/networking/p2p/sync/code_collector.rs b/crates/networking/p2p/sync/code_collector.rs index 7cd3873fc94..0038bc8c935 100644 --- a/crates/networking/p2p/sync/code_collector.rs +++ b/crates/networking/p2p/sync/code_collector.rs @@ -39,6 +39,8 @@ impl CodeHashCollector { self.buffer.insert(hash); } + // The optimization for rocksdb database doesn't use this method + #[cfg(not(feature = "rocksdb"))] /// Extends the buffer with a list of code hashes pub fn extend(&mut self, hashes: impl IntoIterator) { self.buffer.extend(hashes); diff --git a/crates/networking/p2p/sync/state_healing.rs b/crates/networking/p2p/sync/state_healing.rs index ff744263763..8a302a96b00 100644 --- a/crates/networking/p2p/sync/state_healing.rs +++ b/crates/networking/p2p/sync/state_healing.rs @@ -181,9 +181,12 @@ async fn heal_state_trie( } storage_accounts.healed_accounts.insert(account_hash); - storage_accounts + let old_value = storage_accounts .accounts_with_storage_root - .remove(&account_hash); + .get_mut(&account_hash); + if let Some((old_root, _)) = old_value { + *old_root = None; + } } } leafs_healed += nodes diff --git a/crates/networking/p2p/sync/storage_healing.rs b/crates/networking/p2p/sync/storage_healing.rs index bdda8cb0b37..dccf25dab7d 100644 --- a/crates/networking/p2p/sync/storage_healing.rs +++ b/crates/networking/p2p/sync/storage_healing.rs @@ -547,26 +547,6 @@ fn get_initial_downloads( }) .collect::>(), ); - initial_requests.extend( - account_paths - .accounts_with_storage_root - .par_iter() - .filter_map(|(acc_path, storage_root)| { - if store - .contains_storage_node(*acc_path, *storage_root) - .expect("We should be able to open the store") - { - return None; - } - Some(NodeRequest { - acc_path: Nibbles::from_bytes(&acc_path.0), - storage_path: Nibbles::default(), // We need to be careful, the root parent is a special case - parent: Nibbles::default(), - hash: *storage_root, - }) - }) - .collect::>(), - ); initial_requests } diff --git a/crates/networking/p2p/utils.rs b/crates/networking/p2p/utils.rs index c5b1a21aa44..fe903edaec2 100644 --- a/crates/networking/p2p/utils.rs +++ b/crates/networking/p2p/utils.rs @@ -4,18 +4,18 @@ use std::{ }; use ethrex_common::utils::keccak; -use ethrex_common::{H256, H512}; -use ethrex_rlp::error::RLPDecodeError; +use ethrex_common::{H256, H512, U256, types::AccountState}; +use ethrex_rlp::{encode::RLPEncode, error::RLPDecodeError}; use ethrex_trie::Node; use secp256k1::{PublicKey, SecretKey}; use spawned_concurrency::error::GenServerError; +use crate::peer_handler::DumpError; use crate::{ discv4::peer_table::PeerChannels, rlpx::{Message, connection::server::CastMessage, snap::TrieNodes}, }; - -use crate::peer_handler::DumpError; +use tracing::error; /// Computes the node_id from a public key (aka computes the Keccak256 hash of the given public key) pub fn node_id(public_key: &H512) -> H256 { @@ -56,6 +56,14 @@ pub fn get_account_state_snapshots_dir(datadir: &Path) -> PathBuf { datadir.join("account_state_snapshots") } +pub fn get_rocksdb_temp_accounts_dir(datadir: &Path) -> PathBuf { + datadir.join("temp_acc_dir") +} + +pub fn get_rocksdb_temp_storage_dir(datadir: &Path) -> PathBuf { + datadir.join("temp_storage_dir") +} + pub fn get_account_state_snapshot_file(directory: &Path, chunk_index: u64) -> PathBuf { directory.join(format!("account_state_chunk.rlp.{chunk_index}")) } @@ -64,6 +72,66 @@ pub fn get_account_storages_snapshot_file(directory: &Path, chunk_index: u64) -> directory.join(format!("account_storages_chunk.rlp.{chunk_index}")) } +#[cfg(feature = "rocksdb")] +pub fn dump_accounts_to_rocks_db( + path: &Path, + mut contents: Vec<(H256, AccountState)>, +) -> Result<(), rocksdb::Error> { + // This can happen sometimes during download, and the sst ingestion method + // fails with empty chunk files + if contents.is_empty() { + return Ok(()); + } + contents.sort_by_key(|(k, _)| *k); + contents.dedup_by_key(|(k, _)| { + let mut buf = [0u8; 32]; + buf[..32].copy_from_slice(&k.0); + buf + }); + let mut buffer: Vec = Vec::new(); + let writer_options = rocksdb::Options::default(); + let mut writer = rocksdb::SstFileWriter::create(&writer_options); + writer.open(std::path::Path::new(&path))?; + for (key, acccount) in contents { + buffer.clear(); + acccount.encode(&mut buffer); + writer.put(key.0.as_ref(), buffer.as_slice())?; + } + writer.finish() +} + +#[cfg(feature = "rocksdb")] +pub fn dump_storages_to_rocks_db( + path: &Path, + mut contents: Vec<(H256, H256, U256)>, +) -> Result<(), rocksdb::Error> { + // This can happen sometimes during download, and the sst ingestion method + // fails with empty chunk files + if contents.is_empty() { + return Ok(()); + } + contents.sort(); + contents.dedup_by_key(|(k0, k1, _)| { + let mut buffer = [0_u8; 64]; + buffer[0..32].copy_from_slice(&k0.0); + buffer[32..64].copy_from_slice(&k1.0); + buffer + }); + let writer_options = rocksdb::Options::default(); + let mut writer = rocksdb::SstFileWriter::create(&writer_options); + let mut buffer_key = [0_u8; 64]; + let mut buffer_storage: Vec = Vec::new(); + writer.open(std::path::Path::new(&path))?; + for (account, slot_hash, slot_value) in contents { + buffer_key[0..32].copy_from_slice(&account.0); + buffer_key[32..64].copy_from_slice(&slot_hash.0); + buffer_storage.clear(); + slot_value.encode(&mut buffer_storage); + writer.put(buffer_key.as_ref(), buffer_storage.as_slice())?; + } + writer.finish() +} + pub fn get_code_hashes_snapshots_dir(datadir: &Path) -> PathBuf { datadir.join("bytecode_hashes_snapshots") } @@ -82,6 +150,73 @@ pub fn dump_to_file(path: &Path, contents: Vec) -> Result<(), DumpError> { }) } +pub fn dump_accounts_to_file( + path: &Path, + accounts: Vec<(H256, AccountState)>, +) -> Result<(), DumpError> { + #[cfg(feature = "rocksdb")] + return dump_accounts_to_rocks_db(path, accounts) + .inspect_err(|err| error!("Rocksdb writing stt error {err:?}")) + .map_err(|_| DumpError { + path: path.to_path_buf(), + contents: Vec::new(), + error: std::io::ErrorKind::Other, + }); + #[cfg(not(feature = "rocksdb"))] + dump_to_file(path, accounts.encode_to_vec()) +} + +/// Struct representing the storage slots of certain accounts that share the same storage root +pub struct AccountsWithStorage { + /// Accounts with the same storage root + pub accounts: Vec, + /// All slots in the trie from the accounts + pub storages: Vec<(H256, U256)>, +} + +pub fn dump_storages_to_file( + path: &Path, + storages: Vec, +) -> Result<(), DumpError> { + #[cfg(feature = "rocksdb")] + return dump_storages_to_rocks_db( + path, + storages + .into_iter() + .flat_map(|accounts_with_slots| { + accounts_with_slots + .accounts + .into_iter() + .map(|hash| { + accounts_with_slots + .storages + .iter() + .map(move |(slot_hash, slot_value)| (hash, *slot_hash, *slot_value)) + .collect::>() + }) + .collect::>() + }) + .flatten() + .collect::>(), + ) + .inspect_err(|err| error!("Rocksdb writing stt error {err:?}")) + .map_err(|_| DumpError { + path: path.to_path_buf(), + contents: Vec::new(), + error: std::io::ErrorKind::Other, + }); + + #[cfg(not(feature = "rocksdb"))] + dump_to_file( + path, + storages + .into_iter() + .map(|accounts_with_slots| (accounts_with_slots.accounts, accounts_with_slots.storages)) + .collect::>() + .encode_to_vec(), + ) +} + /// TODO: make it more generic pub async fn send_message_and_wait_for_response( peer_channel: &mut PeerChannels, diff --git a/crates/storage/trie_db/rocksdb.rs b/crates/storage/trie_db/rocksdb.rs index 90858ac1179..70d1b29a07f 100644 --- a/crates/storage/trie_db/rocksdb.rs +++ b/crates/storage/trie_db/rocksdb.rs @@ -1,5 +1,6 @@ use ethrex_common::H256; -use ethrex_trie::{NodeHash, TrieDB, error::TrieError}; +use ethrex_rlp::encode::RLPEncode; +use ethrex_trie::{Node, NodeHash, TrieDB, error::TrieError}; use rocksdb::{MultiThreaded, OptimisticTransactionDB}; use std::sync::Arc; @@ -40,7 +41,7 @@ impl RocksDBTrieDB { .ok_or_else(|| TrieError::DbError(anyhow::anyhow!("Column family not found"))) } - fn make_key(&self, node_hash: NodeHash) -> Vec { + fn make_key(&self, node_hash: &NodeHash) -> Vec { match &self.address_prefix { Some(address) => { // For storage tries, prefix with address @@ -59,7 +60,7 @@ impl RocksDBTrieDB { impl TrieDB for RocksDBTrieDB { fn get(&self, key: NodeHash) -> Result>, TrieError> { let cf = self.cf_handle()?; - let db_key = self.make_key(key); + let db_key = self.make_key(&key); self.db .get_cf(&cf, db_key) @@ -71,7 +72,7 @@ impl TrieDB for RocksDBTrieDB { let mut batch = rocksdb::WriteBatchWithTransaction::default(); for (key, value) in key_values { - let db_key = self.make_key(key); + let db_key = self.make_key(&key); batch.put_cf(&cf, db_key, value); } @@ -79,6 +80,24 @@ impl TrieDB for RocksDBTrieDB { .write(batch) .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB batch write error: {}", e))) } + + fn put_batch_no_alloc(&self, key_values: &[(NodeHash, Node)]) -> Result<(), TrieError> { + let cf = self.cf_handle()?; + let mut batch = rocksdb::WriteBatchWithTransaction::default(); + // 532 is the maximum size of an encoded branch node. + let mut buffer = Vec::with_capacity(532); + + for (hash, node) in key_values { + let db_key = self.make_key(hash); + buffer.clear(); + node.encode(&mut buffer); + batch.put_cf(&cf, db_key, &buffer); + } + + self.db + .write(batch) + .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB batch write error: {}", e))) + } } #[cfg(test)]