Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions votor/src/certificate_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub struct CertificatePool {
highest_notarized_fallback: Option<(Slot, Hash)>,
/// Highest slot that has a Finalized variant certificate, for use in notifying RPC
highest_finalized_slot: Option<Slot>,
/// Highest slot that has Finalize+Notarize or FinalizeFast, for use in standstill
highest_finalized_with_notarize: Option<Slot>,
// Cached epoch_schedule
epoch_schedule: EpochSchedule,
// Cached epoch_stakes_map
Expand Down Expand Up @@ -128,6 +130,7 @@ impl CertificatePool {
completed_certificates: BTreeMap::new(),
highest_notarized_fallback: None,
highest_finalized_slot: None,
highest_finalized_with_notarize: None,
epoch_schedule: EpochSchedule::default(),
epoch_stakes_map: Arc::new(HashMap::new()),
root: bank.slot(),
Expand Down Expand Up @@ -328,11 +331,23 @@ impl CertificatePool {
events.push(VotorEvent::BlockNotarized((slot, block_id)));
if self.is_finalized(slot) {
events.push(VotorEvent::Finalized((slot, block_id)));
if self
.highest_finalized_with_notarize
.map_or(true, |s| s < slot)
{
self.highest_finalized_with_notarize = Some(slot);
}
}
}
CertificateId::Finalize(slot) => {
if let Some(block) = self.get_notarized_block(slot) {
events.push(VotorEvent::Finalized(block));
if self
.highest_finalized_with_notarize
.map_or(true, |s| s < slot)
{
self.highest_finalized_with_notarize = Some(slot);
}
}
if self.highest_finalized_slot.map_or(true, |s| s < slot) {
self.highest_finalized_slot = Some(slot);
Expand All @@ -343,6 +358,12 @@ impl CertificatePool {
if self.highest_finalized_slot.map_or(true, |s| s < slot) {
self.highest_finalized_slot = Some(slot);
}
if self
.highest_finalized_with_notarize
.map_or(true, |s| s < slot)
{
self.highest_finalized_with_notarize = Some(slot);
}
}
}
}
Expand Down Expand Up @@ -757,6 +778,19 @@ impl CertificatePool {
pub fn maybe_report(&mut self) {
self.stats.maybe_report();
}

pub fn get_certs_for_standstill(&self) -> Vec<Arc<CertificateMessage>> {
self.completed_certificates
.iter()
.filter_map(|(_, cert)| {
if Some(cert.certificate.slot) >= self.highest_finalized_with_notarize {
Some(cert.clone())
} else {
None
}
})
.collect()
}
}

pub fn load_from_blockstore(
Expand Down Expand Up @@ -1758,4 +1792,161 @@ mod tests {
.add_message(&Pubkey::new_unique(), &cert, &mut vec![])
.is_err());
}

#[test]
fn test_get_certs_for_standstill() {
let (_, mut pool) = create_keypairs_and_pool();

// Should return empty vector if no certificates
assert!(pool.get_certs_for_standstill().is_empty());

// Add notar-fallback cert on 3 and finalize cert on 4
let cert_3 = CertificateMessage {
certificate: Certificate {
slot: 3,
certificate_type: CertificateType::NotarizeFallback,
block_id: Some(Hash::new_unique()),
replayed_bank_hash: Some(Hash::new_unique()),
},
signature: BLSSignature::default(),
bitmap: BitVec::new(),
};
assert!(pool
.add_message(
&Pubkey::new_unique(),
&BLSMessage::Certificate(cert_3.clone()),
&mut vec![]
)
.is_ok());
let cert_4 = CertificateMessage {
certificate: Certificate {
slot: 4,
certificate_type: CertificateType::Finalize,
block_id: Some(Hash::new_unique()),
replayed_bank_hash: Some(Hash::new_unique()),
},
signature: BLSSignature::default(),
bitmap: BitVec::new(),
};
assert!(pool
.add_message(
&Pubkey::new_unique(),
&BLSMessage::Certificate(cert_4.clone()),
&mut vec![]
)
.is_ok());
// Should return both certificates
let certs = pool.get_certs_for_standstill();
assert_eq!(certs.len(), 2);
assert!(certs.iter().any(|cert| cert.certificate.slot == 3
&& cert.certificate.certificate_type == CertificateType::NotarizeFallback));
assert!(certs.iter().any(|cert| cert.certificate.slot == 4
&& cert.certificate.certificate_type == CertificateType::Finalize));

// Add FinalizeFast cert on 5
let cert_5 = CertificateMessage {
certificate: Certificate {
slot: 5,
certificate_type: CertificateType::FinalizeFast,
block_id: Some(Hash::new_unique()),
replayed_bank_hash: Some(Hash::new_unique()),
},
signature: BLSSignature::default(),
bitmap: BitVec::new(),
};
assert!(pool
.add_message(
&Pubkey::new_unique(),
&BLSMessage::Certificate(cert_5.clone()),
&mut vec![]
)
.is_ok());
// Should return only cert on 5
let certs = pool.get_certs_for_standstill();
assert_eq!(certs.len(), 1);
assert!(
certs[0].certificate.slot == 5
&& certs[0].certificate.certificate_type == CertificateType::FinalizeFast
);

// Now add Notarize cert on 6
let cert_6 = CertificateMessage {
certificate: Certificate {
slot: 6,
certificate_type: CertificateType::Notarize,
block_id: Some(Hash::new_unique()),
replayed_bank_hash: Some(Hash::new_unique()),
},
signature: BLSSignature::default(),
bitmap: BitVec::new(),
};
assert!(pool
.add_message(
&Pubkey::new_unique(),
&BLSMessage::Certificate(cert_6.clone()),
&mut vec![]
)
.is_ok());
// Should return certs on 5 and 6
let certs = pool.get_certs_for_standstill();
assert_eq!(certs.len(), 2);
assert!(certs.iter().any(|cert| cert.certificate.slot == 5
&& cert.certificate.certificate_type == CertificateType::FinalizeFast));
assert!(certs.iter().any(|cert| cert.certificate.slot == 6
&& cert.certificate.certificate_type == CertificateType::Notarize));

// Add another Finalize cert on 6
let cert_6_finalize = CertificateMessage {
certificate: Certificate {
slot: 6,
certificate_type: CertificateType::Finalize,
block_id: Some(Hash::new_unique()),
replayed_bank_hash: Some(Hash::new_unique()),
},
signature: BLSSignature::default(),
bitmap: BitVec::new(),
};
assert!(pool
.add_message(
&Pubkey::new_unique(),
&BLSMessage::Certificate(cert_6_finalize.clone()),
&mut vec![]
)
.is_ok());
// Should only return both certs on 6
let certs = pool.get_certs_for_standstill();
assert_eq!(certs.len(), 2);
assert!(certs.iter().any(|cert| cert.certificate.slot == 6
&& cert.certificate.certificate_type == CertificateType::Finalize));
assert!(certs.iter().any(|cert| cert.certificate.slot == 6
&& cert.certificate.certificate_type == CertificateType::Notarize));

// Add another skip on 7
let cert_7 = CertificateMessage {
certificate: Certificate {
slot: 7,
certificate_type: CertificateType::Skip,
block_id: Some(Hash::new_unique()),
replayed_bank_hash: Some(Hash::new_unique()),
},
signature: BLSSignature::default(),
bitmap: BitVec::new(),
};
assert!(pool
.add_message(
&Pubkey::new_unique(),
&BLSMessage::Certificate(cert_7.clone()),
&mut vec![]
)
.is_ok());
// Should return certs on 6 and 7
let certs = pool.get_certs_for_standstill();
assert_eq!(certs.len(), 3);
assert!(certs.iter().any(|cert| cert.certificate.slot == 6
&& cert.certificate.certificate_type == CertificateType::Finalize));
assert!(certs.iter().any(|cert| cert.certificate.slot == 6
&& cert.certificate.certificate_type == CertificateType::Notarize));
assert!(certs.iter().any(|cert| cert.certificate.slot == 7
&& cert.certificate.certificate_type == CertificateType::Skip));
}
}
26 changes: 24 additions & 2 deletions votor/src/certificate_pool_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,15 @@ impl CertificatePoolService {
}
}
// Send new certificates to peers
for (i, certificate) in new_certificates_to_send.iter().enumerate() {
Self::send_certificates(bls_sender, new_certificates_to_send, stats)
}

fn send_certificates(
bls_sender: &Sender<BLSOp>,
certificates_to_send: Vec<Arc<CertificateMessage>>,
stats: &mut CertificatePoolServiceStats,
) -> Result<(), AddVoteError> {
for (i, certificate) in certificates_to_send.iter().enumerate() {
// The buffer should normally be large enough, so we don't handle
// certificate re-send here.
match bls_sender.try_send(BLSOp::PushCertificate {
Expand All @@ -122,7 +130,7 @@ impl CertificatePoolService {
return Err(AddVoteError::VotingServiceSenderDisconnected);
}
Err(TrySendError::Full(_)) => {
let dropped = new_certificates_to_send.len().saturating_sub(i) as u16;
let dropped = certificates_to_send.len().saturating_sub(i) as u16;
stats.certificates_dropped = stats.certificates_dropped.saturating_add(dropped);
return Err(AddVoteError::VotingServiceQueueFull);
}
Expand Down Expand Up @@ -229,6 +237,20 @@ impl CertificatePoolService {
events.push(VotorEvent::Standstill(highest_finalized_slot));
stats.standstill = true;
standstill_timer = Instant::now();
if Err(AddVoteError::VotingServiceSenderDisconnected)
== Self::send_certificates(
&ctx.bls_sender,
cert_pool.get_certs_for_standstill(),
&mut stats,
)
{
info!(
"{}: Voting service sender disconnected. Exiting.",
ctx.my_pubkey
);
ctx.exit.store(true, Ordering::Relaxed);
return Ok(());
}
}

if events
Expand Down
4 changes: 2 additions & 2 deletions votor/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,10 @@ impl EventHandler {
)?;
}

// We have not observed a finalization certificate in a while, refresh our votes and certs
// We have not observed a finalization certificate in a while, refresh our votes
VotorEvent::Standstill(highest_finalized_slot) => {
info!("{my_pubkey}: Standstill {highest_finalized_slot}");
// TODO: once we have certificate broadcast, we should also refresh certs
// certs refresh happens in CertificatePoolService
Self::refresh_votes(my_pubkey, highest_finalized_slot, vctx, &mut votes);
}

Expand Down