Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 13 additions & 51 deletions core/src/sigverifier/bls_sigverifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,18 @@ use {
alpenglow_vote::bls_message::BLSMessage,
crossbeam_channel::{Sender, TrySendError},
solana_pubkey::Pubkey,
solana_runtime::{bank_forks::BankForks, epoch_stakes::EpochStakes},
solana_sdk::{
clock::{Epoch, Slot},
epoch_schedule::EpochSchedule,
},
solana_runtime::epoch_stakes_service::EpochStakesService,
solana_sdk::clock::Slot,
solana_streamer::packet::PacketBatch,
stats::{BLSSigVerifierStats, StatsUpdater},
std::{
collections::HashMap,
sync::{Arc, RwLock},
time::{Duration, Instant},
},
std::{collections::HashMap, sync::Arc},
};

const EPOCH_STAKES_QUERY_INTERVAL: Duration = Duration::from_secs(60);

pub struct BLSSigVerifier {
bank_forks: Arc<RwLock<BankForks>>,
verified_votes_sender: VerifiedVoteSender,
message_sender: Sender<BLSMessage>,
epoch_stakes_service: Arc<EpochStakesService>,
stats: BLSSigVerifierStats,
root_epoch: Epoch,
epoch_schedule: EpochSchedule,
epoch_stakes_map: Arc<HashMap<Epoch, EpochStakes>>,
epoch_stakes_queried: Instant,
}

impl SigVerifier for BLSSigVerifier {
Expand Down Expand Up @@ -72,10 +59,9 @@ impl SigVerifier for BLSSigVerifier {
certificate_message.certificate.slot
}
};
let epoch = self.epoch_schedule.get_epoch(slot);
let rank_to_pubkey_map = if let Some(epoch_stakes) = self.epoch_stakes_map.get(&epoch) {
epoch_stakes.bls_pubkey_to_rank_map()
} else {

let Some(rank_to_pubkey_map) = self.epoch_stakes_service.get_key_to_rank_map(slot)
else {
stats_updater.received_no_epoch_stakes += 1;
continue;
};
Expand Down Expand Up @@ -110,46 +96,21 @@ impl SigVerifier for BLSSigVerifier {
self.send_verified_votes(verified_votes);
self.stats.update(stats_updater);
self.stats.maybe_report_stats();
self.update_epoch_stakes_map();
Ok(())
}
}

impl BLSSigVerifier {
pub fn new(
bank_forks: Arc<RwLock<BankForks>>,
epoch_stakes_service: Arc<EpochStakesService>,
verified_votes_sender: VerifiedVoteSender,
message_sender: Sender<BLSMessage>,
) -> Self {
let mut verifier = Self {
bank_forks,
Self {
epoch_stakes_service,
verified_votes_sender,
message_sender,
stats: BLSSigVerifierStats::new(),
epoch_schedule: EpochSchedule::default(),
epoch_stakes_map: Arc::new(HashMap::new()),
root_epoch: Epoch::default(),
epoch_stakes_queried: Instant::now() - EPOCH_STAKES_QUERY_INTERVAL,
};
verifier.update_epoch_stakes_map();
verifier
}

// TODO(wen): We should maybe create a epoch stakes service so all these objects
// only needing epoch stakes don't need to worry about bank_forks and banks.
fn update_epoch_stakes_map(&mut self) {
if self.epoch_stakes_queried.elapsed() < EPOCH_STAKES_QUERY_INTERVAL {
return;
}
self.epoch_stakes_queried = Instant::now();
let root_bank = self.bank_forks.read().unwrap().root_bank();
if self.epoch_stakes_map.is_empty() {
self.epoch_schedule = root_bank.epoch_schedule().clone();
}
let epoch = root_bank.epoch();
if self.epoch_stakes_map.is_empty() || epoch > self.root_epoch {
self.epoch_stakes_map = Arc::new(root_bank.epoch_stakes_map().clone());
self.root_epoch = epoch;
}
}

Expand Down Expand Up @@ -194,7 +155,7 @@ mod tests {
},
solana_sdk::{hash::Hash, signer::Signer},
stats::STATS_INTERVAL_DURATION,
std::time::Duration,
std::time::{Duration, Instant},
};

fn create_keypairs_and_bls_sig_verifier(
Expand All @@ -215,9 +176,10 @@ mod tests {
);
let bank0 = Bank::new_for_tests(&genesis.genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);
let epoch_stakes_service = Arc::new(EpochStakesService::new(bank_forks));
(
validator_keypairs,
BLSSigVerifier::new(bank_forks, verified_vote_sender, message_sender),
BLSSigVerifier::new(epoch_stakes_service, verified_vote_sender, message_sender),
)
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use {
},
solana_runtime::{
bank_forks::BankForks,
epoch_stakes_service::EpochStakesService,
prioritization_fee_cache::PrioritizationFeeCache,
root_bank_cache::RootBankCache,
vote_sender_types::{
Expand Down Expand Up @@ -263,8 +264,9 @@ impl Tpu {
};

let alpenglow_sigverify_stage = {
let epoch_stakes_service = Arc::new(EpochStakesService::new(bank_forks.clone()));
let verifier = BLSSigVerifier::new(
bank_forks.clone(),
epoch_stakes_service,
verified_vote_sender.clone(),
bls_verified_message_sender,
);
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ num-derive = { workspace = true }
num-traits = { workspace = true }
num_cpus = { workspace = true }
num_enum = { workspace = true }
parking_lot = { workspace = true }
percentage = { workspace = true }
qualifier_attr = { workspace = true }
rand = { workspace = true }
Expand Down
80 changes: 80 additions & 0 deletions runtime/src/epoch_stakes_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use {
crate::{
bank_forks::BankForks,
epoch_stakes::{BLSPubkeyToRankMap, EpochStakes},
},
parking_lot::RwLock as PlRwLock,
solana_sdk::{
clock::{Epoch, Slot},
epoch_schedule::EpochSchedule,
},
std::{
collections::HashMap,
sync::{Arc, RwLock as StdRwLock},
thread,
time::Duration,
},
};

const EPOCH_STAKES_QUERY_INTERVAL: Duration = Duration::from_secs(60);

struct State {
stakes: HashMap<Epoch, EpochStakes>,
epoch_schedule: EpochSchedule,
}

/// Updates the epoch stakes state from the bank forks.
///
/// If new state was found, returns the corresponding root epoch and the state.
fn update_state(bank_forks: &Arc<StdRwLock<BankForks>>, epoch: Epoch) -> Option<(Epoch, State)> {
let root_bank = bank_forks.read().unwrap().root_bank();
let root_epoch = root_bank.epoch();
(root_epoch > epoch).then(|| {
(
root_epoch,
State {
stakes: root_bank.epoch_stakes_map().clone(),
epoch_schedule: root_bank.epoch_schedule().clone(),
},
)
})
}

/// A service that regularly updates the epoch stakes state from the bank forks
/// and exposes various methods to access the state.
pub struct EpochStakesService {
state: Arc<PlRwLock<State>>,
}

impl EpochStakesService {
pub fn new(bank_forks: Arc<StdRwLock<BankForks>>) -> Self {
let mut last_update_epoch = Epoch::default();
let state = Arc::new(PlRwLock::new(State {
stakes: HashMap::new(),
epoch_schedule: EpochSchedule::default(),
}));

{
let state = state.clone();
thread::spawn(move || loop {
thread::sleep(EPOCH_STAKES_QUERY_INTERVAL);
if let Some((new_update_epoch, update)) =
update_state(&bank_forks, last_update_epoch)
{
last_update_epoch = new_update_epoch;
*state.write() = update;
}
});
}
Self { state }
}

pub fn get_key_to_rank_map(&self, slot: Slot) -> Option<Arc<BLSPubkeyToRankMap>> {
let guard = self.state.read();
let epoch = guard.epoch_schedule.get_epoch(slot);
guard
.stakes
.get(&epoch)
.map(|stake| Arc::clone(stake.bls_pubkey_to_rank_map()))
}
}
1 change: 1 addition & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod bank_hash_cache;
pub mod bank_utils;
pub mod commitment;
pub mod epoch_stakes;
pub mod epoch_stakes_service;
pub mod genesis_utils;
pub mod inflation_rewards;
pub mod installed_scheduler_pool;
Expand Down