@@ -29,12 +29,22 @@ use std::sync::Arc;
2929use sc_client_api:: backend:: { Backend , StorageProvider } ;
3030use sp_api:: { ApiExt , ProvideRuntimeApi } ;
3131use sp_blockchain:: { Backend as _, HeaderBackend } ;
32+ use sp_consensus:: SyncOracle ;
3233use sp_runtime:: traits:: { Block as BlockT , Header as HeaderT , Zero } ;
3334// Frontier
3435use fc_storage:: OverrideHandle ;
3536use fp_consensus:: { FindLogError , Hashes , Log , PostLog , PreLog } ;
3637use fp_rpc:: EthereumRuntimeRPCApi ;
3738
39+ pub type EthereumBlockNotificationSinks < T > =
40+ parking_lot:: Mutex < Vec < sc_utils:: mpsc:: TracingUnboundedSender < T > > > ;
41+
42+ #[ derive( Copy , Clone , Debug , Eq , PartialEq ) ]
43+ pub struct EthereumBlockNotification < Block : BlockT > {
44+ pub is_new_best : bool ,
45+ pub hash : Block :: Hash ,
46+ }
47+
3848pub fn sync_block < Block : BlockT , C , BE > (
3949 client : & C ,
4050 overrides : Arc < OverrideHandle < Block > > ,
@@ -160,6 +170,10 @@ pub fn sync_one_block<Block: BlockT, C, BE>(
160170 frontier_backend : & fc_db:: Backend < Block > ,
161171 sync_from : <Block :: Header as HeaderT >:: Number ,
162172 strategy : SyncStrategy ,
173+ sync_oracle : Arc < dyn SyncOracle + Send + Sync + ' static > ,
174+ pubsub_notification_sinks : Arc <
175+ EthereumBlockNotificationSinks < EthereumBlockNotification < Block > > ,
176+ > ,
163177) -> Result < bool , String >
164178where
165179 C : ProvideRuntimeApi < Block > ,
@@ -208,7 +222,6 @@ where
208222 frontier_backend
209223 . meta ( )
210224 . write_current_syncing_tips ( current_syncing_tips) ?;
211- Ok ( true )
212225 } else {
213226 if SyncStrategy :: Parachain == strategy
214227 && operating_header. number ( ) > & client. info ( ) . best_number
@@ -221,8 +234,22 @@ where
221234 frontier_backend
222235 . meta ( )
223236 . write_current_syncing_tips ( current_syncing_tips) ?;
224- Ok ( true )
225237 }
238+ // Notify on import and remove closed channels.
239+ // Only notify when the node is node in major syncing.
240+ let sinks = & mut pubsub_notification_sinks. lock ( ) ;
241+ sinks. retain ( |sink| {
242+ if !sync_oracle. is_major_syncing ( ) {
243+ let hash = operating_header. hash ( ) ;
244+ let is_new_best = client. info ( ) . best_hash == hash;
245+ sink. unbounded_send ( EthereumBlockNotification { is_new_best, hash } )
246+ . is_ok ( )
247+ } else {
248+ // Remove from the pool if in major syncing.
249+ false
250+ }
251+ } ) ;
252+ Ok ( true )
226253}
227254
228255pub fn sync_blocks < Block : BlockT , C , BE > (
@@ -233,6 +260,10 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
233260 limit : usize ,
234261 sync_from : <Block :: Header as HeaderT >:: Number ,
235262 strategy : SyncStrategy ,
263+ sync_oracle : Arc < dyn SyncOracle + Send + Sync + ' static > ,
264+ pubsub_notification_sinks : Arc <
265+ EthereumBlockNotificationSinks < EthereumBlockNotification < Block > > ,
266+ > ,
236267) -> Result < bool , String >
237268where
238269 C : ProvideRuntimeApi < Block > ,
@@ -251,6 +282,8 @@ where
251282 frontier_backend,
252283 sync_from,
253284 strategy,
285+ sync_oracle. clone ( ) ,
286+ pubsub_notification_sinks. clone ( ) ,
254287 ) ?;
255288 }
256289
0 commit comments