2121use std:: {
2222 collections:: { HashMap , VecDeque } ,
2323 pin:: Pin ,
24+ time:: Duration ,
2425} ;
2526
2627use futures:: {
@@ -41,7 +42,7 @@ use polkadot_node_network_protocol::{
4142 IfDisconnected ,
4243} ;
4344use polkadot_node_primitives:: { AvailableData , ErasureChunk } ;
44- use polkadot_node_subsystem_util:: request_session_info;
45+ use polkadot_node_subsystem_util:: { request_session_info, TimeoutExt } ;
4546use polkadot_primitives:: v1:: {
4647 AuthorityDiscoveryId , BlakeTwo256 , BlockNumber , CandidateHash , CandidateReceipt , GroupIndex ,
4748 Hash , HashT , SessionIndex , SessionInfo , ValidatorId , ValidatorIndex ,
@@ -68,6 +69,12 @@ const N_PARALLEL: usize = 50;
6869// Size of the LRU cache where we keep recovered data.
6970const LRU_SIZE : usize = 16 ;
7071
72+ const COST_INVALID_REQUEST : Rep = Rep :: CostMajor ( "Peer sent unparsable request" ) ;
73+
74+ /// Max time we want to wait for responses, before calling `launch_parallel_requests` again to fill
75+ /// up slots.
76+ const MAX_CHUNK_WAIT : Duration = Duration :: from_secs ( 1 ) ;
77+
7178/// The Availability Recovery Subsystem.
7279pub struct AvailabilityRecoverySubsystem {
7380 fast_path : bool ,
@@ -279,7 +286,11 @@ impl RequestChunksPhase {
279286
280287 async fn wait_for_chunks ( & mut self , params : & InteractionParams ) {
281288 // Wait for all current requests to conclude or time-out, or until we reach enough chunks.
282- while let Some ( request_result) = self . requesting_chunks . next ( ) . await {
289+ // We will also stop, if there has not been a response for `MAX_CHUNK_WAIT`, so
290+ // `launch_parallel_requests` cann fill up slots again.
291+ while let Some ( request_result) =
292+ self . requesting_chunks . next ( ) . timeout ( MAX_CHUNK_WAIT ) . await . flatten ( )
293+ {
283294 match request_result {
284295 Ok ( Some ( chunk) ) => {
285296 // Check merkle proofs of any received chunks.
0 commit comments