diff --git a/Cargo.lock b/Cargo.lock index 1b745119c9..335add1f26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9602,6 +9602,7 @@ dependencies = [ "num-traits", "num_cpus", "num_enum", + "parking_lot 0.12.3", "percentage", "qualifier_attr", "rand 0.7.3", diff --git a/core/src/sigverifier/bls_sigverifier.rs b/core/src/sigverifier/bls_sigverifier.rs index c0b104f5bb..c23778f3a4 100644 --- a/core/src/sigverifier/bls_sigverifier.rs +++ b/core/src/sigverifier/bls_sigverifier.rs @@ -11,31 +11,18 @@ use { alpenglow_vote::bls_message::BLSMessage, crossbeam_channel::{Sender, TrySendError}, solana_pubkey::Pubkey, - solana_runtime::{bank_forks::BankForks, epoch_stakes::EpochStakes}, - solana_sdk::{ - clock::{Epoch, Slot}, - epoch_schedule::EpochSchedule, - }, + solana_runtime::epoch_stakes_service::EpochStakesService, + solana_sdk::clock::Slot, solana_streamer::packet::PacketBatch, stats::{BLSSigVerifierStats, StatsUpdater}, - std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::{Duration, Instant}, - }, + std::{collections::HashMap, sync::Arc}, }; -const EPOCH_STAKES_QUERY_INTERVAL: Duration = Duration::from_secs(60); - pub struct BLSSigVerifier { - bank_forks: Arc>, verified_votes_sender: VerifiedVoteSender, message_sender: Sender, + epoch_stakes_service: Arc, stats: BLSSigVerifierStats, - root_epoch: Epoch, - epoch_schedule: EpochSchedule, - epoch_stakes_map: Arc>, - epoch_stakes_queried: Instant, } impl SigVerifier for BLSSigVerifier { @@ -72,10 +59,9 @@ impl SigVerifier for BLSSigVerifier { certificate_message.certificate.slot } }; - let epoch = self.epoch_schedule.get_epoch(slot); - let rank_to_pubkey_map = if let Some(epoch_stakes) = self.epoch_stakes_map.get(&epoch) { - epoch_stakes.bls_pubkey_to_rank_map() - } else { + + let Some(rank_to_pubkey_map) = self.epoch_stakes_service.get_key_to_rank_map(slot) + else { stats_updater.received_no_epoch_stakes += 1; continue; }; @@ -110,46 +96,21 @@ impl SigVerifier for BLSSigVerifier { self.send_verified_votes(verified_votes); self.stats.update(stats_updater); self.stats.maybe_report_stats(); - self.update_epoch_stakes_map(); Ok(()) } } impl BLSSigVerifier { pub fn new( - bank_forks: Arc>, + epoch_stakes_service: Arc, verified_votes_sender: VerifiedVoteSender, message_sender: Sender, ) -> Self { - let mut verifier = Self { - bank_forks, + Self { + epoch_stakes_service, verified_votes_sender, message_sender, stats: BLSSigVerifierStats::new(), - epoch_schedule: EpochSchedule::default(), - epoch_stakes_map: Arc::new(HashMap::new()), - root_epoch: Epoch::default(), - epoch_stakes_queried: Instant::now() - EPOCH_STAKES_QUERY_INTERVAL, - }; - verifier.update_epoch_stakes_map(); - verifier - } - - // TODO(wen): We should maybe create a epoch stakes service so all these objects - // only needing epoch stakes don't need to worry about bank_forks and banks. - fn update_epoch_stakes_map(&mut self) { - if self.epoch_stakes_queried.elapsed() < EPOCH_STAKES_QUERY_INTERVAL { - return; - } - self.epoch_stakes_queried = Instant::now(); - let root_bank = self.bank_forks.read().unwrap().root_bank(); - if self.epoch_stakes_map.is_empty() { - self.epoch_schedule = root_bank.epoch_schedule().clone(); - } - let epoch = root_bank.epoch(); - if self.epoch_stakes_map.is_empty() || epoch > self.root_epoch { - self.epoch_stakes_map = Arc::new(root_bank.epoch_stakes_map().clone()); - self.root_epoch = epoch; } } @@ -181,12 +142,11 @@ mod tests { vote::Vote, }, bitvec::prelude::*, - crossbeam_channel::Receiver, + crossbeam_channel::{unbounded, Receiver}, solana_bls_signatures::Signature, solana_perf::packet::Packet, solana_runtime::{ bank::Bank, - bank_forks::BankForks, genesis_utils::{ create_genesis_config_with_alpenglow_vote_accounts_no_program, ValidatorVoteKeypairs, @@ -194,7 +154,7 @@ mod tests { }, solana_sdk::{hash::Hash, signer::Signer}, stats::STATS_INTERVAL_DURATION, - std::time::Duration, + std::time::{Duration, Instant}, }; fn create_keypairs_and_bls_sig_verifier( @@ -213,11 +173,13 @@ mod tests { &validator_keypairs, stakes_vec, ); - let bank0 = Bank::new_for_tests(&genesis.genesis_config); - let bank_forks = BankForks::new_rw_arc(bank0); + let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); + let epoch = bank.epoch(); + let (_tx, rx) = unbounded(); + let epoch_stakes_service = Arc::new(EpochStakesService::new(bank, epoch, rx)); ( validator_keypairs, - BLSSigVerifier::new(bank_forks, verified_vote_sender, message_sender), + BLSSigVerifier::new(epoch_stakes_service, verified_vote_sender, message_sender), ) } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 37dbdb8cff..2abf0ccdb6 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -42,6 +42,7 @@ use { }, solana_runtime::{ bank_forks::BankForks, + epoch_stakes_service::EpochStakesService, prioritization_fee_cache::PrioritizationFeeCache, root_bank_cache::RootBankCache, vote_sender_types::{ @@ -263,8 +264,13 @@ impl Tpu { }; let alpenglow_sigverify_stage = { + let (tx, rx) = unbounded(); + bank_forks.write().unwrap().register_new_bank_subscriber(tx); + let bank = bank_forks.read().unwrap().root_bank(); + let epoch = bank.epoch(); + let epoch_stakes_service = Arc::new(EpochStakesService::new(bank, epoch, rx)); let verifier = BLSSigVerifier::new( - bank_forks.clone(), + epoch_stakes_service, verified_vote_sender.clone(), bls_verified_message_sender, ); diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index c8f4da6c6f..6303c6584b 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -38,6 +38,7 @@ num-derive = { workspace = true } num-traits = { workspace = true } num_cpus = { workspace = true } num_enum = { workspace = true } +parking_lot = { workspace = true } percentage = { workspace = true } qualifier_attr = { workspace = true } rand = { workspace = true } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index dc33951de6..16fbe65fbf 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -10,7 +10,7 @@ use { }, snapshot_config::SnapshotConfig, }, - crossbeam_channel::SendError, + crossbeam_channel::{SendError, Sender}, log::*, solana_measure::measure::Measure, solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph}, @@ -86,6 +86,8 @@ pub struct BankForks { scheduler_pool: Option, dumped_slot_subscribers: Vec, + /// Tracks subscribers interested in hearing about new `Bank`s. + new_bank_subscribers: Vec>>, } impl Index for BankForks { @@ -137,6 +139,7 @@ impl BankForks { highest_slot_at_startup: 0, scheduler_pool: None, dumped_slot_subscribers: vec![], + new_bank_subscribers: vec![], })); root_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks)); @@ -329,6 +332,24 @@ impl BankForks { self.dumped_slot_subscribers.push(notifier); } + /// Register a new subscriber interested in hearing about new `Bank`s. + pub fn register_new_bank_subscriber(&mut self, tx: Sender>) { + self.new_bank_subscribers.push(tx); + } + + /// Call to notify subscribers of new `Bank`s. + fn notify_new_bank_subscribers(&mut self, root_bank: &Arc) { + let mut channels_to_drop = vec![]; + for (ind, tx) in self.new_bank_subscribers.iter().enumerate() { + if let Err(SendError(_)) = tx.send(root_bank.clone()) { + channels_to_drop.push(ind); + } + } + for ind in channels_to_drop { + self.new_bank_subscribers.remove(ind); + } + } + /// Clears associated banks from BankForks and notifies subscribers that a dump has occured. pub fn dump_slots<'a, I>(&mut self, slots: I) -> (Vec<(Slot, BankId)>, Vec) where @@ -444,6 +465,7 @@ impl BankForks { .unwrap() .node_id_to_vote_accounts() ); + self.notify_new_bank_subscribers(root_bank); } let root_tx_count = root_bank .parents() diff --git a/runtime/src/epoch_stakes_service.rs b/runtime/src/epoch_stakes_service.rs new file mode 100644 index 0000000000..94d630479a --- /dev/null +++ b/runtime/src/epoch_stakes_service.rs @@ -0,0 +1,68 @@ +use { + crate::{ + bank::Bank, + epoch_stakes::{BLSPubkeyToRankMap, EpochStakes}, + }, + crossbeam_channel::Receiver, + log::warn, + parking_lot::RwLock as PlRwLock, + solana_sdk::{ + clock::{Epoch, Slot}, + epoch_schedule::EpochSchedule, + }, + std::{collections::HashMap, sync::Arc, thread}, +}; + +struct State { + stakes: HashMap, + epoch_schedule: EpochSchedule, +} + +impl State { + fn new(bank: Arc) -> Self { + Self { + stakes: bank.epoch_stakes_map().clone(), + epoch_schedule: bank.epoch_schedule().clone(), + } + } +} + +/// A service that regularly updates the epoch stakes state from `Bank`s +/// and exposes various methods to access the state. +pub struct EpochStakesService { + state: Arc>, +} + +impl EpochStakesService { + pub fn new(bank: Arc, epoch: Epoch, new_bank_receiver: Receiver>) -> Self { + let mut prev_epoch = epoch; + let state = Arc::new(PlRwLock::new(State::new(bank))); + { + let state = state.clone(); + thread::spawn(move || loop { + let bank = match new_bank_receiver.recv() { + Ok(b) => b, + Err(e) => { + warn!("recv() returned {e:?}. Exiting."); + break; + } + }; + let new_epoch = bank.epoch(); + if new_epoch > prev_epoch { + prev_epoch = new_epoch; + *state.write() = State::new(bank) + } + }); + } + Self { state } + } + + pub fn get_key_to_rank_map(&self, slot: Slot) -> Option> { + let guard = self.state.read(); + let epoch = guard.epoch_schedule.get_epoch(slot); + guard + .stakes + .get(&epoch) + .map(|stake| Arc::clone(stake.bls_pubkey_to_rank_map())) + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index b5cd9ddf5d..51e36bc32b 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -13,6 +13,7 @@ pub mod bank_hash_cache; pub mod bank_utils; pub mod commitment; pub mod epoch_stakes; +pub mod epoch_stakes_service; pub mod genesis_utils; pub mod inflation_rewards; pub mod installed_scheduler_pool;