From ca1551283842a0f7e2bd8ce34219b49253dddf99 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 22 Aug 2023 16:47:38 +1000 Subject: [PATCH 01/14] Add `lighthouse db prune_states` --- beacon_node/store/src/hot_cold_store.rs | 84 +++++++++++++++++++++++++ database_manager/src/lib.rs | 43 ++++++++++++- 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 47839ed3e98..c0496b55705 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1691,6 +1691,90 @@ impl, Cold: ItemStore> HotColdDB ); Ok(()) } + + /// Delete ALL states from the freezer database and update the anchor accordingly. + pub fn prune_historic_states( + &self, + genesis_state_root: Hash256, + genesis_state: &BeaconState, + ignore_errors: bool, + ) -> Result<(), Error> { + // Update the anchor to use the dummy state upper limit and disable historic state storage. + let old_anchor = self.get_anchor_info(); + let new_anchor = if let Some(old_anchor) = old_anchor.clone() { + AnchorInfo { + state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + ..old_anchor.clone() + } + } else { + AnchorInfo { + anchor_slot: Slot::new(0), + oldest_block_slot: Slot::new(0), + oldest_block_parent: Hash256::zero(), + state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + state_lower_limit: Slot::new(0), + } + }; + + // Commit the anchor change immediately: if the cold database ops fail they can always be + // retried, and we can't do them atomically with this change anyway. + self.compare_and_set_anchor_info_with_write(old_anchor, Some(new_anchor))?; + + // Stage freezer data for deletion. Do not bother loading and deserializing values as this + // wastes time and is less schema-agnostic. My hope is that this method will be useful for + // migrating to the tree-states schema (delete everything in the freezer then start afresh). + let mut cold_ops = vec![]; + + let columns = [ + DBColumn::BeaconState, + DBColumn::BeaconStateSummary, + DBColumn::BeaconRestorePoint, + DBColumn::BeaconStateRoots, + DBColumn::BeaconHistoricalRoots, + DBColumn::BeaconRandaoMixes, + DBColumn::BeaconHistoricalSummaries, + ]; + + for column in columns { + for res in self.cold_db.iter_column_keys(column) { + match res { + Ok(key) => cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + column.as_str(), + key.as_bytes(), + ))), + Err(e) if ignore_errors => { + warn!( + self.log, + "Ignoring error while reading key"; + "column" => ?column, + "err" => ?e, + ); + } + Err(e) => { + return Err(e); + } + } + } + } + + // XXX: We need to commit the mass deletion here *before* re-storing the genesis state, as + // the current schema performs reads as part of `store_cold_state`. This can be deleted + // once the target schema is tree-states. + info!( + self.log, + "Deleting historic states"; + "num_kv" => cold_ops.len(), + ); + self.cold_db.do_atomically(std::mem::take(&mut cold_ops))?; + + // Store the genesis state using the *current* schema, which may be different from the + // schema of the genesis state we just deleted. + // FIXME(sproul): check split > 0 or this is invalid + self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?; + self.cold_db.do_atomically(cold_ops)?; + + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index ce0b094b772..65c2c02d9f8 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -15,7 +15,7 @@ use store::{ DBColumn, HotColdDB, KeyValueStore, LevelDB, }; use strum::{EnumString, EnumVariantNames, VariantNames}; -use types::EthSpec; +use types::{BeaconState, EthSpec}; pub const CMD: &str = "database_manager"; @@ -75,6 +75,12 @@ pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { .about("Prune finalized execution payloads") } +pub fn prune_states_app<'a, 'b>() -> App<'a, 'b> { + App::new("prune_states") + .setting(clap::AppSettings::ColoredHelp) + .about("Prune all beacon states from the freezer database") +} + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) .visible_aliases(&["db"]) @@ -102,6 +108,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) .subcommand(prune_payloads_app()) + .subcommand(prune_states_app()) } fn parse_client_config( @@ -334,6 +341,31 @@ pub fn prune_payloads( db.try_prune_execution_payloads(force) } +pub fn prune_states( + client_config: ClientConfig, + mut genesis_state: BeaconState, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log, + )?; + + let genesis_state_root = genesis_state.update_tree_hash_cache()?; + let ignore_errors = false; + + db.prune_historic_states(genesis_state_root, &genesis_state, ignore_errors) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result<(), String> { let client_config = parse_client_config(cli_args, &env)?; @@ -356,6 +388,15 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result ("prune_payloads", Some(_)) => { prune_payloads(client_config, &context, log).map_err(format_err) } + ("prune_states", Some(_)) => { + let network_config = context + .eth2_network_config + .clone() + .ok_or("Missing network config")?; + let genesis_state = network_config.beacon_state()?; + + prune_states(client_config, genesis_state, &context, log).map_err(format_err) + } _ => Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()), } } From 432f166419f15291121e4c06315efea1ffb81b83 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 22 Aug 2023 16:57:11 +1000 Subject: [PATCH 02/14] Add FIXME --- beacon_node/store/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index ee01fa1ae15..1554ee2e0f2 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -43,6 +43,7 @@ use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; pub use types::*; +// FIXME(sproul): abstract over non-32-byte keys pub type ColumnIter<'a> = Box), Error>> + 'a>; pub type ColumnKeyIter<'a> = Box> + 'a>; From 4686b77e446c8d3ab076bd2962237f479a51599c Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 24 Aug 2023 17:06:59 +1000 Subject: [PATCH 03/14] Backport some abstractions from tree-states --- Cargo.lock | 1 + .../src/otb_verification_service.rs | 11 ++- beacon_node/store/src/errors.rs | 1 + beacon_node/store/src/hot_cold_store.rs | 4 +- beacon_node/store/src/leveldb_store.rs | 55 ++++++------ beacon_node/store/src/lib.rs | 63 ++++++++++++-- beacon_node/store/src/memory_store.rs | 86 ++++++++----------- database_manager/Cargo.toml | 1 + database_manager/src/lib.rs | 12 +-- 9 files changed, 140 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a8cf48336c..20d81c786b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1457,6 +1457,7 @@ dependencies = [ "clap", "clap_utils", "environment", + "hex", "logging", "slog", "sloggers", diff --git a/beacon_node/beacon_chain/src/otb_verification_service.rs b/beacon_node/beacon_chain/src/otb_verification_service.rs index 805b61dd9c0..b934c553e6c 100644 --- a/beacon_node/beacon_chain/src/otb_verification_service.rs +++ b/beacon_node/beacon_chain/src/otb_verification_service.rs @@ -119,10 +119,13 @@ pub fn start_otb_verification_service( pub fn load_optimistic_transition_blocks( chain: &BeaconChain, ) -> Result, StoreError> { - process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| { - iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) - .collect() - })? + process_results( + chain.store.hot_db.iter_column::(OTBColumn), + |iter| { + iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) + .collect() + }, + )? } #[derive(Debug)] diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index fcc40706b30..e1632820b0c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -45,6 +45,7 @@ pub enum Error { SlotClockUnavailableForMigration, UnableToDowngrade, InconsistentFork(InconsistentFork), + InvalidKey, } pub trait HandleUnavailable { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c0496b55705..8ed59655762 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1736,11 +1736,11 @@ impl, Cold: ItemStore> HotColdDB ]; for column in columns { - for res in self.cold_db.iter_column_keys(column) { + for res in self.cold_db.iter_column_keys::>(column) { match res { Ok(key) => cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( column.as_str(), - key.as_bytes(), + &key, ))), Err(e) if ignore_errors => { warn!( diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 86bd4ffaccd..6dfb3397a02 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,7 +1,6 @@ use super::*; use crate::hot_cold_store::HotColdDBError; use crate::metrics; -use db_key::Key; use leveldb::compaction::Compaction; use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; @@ -168,18 +167,16 @@ impl KeyValueStore for LevelDB { }; for (start_key, end_key) in vec![ - endpoints(DBColumn::BeaconStateTemporary), endpoints(DBColumn::BeaconState), + endpoints(DBColumn::BeaconStateSummary), ] { self.db.compact(&start_key, &end_key); } Ok(()) } - /// Iterate through all keys and values in a particular column. - fn iter_column(&self, column: DBColumn) -> ColumnIter { - let start_key = - BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from)); let iter = self.db.iter(self.read_options()); iter.seek(&start_key); @@ -187,19 +184,18 @@ impl KeyValueStore for LevelDB { Box::new( iter.take_while(move |(key, _)| key.matches_column(column)) .map(move |(bytes_key, value)| { - let key = - bytes_key - .remove_column(column) - .ok_or(HotColdDBError::IterationError { - unexpected_key: bytes_key, - })?; - Ok((key, value)) + let key = bytes_key.remove_column_variable(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key.clone(), + } + })?; + Ok((K::from_bytes(key)?, value)) }), ) } /// Iterate through all keys and values in a particular column. - fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { let start_key = BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); @@ -209,13 +205,12 @@ impl KeyValueStore for LevelDB { Box::new( iter.take_while(move |key| key.matches_column(column)) .map(move |bytes_key| { - let key = - bytes_key - .remove_column(column) - .ok_or(HotColdDBError::IterationError { - unexpected_key: bytes_key, - })?; - Ok(key) + let key = bytes_key.remove_column_variable(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key.clone(), + } + })?; + K::from_bytes(key) }), ) } @@ -224,12 +219,12 @@ impl KeyValueStore for LevelDB { impl ItemStore for LevelDB {} /// Used for keying leveldb. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct BytesKey { key: Vec, } -impl Key for BytesKey { +impl db_key::Key for BytesKey { fn from_u8(key: &[u8]) -> Self { Self { key: key.to_vec() } } @@ -245,12 +240,20 @@ impl BytesKey { self.key.starts_with(column.as_bytes()) } - /// Remove the column from a key, returning its `Hash256` portion. + /// Remove the column from a 32 byte key, yielding the `Hash256` key. pub fn remove_column(&self, column: DBColumn) -> Option { + let key = self.remove_column_variable(column)?; + (column.key_size() == 32).then(|| Hash256::from_slice(key)) + } + + /// Remove the column from a key. + /// + /// Will return `None` if the value doesn't match the column or has the wrong length. + pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> { if self.matches_column(column) { let subkey = &self.key[column.as_bytes().len()..]; - if subkey.len() == 32 { - return Some(Hash256::from_slice(subkey)); + if subkey.len() == column.key_size() { + return Some(subkey); } } None diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 1554ee2e0f2..e2e4a9d5ab3 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -43,9 +43,8 @@ use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; pub use types::*; -// FIXME(sproul): abstract over non-32-byte keys -pub type ColumnIter<'a> = Box), Error>> + 'a>; -pub type ColumnKeyIter<'a> = Box> + 'a>; +pub type ColumnIter<'a, K> = Box), Error>> + 'a>; +pub type ColumnKeyIter<'a, K> = Box> + 'a>; pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Retrieve some bytes in `column` with `key`. @@ -81,15 +80,34 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { fn compact(&self) -> Result<(), Error>; /// Iterate through all keys and values in a particular column. - fn iter_column(&self, _column: DBColumn) -> ColumnIter { - // Default impl for non LevelDB databases - Box::new(std::iter::empty()) + fn iter_column(&self, column: DBColumn) -> ColumnIter { + self.iter_column_from(column, &vec![0; column.key_size()]) } + /// Iterate through all keys and values in a column from a given starting point. + fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter; + /// Iterate through all keys in a particular column. - fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter { - // Default impl for non LevelDB databases - Box::new(std::iter::empty()) + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; +} + +pub trait Key: Sized + 'static { + fn from_bytes(key: &[u8]) -> Result; +} + +impl Key for Hash256 { + fn from_bytes(key: &[u8]) -> Result { + if key.len() == 32 { + Ok(Hash256::from_slice(key)) + } else { + Err(Error::InvalidKey) + } + } +} + +impl Key for Vec { + fn from_bytes(key: &[u8]) -> Result { + Ok(key.to_vec()) } } @@ -231,6 +249,33 @@ impl DBColumn { pub fn as_bytes(self) -> &'static [u8] { self.as_str().as_bytes() } + + /// Most database keys are 32 bytes, but some freezer DB keys are 8 bytes. + /// + /// This function returns the number of bytes used by keys in a given column. + pub fn key_size(self) -> usize { + match self { + Self::BeaconMeta + | Self::BeaconBlock + | Self::BeaconState + | Self::BeaconStateSummary + | Self::BeaconStateTemporary + | Self::ExecPayload + | Self::BeaconChain + | Self::OpPool + | Self::Eth1Cache + | Self::ForkChoice + | Self::PubkeyCache + | Self::BeaconRestorePoint + | Self::DhtEnrs + | Self::OptimisticTransitionBlock => 32, + Self::BeaconBlockRoots + | Self::BeaconStateRoots + | Self::BeaconHistoricalRoots + | Self::BeaconHistoricalSummaries + | Self::BeaconRandaoMixes => 8, + } + } } /// An item that may stored in a `Store` by serializing and deserializing from bytes. diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 1473f59a4e9..c2e494dce6a 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,17 +1,17 @@ -use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; -use crate::{ColumnIter, DBColumn}; +use crate::{ + get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error, + ItemStore, Key, KeyValueStore, KeyValueStoreOp, +}; use parking_lot::{Mutex, MutexGuard, RwLock}; -use std::collections::{HashMap, HashSet}; +use std::collections::BTreeMap; use std::marker::PhantomData; use types::*; -type DBHashMap = HashMap, Vec>; -type DBKeyMap = HashMap, HashSet>>; +type DBMap = BTreeMap>; -/// A thread-safe `HashMap` wrapper. +/// A thread-safe `BTreeMap` wrapper. pub struct MemoryStore { - db: RwLock, - col_keys: RwLock, + db: RwLock, transaction_mutex: Mutex<()>, _phantom: PhantomData, } @@ -20,36 +20,24 @@ impl MemoryStore { /// Create a new, empty database. pub fn open() -> Self { Self { - db: RwLock::new(HashMap::new()), - col_keys: RwLock::new(HashMap::new()), + db: RwLock::new(BTreeMap::new()), transaction_mutex: Mutex::new(()), _phantom: PhantomData, } } - - fn get_key_for_col(col: &str, key: &[u8]) -> Vec { - let mut col = col.as_bytes().to_vec(); - col.append(&mut key.to_vec()); - col - } } impl KeyValueStore for MemoryStore { /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); Ok(self.db.read().get(&column_key).cloned()) } /// Puts a key in the database. fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); self.db.write().insert(column_key, val.to_vec()); - self.col_keys - .write() - .entry(col.as_bytes().to_vec()) - .or_insert_with(HashSet::new) - .insert(key.to_vec()); Ok(()) } @@ -64,18 +52,14 @@ impl KeyValueStore for MemoryStore { /// Return true if some key exists in some column. fn key_exists(&self, col: &str, key: &[u8]) -> Result { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); Ok(self.db.read().contains_key(&column_key)) } /// Delete some key from the database. fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); self.db.write().remove(&column_key); - self.col_keys - .write() - .get_mut(&col.as_bytes().to_vec()) - .map(|set| set.remove(key)); Ok(()) } @@ -83,35 +67,41 @@ impl KeyValueStore for MemoryStore { for op in batch { match op { KeyValueStoreOp::PutKeyValue(key, value) => { - self.db.write().insert(key, value); + self.db.write().insert(BytesKey::from_vec(key), value); } - KeyValueStoreOp::DeleteKey(hash) => { - self.db.write().remove(&hash); + KeyValueStoreOp::DeleteKey(key) => { + self.db.write().remove(&BytesKey::from_vec(key)); } } } Ok(()) } - // pub type ColumnIter<'a> = Box), Error>> + 'a>; - fn iter_column(&self, column: DBColumn) -> ColumnIter { + fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter { + // We use this awkward pattern because we can't lock the `self.db` field *and* maintain a + // reference to the lock guard across calls to `.next()`. This would be require a + // struct with a field (the iterator) which references another field (the lock guard). + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), from)); let col = column.as_str(); - if let Some(keys) = self - .col_keys + let keys = self + .db .read() - .get(col.as_bytes()) - .map(|set| set.iter().cloned().collect::>()) - { - Box::new(keys.into_iter().filter_map(move |key| { - let hash = Hash256::from_slice(&key); - self.get_bytes(col, &key) - .transpose() - .map(|res| res.map(|bytes| (hash, bytes))) - })) - } else { - Box::new(std::iter::empty()) - } + .range(start_key..) + .take_while(|(k, _)| k.remove_column_variable(column).is_some()) + .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) + .collect::>(); + Box::new(keys.into_iter().filter_map(move |key| { + self.get_bytes(col, &key).transpose().map(|res| { + let k = K::from_bytes(&key)?; + let v = res?; + Ok((k, v)) + }) + })) + } + + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) } fn begin_rw_transaction(&self) -> MutexGuard<()> { diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index f715528138a..55a6bb9d3dc 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -16,3 +16,4 @@ tempfile = "3.1.0" types = { path = "../consensus/types" } slog = "2.5.2" strum = { version = "0.24.0", features = ["derive"] } +hex = "0.4.2" diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 65c2c02d9f8..44ba4875b6f 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -222,20 +222,22 @@ pub fn inspect_db( .map_err(|e| format!("Unable to create import directory: {:?}", e))?; } - for res in db.hot_db.iter_column(inspect_config.column) { + for res in db.hot_db.iter_column::>(inspect_config.column) { let (key, value) = res.map_err(|e| format!("{:?}", e))?; match inspect_config.target { InspectTarget::ValueSizes => { - println!("{:?}: {} bytes", key, value.len()); - total += value.len(); + println!("{}: {} bytes", hex::encode(&key), value.len()); } InspectTarget::ValueTotal => { total += value.len(); } InspectTarget::Values => { - let file_path = - base_path.join(format!("{}_{}.ssz", inspect_config.column.as_str(), key)); + let file_path = base_path.join(format!( + "{}_{}.ssz", + inspect_config.column.as_str(), + hex::encode(&key) + )); let write_result = fs::OpenOptions::new() .create(true) From 1567af9c23afba24ce8624e7ddc5924e0dec31bf Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 25 Aug 2023 12:26:22 +1000 Subject: [PATCH 04/14] Make it work, port more changes from tree-states --- beacon_node/store/src/hot_cold_store.rs | 49 ++++--- beacon_node/store/src/leveldb_store.rs | 2 +- database_manager/src/lib.rs | 164 +++++++++++++++++++++--- 3 files changed, 168 insertions(+), 47 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 8ed59655762..2859c2f0fa4 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1692,12 +1692,16 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Delete ALL states from the freezer database and update the anchor accordingly. + /// Delete *all* states from the freezer database and update the anchor accordingly. + /// + /// WARNING: this method deletes the genesis state and replaces it with the provided + /// `genesis_state`. This is to support its use in schema migrations where the storage scheme of + /// the genesis state may be modified. It is the responsibility of the caller to ensure that the + /// genesis state is correct, else a corrupt database will be created. pub fn prune_historic_states( &self, genesis_state_root: Hash256, genesis_state: &BeaconState, - ignore_errors: bool, ) -> Result<(), Error> { // Update the anchor to use the dummy state upper limit and disable historic state storage. let old_anchor = self.get_anchor_info(); @@ -1737,29 +1741,18 @@ impl, Cold: ItemStore> HotColdDB for column in columns { for res in self.cold_db.iter_column_keys::>(column) { - match res { - Ok(key) => cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( - column.as_str(), - &key, - ))), - Err(e) if ignore_errors => { - warn!( - self.log, - "Ignoring error while reading key"; - "column" => ?column, - "err" => ?e, - ); - } - Err(e) => { - return Err(e); - } - } + let key = res?; + cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + column.as_str(), + &key, + ))); } } // XXX: We need to commit the mass deletion here *before* re-storing the genesis state, as // the current schema performs reads as part of `store_cold_state`. This can be deleted - // once the target schema is tree-states. + // once the target schema is tree-states. If the process is killed before the genesis state + // is written this can be fixed by re-running. info!( self.log, "Deleting historic states"; @@ -1767,11 +1760,17 @@ impl, Cold: ItemStore> HotColdDB ); self.cold_db.do_atomically(std::mem::take(&mut cold_ops))?; - // Store the genesis state using the *current* schema, which may be different from the - // schema of the genesis state we just deleted. - // FIXME(sproul): check split > 0 or this is invalid - self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?; - self.cold_db.do_atomically(cold_ops)?; + // If we just deleted the the genesis state, re-store it using the *current* schema, which + // may be different from the schema of the genesis state we just deleted. + if self.get_split_slot() > 0 { + info!( + self.log, + "Re-storing genesis state"; + "state_root" => ?genesis_state_root, + ); + self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?; + self.cold_db.do_atomically(cold_ops)?; + } Ok(()) } diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 6dfb3397a02..0b8a00645ac 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -197,7 +197,7 @@ impl KeyValueStore for LevelDB { /// Iterate through all keys and values in a particular column. fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { let start_key = - BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()])); let iter = self.db.keys_iter(self.read_options()); iter.seek(&start_key); diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 44ba4875b6f..008cea8eefb 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -5,7 +5,7 @@ use beacon_chain::{ use beacon_node::{get_data_dir, get_slots_per_restore_point, ClientConfig}; use clap::{App, Arg, ArgMatches}; use environment::{Environment, RuntimeContext}; -use slog::{info, Logger}; +use slog::{info, warn, Logger}; use std::fs; use std::io::Write; use std::path::PathBuf; @@ -15,7 +15,7 @@ use store::{ DBColumn, HotColdDB, KeyValueStore, LevelDB, }; use strum::{EnumString, EnumVariantNames, VariantNames}; -use types::{BeaconState, EthSpec}; +use types::{BeaconState, EthSpec, Slot}; pub const CMD: &str = "database_manager"; @@ -60,6 +60,24 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("sizes") .possible_values(InspectTarget::VARIANTS), ) + .arg( + Arg::with_name("skip") + .long("skip") + .value_name("N") + .help("Skip over the first N keys"), + ) + .arg( + Arg::with_name("limit") + .long("limit") + .value_name("N") + .help("Output at most N keys"), + ) + .arg( + Arg::with_name("freezer") + .long("freezer") + .help("Inspect the freezer DB rather than the hot DB") + .takes_value(false), + ) .arg( Arg::with_name("output-dir") .long("output-dir") @@ -70,13 +88,24 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { } pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { - App::new("prune_payloads") + App::new("prune-payloads") + .alias("prune_payloads") .setting(clap::AppSettings::ColoredHelp) .about("Prune finalized execution payloads") } pub fn prune_states_app<'a, 'b>() -> App<'a, 'b> { - App::new("prune_states") + App::new("prune-states") + .alias("prune_states") + .arg( + Arg::with_name("confirm") + .long("confirm") + .help( + "Commit to pruning states irreversably. Without this flag the command will \ + just check that the database is capable of being pruned.", + ) + .takes_value(false), + ) .setting(clap::AppSettings::ColoredHelp) .about("Prune all beacon states from the freezer database") } @@ -165,7 +194,7 @@ pub fn display_db_version( Ok(()) } -#[derive(Debug, EnumString, EnumVariantNames)] +#[derive(Debug, PartialEq, Eq, EnumString, EnumVariantNames)] pub enum InspectTarget { #[strum(serialize = "sizes")] ValueSizes, @@ -173,11 +202,16 @@ pub enum InspectTarget { ValueTotal, #[strum(serialize = "values")] Values, + #[strum(serialize = "gaps")] + Gaps, } pub struct InspectConfig { column: DBColumn, target: InspectTarget, + skip: Option, + limit: Option, + freezer: bool, /// Configures where the inspect output should be stored. output_dir: PathBuf, } @@ -185,11 +219,18 @@ pub struct InspectConfig { fn parse_inspect_config(cli_args: &ArgMatches) -> Result { let column = clap_utils::parse_required(cli_args, "column")?; let target = clap_utils::parse_required(cli_args, "output")?; + let skip = clap_utils::parse_optional(cli_args, "skip")?; + let limit = clap_utils::parse_optional(cli_args, "limit")?; + let freezer = cli_args.is_present("freezer"); + let output_dir: PathBuf = clap_utils::parse_optional(cli_args, "output-dir")?.unwrap_or_else(PathBuf::new); Ok(InspectConfig { column, target, + skip, + limit, + freezer, output_dir, }) } @@ -215,6 +256,20 @@ pub fn inspect_db( .map_err(|e| format!("{:?}", e))?; let mut total = 0; + let mut num_keys = 0; + + let sub_db = if inspect_config.freezer { + &db.cold_db + } else { + &db.hot_db + }; + + let skip = inspect_config.skip.unwrap_or(0); + let limit = inspect_config.limit.unwrap_or(usize::MAX); + + let mut prev_key = 0; + let mut found_gaps = false; + let base_path = &inspect_config.output_dir; if let InspectTarget::Values = inspect_config.target { @@ -222,16 +277,35 @@ pub fn inspect_db( .map_err(|e| format!("Unable to create import directory: {:?}", e))?; } - for res in db.hot_db.iter_column::>(inspect_config.column) { + for res in sub_db + .iter_column::>(inspect_config.column) + .skip(skip) + .take(limit) + { let (key, value) = res.map_err(|e| format!("{:?}", e))?; match inspect_config.target { InspectTarget::ValueSizes => { println!("{}: {} bytes", hex::encode(&key), value.len()); } - InspectTarget::ValueTotal => { - total += value.len(); + InspectTarget::Gaps => { + // Convert last 8 bytes of key to u64. + let numeric_key = u64::from_be_bytes( + key[key.len() - 8..] + .try_into() + .expect("key is at least 8 bytes"), + ); + + if numeric_key > prev_key + 1 { + println!( + "gap between keys {} and {} (offset: {})", + prev_key, numeric_key, num_keys, + ); + found_gaps = true; + } + prev_key = numeric_key; } + InspectTarget::ValueTotal => (), InspectTarget::Values => { let file_path = base_path.join(format!( "{}_{}.ssz", @@ -257,14 +331,17 @@ pub fn inspect_db( total += value.len(); } } + total += value.len(); + num_keys += 1; } - match inspect_config.target { - InspectTarget::ValueSizes | InspectTarget::ValueTotal | InspectTarget::Values => { - println!("Total: {} bytes", total); - } + if inspect_config.target == InspectTarget::Gaps && !found_gaps { + println!("No gaps found!"); } + println!("Num keys: {}", num_keys); + println!("Total: {} bytes", total); + Ok(()) } @@ -343,12 +420,22 @@ pub fn prune_payloads( db.try_prune_execution_payloads(force) } +pub struct PruneStatesConfig { + confirm: bool, +} + +fn parse_prune_states_config(cli_args: &ArgMatches) -> Result { + let confirm = cli_args.is_present("confirm"); + Ok(PruneStatesConfig { confirm }) +} + pub fn prune_states( client_config: ClientConfig, + prune_config: PruneStatesConfig, mut genesis_state: BeaconState, runtime_context: &RuntimeContext, log: Logger, -) -> Result<(), Error> { +) -> Result<(), String> { let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); @@ -359,13 +446,47 @@ pub fn prune_states( |_, _, _| Ok(()), client_config.store, spec.clone(), - log, - )?; + log.clone(), + ) + .map_err(|e| format!("Unable to open database: {e:?}"))?; + + // Load the genesis state from the database to ensure we're deleting states for the + // correct network, and that we don't end up storing the wrong genesis state. + let genesis_from_db = db + .load_cold_state_by_slot(Slot::new(0)) + .map_err(|e| format!("Error reading genesis state: {e:?}"))? + .ok_or("Error: genesis state missing from database. Check schema version.")?; + + if genesis_from_db.genesis_validators_root() != genesis_state.genesis_validators_root() { + return Err(format!( + "Error: Wrong network. Genesis state in DB does not match {} genesis.", + spec.config_name.as_deref().unwrap_or("") + )); + } - let genesis_state_root = genesis_state.update_tree_hash_cache()?; - let ignore_errors = false; + // Check that the user has confirmed they want to proceed. + if !prune_config.confirm { + warn!( + log, + "Pruning states is irreversible"; + ); + warn!( + log, + "Re-run this command with --confirm to commit to state deletion" + ); + info!(log, "Nothing has been pruned on this run"); + return Err("Error: confirmation flag required".into()); + } - db.prune_historic_states(genesis_state_root, &genesis_state, ignore_errors) + // Delete all historic state data and *re-store* the genesis state. + let genesis_state_root = genesis_state + .update_tree_hash_cache() + .map_err(|e| format!("Error computing genesis state root: {e:?}"))?; + db.prune_historic_states(genesis_state_root, &genesis_state) + .map_err(|e| format!("Failed to prune due to error: {e:?}"))?; + + info!(log, "Historic states pruned successfully"); + Ok(()) } /// Run the database manager, returning an error string if the operation did not succeed. @@ -387,17 +508,18 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result let inspect_config = parse_inspect_config(cli_args)?; inspect_db(inspect_config, client_config, &context, log) } - ("prune_payloads", Some(_)) => { + ("prune-payloads", Some(_)) => { prune_payloads(client_config, &context, log).map_err(format_err) } - ("prune_states", Some(_)) => { + ("prune-states", Some(cli_args)) => { let network_config = context .eth2_network_config .clone() .ok_or("Missing network config")?; let genesis_state = network_config.beacon_state()?; + let prune_config = parse_prune_states_config(cli_args)?; - prune_states(client_config, genesis_state, &context, log).map_err(format_err) + prune_states(client_config, prune_config, genesis_state, &context, log) } _ => Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()), } From c7664be93ea09eb3a4e0c67d3c8dc2b849867974 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 12 Oct 2023 16:32:28 +1100 Subject: [PATCH 05/14] Fix clippy errors and fix genesis download in `database_manager`. --- beacon_node/store/src/memory_store.rs | 5 ----- consensus/fork_choice/tests/tests.rs | 16 +++++++--------- database_manager/src/lib.rs | 16 +++++++++++++++- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index f3ecb02b947..c2e494dce6a 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -38,11 +38,6 @@ impl KeyValueStore for MemoryStore { fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { let column_key = BytesKey::from_vec(get_key_for_col(col, key)); self.db.write().insert(column_key, val.to_vec()); - self.col_keys - .write() - .entry(col.as_bytes().to_vec()) - .or_default() - .insert(key.to_vec()); Ok(()) } diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index b50413713de..47c3190c7b7 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -1,4 +1,4 @@ -// #![cfg(not(debug_assertions))] +#![cfg(not(debug_assertions))] use std::fmt; use std::sync::Mutex; @@ -93,8 +93,7 @@ impl ForkChoiceTest { T: Fn(&BeaconForkChoiceStore, MemoryStore>) -> U, { func( - &self - .harness + self.harness .chain .canonical_head .fork_choice_read_lock() @@ -386,8 +385,7 @@ impl ForkChoiceTest { &self.harness.chain.spec, self.harness.logger(), ) - .err() - .expect("on_block did not return an error"); + .expect_err("on_block did not return an error"); comparison_func(err); self } @@ -841,7 +839,7 @@ async fn valid_attestation() { .apply_attestation_to_chain( MutationDelay::NoDelay, |_, _| {}, - |result| assert_eq!(result.unwrap(), ()), + |result| assert!(result.is_ok()), ) .await; } @@ -1074,7 +1072,7 @@ async fn invalid_attestation_delayed_slot() { .apply_attestation_to_chain( MutationDelay::NoDelay, |_, _| {}, - |result| assert_eq!(result.unwrap(), ()), + |result| assert!(result.is_ok()), ) .await .inspect_queued_attestations(|queue| assert_eq!(queue.len(), 1)) @@ -1183,7 +1181,7 @@ async fn weak_subjectivity_check_fails_early_epoch() { let mut checkpoint = setup_harness.harness.finalized_checkpoint(); - checkpoint.epoch = checkpoint.epoch - 1; + checkpoint.epoch -= 1; let chain_config = ChainConfig { weak_subjectivity_checkpoint: Some(checkpoint), @@ -1210,7 +1208,7 @@ async fn weak_subjectivity_check_fails_late_epoch() { let mut checkpoint = setup_harness.harness.finalized_checkpoint(); - checkpoint.epoch = checkpoint.epoch + 1; + checkpoint.epoch += 1; let chain_config = ChainConfig { weak_subjectivity_checkpoint: Some(checkpoint), diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 008cea8eefb..1b9cd4d73fc 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -512,11 +512,25 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result prune_payloads(client_config, &context, log).map_err(format_err) } ("prune-states", Some(cli_args)) => { + let executor = env.core_context().executor; let network_config = context .eth2_network_config .clone() .ok_or("Missing network config")?; - let genesis_state = network_config.beacon_state()?; + + let genesis_state = executor + .block_on_dangerous( + network_config.genesis_state::( + client_config.genesis_state_url.as_deref(), + client_config.genesis_state_url_timeout, + &log, + ), + "get_genesis_state", + ) + .ok_or("Shutting down")? + .map_err(|e| format!("Error getting genesis state: {e}"))? + .ok_or("Genesis state missing")?; + let prune_config = parse_prune_states_config(cli_args)?; prune_states(client_config, prune_config, genesis_state, &context, log) From 5d651d08eac55c5ade438bcf733f6bf304c32bd0 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 24 Oct 2023 16:21:26 +1100 Subject: [PATCH 06/14] Add tests for historical state pruning. --- beacon_node/beacon_chain/tests/store_tests.rs | 73 ++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ab54af42c78..5e3f70c5eb1 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -23,7 +23,7 @@ use std::collections::HashSet; use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; -use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}; +use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN}; use store::{ iter::{BlockRootsIterator, StateRootsIterator}, HotColdDB, LevelDB, StoreConfig, @@ -2796,6 +2796,77 @@ async fn schema_downgrade_to_min_version() { .expect_err("should not downgrade below minimum version"); } +#[tokio::test] +async fn prune_historic_states() { + let num_blocks_produced = E::slots_per_epoch() * 5; + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let genesis_state_root = harness.chain.genesis_state_root; + let genesis_state = harness + .chain + .get_state(&genesis_state_root, None) + .unwrap() + .unwrap(); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Check historical state is present. + let state_roots_iter = harness + .chain + .forwards_iter_state_roots(Slot::new(0)) + .unwrap(); + for (state_root, slot) in state_roots_iter + .take(E::slots_per_epoch() as usize) + .map(Result::unwrap) + { + assert!(store.get_state(&state_root, Some(slot)).unwrap().is_some()); + } + + store + .prune_historic_states(genesis_state_root, &genesis_state) + .unwrap(); + + // Check that anchor info is updated. + let anchor_info = store.get_anchor_info().unwrap(); + assert_eq!(anchor_info.state_lower_limit, 0); + assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN); + + // Historical states should be pruned. + let state_roots_iter = harness + .chain + .forwards_iter_state_roots(Slot::new(1)) + .unwrap(); + for (state_root, slot) in state_roots_iter + .take(E::slots_per_epoch() as usize) + .map(Result::unwrap) + { + assert!(store.get_state(&state_root, Some(slot)).unwrap().is_none()); + } + + // Ensure that genesis state is still accessible + let genesis_state_root = harness.chain.genesis_state_root; + assert!(store + .get_state(&genesis_state_root, Some(Slot::new(0))) + .unwrap() + .is_some()); + + // Run for another two epochs. + let additional_blocks_produced = 2 * E::slots_per_epoch(); + harness + .extend_slots(additional_blocks_produced as usize) + .await; + + check_finalization(&harness, num_blocks_produced + additional_blocks_produced); + check_split_slot(&harness, store); +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store). From 514e40b67cad9ea048f6e1a780015d15a5eb91b6 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 27 Oct 2023 16:24:07 +1100 Subject: [PATCH 07/14] Add docs for prune-states command. --- book/src/database-migrations.md | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/book/src/database-migrations.md b/book/src/database-migrations.md index 9b60ca2e186..ed7f7d72f52 100644 --- a/book/src/database-migrations.md +++ b/book/src/database-migrations.md @@ -158,3 +158,38 @@ lighthouse db version --network mainnet ``` [run-correctly]: #how-to-run-lighthouse-db-correctly + +## How to prune historic states + +Pruning historic states helps in managing the disk space used by the Lighthouse beacon node by removing old beacon +states from the freezer database. This can be especially useful when the database has accumulated a significant amount +of historic data. This command is intended for nodes synced before 4.4.1, as newly synced node no longer store +historic states by default. + +Here are the steps to prune historic states: + +1. Before running the prune command, make sure that the Lighthouse beacon node is not running. If you are using systemd, you might stop the Lighthouse beacon node with a command like: + + ```bash + sudo systemctl stop lighthousebeacon + ``` + +2. Use the `prune-states` command to prune the historic states. You can do a test run without the `--confirm` flag to check that the database can be pruned: + + ```bash + sudo -u "$LH_USER" lighthouse db prune-states --datadir "$LH_DATADIR" --network "$NET" + ``` + +3. If you are ready to prune the states irreversibly, add the `--confirm` flag to commit the changes: + + ```bash + sudo -u "$LH_USER" lighthouse db prune-states --confirm --datadir "$LH_DATADIR" --network "$NET" + ``` + + The `--confirm` flag ensures that you are aware the action is irreversible, and historic states will be permanently removed. + +4. After successfully pruning the historic states, you can restart the Lighthouse beacon node: + + ```bash + sudo systemctl start lighthousebeacon + ``` From 4ceb91fab43962e4bbdbf2b67ded238423242b4f Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 27 Oct 2023 16:52:28 +1100 Subject: [PATCH 08/14] Fix compilation errors. --- beacon_node/store/src/lib.rs | 2 ++ database_manager/src/lib.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index ac47f8c8479..d88edeb9a46 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -288,6 +288,8 @@ impl DBColumn { | Self::PubkeyCache | Self::BeaconRestorePoint | Self::DhtEnrs + | Self::BeaconBlob + | Self::OverflowLRUCache | Self::OptimisticTransitionBlock => 32, Self::BeaconBlockRoots | Self::BeaconStateRoots diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index a0836244a5e..5df5844df21 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -507,10 +507,12 @@ pub fn prune_states( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(), From 7f2de7d8276e6b391f14935d5f9145870b297a8f Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 30 Oct 2023 14:44:36 +1100 Subject: [PATCH 09/14] Heal freezer block roots before pruning --- beacon_node/store/src/hot_cold_store.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0d45149c62d..c08b0e233f7 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2219,6 +2219,8 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// This function fills in missing block roots between last restore point slot and split + /// slot, if any. pub fn heal_freezer_block_roots(&self) -> Result<(), Error> { let split = self.get_split_info(); let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point @@ -2259,6 +2261,9 @@ impl, Cold: ItemStore> HotColdDB genesis_state_root: Hash256, genesis_state: &BeaconState, ) -> Result<(), Error> { + // Make sure there is no missing block roots before pruning + self.heal_freezer_block_roots()?; + // Update the anchor to use the dummy state upper limit and disable historic state storage. let old_anchor = self.get_anchor_info(); let new_anchor = if let Some(old_anchor) = old_anchor.clone() { From dbf49f3cc3fd41773fa59f0c846942d42eec6c35 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 30 Oct 2023 16:34:13 +1100 Subject: [PATCH 10/14] Update `OverflowLRUCache` key size to 33. --- beacon_node/store/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3d9163c3655..eacd28d2db6 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -275,7 +275,7 @@ impl DBColumn { /// This function returns the number of bytes used by keys in a given column. pub fn key_size(self) -> usize { match self { - Self::OverflowLRUCache => 40, + Self::OverflowLRUCache => 33, // See `OverflowKey` encode impl. Self::BeaconMeta | Self::BeaconBlock | Self::BeaconState From 1bdbc551db761f67112bc3b075611cd3fc640a3e Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 1 Nov 2023 16:12:14 +1100 Subject: [PATCH 11/14] Rename `prune_blobs` app to `prune-blobs` for consistency. --- database_manager/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 6557206bc47..8d10cb5f8fd 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -95,7 +95,8 @@ pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { } pub fn prune_blobs_app<'a, 'b>() -> App<'a, 'b> { - App::new("prune_blobs") + App::new("prune-blobs") + .alias("prune-blobs") .setting(clap::AppSettings::ColoredHelp) .about("Prune blobs older than data availability boundary") } @@ -579,7 +580,7 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result ("prune-payloads", Some(_)) => { prune_payloads(client_config, &context, log).map_err(format_err) } - ("prune_blobs", Some(_)) => prune_blobs(client_config, &context, log).map_err(format_err), + ("prune-blobs", Some(_)) => prune_blobs(client_config, &context, log).map_err(format_err), ("prune-states", Some(cli_args)) => { let executor = env.core_context().executor; let network_config = context From e07a70d0ded61e2b315cef943369d9b5826cfd35 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 1 Nov 2023 18:36:53 +1100 Subject: [PATCH 12/14] Update `state_lower_limit` to 0 when pruning states --- beacon_node/store/src/hot_cold_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4e33f713e46..9579c978927 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2276,6 +2276,7 @@ impl, Cold: ItemStore> HotColdDB let new_anchor = if let Some(old_anchor) = old_anchor.clone() { AnchorInfo { state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + state_lower_limit: Slot::new(0), ..old_anchor.clone() } } else { From 5b7ab70017e02d14750816f2ff53f39e3fcd2e29 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 1 Nov 2023 23:48:40 +1100 Subject: [PATCH 13/14] Add additional logs when running state pruning in dry mode. --- database_manager/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 8d10cb5f8fd..afb429f50ce 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -9,6 +9,7 @@ use slog::{info, warn, Logger}; use std::fs; use std::io::Write; use std::path::PathBuf; +use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN; use store::{ errors::Error, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, @@ -535,6 +536,15 @@ pub fn prune_states( // Check that the user has confirmed they want to proceed. if !prune_config.confirm { + match db.get_anchor_info() { + Some(anchor_info) if anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => { + info!(log, "States have already been pruned"); + return Ok(()); + } + _ => { + info!(log, "Ready to prune states"); + } + } warn!( log, "Pruning states is irreversible"; From c13ec7c8e0300fc76658262d6b551b8be8b3fef0 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 3 Nov 2023 11:11:07 +1100 Subject: [PATCH 14/14] Add prune_blobs alias --- database_manager/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index afb429f50ce..93654b8dd50 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -97,7 +97,7 @@ pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { pub fn prune_blobs_app<'a, 'b>() -> App<'a, 'b> { App::new("prune-blobs") - .alias("prune-blobs") + .alias("prune_blobs") .setting(clap::AppSettings::ColoredHelp) .about("Prune blobs older than data availability boundary") }