11use std:: net:: SocketAddr ;
2+ use std:: ops:: ControlFlow ;
23
4+ use namada:: types:: control_flow:: time;
35use tokio:: sync:: mpsc:: UnboundedReceiver ;
46
57use crate :: facade:: tendermint_rpc:: { Client , HttpClient } ;
@@ -26,6 +28,41 @@ impl Broadcaster {
2628 /// Loop forever, braodcasting messages that have been received
2729 /// by the receiver
2830 async fn run_loop ( & mut self ) {
31+ let result = time:: Sleep {
32+ strategy : time:: ExponentialBackoff {
33+ base : 2 ,
34+ as_duration : time:: Duration :: from_secs,
35+ } ,
36+ }
37+ . run ( || async {
38+ let status_result = time:: Sleep {
39+ strategy : time:: Constant ( time:: Duration :: from_secs ( 1 ) ) ,
40+ }
41+ . timeout (
42+ time:: Instant :: now ( ) + time:: Duration :: from_secs ( 30 ) ,
43+ || async {
44+ match self . client . status ( ) . await {
45+ Ok ( status) => ControlFlow :: Break ( status) ,
46+ Err ( _) => ControlFlow :: Continue ( ( ) ) ,
47+ }
48+ } ,
49+ )
50+ . await ;
51+ let status = match status_result {
52+ Ok ( status) => status,
53+ Err ( _) => return ControlFlow :: Break ( Err ( ( ) ) ) ,
54+ } ;
55+ if status. sync_info . catching_up {
56+ ControlFlow :: Continue ( ( ) )
57+ } else {
58+ ControlFlow :: Break ( Ok ( ( ) ) )
59+ }
60+ } )
61+ . await ;
62+ if let Err ( ( ) ) = result {
63+ tracing:: error!( "Broadcaster failed to connect to CometBFT node" ) ;
64+ return ;
65+ }
2966 loop {
3067 if let Some ( msg) = self . receiver . recv ( ) . await {
3168 let _ = self . client . broadcast_tx_sync ( msg. into ( ) ) . await ;
0 commit comments