Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub fn upgrade_to_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
db.heal_freezer_block_roots()?;
info!(log, "Healed freezer block roots");

// No-op, even if Deneb has already occurred. The database is probably borked in this case, but
// *maybe* the fork recovery will revert the minority fork and succeed.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {
Expand Down
179 changes: 178 additions & 1 deletion beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use std::sync::Arc;
use std::time::Duration;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::{
chunked_vector::{chunk_key, Field},
get_key_for_col,
iter::{BlockRootsIterator, StateRootsIterator},
HotColdDB, LevelDB, StoreConfig,
DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp, LevelDB, StoreConfig,
};
use tempfile::{tempdir, TempDir};
use tokio::time::sleep;
Expand Down Expand Up @@ -104,6 +106,181 @@ fn get_harness_generic(
harness
}

/// Tests that `store.heal_freezer_block_roots` inserts block roots between last restore point
/// slot and the split slot.
#[tokio::test]
async fn heal_freezer_block_roots() {
// chunk_size is hard-coded to 128
let num_blocks_produced = E::slots_per_epoch() * 20;
let db_path = tempdir().unwrap();
let store = get_store_generic(
&db_path,
StoreConfig {
slots_per_restore_point: 2 * E::slots_per_epoch(),
..Default::default()
},
test_spec::<E>(),
);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);

harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;

let split_slot = store.get_split_slot();
assert_eq!(split_slot, 18 * E::slots_per_epoch());

// Do a heal before deleting to make sure that it doesn't break.
let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch());
store.heal_freezer_block_roots().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);

// Delete block roots between `last_restore_point_slot` and `split_slot`.
let chunk_index = <store::chunked_vector::BlockRoots as Field<E>>::chunk_index(
last_restore_point_slot.as_usize(),
);
let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index));
store
.cold_db
.do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)])
.unwrap();

let block_root_err = store
.forwards_block_roots_iterator_until(
last_restore_point_slot,
last_restore_point_slot + 1,
|| unreachable!(),
&harness.chain.spec,
)
.unwrap()
.next()
.unwrap()
.unwrap_err();

assert!(matches!(block_root_err, store::Error::NoContinuationData));

// Re-insert block roots
store.heal_freezer_block_roots().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);

// Run for another two epochs to check that the invariant is maintained.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;

check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
check_chain_dump(
&harness,
num_blocks_produced + additional_blocks_produced + 1,
);
check_iterators(&harness);
}

/// Tests that `store.heal_freezer_block_roots` inserts block roots between last restore point
/// slot and the split slot.
#[tokio::test]
async fn heal_freezer_block_roots_with_skip_slots() {
// chunk_size is hard-coded to 128
let num_blocks_produced = E::slots_per_epoch() * 20;
let db_path = tempdir().unwrap();
let store = get_store_generic(
&db_path,
StoreConfig {
slots_per_restore_point: 2 * E::slots_per_epoch(),
..Default::default()
},
test_spec::<E>(),
);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);

let current_state = harness.get_current_state();
let state_root = harness.get_current_state().tree_hash_root();
let all_validators = &harness.get_all_validators();
harness
.add_attested_blocks_at_slots(
current_state,
state_root,
&(1..=num_blocks_produced)
.filter(|i| i % 12 != 0)
.map(Slot::new)
.collect::<Vec<_>>(),
all_validators,
)
.await;

// split slot should be 18 here
let split_slot = store.get_split_slot();
assert_eq!(split_slot, 18 * E::slots_per_epoch());

let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch());
let chunk_index = <store::chunked_vector::BlockRoots as Field<E>>::chunk_index(
last_restore_point_slot.as_usize(),
);
let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index));
store
.cold_db
.do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)])
.unwrap();

let block_root_err = store
.forwards_block_roots_iterator_until(
last_restore_point_slot,
last_restore_point_slot + 1,
|| unreachable!(),
&harness.chain.spec,
)
.unwrap()
.next()
.unwrap()
.unwrap_err();

assert!(matches!(block_root_err, store::Error::NoContinuationData));

// heal function
store.heal_freezer_block_roots().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);

// Run for another two epochs to check that the invariant is maintained.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;

check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
check_iterators(&harness);
}

fn check_freezer_block_roots(
harness: &TestHarness,
last_restore_point_slot: Slot,
split_slot: Slot,
) {
for slot in (last_restore_point_slot.as_u64()..split_slot.as_u64()).map(Slot::new) {
let (block_root, result_slot) = harness
.chain
.store
.forwards_block_roots_iterator_until(slot, slot, || unreachable!(), &harness.chain.spec)
.unwrap()
.next()
.unwrap()
.unwrap();
assert_eq!(slot, result_slot);
let expected_block_root = harness
.chain
.block_root_at_slot(slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
assert_eq!(expected_block_root, block_root);
}
}

#[tokio::test]
async fn full_participation_no_skips() {
let num_blocks_produced = E::slots_per_epoch() * 5;
Expand Down
29 changes: 29 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,35 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

Ok(())
}

pub fn heal_freezer_block_roots(&self) -> Result<(), Error> {
let split = self.get_split_info();
let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point
* self.config.slots_per_restore_point;

// Load split state (which has access to block roots).
let (_, split_state) = self
.get_advanced_hot_state(split.block_root, split.slot, split.state_root)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))?;

let mut batch = vec![];
let mut chunk_writer = ChunkWriter::<BlockRoots, _, _>::new(
&self.cold_db,
last_restore_point_slot.as_usize(),
)?;

for slot in (last_restore_point_slot.as_u64()..split.slot.as_u64()).map(Slot::new) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this is the correct upper-bound, because the invariant is exclusive of the split slot. We see that here:

// Chunk writer for the linear block roots in the freezer DB.
// Start at the new upper limit because we iterate backwards.
let new_frozen_block_root_upper_limit = finalized_state.slot().as_usize().saturating_sub(1);
let mut block_root_writer =
ChunkWriter::<BlockRoots, _, _>::new(&store.cold_db, new_frozen_block_root_upper_limit)?;

This also matches the invariant description on the PR.

I've opened #4878 to track the broader DB invariant project.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking!
Great team work on the tests 😄

let block_root = *split_state.get_block_root(slot)?;
chunk_writer.set(slot.as_usize(), block_root, &mut batch)?;
}
chunk_writer.write(&mut batch)?;
self.cold_db.do_atomically(batch)?;

Ok(())
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down