diff --git a/core/src/block_creation_loop.rs b/core/src/block_creation_loop.rs index 0bfdf1b1a1..d9b7ff1df5 100644 --- a/core/src/block_creation_loop.rs +++ b/core/src/block_creation_loop.rs @@ -657,15 +657,30 @@ fn create_and_insert_leader_bank(slot: Slot, parent_bank: Arc, ctx: &mut L ctx.poh_recorder.write().unwrap().set_bank(tpu_bank); // If this the very first alpenglow block, include the genesis certificate - if parent_slot == ctx.genesis_cert.slot { + // Note: if the alpenglow genesis is 0, then this is a test cluster with Alpenglow enabled + // by default. No need to put in the genesis marker as the genesis account is already populated + // during cluster creation. + if parent_slot == ctx.genesis_cert.slot && parent_slot != 0 { let genesis_marker = VersionedBlockMarker::Current(BlockMarkerV1::GenesisCertificate( ctx.genesis_cert.clone(), )); - ctx.poh_recorder - .write() - .unwrap() + + let mut poh_recorder = ctx.poh_recorder.write().unwrap(); + // Send the genesis certificate + poh_recorder .send_marker(genesis_marker) .expect("Max tick height cannot have been reached"); + + // Process the genesis certificate + let bank = poh_recorder.bank().expect("Bank cannot have been cleared"); + let processor = bank.block_component_processor.read().unwrap(); + processor + .on_genesis_certificate( + bank.clone(), + ctx.genesis_cert.clone(), + &ctx.bank_forks.read().unwrap().migration_status(), + ) + .expect("Recording genesis certificate should not fail"); } // Wakeup banking stage diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f904fc4a0a..8b813f79d3 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -922,7 +922,6 @@ impl ReplayStage { let forks_root = bank_forks.read().unwrap().root(); let start_leader_time = if !migration_status.is_alpenglow_enabled() { - debug_assert!(votor_event_receiver.is_empty()); // Process cluster-agreed versions of duplicate slots for which we potentially // have the wrong version. Our version was dead or pruned. // Signalled by ancestor_hashes_service. diff --git a/core/src/validator.rs b/core/src/validator.rs index 04a3387782..a56803bdb8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1593,6 +1593,7 @@ impl Validator { }; (tower, VoteHistory::new(identity_keypair.pubkey(), 0)) }; + migration_status.log_phase(); let last_vote = tower.last_vote(); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 2b459c08b4..f03ff58952 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1579,7 +1579,7 @@ pub fn confirm_slot( .on_marker( bank.clone_without_scheduler(), parent_bank, - &marker, + marker, migration_status, is_final, ) @@ -1856,6 +1856,54 @@ fn process_bank_0( Ok(()) } +/// Clean up a failed slot and restart processing from the given genesis slot +fn cleanup_and_populate_pending_from_alpenglow_genesis( + first_alpenglow_bank: &BankWithScheduler, + genesis_slot: Slot, + bank_forks: &RwLock, + blockstore: &Blockstore, + leader_schedule_cache: &LeaderScheduleCache, + pending_slots: &mut Vec<(SlotMeta, Bank, Hash)>, + opts: &ProcessOptions, + migration_status: &MigrationStatus, +) -> result::Result<(), BlockstoreProcessorError> { + // `first_alpenglow_bank` was processed as a TowerBFT bank. Reset it. + let root_bank = bank_forks.read().unwrap().root_bank(); + root_bank + .remove_unrooted_slots(&[(first_alpenglow_bank.slot(), first_alpenglow_bank.bank_id())]); + root_bank.clear_slot_signatures(first_alpenglow_bank.slot()); + root_bank.prune_program_cache_by_deployment_slot(first_alpenglow_bank.slot()); + blockstore + .remove_dead_slot(first_alpenglow_bank.slot()) + .unwrap(); + + let genesis_slot_meta = blockstore + .meta(genesis_slot) + .map_err(|err| { + warn!("Failed to load meta for slot {genesis_slot}: {err:?}"); + BlockstoreProcessorError::FailedToLoadMeta + })? + .unwrap(); + + warn!( + "{}: load_frozen_forks() restart processing from {genesis_slot} treating further blocks \ + as Alpenglow banks", + migration_status.my_pubkey() + ); + pending_slots.clear(); + process_next_slots( + &bank_forks.read().unwrap().get(genesis_slot).unwrap(), + &genesis_slot_meta, + blockstore, + leader_schedule_cache, + pending_slots, + opts, + migration_status, + )?; + + Ok(()) +} + // Given a bank, add its children to the pending slots queue if those children slots are // complete fn process_next_slots( @@ -1959,8 +2007,8 @@ fn load_frozen_forks( let mut root = bank_forks.read().unwrap().root(); let max_root = std::cmp::max(root, blockstore_max_root); info!( - "load_frozen_forks() latest root from blockstore: {blockstore_max_root}, max_root: \ - {max_root}", + "load_frozen_forks() bank forks root {root}, latest root from blockstore: \ + {blockstore_max_root}, max_root: {max_root}", ); // The total number of slots processed @@ -2043,6 +2091,31 @@ fn load_frozen_forks( if opts.abort_on_invalid_block { Err(error)? } + + // If this block was the first alpenglow block and advanced the migration phase, we can enable alpenglow. + // + // Note: since this code is all startup code we don't have to worry about shutting down `PohService` or any + // in flight activity of `ReplayStage`. This bank must have failed to freeze as it is an Alpenglow block + // being verified as a TowerBFT one. + // + // We are safe to cleanly transition to alpenglow here + if migration_status.is_ready_to_enable() { + let genesis_slot = migration_status.enable_alpenglow_during_startup(); + + // We need to clear pending_slots as it might contain Alpenglow blocks initialized as TowerBFT banks. + // Clear and populate pending slots from alpenglow genesis + cleanup_and_populate_pending_from_alpenglow_genesis( + &bank, + genesis_slot, + bank_forks, + blockstore, + leader_schedule_cache, + &mut pending_slots, + opts, + &migration_status, + )?; + } + continue; } txs += progress.num_txs; @@ -2105,7 +2178,11 @@ fn load_frozen_forks( } else { None } - }; + }.filter(|new_root_bank| { + // In the case that we've restarted while the migrationary period is going on but before alpenglow + // is enabled, don't root blocks past the migration slot + migration_status.should_root_during_startup(new_root_bank.slot()) + }); m.stop(); voting_us += m.as_us(); @@ -2129,6 +2206,19 @@ fn load_frozen_forks( all_banks.retain(|_, bank| bank.ancestors.contains_key(&root)); m.stop(); root_retain_us += m.as_us(); + + // If this root bank activated the feature flag, update migration status + if migration_status.is_pre_feature_activation() { + if let Some(slot) = bank_forks + .read() + .unwrap() + .root_bank() + .feature_set + .activated_slot(&agave_feature_set::alpenglow::id()) + { + migration_status.record_feature_activation(slot); + } + } } slots_processed += 1; diff --git a/local-cluster/src/integration_tests.rs b/local-cluster/src/integration_tests.rs index bdf99074a1..9cf146fb4c 100644 --- a/local-cluster/src/integration_tests.rs +++ b/local-cluster/src/integration_tests.rs @@ -71,7 +71,7 @@ pub const AG_DEBUG_LOG_FILTER: &str = solana_core::block_creation_loop=trace,solana_votor=trace,solana_votor::voting_service=info,\ solana_votor::vote_history_storage=info,solana_core::validator=info,\ solana_votor::consensus_metrics=info,solana_core::consensus=info,\ - solana_ledger::blockstore_processor=info"; + solana_ledger::blockstore_processor=info,solana_ledger::blockstore=info"; pub const DEFAULT_NODE_STAKE: u64 = 10 * LAMPORTS_PER_SOL; pub fn last_vote_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot, Hash)> { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 00086c4132..6eb635147c 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -40,7 +40,7 @@ use { solana_ledger::{ ancestor_iterator::AncestorIterator, bank_forks_utils, - blockstore::{entries_to_test_shreds, Blockstore}, + blockstore::{entries_to_test_shreds, Blockstore, PurgeType}, blockstore_processor::ProcessOptions, leader_schedule::{FixedSchedule, IdentityKeyedLeaderSchedule}, leader_schedule_utils::first_of_consecutive_leader_slots, @@ -100,9 +100,9 @@ use { solana_votor::voting_service::{AlpenglowPortOverride, VotingServiceOverride}, solana_votor_messages::{ consensus_message::{ - CertificateType, ConsensusMessage, VoteMessage, BLS_KEYPAIR_DERIVE_SEED, + Certificate, CertificateType, ConsensusMessage, VoteMessage, BLS_KEYPAIR_DERIVE_SEED, }, - migration::MIGRATION_SLOT_OFFSET, + migration::{GENESIS_CERTIFICATE_ACCOUNT, MIGRATION_SLOT_OFFSET}, vote::Vote, }, std::{ @@ -6223,9 +6223,16 @@ fn test_alpenglow_imbalanced_stakes_catchup() { ); } -fn test_alpenglow_migration(num_nodes: usize) { +fn test_alpenglow_migration( + num_nodes: usize, + test_name: &str, + leader_schedule: &[usize], +) -> ( + LocalCluster, + Vec>, + /* migration slot */ Slot, +) { solana_logger::setup_with_default(AG_DEBUG_LOG_FILTER); - let test_name = &format!("test_alpenglow_migration_{num_nodes}"); let vote_listener_socket = solana_net_utils::bind_to_localhost().unwrap(); let vote_listener_addr = vote_listener_socket.try_clone().unwrap(); @@ -6238,9 +6245,11 @@ fn test_alpenglow_migration(num_nodes: usize) { // we can't afford any forking due to rolling start validator_config.wait_for_supermajority = Some(0); - let validator_keys = (0..num_nodes) - .map(|i| (Arc::new(keypair_from_seed(&[i as u8; 32]).unwrap()), true)) - .collect::>(); + let (leader_schedule, keys) = create_custom_leader_schedule_with_random_keys(leader_schedule); + + validator_config.fixed_leader_schedule = Some(FixedSchedule { + leader_schedule: Arc::new(leader_schedule), + }); let node_stakes = vec![DEFAULT_NODE_STAKE; num_nodes]; // We want the epochs to be as short as possible to reduce test time without being flaky. @@ -6249,7 +6258,12 @@ fn test_alpenglow_migration(num_nodes: usize) { assert!(slots_per_epoch > MIGRATION_SLOT_OFFSET); let mut cluster_config = ClusterConfig { validator_configs: make_identical_validator_configs(&validator_config, num_nodes), - validator_keys: Some(validator_keys), + validator_keys: Some( + keys.iter() + .cloned() + .zip(iter::repeat_with(|| true)) + .collect(), + ), node_stakes: node_stakes.clone(), slots_per_epoch, stakers_slot_offset: slots_per_epoch, @@ -6341,24 +6355,179 @@ fn test_alpenglow_migration(num_nodes: usize) { // Additionally ensure that roots are being made cluster.check_for_new_roots(8, test_name, SocketAddrSpace::Unspecified); + (cluster, keys, migration_slot) } #[test] #[serial] fn test_alpenglow_migration_1() { - test_alpenglow_migration(1) + test_alpenglow_migration(1, "test_alpenglow_migration_1", &[4]); } #[test] #[serial] fn test_alpenglow_migration_2() { - test_alpenglow_migration(2) + test_alpenglow_migration(2, "test_alpenglow_migration_2", &[4, 4]); } #[test] #[serial] fn test_alpenglow_migration_4() { - test_alpenglow_migration(4) + test_alpenglow_migration(4, "test_alpenglow_migration_4", &[4, 4, 4, 4]); +} + +#[test] +#[serial] +fn test_alpenglow_restart_post_migration() { + let test_name = "test_alpenglow_restart_post_migration"; + + // Start a 2 node cluster and have it go through the migration + let (mut cluster, _, _) = test_alpenglow_migration(2, test_name, &[4, 4]); + + // Now restart one of the nodes. This causes the cluster to temporarily halt + let node_pubkey = cluster.get_node_pubkeys()[0]; + cluster.exit_restart_node( + &node_pubkey, + safe_clone_config(&cluster.validators.get(&node_pubkey).unwrap().config), + SocketAddrSpace::Unspecified, + ); + + // The restarted node will startup from genesis (0) so this test verifies the following: + // - When processing the feature flag activation during startup increment `PreFeatureActivation` -> `Migration` + // - When processing the first alpenglow block during startup increment `Migration` -> `ReadyToEnable` + // - If we reach `ReadyToEnable` during startup, enable alpenglow + // - Ensure that during startup we set ticks correctly + cluster.check_for_new_roots(8, test_name, SocketAddrSpace::Unspecified); +} + +#[test] +#[serial] +fn test_alpenglow_missed_migration_entirely() { + let test_name = "test_alpenglow_missed_migration_entirely"; + + // Start a 3 node cluster and have it go through the migration and root some slots + // Critical that the third node is not in the leader schedule, as since + // we clear blockstore later, we could end up producing duplicate blocks + let (mut cluster, validator_keys, migration_slot) = + test_alpenglow_migration(3, test_name, &[4, 4, 0]); + + // Now kill the second node + let node_pubkey = validator_keys[2].pubkey(); + let exit_info = cluster.exit_node(&node_pubkey); + let start_slot = migration_slot - 10; + + // Clear blockstore to simulate the node partitioning before the migration period + info!("Clearing blockstore after slot {start_slot}"); + { + let blockstore = Blockstore::open(&exit_info.info.ledger_path).unwrap(); + let end_slot = blockstore.highest_slot().unwrap().unwrap(); + blockstore.purge_from_next_slots(start_slot, end_slot); + blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); + } + + // Restart the node. + // We have simulated the following situation: + // - Nodes 1 and 2 were enough to perform the migration. They finalized blocks after the migration + // and are no longer broadcasting the GenesisCertificate. + // - Node 3 was completely partitioned off since 10 slots before the migration. + // - Node 3 now rejoins the network + // - It is able to use TowerBFT eager repair to fetch blocks, eventually it will see the + // first Alpenglow block, attempt to process it as a TowerBFT block and switch to Alpenglow. + info!("Restarting node to a pre migration state"); + cluster.restart_node(&node_pubkey, exit_info, SocketAddrSpace::Unspecified); + cluster.check_for_new_roots(8, test_name, SocketAddrSpace::Unspecified); +} + +#[test] +// This test requires alpenglow repair +#[ignore] +#[serial] +fn test_alpenglow_missed_migration_completion() { + let test_name = "test_alpenglow_missed_migration_completion"; + + // Start a 3 node cluster and have it go through the migration and root some slots + // Critical that the second node is not in the leader schedule, as since + // we clear blockstore later, we could end up producing duplicate blocks + let (mut cluster, validator_keys, _migration_slot) = + test_alpenglow_migration(3, test_name, &[4, 0, 4]); + + // Figure out the genesis slot + info!("Determining the genesis slot"); + let client = RpcClient::new_socket_with_commitment( + cluster.entry_point_info.rpc().unwrap(), + CommitmentConfig::processed(), + ); + let genesis_slot = loop { + let Ok(account) = client.get_account(&GENESIS_CERTIFICATE_ACCOUNT) else { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + }; + break account + .deserialize_data::() + .expect("Genesis cert must be populated") + .cert_type + .slot(); + }; + info!("Genesis slot {genesis_slot}"); + + // Now kill the second node + let node_pubkey = validator_keys[1].pubkey(); + let exit_info = cluster.exit_node(&node_pubkey); + { + // Clear all blocks past genesis + let start_slot = genesis_slot + 1; + let blockstore = Blockstore::open(&exit_info.info.ledger_path).unwrap(); + let end_slot = blockstore.highest_slot().unwrap().unwrap(); + info!("Clearing blockstore from slot {start_slot}"); + blockstore.purge_from_next_slots(start_slot, end_slot); + blockstore.purge_slots(start_slot, end_slot, PurgeType::CompactionFilter); + + // Now to simulate the node observing the migration insert 5 TowerBFT blocks + // past the alpenglow genesis + for slot in start_slot..start_slot + 5 { + let entries = create_ticks(64, 0, cluster.genesis_config.hash()); + let last_hash = entries.last().unwrap().hash; + let version = solana_shred_version::version_from_hash(&last_hash); + // It doesn't really matter if these shreds fully check out. + // The key is that they occupy space in the blockstore column requiring + // Alpenglow repair to get the alpenglow version of these blocks + let shreds = Shredder::new(slot, slot - 1, 0, version) + .unwrap() + .entries_to_merkle_shreds_for_tests( + &Keypair::new(), + &entries, + true, // is_full_slot + None, // chained_merkle_root + 0, // next_shred_index, + 0, // next_code_index + &ReedSolomonCache::default(), + &mut ProcessShredsStats::default(), + ) + .0; + blockstore + .insert_shreds(shreds, None, /* is_trusted */ true) + .unwrap(); + } + } + + // Restart the node - we have now simulated the following case: + // - All nodes entered the migration + // - Node 1 and 3 observed the migration success, cleared TowerBFT blocks past genesis + // and are now chugging along in Alpenglow + // - The second node observed 5 TowerBFT blocks after the migration, but partitioned off + // before it could view the Genesis Certificate. It has the Genesis block + // + // BlockID repair is necessary to progress: + // - Alpenglow must repair to get the Alpenglow blocks instead of the TowerBFT ones leftover from the + // migrationary period. + // - This allows Node 2 to see the first Alpenglow block and enter alpenglow + // + // Upon observing a finalization certificate from the other nodes, Node 2 will eventually + // be able to trigger block id repair. The tricky part is figuring out that we need to replay + // these alpenglow blocks from the secondary column. If this is done, then the GenesisCertificate + // Marker will be observed and we will enable alpenglow. + cluster.restart_node(&node_pubkey, exit_info, SocketAddrSpace::Unspecified); + cluster.check_for_new_roots(8, test_name, SocketAddrSpace::Unspecified); } fn broadcast_vote( diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 94ee992517..f6b23f0252 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -2826,9 +2826,6 @@ impl Bank { } /// If this is an alpenglow block, return the genesis certificate. - /// - /// Note: this should only be called on a frozen bank, otherwise results - /// might be inaccurate for the first alpenglow bank. pub fn get_alpenglow_genesis_certificate(&self) -> Option { self.get_account(&GENESIS_CERTIFICATE_ACCOUNT).map(|acct| { acct.deserialize_data() @@ -2836,6 +2833,15 @@ impl Bank { }) } + /// For use in the first Alpenglow block, set the genesis certificate. + pub fn set_alpenglow_genesis_certificate(&self, cert: &Certificate) { + let cert_size = bincode::serialized_size(cert).unwrap(); + let lamports = Rent::default().minimum_balance(cert_size as usize); + let cert_acct = AccountSharedData::new_data(lamports, cert, &system_program::ID).unwrap(); + + self.store_account_and_update_capitalization(&GENESIS_CERTIFICATE_ACCOUNT, &cert_acct); + } + pub fn confirmed_last_blockhash(&self) -> Hash { const NUM_BLOCKHASH_CONFIRMATIONS: usize = 3; diff --git a/runtime/src/block_component_processor.rs b/runtime/src/block_component_processor.rs index 805cd99e85..61ee15b6fe 100644 --- a/runtime/src/block_component_processor.rs +++ b/runtime/src/block_component_processor.rs @@ -1,17 +1,26 @@ use { crate::bank::Bank, + log::*, solana_clock::{Slot, DEFAULT_MS_PER_SLOT}, solana_entry::block_component::{ - BlockFooterV1, BlockMarkerV1, VersionedBlockFooter, VersionedBlockHeader, - VersionedBlockMarker, + BlockFooterV1, BlockMarkerV1, GenesisCertificate, VersionedBlockFooter, + VersionedBlockHeader, VersionedBlockMarker, }, - solana_votor_messages::migration::MigrationStatus, + solana_votor_messages::{consensus_message::Certificate, migration::MigrationStatus}, std::sync::Arc, thiserror::Error, }; #[derive(Debug, Error, PartialEq, Eq)] pub enum BlockComponentProcessorError { + #[error("BlockComponent detected pre-migration")] + BlockComponentPreMigration, + #[error("GenesisCertificate marker detected when GenesisCertificate is already populated")] + GenesisCertificateAlreadyPopulated, + #[error("GenesisCertificate marker detected when the cluster has Alpenglow enabled at slot 0")] + GenesisCertificateInAlpenglowCluster, + #[error("GenesisCertificate marker detected on a block which is not a child of genesis")] + GenesisCertificateOnNonChild, #[error("Missing block footer")] MissingBlockFooter, #[error("Missing block header")] @@ -20,8 +29,6 @@ pub enum BlockComponentProcessorError { MultipleBlockFooters, #[error("Multiple block headers detected")] MultipleBlockHeaders, - #[error("BlockComponent detected pre-migration")] - BlockComponentPreMigration, #[error("Nanosecond clock out of bounds")] NanosecondClockOutOfBounds, } @@ -71,36 +78,101 @@ impl BlockComponentProcessor { &mut self, bank: Arc, parent_bank: Arc, - marker: &VersionedBlockMarker, + marker: VersionedBlockMarker, migration_status: &MigrationStatus, is_final: bool, ) -> Result<(), BlockComponentProcessorError> { - // Pre-migration: blocks with block components should be marked as dead - if !migration_status.is_alpenglow_enabled() { - return Err(BlockComponentProcessorError::BlockComponentPreMigration); - } - - // Here onwards, Alpenglow is enabled + let slot = bank.slot(); let marker = match marker { VersionedBlockMarker::V1(marker) | VersionedBlockMarker::Current(marker) => marker, }; - match marker { - BlockMarkerV1::BlockFooter(footer) => self.on_footer(bank, parent_bank, footer), - BlockMarkerV1::BlockHeader(header) => self.on_header(header), + match (marker, migration_status.should_allow_block_markers(slot)) { + // The first alpenglow block can be ingested before migration is complete if our node is having issues + // observing the migration. Thus we allow the header and genesis certificate during the migration period + // in order to increment the migration phase. + (BlockMarkerV1::BlockHeader(header), allowed) + if allowed || migration_status.is_in_migration() => + { + self.on_header(&header) + } + (BlockMarkerV1::GenesisCertificate(genesis_cert), allowed) + if allowed || migration_status.is_in_migration() => + { + self.on_genesis_certificate(bank, genesis_cert, migration_status) + } + + // The remaining block components should only be present if the migration is complete + (BlockMarkerV1::BlockFooter(footer), true) => { + self.on_footer(bank, parent_bank, &footer) + } // We process UpdateParent messages on shred ingest, so no callback needed here - BlockMarkerV1::UpdateParent(_) => Ok(()), - // TODO(ashwin): update genesis certificate account / ticks - BlockMarkerV1::GenesisCertificate(_) => Ok(()), + (BlockMarkerV1::UpdateParent(_), true) => Ok(()), + (_, _) => Err(BlockComponentProcessorError::BlockComponentPreMigration), }?; - if is_final { + if is_final && migration_status.should_allow_block_markers(slot) { self.on_final() } else { Ok(()) } } + pub fn on_genesis_certificate( + &self, + bank: Arc, + genesis_cert: GenesisCertificate, + migration_status: &MigrationStatus, + ) -> Result<(), BlockComponentProcessorError> { + // Genesis Certificate is only allowed for direct child of genesis + if bank.parent_slot() == 0 { + return Err(BlockComponentProcessorError::GenesisCertificateInAlpenglowCluster); + } + + let parent_block_id = bank + .parent_block_id() + .expect("Block id is populated for all slots > 0"); + if (bank.parent_slot(), parent_block_id) != (genesis_cert.slot, genesis_cert.block_id) { + return Err(BlockComponentProcessorError::GenesisCertificateOnNonChild); + } + + if bank.get_alpenglow_genesis_certificate().is_some() { + return Err(BlockComponentProcessorError::GenesisCertificateAlreadyPopulated); + } + + let genesis_cert = Certificate::from(genesis_cert); + // TODO(ashwin): verify genesis cert using bls sigverify and bank + + bank.set_alpenglow_genesis_certificate(&genesis_cert); + + if migration_status.is_alpenglow_enabled() { + // We participated in the migration, nothing to do + return Ok(()); + } + + // We missed the migration however we ingested the first alpenglow block. + // This is either a result of startup replay, or in some weird cases steady state replay after a network partition. + // Either way we ingest the genesis block details moving us to `ReadyToEnable`. + // Since this is a direct child of genesis, and we are replaying, we know we have frozen the genesis block. + // Then `load_frozen_forks` or `replay_stage` will take care of the rest. + warn!( + "{}: Alpenglow genesis marker processed during replay of {}. Transitioning Alpenglow \ + to ReadyToEnable", + migration_status.my_pubkey(), + bank.slot() + ); + migration_status.set_genesis_block( + genesis_cert + .cert_type + .to_block() + .expect("Genesis cert must correspond to a block"), + ); + migration_status.set_genesis_certificate(Arc::new(genesis_cert)); + assert!(migration_status.is_ready_to_enable()); + + Ok(()) + } + fn on_footer( &mut self, bank: Arc, @@ -357,7 +429,7 @@ mod tests { let bank = create_child_bank(&parent, 1); processor - .on_marker(bank, parent, &marker, &migration_status, false) + .on_marker(bank, parent, marker, &migration_status, false) .unwrap(); assert!(processor.has_header); } @@ -387,7 +459,7 @@ mod tests { )); processor - .on_marker(bank.clone(), parent, &marker, &migration_status, false) + .on_marker(bank.clone(), parent, marker, &migration_status, false) .unwrap(); assert!(processor.has_footer); @@ -450,7 +522,7 @@ mod tests { }), )); - let result = processor.on_marker(bank, parent, &marker, &migration_status, false); + let result = processor.on_marker(bank, parent, marker, &migration_status, false); assert_eq!( result, Err(BlockComponentProcessorError::BlockComponentPreMigration) @@ -489,7 +561,7 @@ mod tests { .on_marker( bank.clone(), parent.clone(), - &header_marker, + header_marker, &migration_status, false, ) @@ -515,7 +587,7 @@ mod tests { .on_marker( bank.clone(), parent, - &footer_marker, + footer_marker, &migration_status, false, ) @@ -574,13 +646,8 @@ mod tests { )); // Should succeed - footer is processed and slot_full validation passes - let result = processor.on_marker( - bank.clone(), - parent, - &footer_marker, - &migration_status, - true, - ); + let result = + processor.on_marker(bank.clone(), parent, footer_marker, &migration_status, true); assert!(result.is_ok()); assert!(processor.has_footer); diff --git a/votor-messages/src/migration.rs b/votor-messages/src/migration.rs index 80ec81c6d8..23043bb6a6 100644 --- a/votor-messages/src/migration.rs +++ b/votor-messages/src/migration.rs @@ -187,6 +187,20 @@ impl MigrationPhase { } } + /// Should we root this slot when loading frozen slots during startup? + /// Similar to `should_report_commitment_or_root`, but we also continue root post migration. + /// This is only relevant if we restart during the migration period before it completes, we don't + /// want to root any slots >= migraiton_slot + fn should_root_during_startup(&self, slot: Slot) -> bool { + match self { + MigrationPhase::PreFeatureActivation => true, + MigrationPhase::Migration { migration_slot, .. } => slot < *migration_slot, + MigrationPhase::ReadyToEnable { .. } + | MigrationPhase::AlpenglowEnabled { .. } + | MigrationPhase::FullAlpenglowEpoch { .. } => true, + } + } + /// Should we publish epoch slots for this slot? /// We publish epoch slots for all slots until we enable alpenglow. /// Once alpenglow is enabled in the mixed migration epoch we should still be publishing for TowerBFT slots @@ -228,6 +242,12 @@ impl MigrationPhase { self.should_send_votor_event(slot) } + /// Should this block be allowed to have block markers? + fn should_allow_block_markers(&self, slot: Slot) -> bool { + // Same as votor events, TowerBFT blocks should not have markers + self.should_send_votor_event(slot) + } + /// Check if we are in the full alpenglow epoch fn is_full_alpenglow_epoch(&self) -> bool { matches!(self, MigrationPhase::FullAlpenglowEpoch { .. }) @@ -350,7 +370,7 @@ impl MigrationStatus { } }; - warn!("Initializing alpenglow migration {phase:?}"); + warn!("Pre startup initializing alpenglow migration from root bank: {phase:?}"); Self::new(phase) } @@ -359,18 +379,28 @@ impl MigrationStatus { *self.my_pubkey.write().unwrap() = my_pubkey; } - fn my_pubkey(&self) -> Pubkey { + /// For use in logging, the current pubkey + pub fn my_pubkey(&self) -> Pubkey { *self.my_pubkey.read().unwrap() } + /// Print a log about the current phase of migration + pub fn log_phase(&self) { + let my_pubkey = self.my_pubkey(); + let phase = self.phase.read().unwrap(); + warn!("{my_pubkey}: Alpenglow migration phase {phase:?}"); + } + dispatch!(pub fn is_alpenglow_enabled(&self) -> bool); dispatch!(pub fn qualifies_for_genesis_discovery(&self, slot: Slot) -> bool); dispatch!(pub fn should_bank_be_vote_only(&self, bank_slot: Slot) -> bool); dispatch!(pub fn should_report_commitment_or_root(&self, slot: Slot) -> bool); + dispatch!(pub fn should_root_during_startup(&self, slot: Slot) -> bool); dispatch!(pub fn should_publish_epoch_slots(&self, slot: Slot) -> bool); dispatch!(pub fn should_send_votor_event(&self, slot: Slot) -> bool); dispatch!(pub fn should_respond_to_ancestor_hashes_requests(&self, slot: Slot) -> bool); dispatch!(pub fn should_have_alpenglow_ticks(&self, slot: Slot) -> bool); + dispatch!(pub fn should_allow_block_markers(&self, slot: Slot) -> bool); dispatch!(pub fn is_full_alpenglow_epoch(&self) -> bool); dispatch!(pub fn is_pre_feature_activation(&self) -> bool); dispatch!(pub fn is_ready_to_enable(&self) -> bool); @@ -575,6 +605,34 @@ impl MigrationStatus { condvar.notify_all(); } + /// Enables alpenglow in the startup pathway. This is pre `PohService` so we can do this from a single thread. + /// Returns the genesis slot + /// + /// Transition the phase from `ReadyToEnable` to `AlpenglowEnabled` + pub fn enable_alpenglow_during_startup(&self) -> Slot { + warn!("{}: Enabling alpenglow during startup", self.my_pubkey()); + let MigrationPhase::ReadyToEnable { genesis_cert } = self.phase.read().unwrap().clone() + else { + unreachable!( + "{}: Programmer error, Attempting to enable alpenglow during startup without \ + being ReadyToEnable", + self.my_pubkey() + ); + }; + + let genesis_slot = genesis_cert.cert_type.slot(); + self.shutdown_poh.store(true, Ordering::Release); + *self.phase.write().unwrap() = MigrationPhase::AlpenglowEnabled { genesis_cert }; + let (is_alpenglow_enabled, _condvar) = &self.migration_wait; + *is_alpenglow_enabled.lock().unwrap() = true; + // No need to condvar as we're in startup and no one is waiting for us. + warn!( + "{}: Alpenglow enabled during startup! Genesis slot {genesis_slot}", + self.my_pubkey() + ); + genesis_slot + } + /// Alpenglow has rooted a block in a new epoch. This indicates the migration epoch has completed. /// /// Transitions from `AlpenglowEnabled` to `FullAlpenglowEpoch`