-
Notifications
You must be signed in to change notification settings - Fork 33
WIP: generating rewards certs and adding them to the block footer #582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
f10419e
d6ae39e
0e05f62
bbe27dd
ddec9ae
1785420
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ use { | |
| banking_trace::BankingTracer, | ||
| replay_stage::{Finalizer, ReplayStage}, | ||
| }, | ||
| parking_lot::RwLock as PLRwLock, | ||
| solana_clock::Slot, | ||
| solana_entry::block_component::{ | ||
| BlockFooterV1, BlockMarkerV1, VersionedBlockFooter, VersionedBlockMarker, | ||
|
|
@@ -31,8 +32,14 @@ use { | |
| bank_forks::BankForks, | ||
| }, | ||
| solana_version::version, | ||
| solana_votor::{common::block_timeout, event::LeaderWindowInfo, votor::LeaderWindowNotifier}, | ||
| solana_votor_messages::migration::MigrationStatus, | ||
| solana_votor::{ | ||
| common::block_timeout, consensus_rewards::ConsensusRewards, event::LeaderWindowInfo, | ||
| votor::LeaderWindowNotifier, | ||
| }, | ||
| solana_votor_messages::{ | ||
| migration::MigrationStatus, | ||
| rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate}, | ||
| }, | ||
| stats::{BlockCreationLoopMetrics, SlotMetrics}, | ||
| std::{ | ||
| sync::{ | ||
|
|
@@ -79,6 +86,7 @@ pub struct BlockCreationLoopConfig { | |
| pub poh_recorder: Arc<RwLock<PohRecorder>>, | ||
| pub leader_schedule_cache: Arc<LeaderScheduleCache>, | ||
| pub rpc_subscriptions: Option<Arc<RpcSubscriptions>>, | ||
| pub consensus_rewards: Arc<PLRwLock<ConsensusRewards>>, | ||
|
|
||
| // Notifiers | ||
| pub banking_tracer: Arc<BankingTracer>, | ||
|
|
@@ -104,6 +112,7 @@ struct LeaderContext { | |
| slot_status_notifier: Option<SlotStatusNotifier>, | ||
| banking_tracer: Arc<BankingTracer>, | ||
| replay_highest_frozen: Arc<ReplayHighestFrozen>, | ||
| consensus_rewards: Arc<PLRwLock<ConsensusRewards>>, | ||
|
|
||
| // Metrics | ||
| metrics: BlockCreationLoopMetrics, | ||
|
|
@@ -134,9 +143,15 @@ enum StartLeaderError { | |
| ), | ||
| } | ||
|
|
||
| fn produce_block_footer(block_producer_time_nanos: u64) -> VersionedBlockMarker { | ||
| fn produce_block_footer( | ||
| block_producer_time_nanos: u64, | ||
| skip_reward_certificate: Option<SkipRewardCertificate>, | ||
| notar_reward_certificate: Option<NotarRewardCertificate>, | ||
| ) -> VersionedBlockMarker { | ||
| let footer = BlockFooterV1 { | ||
| block_producer_time_nanos, | ||
| skip_reward_certificate, | ||
| notar_reward_certificate, | ||
| block_user_agent: format!("agave/{}", version!()).into_bytes(), | ||
| }; | ||
|
|
||
|
|
@@ -166,6 +181,7 @@ fn start_loop(config: BlockCreationLoopConfig) { | |
| leader_window_notifier, | ||
| replay_highest_frozen, | ||
| migration_status, | ||
| consensus_rewards, | ||
| } = config; | ||
|
|
||
| // Similar to Votor, if this loop dies kill the validator | ||
|
|
@@ -189,6 +205,7 @@ fn start_loop(config: BlockCreationLoopConfig) { | |
| replay_highest_frozen, | ||
| metrics: BlockCreationLoopMetrics::default(), | ||
| slot_metrics: SlotMetrics::default(), | ||
| consensus_rewards, | ||
| }; | ||
|
|
||
| info!("{my_pubkey}: Block creation loop initialized"); | ||
|
|
@@ -301,6 +318,7 @@ fn produce_window( | |
| if let Err(e) = record_and_complete_block( | ||
| ctx.poh_recorder.as_ref(), | ||
| &mut ctx.record_receiver, | ||
| ctx.consensus_rewards.clone(), | ||
| skip_timer, | ||
| timeout, | ||
| ) { | ||
|
|
@@ -342,9 +360,17 @@ fn produce_window( | |
| fn record_and_complete_block( | ||
| poh_recorder: &RwLock<PohRecorder>, | ||
| record_receiver: &mut RecordReceiver, | ||
| consensus_rewards: Arc<PLRwLock<ConsensusRewards>>, | ||
| block_timer: Instant, | ||
| block_timeout: Duration, | ||
| ) -> Result<(), PohRecorderError> { | ||
| // Taking a read lock on consensus_rewards can contend with the write lock in bls_sigverifier. | ||
|
||
| // We are ready to produce the block footer now, while we gather other bits of data, we can block on the consensus_rewards lock in a separate thread to minimise contention. | ||
| let handle = std::thread::spawn(move || { | ||
| // XXX: how to look up the slot. | ||
| let slot = u64::MAX; | ||
| consensus_rewards.read().build_rewards_certs(slot) | ||
| }); | ||
|
||
| loop { | ||
| let remaining_slot_time = block_timeout.saturating_sub(block_timer.elapsed()); | ||
| if remaining_slot_time.is_zero() { | ||
|
|
@@ -378,7 +404,8 @@ fn record_and_complete_block( | |
| // Construct and send the block footer | ||
| let mut w_poh_recorder = poh_recorder.write().unwrap(); | ||
| let block_producer_time_nanos = w_poh_recorder.working_bank_block_producer_time_nanos(); | ||
| let footer = produce_block_footer(block_producer_time_nanos); | ||
| let (skip, notar) = handle.join().unwrap(); | ||
| let footer = produce_block_footer(block_producer_time_nanos, skip, notar); | ||
| w_poh_recorder.send_marker(footer)?; | ||
|
|
||
| // Alpentick and clear bank | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ use { | |
| }, | ||
| bitvec::prelude::{BitVec, Lsb0}, | ||
| crossbeam_channel::{Sender, TrySendError}, | ||
| parking_lot::RwLock as PLRwLock, | ||
| rayon::iter::{ | ||
| IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator, | ||
| }, | ||
|
|
@@ -23,7 +24,10 @@ use { | |
| solana_runtime::{bank::Bank, bank_forks::SharableBanks, epoch_stakes::BLSPubkeyToRankMap}, | ||
| solana_signer_store::{decode, DecodeError}, | ||
| solana_streamer::packet::PacketBatch, | ||
| solana_votor::consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender}, | ||
| solana_votor::{ | ||
| consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender}, | ||
| consensus_rewards::ConsensusRewards, | ||
| }, | ||
| solana_votor_messages::{ | ||
| consensus_message::{Certificate, CertificateType, ConsensusMessage, VoteMessage}, | ||
| vote::Vote, | ||
|
|
@@ -92,6 +96,7 @@ pub struct BLSSigVerifier { | |
| consensus_metrics_sender: ConsensusMetricsEventSender, | ||
| last_checked_root_slot: Slot, | ||
| alpenglow_last_voted: Arc<AlpenglowLastVoted>, | ||
| consensus_rewards: Arc<PLRwLock<ConsensusRewards>>, | ||
| } | ||
|
|
||
| impl BLSSigVerifier { | ||
|
|
@@ -172,8 +177,17 @@ impl BLSSigVerifier { | |
| id: *solana_pubkey, | ||
| vote: vote_message.vote, | ||
| }); | ||
| // Only need votes newer than root slot | ||
|
|
||
| // consensus pool does not need votes for slots other than root slot however the rewards container may still need them. | ||
| if vote_message.vote.slot() <= root_bank.slot() { | ||
| // the only module that takes a write lock on consensus_rewards is this one and it does not take the write lock while it is verifying votes so this should never block. | ||
|
||
| if self | ||
| .consensus_rewards | ||
| .read() | ||
| .wants_vote(root_bank.slot(), &vote_message) | ||
| { | ||
| // XXX: actually verify and send the votes. The verification and sending should happen off the critical path. | ||
| } | ||
| self.stats.received_old.fetch_add(1, Ordering::Relaxed); | ||
| packet.meta_mut().set_discard(true); | ||
| continue; | ||
|
|
@@ -219,8 +233,8 @@ impl BLSSigVerifier { | |
| || self.verify_and_send_certificates(certs_to_verify, &root_bank), | ||
| ); | ||
|
|
||
| votes_result?; | ||
| certs_result?; | ||
| let rewards_votes = votes_result?; | ||
| let () = certs_result?; | ||
|
|
||
| // Send to RPC service for last voted tracking | ||
| self.alpenglow_last_voted | ||
|
|
@@ -235,6 +249,17 @@ impl BLSSigVerifier { | |
| warn!("could not send consensus metrics, receive side of channel is closed"); | ||
| } | ||
|
|
||
| { | ||
| // This should be the only place that is taking a write lock on consensus_rewards. | ||
| // This lock should not contend with any other operations in this module. | ||
| // It can contend with the read lock in the block creation loop though as such we should hold it for as little time as possible. | ||
| let mut guard = self.consensus_rewards.write(); | ||
| let root_slot = root_bank.slot(); | ||
| for v in rewards_votes { | ||
| guard.add_vote_message(root_slot, v); | ||
| } | ||
| } | ||
|
|
||
| self.stats.maybe_report_stats(); | ||
|
|
||
| Ok(()) | ||
|
|
@@ -248,6 +273,7 @@ impl BLSSigVerifier { | |
| message_sender: Sender<ConsensusMessage>, | ||
| consensus_metrics_sender: ConsensusMetricsEventSender, | ||
| alpenglow_last_voted: Arc<AlpenglowLastVoted>, | ||
| consensus_rewards: Arc<PLRwLock<ConsensusRewards>>, | ||
| ) -> Self { | ||
| Self { | ||
| sharable_banks, | ||
|
|
@@ -259,14 +285,32 @@ impl BLSSigVerifier { | |
| consensus_metrics_sender, | ||
| last_checked_root_slot: 0, | ||
| alpenglow_last_voted, | ||
| consensus_rewards, | ||
| } | ||
| } | ||
|
|
||
| /// Verifies votes and sends verified votes to the consensus pool. | ||
| /// Also returns a copy of the verified votes that the rewards container is interested is so that the caller can send them to it. | ||
| fn verify_and_send_votes( | ||
| &self, | ||
| votes_to_verify: Vec<VoteToVerify>, | ||
| ) -> Result<(), BLSSigVerifyServiceError<ConsensusMessage>> { | ||
| ) -> Result<Vec<VoteMessage>, BLSSigVerifyServiceError<ConsensusMessage>> { | ||
| let verified_votes = self.verify_votes(votes_to_verify); | ||
|
|
||
| let rewards_votes = { | ||
| // the only module that takes a write lock on consensus_rewards is this one and it does not take the write lock while it is verifying votes so this should never block. | ||
| let guard = self.consensus_rewards.read(); | ||
| let root_slot = self.sharable_banks.root().slot(); | ||
| verified_votes | ||
| .iter() | ||
| .filter_map(|vote| { | ||
| guard | ||
| .wants_vote(root_slot, &vote.vote_message) | ||
| .then_some(vote.vote_message.clone()) | ||
| }) | ||
| .collect() | ||
| }; | ||
|
|
||
| self.stats | ||
| .total_valid_packets | ||
| .fetch_add(verified_votes.len() as u64, Ordering::Relaxed); | ||
|
|
@@ -285,7 +329,7 @@ impl BLSSigVerifier { | |
| } | ||
| } | ||
|
|
||
| // Send the BLS vote messaage to certificate pool | ||
| // Send the votes to the consensus pool | ||
| match self | ||
| .message_sender | ||
| .try_send(ConsensusMessage::Vote(vote.vote_message)) | ||
|
|
@@ -319,7 +363,7 @@ impl BLSSigVerifier { | |
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| Ok(rewards_votes) | ||
| } | ||
|
|
||
| fn verify_votes(&self, votes_to_verify: Vec<VoteToVerify>) -> Vec<VoteToVerify> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this over
std::sync::RwLock?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of reasons: