@@ -5370,6 +5370,252 @@ fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() {
53705370 } ) ;
53715371}
53725372
5373+ // Test that if the subsystem missed the triggering of some tranches because it was not running
5374+ // it launches the missed assignements on restart.
5375+ #[ test]
5376+ fn subsystem_launches_missed_assignments_on_restart ( ) {
5377+ let test_tranche = 20 ;
5378+ let assignment_criteria = Box :: new ( MockAssignmentCriteria (
5379+ move || {
5380+ let mut assignments = HashMap :: new ( ) ;
5381+ let _ = assignments. insert (
5382+ CoreIndex ( 0 ) ,
5383+ approval_db:: v2:: OurAssignment {
5384+ cert : garbage_assignment_cert_v2 ( AssignmentCertKindV2 :: RelayVRFDelay {
5385+ core_index : CoreIndex ( 0 ) ,
5386+ } ) ,
5387+ tranche : test_tranche,
5388+ validator_index : ValidatorIndex ( 0 ) ,
5389+ triggered : false ,
5390+ }
5391+ . into ( ) ,
5392+ ) ;
5393+
5394+ assignments
5395+ } ,
5396+ |_| Ok ( 0 ) ,
5397+ ) ) ;
5398+ let config = HarnessConfigBuilder :: default ( ) . assignment_criteria ( assignment_criteria) . build ( ) ;
5399+ let store = config. backend ( ) ;
5400+ let store_clone = config. backend ( ) ;
5401+
5402+ test_harness ( config, |test_harness| async move {
5403+ let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;
5404+
5405+ assert_matches ! (
5406+ overseer_recv( & mut virtual_overseer) . await ,
5407+ AllMessages :: ChainApi ( ChainApiMessage :: FinalizedBlockNumber ( rx) ) => {
5408+ rx. send( Ok ( 0 ) ) . unwrap( ) ;
5409+ }
5410+ ) ;
5411+
5412+ let block_hash = Hash :: repeat_byte ( 0x01 ) ;
5413+ let fork_block_hash = Hash :: repeat_byte ( 0x02 ) ;
5414+ let candidate_commitments = CandidateCommitments :: default ( ) ;
5415+ let mut candidate_receipt = dummy_candidate_receipt_v2 ( block_hash) ;
5416+ candidate_receipt. commitments_hash = candidate_commitments. hash ( ) ;
5417+ let candidate_hash = candidate_receipt. hash ( ) ;
5418+ let slot = Slot :: from ( 1 ) ;
5419+ let ( chain_builder, _session_info) = build_chain_with_two_blocks_with_one_candidate_each (
5420+ block_hash,
5421+ fork_block_hash,
5422+ slot,
5423+ sync_oracle_handle,
5424+ candidate_receipt,
5425+ )
5426+ . await ;
5427+ chain_builder. build ( & mut virtual_overseer) . await ;
5428+
5429+ assert ! ( !clock. inner. lock( ) . current_wakeup_is( 1 ) ) ;
5430+ clock. inner . lock ( ) . wakeup_all ( 1 ) ;
5431+
5432+ assert ! ( clock. inner. lock( ) . current_wakeup_is( slot_to_tick( slot) + test_tranche as u64 ) ) ;
5433+ clock. inner . lock ( ) . wakeup_all ( slot_to_tick ( slot) ) ;
5434+
5435+ futures_timer:: Delay :: new ( Duration :: from_millis ( 200 ) ) . await ;
5436+
5437+ clock. inner . lock ( ) . wakeup_all ( slot_to_tick ( slot + 2 ) ) ;
5438+
5439+ assert_eq ! ( clock. inner. lock( ) . wakeups. len( ) , 0 ) ;
5440+
5441+ futures_timer:: Delay :: new ( Duration :: from_millis ( 200 ) ) . await ;
5442+
5443+ let candidate_entry = store. load_candidate_entry ( & candidate_hash) . unwrap ( ) . unwrap ( ) ;
5444+ let our_assignment =
5445+ candidate_entry. approval_entry ( & block_hash) . unwrap ( ) . our_assignment ( ) . unwrap ( ) ;
5446+ assert ! ( !our_assignment. triggered( ) ) ;
5447+
5448+ // Assignment is not triggered because its tranches has not been reached.
5449+ virtual_overseer
5450+ } ) ;
5451+
5452+ // Restart a new approval voting subsystem with the same database and major syncing true until
5453+ // the last leaf.
5454+ let config = HarnessConfigBuilder :: default ( ) . backend ( store_clone) . major_syncing ( true ) . build ( ) ;
5455+
5456+ test_harness ( config, |test_harness| async move {
5457+ let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;
5458+ let slot = Slot :: from ( 1 ) ;
5459+ // 1. Set the clock to the to a tick past the tranche where the assignment should be
5460+ // triggered.
5461+ clock. inner . lock ( ) . set_tick ( slot_to_tick ( slot) + 2 * test_tranche as u64 ) ;
5462+ assert_matches ! (
5463+ overseer_recv( & mut virtual_overseer) . await ,
5464+ AllMessages :: ChainApi ( ChainApiMessage :: FinalizedBlockNumber ( rx) ) => {
5465+ rx. send( Ok ( 0 ) ) . unwrap( ) ;
5466+ }
5467+ ) ;
5468+
5469+ let block_hash = Hash :: repeat_byte ( 0x01 ) ;
5470+ let fork_block_hash = Hash :: repeat_byte ( 0x02 ) ;
5471+ let candidate_commitments = CandidateCommitments :: default ( ) ;
5472+ let mut candidate_receipt = dummy_candidate_receipt_v2 ( block_hash) ;
5473+ candidate_receipt. commitments_hash = candidate_commitments. hash ( ) ;
5474+ let ( chain_builder, session_info) = build_chain_with_two_blocks_with_one_candidate_each (
5475+ block_hash,
5476+ fork_block_hash,
5477+ slot,
5478+ sync_oracle_handle,
5479+ candidate_receipt,
5480+ )
5481+ . await ;
5482+
5483+ chain_builder. build ( & mut virtual_overseer) . await ;
5484+
5485+ futures_timer:: Delay :: new ( Duration :: from_millis ( 2000 ) ) . await ;
5486+
5487+ // On major syncing ending Approval voting should send all the necessary messages for a
5488+ // candidate to be approved.
5489+ assert_matches ! (
5490+ overseer_recv( & mut virtual_overseer) . await ,
5491+ AllMessages :: ApprovalDistribution ( ApprovalDistributionMessage :: NewBlocks (
5492+ _,
5493+ ) ) => {
5494+ }
5495+ ) ;
5496+
5497+ clock
5498+ . inner
5499+ . lock ( )
5500+ . wakeup_all ( slot_to_tick ( slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY - 1 ) ;
5501+
5502+ // Subsystem should not send any messages because the assignment is not triggered yet.
5503+ assert ! ( overseer_recv( & mut virtual_overseer) . timeout( TIMEOUT / 2 ) . await . is_none( ) ) ;
5504+
5505+ // Set the clock to the tick where the assignment should be triggered.
5506+ clock
5507+ . inner
5508+ . lock ( )
5509+ . wakeup_all ( slot_to_tick ( slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY ) ;
5510+
5511+ assert_matches ! (
5512+ overseer_recv( & mut virtual_overseer) . await ,
5513+ AllMessages :: RuntimeApi (
5514+ RuntimeApiMessage :: Request (
5515+ _,
5516+ RuntimeApiRequest :: SessionInfo ( _, si_tx) ,
5517+ )
5518+ ) => {
5519+ si_tx. send( Ok ( Some ( session_info. clone( ) ) ) ) . unwrap( ) ;
5520+ }
5521+ ) ;
5522+
5523+ assert_matches ! (
5524+ overseer_recv( & mut virtual_overseer) . await ,
5525+ AllMessages :: RuntimeApi (
5526+ RuntimeApiMessage :: Request (
5527+ _,
5528+ RuntimeApiRequest :: SessionExecutorParams ( _, si_tx) ,
5529+ )
5530+ ) => {
5531+ // Make sure all SessionExecutorParams calls are not made for the leaf (but for its relay parent)
5532+ si_tx. send( Ok ( Some ( ExecutorParams :: default ( ) ) ) ) . unwrap( ) ;
5533+ }
5534+ ) ;
5535+
5536+ assert_matches ! (
5537+ overseer_recv( & mut virtual_overseer) . await ,
5538+ AllMessages :: RuntimeApi (
5539+ RuntimeApiMessage :: Request ( _, RuntimeApiRequest :: NodeFeatures ( _, si_tx) , )
5540+ ) => {
5541+ si_tx. send( Ok ( NodeFeatures :: EMPTY ) ) . unwrap( ) ;
5542+ }
5543+ ) ;
5544+
5545+ assert_matches ! (
5546+ overseer_recv( & mut virtual_overseer) . await ,
5547+ AllMessages :: ApprovalDistribution ( ApprovalDistributionMessage :: DistributeAssignment (
5548+ _,
5549+ _,
5550+ ) ) => {
5551+ }
5552+ ) ;
5553+
5554+ // Guarantees the approval work has been relaunched.
5555+ recover_available_data ( & mut virtual_overseer) . await ;
5556+ fetch_validation_code ( & mut virtual_overseer) . await ;
5557+
5558+ assert_matches ! (
5559+ overseer_recv( & mut virtual_overseer) . await ,
5560+ AllMessages :: CandidateValidation ( CandidateValidationMessage :: ValidateFromExhaustive {
5561+ exec_kind,
5562+ response_sender,
5563+ ..
5564+ } ) if exec_kind == PvfExecKind :: Approval => {
5565+ response_sender. send( Ok ( ValidationResult :: Valid ( Default :: default ( ) , Default :: default ( ) ) ) )
5566+ . unwrap( ) ;
5567+ }
5568+ ) ;
5569+
5570+ assert_matches ! (
5571+ overseer_recv( & mut virtual_overseer) . await ,
5572+ AllMessages :: RuntimeApi ( RuntimeApiMessage :: Request ( _, RuntimeApiRequest :: ApprovalVotingParams ( _, sender) ) ) => {
5573+ let _ = sender. send( Ok ( ApprovalVotingParams {
5574+ max_approval_coalesce_count: 1 ,
5575+ } ) ) ;
5576+ }
5577+ ) ;
5578+
5579+ assert_matches ! (
5580+ overseer_recv( & mut virtual_overseer) . await ,
5581+ AllMessages :: ApprovalDistribution ( ApprovalDistributionMessage :: DistributeApproval ( _) )
5582+ ) ;
5583+
5584+ clock
5585+ . inner
5586+ . lock ( )
5587+ . wakeup_all ( slot_to_tick ( slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY ) ;
5588+
5589+ assert_matches ! (
5590+ overseer_recv( & mut virtual_overseer) . await ,
5591+ AllMessages :: ApprovalDistribution ( ApprovalDistributionMessage :: DistributeAssignment (
5592+ _,
5593+ _,
5594+ ) ) => {
5595+ }
5596+ ) ;
5597+
5598+ assert_matches ! (
5599+ overseer_recv( & mut virtual_overseer) . await ,
5600+ AllMessages :: RuntimeApi ( RuntimeApiMessage :: Request ( _, RuntimeApiRequest :: ApprovalVotingParams ( _, sender) ) ) => {
5601+ let _ = sender. send( Ok ( ApprovalVotingParams {
5602+ max_approval_coalesce_count: 1 ,
5603+ } ) ) ;
5604+ }
5605+ ) ;
5606+
5607+ assert_matches ! (
5608+ overseer_recv( & mut virtual_overseer) . await ,
5609+ AllMessages :: ApprovalDistribution ( ApprovalDistributionMessage :: DistributeApproval ( _) )
5610+ ) ;
5611+
5612+ // Assert that there are no more messages being sent by the subsystem
5613+ assert ! ( overseer_recv( & mut virtual_overseer) . timeout( TIMEOUT / 2 ) . await . is_none( ) ) ;
5614+
5615+ virtual_overseer
5616+ } ) ;
5617+ }
5618+
53735619// Test we correctly update the timer when we mark the beginning of gathering assignments.
53745620#[ test]
53755621fn test_gathering_assignments_statements ( ) {
0 commit comments