From 8a6334ba4c6000c7aaa9a058e3a96de5590f7b00 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Fri, 25 Jul 2025 15:22:30 -0700 Subject: [PATCH] Refresh certs during standstill. --- votor/src/certificate_pool.rs | 191 ++++++++++++++++++++++++++ votor/src/certificate_pool_service.rs | 26 +++- votor/src/event_handler.rs | 4 +- 3 files changed, 217 insertions(+), 4 deletions(-) diff --git a/votor/src/certificate_pool.rs b/votor/src/certificate_pool.rs index 018a4cfc73..b55d5db2d9 100644 --- a/votor/src/certificate_pool.rs +++ b/votor/src/certificate_pool.rs @@ -98,6 +98,8 @@ pub struct CertificatePool { highest_notarized_fallback: Option<(Slot, Hash)>, /// Highest slot that has a Finalized variant certificate, for use in notifying RPC highest_finalized_slot: Option, + /// Highest slot that has Finalize+Notarize or FinalizeFast, for use in standstill + highest_finalized_with_notarize: Option, // Cached epoch_schedule epoch_schedule: EpochSchedule, // Cached epoch_stakes_map @@ -128,6 +130,7 @@ impl CertificatePool { completed_certificates: BTreeMap::new(), highest_notarized_fallback: None, highest_finalized_slot: None, + highest_finalized_with_notarize: None, epoch_schedule: EpochSchedule::default(), epoch_stakes_map: Arc::new(HashMap::new()), root: bank.slot(), @@ -328,11 +331,23 @@ impl CertificatePool { events.push(VotorEvent::BlockNotarized((slot, block_id))); if self.is_finalized(slot) { events.push(VotorEvent::Finalized((slot, block_id))); + if self + .highest_finalized_with_notarize + .map_or(true, |s| s < slot) + { + self.highest_finalized_with_notarize = Some(slot); + } } } CertificateId::Finalize(slot) => { if let Some(block) = self.get_notarized_block(slot) { events.push(VotorEvent::Finalized(block)); + if self + .highest_finalized_with_notarize + .map_or(true, |s| s < slot) + { + self.highest_finalized_with_notarize = Some(slot); + } } if self.highest_finalized_slot.map_or(true, |s| s < slot) { self.highest_finalized_slot = Some(slot); @@ -343,6 +358,12 @@ impl CertificatePool { if self.highest_finalized_slot.map_or(true, |s| s < slot) { self.highest_finalized_slot = Some(slot); } + if self + .highest_finalized_with_notarize + .map_or(true, |s| s < slot) + { + self.highest_finalized_with_notarize = Some(slot); + } } } } @@ -757,6 +778,19 @@ impl CertificatePool { pub fn maybe_report(&mut self) { self.stats.maybe_report(); } + + pub fn get_certs_for_standstill(&self) -> Vec> { + self.completed_certificates + .iter() + .filter_map(|(_, cert)| { + if Some(cert.certificate.slot) >= self.highest_finalized_with_notarize { + Some(cert.clone()) + } else { + None + } + }) + .collect() + } } pub fn load_from_blockstore( @@ -1758,4 +1792,161 @@ mod tests { .add_message(&Pubkey::new_unique(), &cert, &mut vec![]) .is_err()); } + + #[test] + fn test_get_certs_for_standstill() { + let (_, mut pool) = create_keypairs_and_pool(); + + // Should return empty vector if no certificates + assert!(pool.get_certs_for_standstill().is_empty()); + + // Add notar-fallback cert on 3 and finalize cert on 4 + let cert_3 = CertificateMessage { + certificate: Certificate { + slot: 3, + certificate_type: CertificateType::NotarizeFallback, + block_id: Some(Hash::new_unique()), + replayed_bank_hash: Some(Hash::new_unique()), + }, + signature: BLSSignature::default(), + bitmap: BitVec::new(), + }; + assert!(pool + .add_message( + &Pubkey::new_unique(), + &BLSMessage::Certificate(cert_3.clone()), + &mut vec![] + ) + .is_ok()); + let cert_4 = CertificateMessage { + certificate: Certificate { + slot: 4, + certificate_type: CertificateType::Finalize, + block_id: Some(Hash::new_unique()), + replayed_bank_hash: Some(Hash::new_unique()), + }, + signature: BLSSignature::default(), + bitmap: BitVec::new(), + }; + assert!(pool + .add_message( + &Pubkey::new_unique(), + &BLSMessage::Certificate(cert_4.clone()), + &mut vec![] + ) + .is_ok()); + // Should return both certificates + let certs = pool.get_certs_for_standstill(); + assert_eq!(certs.len(), 2); + assert!(certs.iter().any(|cert| cert.certificate.slot == 3 + && cert.certificate.certificate_type == CertificateType::NotarizeFallback)); + assert!(certs.iter().any(|cert| cert.certificate.slot == 4 + && cert.certificate.certificate_type == CertificateType::Finalize)); + + // Add FinalizeFast cert on 5 + let cert_5 = CertificateMessage { + certificate: Certificate { + slot: 5, + certificate_type: CertificateType::FinalizeFast, + block_id: Some(Hash::new_unique()), + replayed_bank_hash: Some(Hash::new_unique()), + }, + signature: BLSSignature::default(), + bitmap: BitVec::new(), + }; + assert!(pool + .add_message( + &Pubkey::new_unique(), + &BLSMessage::Certificate(cert_5.clone()), + &mut vec![] + ) + .is_ok()); + // Should return only cert on 5 + let certs = pool.get_certs_for_standstill(); + assert_eq!(certs.len(), 1); + assert!( + certs[0].certificate.slot == 5 + && certs[0].certificate.certificate_type == CertificateType::FinalizeFast + ); + + // Now add Notarize cert on 6 + let cert_6 = CertificateMessage { + certificate: Certificate { + slot: 6, + certificate_type: CertificateType::Notarize, + block_id: Some(Hash::new_unique()), + replayed_bank_hash: Some(Hash::new_unique()), + }, + signature: BLSSignature::default(), + bitmap: BitVec::new(), + }; + assert!(pool + .add_message( + &Pubkey::new_unique(), + &BLSMessage::Certificate(cert_6.clone()), + &mut vec![] + ) + .is_ok()); + // Should return certs on 5 and 6 + let certs = pool.get_certs_for_standstill(); + assert_eq!(certs.len(), 2); + assert!(certs.iter().any(|cert| cert.certificate.slot == 5 + && cert.certificate.certificate_type == CertificateType::FinalizeFast)); + assert!(certs.iter().any(|cert| cert.certificate.slot == 6 + && cert.certificate.certificate_type == CertificateType::Notarize)); + + // Add another Finalize cert on 6 + let cert_6_finalize = CertificateMessage { + certificate: Certificate { + slot: 6, + certificate_type: CertificateType::Finalize, + block_id: Some(Hash::new_unique()), + replayed_bank_hash: Some(Hash::new_unique()), + }, + signature: BLSSignature::default(), + bitmap: BitVec::new(), + }; + assert!(pool + .add_message( + &Pubkey::new_unique(), + &BLSMessage::Certificate(cert_6_finalize.clone()), + &mut vec![] + ) + .is_ok()); + // Should only return both certs on 6 + let certs = pool.get_certs_for_standstill(); + assert_eq!(certs.len(), 2); + assert!(certs.iter().any(|cert| cert.certificate.slot == 6 + && cert.certificate.certificate_type == CertificateType::Finalize)); + assert!(certs.iter().any(|cert| cert.certificate.slot == 6 + && cert.certificate.certificate_type == CertificateType::Notarize)); + + // Add another skip on 7 + let cert_7 = CertificateMessage { + certificate: Certificate { + slot: 7, + certificate_type: CertificateType::Skip, + block_id: Some(Hash::new_unique()), + replayed_bank_hash: Some(Hash::new_unique()), + }, + signature: BLSSignature::default(), + bitmap: BitVec::new(), + }; + assert!(pool + .add_message( + &Pubkey::new_unique(), + &BLSMessage::Certificate(cert_7.clone()), + &mut vec![] + ) + .is_ok()); + // Should return certs on 6 and 7 + let certs = pool.get_certs_for_standstill(); + assert_eq!(certs.len(), 3); + assert!(certs.iter().any(|cert| cert.certificate.slot == 6 + && cert.certificate.certificate_type == CertificateType::Finalize)); + assert!(certs.iter().any(|cert| cert.certificate.slot == 6 + && cert.certificate.certificate_type == CertificateType::Notarize)); + assert!(certs.iter().any(|cert| cert.certificate.slot == 7 + && cert.certificate.certificate_type == CertificateType::Skip)); + } } diff --git a/votor/src/certificate_pool_service.rs b/votor/src/certificate_pool_service.rs index fc717d9e5c..4b611053ee 100644 --- a/votor/src/certificate_pool_service.rs +++ b/votor/src/certificate_pool_service.rs @@ -109,7 +109,15 @@ impl CertificatePoolService { } } // Send new certificates to peers - for (i, certificate) in new_certificates_to_send.iter().enumerate() { + Self::send_certificates(bls_sender, new_certificates_to_send, stats) + } + + fn send_certificates( + bls_sender: &Sender, + certificates_to_send: Vec>, + stats: &mut CertificatePoolServiceStats, + ) -> Result<(), AddVoteError> { + for (i, certificate) in certificates_to_send.iter().enumerate() { // The buffer should normally be large enough, so we don't handle // certificate re-send here. match bls_sender.try_send(BLSOp::PushCertificate { @@ -122,7 +130,7 @@ impl CertificatePoolService { return Err(AddVoteError::VotingServiceSenderDisconnected); } Err(TrySendError::Full(_)) => { - let dropped = new_certificates_to_send.len().saturating_sub(i) as u16; + let dropped = certificates_to_send.len().saturating_sub(i) as u16; stats.certificates_dropped = stats.certificates_dropped.saturating_add(dropped); return Err(AddVoteError::VotingServiceQueueFull); } @@ -229,6 +237,20 @@ impl CertificatePoolService { events.push(VotorEvent::Standstill(highest_finalized_slot)); stats.standstill = true; standstill_timer = Instant::now(); + if Err(AddVoteError::VotingServiceSenderDisconnected) + == Self::send_certificates( + &ctx.bls_sender, + cert_pool.get_certs_for_standstill(), + &mut stats, + ) + { + info!( + "{}: Voting service sender disconnected. Exiting.", + ctx.my_pubkey + ); + ctx.exit.store(true, Ordering::Relaxed); + return Ok(()); + } } if events diff --git a/votor/src/event_handler.rs b/votor/src/event_handler.rs index 7d74af3d5d..d4b3662841 100644 --- a/votor/src/event_handler.rs +++ b/votor/src/event_handler.rs @@ -291,10 +291,10 @@ impl EventHandler { )?; } - // We have not observed a finalization certificate in a while, refresh our votes and certs + // We have not observed a finalization certificate in a while, refresh our votes VotorEvent::Standstill(highest_finalized_slot) => { info!("{my_pubkey}: Standstill {highest_finalized_slot}"); - // TODO: once we have certificate broadcast, we should also refresh certs + // certs refresh happens in CertificatePoolService Self::refresh_votes(my_pubkey, highest_finalized_slot, vctx, &mut votes); }