Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 17 additions & 55 deletions core/src/sigverifier/bls_sigverifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,18 @@ use {
alpenglow_vote::bls_message::BLSMessage,
crossbeam_channel::{Sender, TrySendError},
solana_pubkey::Pubkey,
solana_runtime::{bank_forks::BankForks, epoch_stakes::EpochStakes},
solana_sdk::{
clock::{Epoch, Slot},
epoch_schedule::EpochSchedule,
},
solana_runtime::epoch_stakes_service::EpochStakesService,
solana_sdk::clock::Slot,
solana_streamer::packet::PacketBatch,
stats::{BLSSigVerifierStats, StatsUpdater},
std::{
collections::HashMap,
sync::{Arc, RwLock},
time::{Duration, Instant},
},
std::{collections::HashMap, sync::Arc},
};

const EPOCH_STAKES_QUERY_INTERVAL: Duration = Duration::from_secs(60);

pub struct BLSSigVerifier {
bank_forks: Arc<RwLock<BankForks>>,
verified_votes_sender: VerifiedVoteSender,
message_sender: Sender<BLSMessage>,
epoch_stakes_service: Arc<EpochStakesService>,
stats: BLSSigVerifierStats,
root_epoch: Epoch,
epoch_schedule: EpochSchedule,
epoch_stakes_map: Arc<HashMap<Epoch, EpochStakes>>,
epoch_stakes_queried: Instant,
}

impl SigVerifier for BLSSigVerifier {
Expand Down Expand Up @@ -72,10 +59,9 @@ impl SigVerifier for BLSSigVerifier {
certificate_message.certificate.slot
}
};
let epoch = self.epoch_schedule.get_epoch(slot);
let rank_to_pubkey_map = if let Some(epoch_stakes) = self.epoch_stakes_map.get(&epoch) {
epoch_stakes.bls_pubkey_to_rank_map()
} else {

let Some(rank_to_pubkey_map) = self.epoch_stakes_service.get_key_to_rank_map(slot)
else {
stats_updater.received_no_epoch_stakes += 1;
continue;
};
Expand Down Expand Up @@ -110,46 +96,21 @@ impl SigVerifier for BLSSigVerifier {
self.send_verified_votes(verified_votes);
self.stats.update(stats_updater);
self.stats.maybe_report_stats();
self.update_epoch_stakes_map();
Ok(())
}
}

impl BLSSigVerifier {
pub fn new(
bank_forks: Arc<RwLock<BankForks>>,
epoch_stakes_service: Arc<EpochStakesService>,
verified_votes_sender: VerifiedVoteSender,
message_sender: Sender<BLSMessage>,
) -> Self {
let mut verifier = Self {
bank_forks,
Self {
epoch_stakes_service,
verified_votes_sender,
message_sender,
stats: BLSSigVerifierStats::new(),
epoch_schedule: EpochSchedule::default(),
epoch_stakes_map: Arc::new(HashMap::new()),
root_epoch: Epoch::default(),
epoch_stakes_queried: Instant::now() - EPOCH_STAKES_QUERY_INTERVAL,
};
verifier.update_epoch_stakes_map();
verifier
}

// TODO(wen): We should maybe create a epoch stakes service so all these objects
// only needing epoch stakes don't need to worry about bank_forks and banks.
fn update_epoch_stakes_map(&mut self) {
if self.epoch_stakes_queried.elapsed() < EPOCH_STAKES_QUERY_INTERVAL {
return;
}
self.epoch_stakes_queried = Instant::now();
let root_bank = self.bank_forks.read().unwrap().root_bank();
if self.epoch_stakes_map.is_empty() {
self.epoch_schedule = root_bank.epoch_schedule().clone();
}
let epoch = root_bank.epoch();
if self.epoch_stakes_map.is_empty() || epoch > self.root_epoch {
self.epoch_stakes_map = Arc::new(root_bank.epoch_stakes_map().clone());
self.root_epoch = epoch;
}
}

Expand Down Expand Up @@ -181,20 +142,19 @@ mod tests {
vote::Vote,
},
bitvec::prelude::*,
crossbeam_channel::Receiver,
crossbeam_channel::{unbounded, Receiver},
solana_bls_signatures::Signature,
solana_perf::packet::Packet,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{
create_genesis_config_with_alpenglow_vote_accounts_no_program,
ValidatorVoteKeypairs,
},
},
solana_sdk::{hash::Hash, signer::Signer},
stats::STATS_INTERVAL_DURATION,
std::time::Duration,
std::time::{Duration, Instant},
};

fn create_keypairs_and_bls_sig_verifier(
Expand All @@ -213,11 +173,13 @@ mod tests {
&validator_keypairs,
stakes_vec,
);
let bank0 = Bank::new_for_tests(&genesis.genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);
let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
let epoch = bank.epoch();
let (_tx, rx) = unbounded();
let epoch_stakes_service = Arc::new(EpochStakesService::new(bank, epoch, rx));
(
validator_keypairs,
BLSSigVerifier::new(bank_forks, verified_vote_sender, message_sender),
BLSSigVerifier::new(epoch_stakes_service, verified_vote_sender, message_sender),
)
}

Expand Down
8 changes: 7 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use {
},
solana_runtime::{
bank_forks::BankForks,
epoch_stakes_service::EpochStakesService,
prioritization_fee_cache::PrioritizationFeeCache,
root_bank_cache::RootBankCache,
vote_sender_types::{
Expand Down Expand Up @@ -263,8 +264,13 @@ impl Tpu {
};

let alpenglow_sigverify_stage = {
let (tx, rx) = unbounded();
bank_forks.write().unwrap().register_new_bank_subscriber(tx);
let bank = bank_forks.read().unwrap().root_bank();
let epoch = bank.epoch();
let epoch_stakes_service = Arc::new(EpochStakesService::new(bank, epoch, rx));
let verifier = BLSSigVerifier::new(
bank_forks.clone(),
epoch_stakes_service,
verified_vote_sender.clone(),
bls_verified_message_sender,
);
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ num-derive = { workspace = true }
num-traits = { workspace = true }
num_cpus = { workspace = true }
num_enum = { workspace = true }
parking_lot = { workspace = true }
percentage = { workspace = true }
qualifier_attr = { workspace = true }
rand = { workspace = true }
Expand Down
24 changes: 23 additions & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
},
snapshot_config::SnapshotConfig,
},
crossbeam_channel::SendError,
crossbeam_channel::{SendError, Sender},
log::*,
solana_measure::measure::Measure,
solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph},
Expand Down Expand Up @@ -86,6 +86,8 @@ pub struct BankForks {
scheduler_pool: Option<InstalledSchedulerPoolArc>,

dumped_slot_subscribers: Vec<DumpedSlotSubscription>,
/// Tracks subscribers interested in hearing about new `Bank`s.
new_bank_subscribers: Vec<Sender<Arc<Bank>>>,
}

impl Index<u64> for BankForks {
Expand Down Expand Up @@ -137,6 +139,7 @@ impl BankForks {
highest_slot_at_startup: 0,
scheduler_pool: None,
dumped_slot_subscribers: vec![],
new_bank_subscribers: vec![],
}));

root_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
Expand Down Expand Up @@ -329,6 +332,24 @@ impl BankForks {
self.dumped_slot_subscribers.push(notifier);
}

/// Register a new subscriber interested in hearing about new `Bank`s.
pub fn register_new_bank_subscriber(&mut self, tx: Sender<Arc<Bank>>) {
self.new_bank_subscribers.push(tx);
}

/// Call to notify subscribers of new `Bank`s.
fn notify_new_bank_subscribers(&mut self, root_bank: &Arc<Bank>) {
let mut channels_to_drop = vec![];
for (ind, tx) in self.new_bank_subscribers.iter().enumerate() {
if let Err(SendError(_)) = tx.send(root_bank.clone()) {
channels_to_drop.push(ind);
}
}
for ind in channels_to_drop {
self.new_bank_subscribers.remove(ind);
}
}

/// Clears associated banks from BankForks and notifies subscribers that a dump has occured.
pub fn dump_slots<'a, I>(&mut self, slots: I) -> (Vec<(Slot, BankId)>, Vec<BankWithScheduler>)
where
Expand Down Expand Up @@ -444,6 +465,7 @@ impl BankForks {
.unwrap()
.node_id_to_vote_accounts()
);
self.notify_new_bank_subscribers(root_bank);
}
let root_tx_count = root_bank
.parents()
Expand Down
68 changes: 68 additions & 0 deletions runtime/src/epoch_stakes_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use {
crate::{
bank::Bank,
epoch_stakes::{BLSPubkeyToRankMap, EpochStakes},
},
crossbeam_channel::Receiver,
log::warn,
parking_lot::RwLock as PlRwLock,
solana_sdk::{
clock::{Epoch, Slot},
epoch_schedule::EpochSchedule,
},
std::{collections::HashMap, sync::Arc, thread},
};

struct State {
stakes: HashMap<Epoch, EpochStakes>,
epoch_schedule: EpochSchedule,
}

impl State {
fn new(bank: Arc<Bank>) -> Self {
Self {
stakes: bank.epoch_stakes_map().clone(),
epoch_schedule: bank.epoch_schedule().clone(),
}
}
}

/// A service that regularly updates the epoch stakes state from `Bank`s
/// and exposes various methods to access the state.
pub struct EpochStakesService {
state: Arc<PlRwLock<State>>,
}

impl EpochStakesService {
pub fn new(bank: Arc<Bank>, epoch: Epoch, new_bank_receiver: Receiver<Arc<Bank>>) -> Self {
let mut prev_epoch = epoch;
let state = Arc::new(PlRwLock::new(State::new(bank)));
{
let state = state.clone();
thread::spawn(move || loop {
let bank = match new_bank_receiver.recv() {
Ok(b) => b,
Err(e) => {
warn!("recv() returned {e:?}. Exiting.");
break;
}
};
let new_epoch = bank.epoch();
if new_epoch > prev_epoch {
prev_epoch = new_epoch;
*state.write() = State::new(bank)
}
});
}
Self { state }
}

pub fn get_key_to_rank_map(&self, slot: Slot) -> Option<Arc<BLSPubkeyToRankMap>> {
let guard = self.state.read();
let epoch = guard.epoch_schedule.get_epoch(slot);
guard
.stakes
.get(&epoch)
.map(|stake| Arc::clone(stake.bls_pubkey_to_rank_map()))
}
}
1 change: 1 addition & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod bank_hash_cache;
pub mod bank_utils;
pub mod commitment;
pub mod epoch_stakes;
pub mod epoch_stakes_service;
pub mod genesis_utils;
pub mod inflation_rewards;
pub mod installed_scheduler_pool;
Expand Down