Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 17 additions & 52 deletions core/src/sigverifier/bls_sigverifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,18 @@ use {
alpenglow_vote::bls_message::BLSMessage,
crossbeam_channel::{Sender, TrySendError},
solana_pubkey::Pubkey,
solana_runtime::{bank_forks::BankForks, epoch_stakes::EpochStakes},
solana_sdk::{
clock::{Epoch, Slot},
epoch_schedule::EpochSchedule,
},
solana_runtime::epoch_stakes_service::EpochStakesService,
solana_sdk::clock::Slot,
solana_streamer::packet::PacketBatch,
stats::{BLSSigVerifierStats, StatsUpdater},
std::{
collections::HashMap,
sync::{Arc, RwLock},
time::{Duration, Instant},
},
std::{collections::HashMap, sync::Arc},
};

const EPOCH_STAKES_QUERY_INTERVAL: Duration = Duration::from_secs(60);

pub struct BLSSigVerifier {
bank_forks: Arc<RwLock<BankForks>>,
verified_votes_sender: VerifiedVoteSender,
message_sender: Sender<BLSMessage>,
epoch_stakes_service: Arc<EpochStakesService>,
stats: BLSSigVerifierStats,
root_epoch: Epoch,
epoch_schedule: EpochSchedule,
epoch_stakes_map: Arc<HashMap<Epoch, EpochStakes>>,
epoch_stakes_queried: Instant,
}

impl SigVerifier for BLSSigVerifier {
Expand Down Expand Up @@ -72,10 +59,9 @@ impl SigVerifier for BLSSigVerifier {
certificate_message.certificate.slot
}
};
let epoch = self.epoch_schedule.get_epoch(slot);
let rank_to_pubkey_map = if let Some(epoch_stakes) = self.epoch_stakes_map.get(&epoch) {
epoch_stakes.bls_pubkey_to_rank_map()
} else {

let Some(rank_to_pubkey_map) = self.epoch_stakes_service.get_key_to_rank_map(slot)
else {
stats_updater.received_no_epoch_stakes += 1;
continue;
};
Expand Down Expand Up @@ -110,46 +96,21 @@ impl SigVerifier for BLSSigVerifier {
self.send_verified_votes(verified_votes);
self.stats.update(stats_updater);
self.stats.maybe_report_stats();
self.update_epoch_stakes_map();
Ok(())
}
}

impl BLSSigVerifier {
pub fn new(
bank_forks: Arc<RwLock<BankForks>>,
epoch_stakes_service: Arc<EpochStakesService>,
verified_votes_sender: VerifiedVoteSender,
message_sender: Sender<BLSMessage>,
) -> Self {
let mut verifier = Self {
bank_forks,
Self {
epoch_stakes_service,
verified_votes_sender,
message_sender,
stats: BLSSigVerifierStats::new(),
epoch_schedule: EpochSchedule::default(),
epoch_stakes_map: Arc::new(HashMap::new()),
root_epoch: Epoch::default(),
epoch_stakes_queried: Instant::now() - EPOCH_STAKES_QUERY_INTERVAL,
};
verifier.update_epoch_stakes_map();
verifier
}

// TODO(wen): We should maybe create a epoch stakes service so all these objects
// only needing epoch stakes don't need to worry about bank_forks and banks.
fn update_epoch_stakes_map(&mut self) {
if self.epoch_stakes_queried.elapsed() < EPOCH_STAKES_QUERY_INTERVAL {
return;
}
self.epoch_stakes_queried = Instant::now();
let root_bank = self.bank_forks.read().unwrap().root_bank();
if self.epoch_stakes_map.is_empty() {
self.epoch_schedule = root_bank.epoch_schedule().clone();
}
let epoch = root_bank.epoch();
if self.epoch_stakes_map.is_empty() || epoch > self.root_epoch {
self.epoch_stakes_map = Arc::new(root_bank.epoch_stakes_map().clone());
self.root_epoch = epoch;
}
}

Expand Down Expand Up @@ -181,7 +142,7 @@ mod tests {
vote::Vote,
},
bitvec::prelude::*,
crossbeam_channel::Receiver,
crossbeam_channel::{unbounded, Receiver},
solana_bls_signatures::Signature,
solana_perf::packet::Packet,
solana_runtime::{
Expand All @@ -191,10 +152,11 @@ mod tests {
create_genesis_config_with_alpenglow_vote_accounts_no_program,
ValidatorVoteKeypairs,
},
root_bank_cache::RootBankCache,
},
solana_sdk::{hash::Hash, signer::Signer},
stats::STATS_INTERVAL_DURATION,
std::time::Duration,
std::time::{Duration, Instant},
};

fn create_keypairs_and_bls_sig_verifier(
Expand All @@ -215,9 +177,12 @@ mod tests {
);
let bank0 = Bank::new_for_tests(&genesis.genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);
let root_bank_cache = RootBankCache::new(bank_forks);
let (_tx, rx) = unbounded();
let epoch_stakes_service = Arc::new(EpochStakesService::new(rx, root_bank_cache));
(
validator_keypairs,
BLSSigVerifier::new(bank_forks, verified_vote_sender, message_sender),
BLSSigVerifier::new(epoch_stakes_service, verified_vote_sender, message_sender),
)
}

Expand Down
10 changes: 9 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use {
},
solana_runtime::{
bank_forks::BankForks,
epoch_stakes_service::EpochStakesService,
prioritization_fee_cache::PrioritizationFeeCache,
root_bank_cache::RootBankCache,
vote_sender_types::{
Expand Down Expand Up @@ -263,8 +264,15 @@ impl Tpu {
};

let alpenglow_sigverify_stage = {
let root_bank_cache = RootBankCache::new(bank_forks.clone());
let (tx, rx) = unbounded();
bank_forks
.write()
.unwrap()
.register_new_epoch_subscriber(tx);
let epoch_stakes_service = Arc::new(EpochStakesService::new(rx, root_bank_cache));
let verifier = BLSSigVerifier::new(
bank_forks.clone(),
epoch_stakes_service,
verified_vote_sender.clone(),
bls_verified_message_sender,
);
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ num-derive = { workspace = true }
num-traits = { workspace = true }
num_cpus = { workspace = true }
num_enum = { workspace = true }
parking_lot = { workspace = true }
percentage = { workspace = true }
qualifier_attr = { workspace = true }
rand = { workspace = true }
Expand Down
24 changes: 23 additions & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
},
snapshot_config::SnapshotConfig,
},
crossbeam_channel::SendError,
crossbeam_channel::{SendError, Sender},
log::*,
solana_measure::measure::Measure,
solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph},
Expand Down Expand Up @@ -86,6 +86,8 @@ pub struct BankForks {
scheduler_pool: Option<InstalledSchedulerPoolArc>,

dumped_slot_subscribers: Vec<DumpedSlotSubscription>,
/// Tracks subscribers interested in hearing about new epochs.
new_epoch_subscribers: Vec<Sender<()>>,
}

impl Index<u64> for BankForks {
Expand Down Expand Up @@ -137,6 +139,7 @@ impl BankForks {
highest_slot_at_startup: 0,
scheduler_pool: None,
dumped_slot_subscribers: vec![],
new_epoch_subscribers: vec![],
}));

root_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
Expand Down Expand Up @@ -329,6 +332,24 @@ impl BankForks {
self.dumped_slot_subscribers.push(notifier);
}

/// Register a new subscriber interested in hearing about new epochs.
pub fn register_new_epoch_subscriber(&mut self, tx: Sender<()>) {
self.new_epoch_subscribers.push(tx);
}

/// Call to notify subscribers of new epochs.
fn notify_new_epoch_subscribers(&mut self) {
let mut channels_to_drop = vec![];
for (ind, tx) in self.new_epoch_subscribers.iter().enumerate() {
if let Err(SendError(())) = tx.send(()) {
channels_to_drop.push(ind);
}
}
for ind in channels_to_drop {
self.new_epoch_subscribers.remove(ind);
}
}

/// Clears associated banks from BankForks and notifies subscribers that a dump has occured.
pub fn dump_slots<'a, I>(&mut self, slots: I) -> (Vec<(Slot, BankId)>, Vec<BankWithScheduler>)
where
Expand Down Expand Up @@ -444,6 +465,7 @@ impl BankForks {
.unwrap()
.node_id_to_vote_accounts()
);
self.notify_new_epoch_subscribers();
}
let root_tx_count = root_bank
.parents()
Expand Down
72 changes: 72 additions & 0 deletions runtime/src/epoch_stakes_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use {
crate::{
epoch_stakes::{BLSPubkeyToRankMap, EpochStakes},
root_bank_cache::RootBankCache,
},
crossbeam_channel::Receiver,
parking_lot::RwLock as PlRwLock,
solana_sdk::{
clock::{Epoch, Slot},
epoch_schedule::EpochSchedule,
},
std::{collections::HashMap, sync::Arc, thread},
};

struct State {
stakes: HashMap<Epoch, EpochStakes>,
epoch_schedule: EpochSchedule,
}

/// Updates the epoch stakes state from the bank forks.
///
/// If new state was found, returns the corresponding root epoch and the state.
fn update_state(root_bank_cache: &mut RootBankCache, epoch: Epoch) -> Option<(Epoch, State)> {
let root_bank = root_bank_cache.root_bank();
let root_epoch = root_bank.epoch();
(root_epoch > epoch).then(|| {
(
root_epoch,
State {
stakes: root_bank.epoch_stakes_map().clone(),
epoch_schedule: root_bank.epoch_schedule().clone(),
},
)
})
}

/// A service that regularly updates the epoch stakes state from the bank forks
/// and exposes various methods to access the state.
pub struct EpochStakesService {
state: Arc<PlRwLock<State>>,
}

impl EpochStakesService {
pub fn new(new_epoch_receiver: Receiver<()>, mut root_bank_cache: RootBankCache) -> Self {
let mut prev_epoch = Epoch::default();
let state = Arc::new(PlRwLock::new(State {
stakes: HashMap::new(),
epoch_schedule: EpochSchedule::default(),
}));

{
let state = state.clone();
thread::spawn(move || loop {
let () = new_epoch_receiver.recv().unwrap();
if let Some((new_epoch, update)) = update_state(&mut root_bank_cache, prev_epoch) {
prev_epoch = new_epoch;
*state.write() = update;
}
});
}
Self { state }
}

pub fn get_key_to_rank_map(&self, slot: Slot) -> Option<Arc<BLSPubkeyToRankMap>> {
let guard = self.state.read();
let epoch = guard.epoch_schedule.get_epoch(slot);
guard
.stakes
.get(&epoch)
.map(|stake| Arc::clone(stake.bls_pubkey_to_rank_map()))
}
}
1 change: 1 addition & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod bank_hash_cache;
pub mod bank_utils;
pub mod commitment;
pub mod epoch_stakes;
pub mod epoch_stakes_service;
pub mod genesis_utils;
pub mod inflation_rewards;
pub mod installed_scheduler_pool;
Expand Down