@@ -311,57 +311,98 @@ async fn run<Context>(
311311struct PrepareValidationState {
312312 session_index : Option < SessionIndex > ,
313313 is_next_session_authority : bool ,
314- // PVF host won't prepare the same code hash twice, so here we just avoid extra communication
315- already_prepared_code_hashes : HashSet < ValidationCodeHash > ,
316314 // How many PVFs per block we take to prepare themselves for the next session validation
317315 per_block_limit : usize ,
316+ executor_params : Option < ExecutorParams > ,
317+ waiting : HashSet < ValidationCodeHash > ,
318+ pending : HashSet < ValidationCodeHash > ,
319+ processed : HashSet < ValidationCodeHash > ,
318320}
319321
320322impl Default for PrepareValidationState {
321323 fn default ( ) -> Self {
322324 Self {
323325 session_index : None ,
324326 is_next_session_authority : false ,
325- already_prepared_code_hashes : HashSet :: new ( ) ,
326327 per_block_limit : 1 ,
328+ executor_params : None ,
329+ waiting : HashSet :: new ( ) ,
330+ pending : HashSet :: new ( ) ,
331+ processed : HashSet :: new ( ) ,
327332 }
328333 }
329334}
330335
331336async fn maybe_prepare_validation < Sender > (
332337 sender : & mut Sender ,
333338 keystore : KeystorePtr ,
334- validation_backend : impl ValidationBackend ,
339+ mut validation_backend : impl ValidationBackend ,
335340 update : ActiveLeavesUpdate ,
336341 state : & mut PrepareValidationState ,
337342) where
338343 Sender : SubsystemSender < RuntimeApiMessage > ,
339344{
340345 let Some ( leaf) = update. activated else { return } ;
341346 let new_session_index = new_session_index ( sender, state. session_index , leaf. hash ) . await ;
342- if new_session_index. is_some ( ) {
343- state. session_index = new_session_index;
344- state. already_prepared_code_hashes . clear ( ) ;
345- state. is_next_session_authority = check_next_session_authority (
346- sender,
347- keystore,
348- leaf. hash ,
349- state. session_index . expect ( "qed: just checked above" ) ,
350- )
351- . await ;
347+ if let Some ( new_session_index) = new_session_index {
348+ state. session_index = Some ( new_session_index) ;
349+ state. executor_params = None ;
350+ state. waiting . clear ( ) ;
351+ state. pending . clear ( ) ;
352+ state. processed . clear ( ) ;
353+ state. is_next_session_authority =
354+ check_next_session_authority ( sender, keystore, leaf. hash , new_session_index) . await ;
355+ }
356+
357+ if state. is_next_session_authority && state. executor_params . is_none ( ) {
358+ if let Ok ( executor_params) = util:: executor_params_at_relay_parent ( leaf. hash , sender) . await
359+ {
360+ state. executor_params = Some ( executor_params) ;
361+ } else {
362+ gum:: warn!(
363+ target: LOG_TARGET ,
364+ relay_parent = ?leaf. hash,
365+ "cannot fetch executor params for the session" ,
366+ ) ;
367+ } ;
352368 }
353369
354370 // On every active leaf check candidates and prepare PVFs our node doesn't have yet.
355371 if state. is_next_session_authority {
356- let code_hashes = prepare_pvfs_for_backed_candidates (
372+ let Some ( ref executor_params) = state. executor_params else { return } ;
373+ let waiting_code_hashes =
374+ collect_waiting_validation_code_hashes ( sender, leaf. hash , & state) . await ;
375+ state. waiting . extend ( waiting_code_hashes) ;
376+
377+ if state. pending . is_empty ( ) {
378+ let waiting = state. waiting . iter ( ) . cloned ( ) . collect ( ) ;
379+ let Ok ( pending) = validation_backend. ensure_pvf ( waiting, executor_params. clone ( ) ) . await
380+ else {
381+ gum:: warn!(
382+ target: LOG_TARGET ,
383+ relay_parent = ?leaf. hash,
384+ "cannot ensure prepared PVF" ,
385+ ) ;
386+ return
387+ } ;
388+ state. pending = HashSet :: from_iter ( pending) ;
389+ state. waiting . clear ( ) ;
390+ }
391+
392+ let code_hashes_to_process =
393+ state. pending . iter ( ) . cloned ( ) . take ( state. per_block_limit ) . collect :: < Vec < _ > > ( ) ;
394+ let processed_code_hashes = prepare_pvfs_for_backed_candidates (
357395 sender,
358396 validation_backend,
359397 leaf. hash ,
360- & state . already_prepared_code_hashes ,
361- state. per_block_limit ,
398+ code_hashes_to_process ,
399+ & state,
362400 )
363401 . await ;
364- state. already_prepared_code_hashes . extend ( code_hashes. unwrap_or_default ( ) ) ;
402+ for processed in processed_code_hashes {
403+ let _ = state. pending . remove ( & processed) ;
404+ let _ = state. processed . insert ( processed) ;
405+ }
365406 }
366407}
367408
@@ -441,14 +482,11 @@ where
441482 is_past_present_or_future_authority && !is_present_authority
442483}
443484
444- // Sends PVF with unknown code hashes to the validation host returning the list of code hashes sent.
445- async fn prepare_pvfs_for_backed_candidates < Sender > (
485+ async fn collect_waiting_validation_code_hashes < Sender > (
446486 sender : & mut Sender ,
447- mut validation_backend : impl ValidationBackend ,
448487 relay_parent : Hash ,
449- already_prepared : & HashSet < ValidationCodeHash > ,
450- per_block_limit : usize ,
451- ) -> Option < Vec < ValidationCodeHash > >
488+ state : & PrepareValidationState ,
489+ ) -> Vec < ValidationCodeHash >
452490where
453491 Sender : SubsystemSender < RuntimeApiMessage > ,
454492{
@@ -458,34 +496,43 @@ where
458496 ?relay_parent,
459497 "cannot fetch candidate events from runtime API" ,
460498 ) ;
461- return None
499+ return vec ! [ ] ;
462500 } ;
463501 let code_hashes = events
464502 . into_iter ( )
465503 . filter_map ( |e| match e {
466504 CandidateEvent :: CandidateBacked ( receipt, ..) => {
467505 let h = receipt. descriptor . validation_code_hash ;
468- if already_prepared. contains ( & h) {
506+ if state. waiting . contains ( & h) ||
507+ state. pending . contains ( & h) ||
508+ state. processed . contains ( & h)
509+ {
469510 None
470511 } else {
471512 Some ( h)
472513 }
473514 } ,
474515 _ => None ,
475516 } )
476- . take ( per_block_limit)
477517 . collect :: < Vec < _ > > ( ) ;
478518
479- let Ok ( executor_params) = util:: executor_params_at_relay_parent ( relay_parent, sender) . await
480- else {
481- gum:: warn!(
482- target: LOG_TARGET ,
483- ?relay_parent,
484- "cannot fetch executor params for the session" ,
485- ) ;
486- return None
487- } ;
488- let timeout = pvf_prep_timeout ( & executor_params, PvfPrepKind :: Prepare ) ;
519+ code_hashes
520+ }
521+
522+ // Sends PVF with unknown code hashes to the validation host returning the list of code hashes sent.
523+ async fn prepare_pvfs_for_backed_candidates < Sender > (
524+ sender : & mut Sender ,
525+ mut validation_backend : impl ValidationBackend ,
526+ relay_parent : Hash ,
527+ code_hashes : Vec < ValidationCodeHash > ,
528+ state : & PrepareValidationState ,
529+ ) -> Vec < ValidationCodeHash >
530+ where
531+ Sender : SubsystemSender < RuntimeApiMessage > ,
532+ {
533+ let Some ( ref executor_params) = state. executor_params else { return vec ! [ ] } ;
534+
535+ let timeout = pvf_prep_timeout ( executor_params, PvfPrepKind :: Prepare ) ;
489536
490537 let mut active_pvfs = vec ! [ ] ;
491538 let mut processed_code_hashes = vec ! [ ] ;
@@ -525,7 +572,7 @@ where
525572 }
526573
527574 if active_pvfs. is_empty ( ) {
528- return None
575+ return vec ! [ ]
529576 }
530577
531578 if let Err ( err) = validation_backend. heads_up ( active_pvfs) . await {
@@ -535,7 +582,7 @@ where
535582 ?err,
536583 "cannot prepare PVF for the next session" ,
537584 ) ;
538- return None
585+ return vec ! [ ]
539586 } ;
540587
541588 gum:: debug!(
@@ -545,7 +592,7 @@ where
545592 "Prepared PVF for the next session" ,
546593 ) ;
547594
548- Some ( processed_code_hashes)
595+ processed_code_hashes
549596}
550597
551598struct RuntimeRequestFailed ;
@@ -1144,6 +1191,12 @@ trait ValidationBackend {
11441191 async fn precheck_pvf ( & mut self , pvf : PvfPrepData ) -> Result < ( ) , PrepareError > ;
11451192
11461193 async fn heads_up ( & mut self , active_pvfs : Vec < PvfPrepData > ) -> Result < ( ) , String > ;
1194+
1195+ async fn ensure_pvf (
1196+ & mut self ,
1197+ code_hashes : Vec < ValidationCodeHash > ,
1198+ executor_params : ExecutorParams ,
1199+ ) -> Result < Vec < ValidationCodeHash > , String > ;
11471200}
11481201
11491202#[ async_trait]
@@ -1190,6 +1243,18 @@ impl ValidationBackend for ValidationHost {
11901243 async fn heads_up ( & mut self , active_pvfs : Vec < PvfPrepData > ) -> Result < ( ) , String > {
11911244 self . heads_up ( active_pvfs) . await
11921245 }
1246+
1247+ async fn ensure_pvf (
1248+ & mut self ,
1249+ code_hashes : Vec < ValidationCodeHash > ,
1250+ executor_params : ExecutorParams ,
1251+ ) -> Result < Vec < ValidationCodeHash > , String > {
1252+ let ( tx, rx) = oneshot:: channel ( ) ;
1253+ self . ensure_pvf ( code_hashes, executor_params, tx) . await ?;
1254+ let result = rx. await . map_err ( |err| err. to_string ( ) ) ?;
1255+
1256+ result
1257+ }
11931258}
11941259
11951260/// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks
0 commit comments