11//! Service in charge of ingesting new messages into the certificate pool
22//! and notifying votor of new events that occur
33
4+ mod stats;
5+
46use {
57 crate :: {
68 certificate_pool:: {
2325 root_bank_cache:: RootBankCache , vote_sender_types:: BLSVerifiedMessageReceiver ,
2426 } ,
2527 solana_sdk:: clock:: Slot ,
28+ stats:: CertificatePoolServiceStats ,
2629 std:: {
2730 sync:: {
2831 atomic:: { AtomicBool , Ordering } ,
@@ -85,32 +88,39 @@ impl CertificatePoolService {
8588 current_root : & mut Slot ,
8689 highest_finalized_slot : & mut Slot ,
8790 standstill_timer : & mut Instant ,
91+ stats : & mut CertificatePoolServiceStats ,
8892 ) -> Result < ( ) , AddVoteError > {
8993 // If we have a new finalized slot, update the root and send new certificates
9094 if let Some ( new_finalized_slot) = new_finalized_slot {
9195 // Reset standstill timer
9296 debug_assert ! ( new_finalized_slot > * highest_finalized_slot) ;
9397 * highest_finalized_slot = new_finalized_slot;
9498 * standstill_timer = Instant :: now ( ) ;
99+ CertificatePoolServiceStats :: incr_u16 ( & mut stats. new_finalized_slot ) ;
95100 // Set root
96101 let root_bank = root_bank_cache. root_bank ( ) ;
97102 if root_bank. slot ( ) > * current_root {
103+ CertificatePoolServiceStats :: incr_u16 ( & mut stats. new_root ) ;
98104 * current_root = root_bank. slot ( ) ;
99105 cert_pool. handle_new_root ( root_bank) ;
100106 }
101107 }
102108 // Send new certificates to peers
103- for certificate in new_certificates_to_send {
109+ for ( i , certificate) in new_certificates_to_send. iter ( ) . enumerate ( ) {
104110 // The buffer should normally be large enough, so we don't handle
105111 // certificate re-send here.
106112 match bls_sender. try_send ( BLSOp :: PushCertificate {
107113 certificate : certificate. clone ( ) ,
108114 } ) {
109- Ok ( _) => ( ) ,
115+ Ok ( _) => {
116+ CertificatePoolServiceStats :: incr_u16 ( & mut stats. certificates_sent ) ;
117+ }
110118 Err ( TrySendError :: Disconnected ( _) ) => {
111119 return Err ( AddVoteError :: VotingServiceSenderDisconnected ) ;
112120 }
113121 Err ( TrySendError :: Full ( _) ) => {
122+ let dropped = new_certificates_to_send. len ( ) . saturating_sub ( i) as u16 ;
123+ stats. certificates_dropped = stats. certificates_dropped . saturating_add ( dropped) ;
114124 return Err ( AddVoteError :: VotingServiceQueueFull ) ;
115125 }
116126 }
@@ -126,7 +136,16 @@ impl CertificatePoolService {
126136 current_root : & mut Slot ,
127137 highest_finalized_slot : & mut Slot ,
128138 standstill_timer : & mut Instant ,
139+ stats : & mut CertificatePoolServiceStats ,
129140 ) -> Result < ( ) , AddVoteError > {
141+ match message {
142+ BLSMessage :: Certificate ( _) => {
143+ CertificatePoolServiceStats :: incr_u32 ( & mut stats. received_certificates ) ;
144+ }
145+ BLSMessage :: Vote ( _) => {
146+ CertificatePoolServiceStats :: incr_u32 ( & mut stats. received_votes ) ;
147+ }
148+ }
130149 match voting_utils:: add_message_and_maybe_update_commitment (
131150 & ctx. my_pubkey ,
132151 & ctx. my_vote_pubkey ,
@@ -145,6 +164,7 @@ impl CertificatePoolService {
145164 current_root,
146165 highest_finalized_slot,
147166 standstill_timer,
167+ stats,
148168 ) ?;
149169 }
150170 Err ( e) => {
@@ -153,6 +173,7 @@ impl CertificatePoolService {
153173 } else {
154174 // This is a non critical error, a duplicate vote for example
155175 trace ! ( "{}: unable to push vote into pool {}" , & ctx. my_pubkey, e) ;
176+ CertificatePoolServiceStats :: incr_u32 ( & mut stats. add_message_failed ) ;
156177 }
157178 }
158179 } ;
@@ -175,6 +196,7 @@ impl CertificatePoolService {
175196 Votor :: wait_for_migration_or_exit ( & ctx. exit , & ctx. start ) ;
176197 info ! ( "{}: Certificate pool loop starting" , & ctx. my_pubkey) ;
177198 let mut current_root = ctx. root_bank_cache . root_bank ( ) . slot ( ) ;
199+ let mut stats = CertificatePoolServiceStats :: new ( ) ;
178200
179201 // Standstill tracking
180202 let mut standstill_timer = Instant :: now ( ) ;
@@ -201,10 +223,12 @@ impl CertificatePoolService {
201223 & cert_pool,
202224 & mut ctx,
203225 & mut events,
226+ & mut stats,
204227 ) ;
205228
206229 if standstill_timer. elapsed ( ) > STANDSTILL_TIMEOUT {
207230 events. push ( VotorEvent :: Standstill ( highest_finalized_slot) ) ;
231+ stats. standstill = true ;
208232 standstill_timer = Instant :: now ( ) ;
209233 }
210234
@@ -241,6 +265,7 @@ impl CertificatePoolService {
241265 & mut current_root,
242266 & mut highest_finalized_slot,
243267 & mut standstill_timer,
268+ & mut stats,
244269 ) {
245270 info ! (
246271 "{}: Unable to process BLS message: {e}. Exiting." ,
@@ -250,6 +275,7 @@ impl CertificatePoolService {
250275 return Ok ( ( ) ) ;
251276 }
252277 }
278+ stats. maybe_report ( ) ;
253279 }
254280 Ok ( ( ) )
255281 }
@@ -259,6 +285,7 @@ impl CertificatePoolService {
259285 cert_pool : & CertificatePool ,
260286 ctx : & mut CertificatePoolContext ,
261287 events : & mut Vec < VotorEvent > ,
288+ stats : & mut CertificatePoolServiceStats ,
262289 ) {
263290 let Some ( new_highest_parent_ready) = events
264291 . iter ( )
@@ -304,6 +331,7 @@ impl CertificatePoolService {
304331 skipping production of {start_slot}-{end_slot}",
305332 ctx. my_pubkey,
306333 ) ;
334+ CertificatePoolServiceStats :: incr_u16 ( & mut stats. parent_ready_missed_window ) ;
307335 }
308336 BlockProductionParent :: ParentNotReady => {
309337 // This can't happen, place holder depending on how we hook up optimistic
@@ -320,7 +348,8 @@ impl CertificatePoolService {
320348 parent_block,
321349 // TODO: we can just remove this
322350 skip_timer : Instant :: now ( ) ,
323- } ) )
351+ } ) ) ;
352+ CertificatePoolServiceStats :: incr_u16 ( & mut stats. parent_ready_produce_window ) ;
324353 }
325354 }
326355 }
0 commit comments