Skip to content

Commit 1785420

Browse files
committed
use channels and threads instead of locks
1 parent ddec9ae commit 1785420

File tree

7 files changed

+254
-95
lines changed

7 files changed

+254
-95
lines changed

core/src/block_creation_loop.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use {
99
banking_trace::BankingTracer,
1010
replay_stage::{Finalizer, ReplayStage},
1111
},
12-
crossbeam_channel::Receiver,
13-
parking_lot::RwLock as PLRwLock,
12+
crossbeam_channel::{Receiver, Sender},
1413
solana_clock::Slot,
1514
solana_entry::block_component::BlockFooterV1,
1615
solana_gossip::cluster_info::ClusterInfo,
@@ -34,7 +33,9 @@ use {
3433
},
3534
solana_version::version,
3635
solana_votor::{
37-
common::block_timeout, consensus_rewards::ConsensusRewards, event::LeaderWindowInfo,
36+
common::block_timeout,
37+
consensus_rewards::{BuildRewardCertsRequest, BuildRewardCertsResponse},
38+
event::LeaderWindowInfo,
3839
},
3940
solana_votor_messages::rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate},
4041
stats::{BlockCreationLoopMetrics, SlotMetrics},
@@ -82,7 +83,6 @@ pub struct BlockCreationLoopConfig {
8283
pub poh_recorder: Arc<RwLock<PohRecorder>>,
8384
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
8485
pub rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
85-
pub consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
8686

8787
// Notifiers
8888
pub banking_tracer: Arc<BankingTracer>,
@@ -96,6 +96,9 @@ pub struct BlockCreationLoopConfig {
9696
// Channel to receive RecordReceiver from PohService
9797
pub record_receiver_receiver: Receiver<RecordReceiver>,
9898
pub optimistic_parent_receiver: Receiver<LeaderWindowInfo>,
99+
100+
pub build_reward_certs_sender: Sender<BuildRewardCertsRequest>,
101+
pub reward_certs_receiver: Receiver<BuildRewardCertsResponse>,
99102
}
100103

101104
struct LeaderContext {
@@ -113,7 +116,8 @@ struct LeaderContext {
113116
slot_status_notifier: Option<SlotStatusNotifier>,
114117
banking_tracer: Arc<BankingTracer>,
115118
replay_highest_frozen: Arc<ReplayHighestFrozen>,
116-
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
119+
build_reward_certs_sender: Sender<BuildRewardCertsRequest>,
120+
reward_certs_receiver: Receiver<BuildRewardCertsResponse>,
117121

118122
// Metrics
119123
metrics: BlockCreationLoopMetrics,
@@ -165,7 +169,8 @@ fn start_loop(config: BlockCreationLoopConfig) {
165169
replay_highest_frozen,
166170
highest_parent_ready,
167171
optimistic_parent_receiver,
168-
consensus_rewards,
172+
build_reward_certs_sender,
173+
reward_certs_receiver,
169174
} = config;
170175

171176
// Similar to Votor, if this loop dies kill the validator
@@ -203,7 +208,8 @@ fn start_loop(config: BlockCreationLoopConfig) {
203208
replay_highest_frozen,
204209
metrics: BlockCreationLoopMetrics::default(),
205210
slot_metrics: SlotMetrics::default(),
206-
consensus_rewards,
211+
build_reward_certs_sender,
212+
reward_certs_receiver,
207213
};
208214

209215
// Setup poh
@@ -324,7 +330,8 @@ fn produce_window(
324330
if let Err(e) = record_and_complete_block(
325331
ctx.poh_recorder.as_ref(),
326332
&mut ctx.record_receiver,
327-
ctx.consensus_rewards.clone(),
333+
&ctx.build_reward_certs_sender,
334+
&ctx.reward_certs_receiver,
328335
skip_timer,
329336
timeout,
330337
) {
@@ -424,17 +431,16 @@ fn produce_block_footer(
424431
fn record_and_complete_block(
425432
poh_recorder: &RwLock<PohRecorder>,
426433
record_receiver: &mut RecordReceiver,
427-
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
434+
build_reward_certs_sender: &Sender<BuildRewardCertsRequest>,
435+
cert_receiver: &Receiver<BuildRewardCertsResponse>,
428436
block_timer: Instant,
429437
block_timeout: Duration,
430438
) -> Result<(), PohRecorderError> {
431-
// Taking a read lock on consensus_rewards can contend with the write lock in bls_sigverifier.
432-
// We are ready to produce the block footer now, while we gather other bits of data, we can block on the consensus_rewards lock in a separate thread to minimise contention.
433-
let handle = std::thread::spawn(move || {
434-
// XXX: how to look up the slot.
435-
let slot = u64::MAX;
436-
consensus_rewards.read().build_rewards_certs(slot)
437-
});
439+
// XXX: how to look up the slot.
440+
let slot = u64::MAX;
441+
build_reward_certs_sender
442+
.send(BuildRewardCertsRequest { slot })
443+
.unwrap();
438444
loop {
439445
let remaining_slot_time = block_timeout.saturating_sub(block_timer.elapsed());
440446
if remaining_slot_time.is_zero() {
@@ -485,11 +491,11 @@ fn record_and_complete_block(
485491

486492
// Produce the footer with the current timestamp
487493
let working_bank = w_poh_recorder.working_bank().unwrap();
488-
let (skip_reward_certificate, notar_reward_certificate) = handle.join().unwrap();
494+
let resp = cert_receiver.recv().unwrap();
489495
let footer = produce_block_footer(
490496
working_bank.bank.clone_without_scheduler(),
491-
skip_reward_certificate,
492-
notar_reward_certificate,
497+
resp.skip,
498+
resp.notar,
493499
);
494500

495501
BlockComponentProcessor::update_bank_with_footer(

core/src/bls_sigverify/bls_sigverifier.rs

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use {
99
},
1010
bitvec::prelude::{BitVec, Lsb0},
1111
crossbeam_channel::{Sender, TrySendError},
12-
parking_lot::RwLock as PLRwLock,
1312
rayon::iter::{
1413
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
1514
},
@@ -26,7 +25,7 @@ use {
2625
solana_streamer::packet::PacketBatch,
2726
solana_votor::{
2827
consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
29-
consensus_rewards::ConsensusRewards,
28+
consensus_rewards::{AddVoteEntry, AddVoteMessage},
3029
},
3130
solana_votor_messages::{
3231
consensus_message::{Certificate, CertificateType, ConsensusMessage, VoteMessage},
@@ -88,6 +87,7 @@ enum CertVerifyError {
8887

8988
pub struct BLSSigVerifier {
9089
verified_votes_sender: VerifiedVoteSender,
90+
reward_votes_sender: Sender<AddVoteMessage>,
9191
message_sender: Sender<ConsensusMessage>,
9292
sharable_banks: SharableBanks,
9393
stats: BLSSigVerifierStats,
@@ -96,7 +96,6 @@ pub struct BLSSigVerifier {
9696
consensus_metrics_sender: ConsensusMetricsEventSender,
9797
last_checked_root_slot: Slot,
9898
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
99-
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
10099
}
101100

102101
impl BLSSigVerifier {
@@ -180,14 +179,8 @@ impl BLSSigVerifier {
180179

181180
// consensus pool does not need votes for slots other than root slot however the rewards container may still need them.
182181
if vote_message.vote.slot() <= root_bank.slot() {
183-
// the only module that takes a write lock on consensus_rewards is this one and it does not take the write lock while it is verifying votes so this should never block.
184-
if self
185-
.consensus_rewards
186-
.read()
187-
.wants_vote(root_bank.slot(), &vote_message)
188-
{
189-
// XXX: actually verify and send the votes. The verification and sending should happen off the critical path.
190-
}
182+
// XXX: actually verify and send the votes. The verification and sending should happen off the critical path.
183+
191184
self.stats.received_old.fetch_add(1, Ordering::Relaxed);
192185
packet.meta_mut().set_discard(true);
193186
continue;
@@ -249,19 +242,20 @@ impl BLSSigVerifier {
249242
warn!("could not send consensus metrics, receive side of channel is closed");
250243
}
251244

252-
{
253-
// This should be the only place that is taking a write lock on consensus_rewards.
254-
// This lock should not contend with any other operations in this module.
255-
// It can contend with the read lock in the block creation loop though as such we should hold it for as little time as possible.
256-
let mut guard = self.consensus_rewards.write();
257-
let root_slot = root_bank.slot();
258-
for v in rewards_votes {
259-
if let Some(rank_map) = get_key_to_rank_map(&root_bank, v.vote.slot()) {
260-
let max_validators = rank_map.len();
261-
guard.add_vote(root_slot, max_validators, v);
262-
}
263-
}
264-
}
245+
let votes = rewards_votes
246+
.into_iter()
247+
.filter_map(|v| {
248+
get_key_to_rank_map(&root_bank, v.vote.slot()).map(|rank_map| AddVoteEntry {
249+
max_validators: rank_map.len(),
250+
vote: v,
251+
})
252+
})
253+
.collect();
254+
let msg = AddVoteMessage {
255+
root_slot: root_bank.slot(),
256+
votes,
257+
};
258+
self.reward_votes_sender.send(msg).unwrap();
265259

266260
self.stats.maybe_report_stats();
267261

@@ -273,22 +267,22 @@ impl BLSSigVerifier {
273267
pub fn new(
274268
sharable_banks: SharableBanks,
275269
verified_votes_sender: VerifiedVoteSender,
270+
reward_votes_sender: Sender<AddVoteMessage>,
276271
message_sender: Sender<ConsensusMessage>,
277272
consensus_metrics_sender: ConsensusMetricsEventSender,
278273
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
279-
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
280274
) -> Self {
281275
Self {
282276
sharable_banks,
283277
verified_votes_sender,
278+
reward_votes_sender,
284279
message_sender,
285280
stats: BLSSigVerifierStats::new(),
286281
verified_certs: RwLock::new(HashSet::new()),
287282
vote_payload_cache: RwLock::new(HashMap::new()),
288283
consensus_metrics_sender,
289284
last_checked_root_slot: 0,
290285
alpenglow_last_voted,
291-
consensus_rewards,
292286
}
293287
}
294288

@@ -299,20 +293,8 @@ impl BLSSigVerifier {
299293
votes_to_verify: Vec<VoteToVerify>,
300294
) -> Result<Vec<VoteMessage>, BLSSigVerifyServiceError<ConsensusMessage>> {
301295
let verified_votes = self.verify_votes(votes_to_verify);
302-
303-
let rewards_votes = {
304-
// the only module that takes a write lock on consensus_rewards is this one and it does not take the write lock while it is verifying votes so this should never block.
305-
let guard = self.consensus_rewards.read();
306-
let root_slot = self.sharable_banks.root().slot();
307-
verified_votes
308-
.iter()
309-
.filter_map(|vote| {
310-
guard
311-
.wants_vote(root_slot, &vote.vote_message)
312-
.then_some(vote.vote_message)
313-
})
314-
.collect()
315-
};
296+
// XXX: call consensus_rewards::wants_vote() to do some additional filtering here.
297+
let reward_votes = verified_votes.iter().map(|v| v.vote_message).collect();
316298

317299
self.stats
318300
.total_valid_packets
@@ -366,7 +348,7 @@ impl BLSSigVerifier {
366348
}
367349
}
368350

369-
Ok(rewards_votes)
351+
Ok(reward_votes)
370352
}
371353

372354
fn verify_votes(&self, votes_to_verify: Vec<VoteToVerify>) -> Vec<VoteToVerify> {

core/src/replay_stage.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ use {
8686
solana_vote::vote_transaction::VoteTransaction,
8787
solana_votor::{
8888
consensus_metrics::{ConsensusMetricsEventReceiver, ConsensusMetricsEventSender},
89+
consensus_rewards::{AddVoteMessage, BuildRewardCertsRequest, BuildRewardCertsResponse},
8990
event::{
9091
CompletedBlock, LeaderWindowInfo, VotorEvent, VotorEventReceiver, VotorEventSender,
9192
},
@@ -299,6 +300,9 @@ pub struct ReplayStageConfig {
299300
pub consensus_metrics_sender: ConsensusMetricsEventSender,
300301
pub consensus_metrics_receiver: ConsensusMetricsEventReceiver,
301302
pub migration_status: Arc<MigrationStatus>,
303+
pub reward_certs_sender: Sender<BuildRewardCertsResponse>,
304+
pub reward_votes_receiver: Receiver<AddVoteMessage>,
305+
pub build_reward_certs_receiver: Receiver<BuildRewardCertsRequest>,
302306
}
303307

304308
pub struct ReplaySenders {
@@ -620,6 +624,9 @@ impl ReplayStage {
620624
consensus_metrics_sender,
621625
consensus_metrics_receiver,
622626
migration_status,
627+
reward_certs_sender,
628+
reward_votes_receiver,
629+
build_reward_certs_receiver,
623630
} = config;
624631

625632
let ReplaySenders {
@@ -694,6 +701,9 @@ impl ReplayStage {
694701
consensus_metrics_sender,
695702
consensus_metrics_receiver,
696703
migration_status: migration_status.clone(),
704+
reward_certs_sender,
705+
reward_votes_receiver,
706+
build_reward_certs_receiver,
697707
};
698708
let votor = Votor::new(votor_config);
699709

core/src/tvu.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use {
2727
},
2828
bytes::Bytes,
2929
crossbeam_channel::{bounded, unbounded, Receiver, Sender},
30-
parking_lot::RwLock as PLRwLock,
3130
solana_client::connection_cache::ConnectionCache,
3231
solana_clock::Slot,
3332
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
@@ -62,7 +61,7 @@ use {
6261
},
6362
solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
6463
solana_votor::{
65-
consensus_rewards::ConsensusRewards,
64+
consensus_rewards::{BuildRewardCertsRequest, BuildRewardCertsResponse},
6665
event::{LeaderWindowInfo, VotorEventReceiver, VotorEventSender},
6766
vote_history::VoteHistory,
6867
vote_history_storage::VoteHistoryStorage,
@@ -218,7 +217,8 @@ impl Tvu {
218217
key_notifiers: Arc<RwLock<KeyUpdaters>>,
219218
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
220219
migration_status: Arc<MigrationStatus>,
221-
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
220+
reward_certs_sender: Sender<BuildRewardCertsResponse>,
221+
build_reward_certs_receiver: Receiver<BuildRewardCertsRequest>,
222222
) -> Result<Self, String> {
223223
let (consensus_message_sender, consensus_message_receiver) =
224224
bounded(MAX_ALPENGLOW_PACKET_NUM);
@@ -271,15 +271,16 @@ impl Tvu {
271271
alpenglow_quic_server_config,
272272
)
273273
.unwrap();
274+
let (reward_votes_sender, reward_votes_receiver) = unbounded();
274275
let alpenglow_sigverify_service = {
275276
let sharable_banks = bank_forks.read().unwrap().sharable_banks();
276277
let verifier = BLSSigVerifier::new(
277278
sharable_banks,
278279
verified_vote_sender.clone(),
280+
reward_votes_sender,
279281
consensus_message_sender.clone(),
280282
consensus_metrics_sender.clone(),
281283
alpenglow_last_voted.clone(),
282-
consensus_rewards,
283284
);
284285
BLSSigverifyService::new(bls_packet_receiver, verifier)
285286
};
@@ -453,6 +454,9 @@ impl Tvu {
453454
consensus_metrics_sender: consensus_metrics_sender.clone(),
454455
consensus_metrics_receiver,
455456
migration_status,
457+
reward_certs_sender,
458+
reward_votes_receiver,
459+
build_reward_certs_receiver,
456460
};
457461

458462
let voting_service = VotingService::new(

0 commit comments

Comments
 (0)