From 33c0a5c3b1ea0ae60940330a59d4f355a2dd78bf Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 29 Oct 2021 15:43:06 +0200 Subject: [PATCH 1/8] collator-protocol/validator: do not wait 1s to poll requested collations --- node/network/collator-protocol/src/validator_side/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index d9e1de062552..2eaf65fc9398 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -1191,6 +1191,7 @@ where ); dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await; } + default => {}, // if no future is ready, poll requested collations } let mut retained_requested = HashSet::new(); From 6381ed07451387a95df067e5252dc233320c210c Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 29 Oct 2021 15:55:58 +0200 Subject: [PATCH 2/8] collator-protocol/validator: do not request collation for the next group --- .../src/validator_side/mod.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 2eaf65fc9398..5758a40e8194 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -467,6 +467,10 @@ impl ActiveParas { fn is_current_or_next(&self, id: ParaId) -> bool { self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id) } + + fn is_current(&self, id: &ParaId) -> bool { + self.current_assignments.contains_key(id) + } } #[derive(Debug, Clone, Hash, Eq, PartialEq)] @@ -886,6 +890,20 @@ async fn process_incoming_peer_message( Some(p) => p, }; + if let PeerState::Collating(ref collating_state) = peer_data.state { + let para_id = collating_state.para_id; + if !state.active_paras.is_current(¶_id) { + tracing::debug!( + target: LOG_TARGET, + peer_id = ?origin, + %para_id, + ?relay_parent, + "Received advertise collation, but we are assigned to the next group", + ); + return + } + } + match peer_data.insert_advertisement(relay_parent, &state.view) { Ok((id, para_id)) => { tracing::debug!( From 01ff32c8b4bd063c59dc0bf0652ed50977935b78 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 29 Oct 2021 17:18:44 +0200 Subject: [PATCH 3/8] put everything into select --- .../src/validator_side/mod.rs | 87 +++++++++++-------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 5758a40e8194..206516626211 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -1190,8 +1190,8 @@ where &mut state, ).await; } - Ok(FromOverseer::Signal(Conclude)) => break, - _ => {}, + Ok(FromOverseer::Signal(Conclude)) | Err(_) => break, + Ok(FromOverseer::Signal(_)) => continue, } } _ = next_inactivity_stream.next() => { @@ -1209,29 +1209,49 @@ where ); dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await; } - default => {}, // if no future is ready, poll requested collations - } - - let mut retained_requested = HashSet::new(); - for (pending_collation, per_req) in state.requested_collations.iter_mut() { - // Despite the await, this won't block on the response itself. - let finished = poll_collation_response( - &mut ctx, + reputation_changes = advance_collations( + &mut state.requested_collations, &state.metrics, &state.span_per_relay_parent, - pending_collation, - per_req, - ) - .await; - if !finished { - retained_requested.insert(pending_collation.clone()); - } + ).fuse() => { + for (peer_id, rep) in reputation_changes { + modify_reputation(&mut ctx, peer_id, rep).await; + } + }, } - state.requested_collations.retain(|k, _| retained_requested.contains(k)); } + Ok(()) } +async fn advance_collations( + requested_collations: &mut HashMap, + metrics: &Metrics, + span_per_relay_parent: &HashMap, +) -> Vec<(PeerId, Rep)> { + let mut retained_requested = HashSet::new(); + let mut reputation_changes = Vec::new(); + for (pending_collation, per_req) in requested_collations.iter_mut() { + // Despite the await, this won't block on the response itself. + let (finished, maybe_rep) = poll_collation_response( + metrics, + span_per_relay_parent, + pending_collation, + per_req, + ) + .await; + + if !finished { + retained_requested.insert(pending_collation.clone()); + } + if let Some(rep) = maybe_rep { + reputation_changes.push((pending_collation.peer_id.clone(), rep)); + } + } + requested_collations.retain(|k, _| retained_requested.contains(k)); + reputation_changes +} + /// Dequeue another collation and fetch. async fn dequeue_next_collation_and_fetch( ctx: &mut (impl SubsystemContext @@ -1338,24 +1358,19 @@ async fn disconnect_inactive_peers( /// Ready responses are handled, by logging and decreasing peer's reputation on error and by /// forwarding proper responses to the requester. /// -/// Returns: `true` if `from_collator` future was ready. -async fn poll_collation_response( - ctx: &mut Context, +/// Returns: `true` if `from_collator` future was ready and maybe a reputation change. +async fn poll_collation_response( metrics: &Metrics, spans: &HashMap, pending_collation: &PendingCollation, per_req: &mut PerRequest, -) -> bool -where - Context: overseer::SubsystemContext, - Context: SubsystemContext, -{ +) -> (bool, Option) { if never!(per_req.from_collator.is_terminated()) { tracing::error!( target: LOG_TARGET, "We remove pending responses once received, this should not happen." ); - return true + return (true, None) } if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) { @@ -1367,7 +1382,7 @@ where let mut metrics_result = Err(()); let mut success = "false"; - match response { + let reputation_change = match response { Err(RequestError::InvalidResponse(err)) => { tracing::warn!( target: LOG_TARGET, @@ -1377,8 +1392,7 @@ where err = ?err, "Collator provided response that could not be decoded" ); - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_CORRUPTED_MESSAGE) - .await; + Some(COST_CORRUPTED_MESSAGE) }, Err(RequestError::NetworkError(err)) => { tracing::debug!( @@ -1393,7 +1407,7 @@ where // sensible. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalties on timeouts, which we also have. - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_NETWORK_ERROR).await; + Some(COST_NETWORK_ERROR) }, Err(RequestError::Canceled(_)) => { tracing::debug!( @@ -1407,8 +1421,7 @@ where // sensible. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalties on timeouts, which we also have. - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_REQUEST_TIMED_OUT) - .await; + Some(COST_REQUEST_TIMED_OUT) }, Ok(CollationFetchingResponse::Collation(receipt, _)) if receipt.descriptor().para_id != pending_collation.para_id => @@ -1421,7 +1434,7 @@ where "Got wrong para ID for requested collation." ); - modify_reputation(ctx, pending_collation.peer_id.clone(), COST_WRONG_PARA).await; + Some(COST_WRONG_PARA) } Ok(CollationFetchingResponse::Collation(receipt, pov)) => { tracing::debug!( @@ -1449,12 +1462,14 @@ where metrics_result = Ok(()); success = "true"; } + + None }, }; metrics.on_request(metrics_result); per_req.span.as_mut().map(|s| s.add_string_tag("success", success)); - true + (true, reputation_change) } else { - false + (false, None) } } From aeb6a92ef30fa730c687f6a6ee9d1271a070bc8d Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 29 Oct 2021 17:24:47 +0200 Subject: [PATCH 4/8] fmt --- .../collator-protocol/src/validator_side/mod.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 206516626211..4584f1fcf23a 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -1233,13 +1233,9 @@ async fn advance_collations( let mut reputation_changes = Vec::new(); for (pending_collation, per_req) in requested_collations.iter_mut() { // Despite the await, this won't block on the response itself. - let (finished, maybe_rep) = poll_collation_response( - metrics, - span_per_relay_parent, - pending_collation, - per_req, - ) - .await; + let (finished, maybe_rep) = + poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req) + .await; if !finished { retained_requested.insert(pending_collation.clone()); From 012c9224a4cc1d4f7dc32466114bcbee576ea717 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 29 Oct 2021 18:58:39 +0200 Subject: [PATCH 5/8] more hacks yay --- .../src/validator_side/mod.rs | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 4584f1fcf23a..9eeffc70316e 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -19,7 +19,7 @@ use futures::{ channel::oneshot, future::{BoxFuture, Fuse, FusedFuture}, select, - stream::FuturesUnordered, + stream::{FusedStream, FuturesUnordered}, FutureExt, StreamExt, }; use futures_timer::Delay; @@ -94,6 +94,10 @@ const ACTIVITY_POLL: Duration = Duration::from_secs(1); #[cfg(test)] const ACTIVITY_POLL: Duration = Duration::from_millis(10); +// How often to poll collation responses. +// This is a hack that should be removed in a refactoring. +const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5); + #[derive(Clone, Default)] pub struct Metrics(Option); @@ -1154,6 +1158,13 @@ async fn wait_until_next_check(last_poll: Instant) -> Instant { Instant::now() } +fn infinite_stream(every: Duration) -> impl FusedStream { + futures::stream::unfold(Instant::now() + every, |next_check| async move { + Some(((), wait_until_next_check(next_check).await)) + }) + .fuse() +} + /// The main run loop. pub(crate) async fn run( mut ctx: Context, @@ -1165,18 +1176,14 @@ where Context: overseer::SubsystemContext, Context: SubsystemContext, { - use OverseerSignal::*; - let mut state = State { metrics, ..Default::default() }; - let next_inactivity_stream = - futures::stream::unfold(Instant::now() + ACTIVITY_POLL, |next_check| async move { - Some(((), wait_until_next_check(next_check).await)) - }) - .fuse(); - + let next_inactivity_stream = infinite_stream(ACTIVITY_POLL); futures::pin_mut!(next_inactivity_stream); + let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL); + futures::pin_mut!(check_collations_stream); + loop { select! { res = ctx.recv().fuse() => { @@ -1190,7 +1197,7 @@ where &mut state, ).await; } - Ok(FromOverseer::Signal(Conclude)) | Err(_) => break, + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) | Err(_) => break, Ok(FromOverseer::Signal(_)) => continue, } } @@ -1209,11 +1216,13 @@ where ); dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await; } - reputation_changes = advance_collations( - &mut state.requested_collations, - &state.metrics, - &state.span_per_relay_parent, - ).fuse() => { + _ = check_collations_stream.next() => { + let reputation_changes = poll_requests( + &mut state.requested_collations, + &state.metrics, + &state.span_per_relay_parent, + ).await; + for (peer_id, rep) in reputation_changes { modify_reputation(&mut ctx, peer_id, rep).await; } @@ -1224,7 +1233,7 @@ where Ok(()) } -async fn advance_collations( +async fn poll_requests( requested_collations: &mut HashMap, metrics: &Metrics, span_per_relay_parent: &HashMap, @@ -1351,7 +1360,7 @@ async fn disconnect_inactive_peers( /// Poll collation response, return immediately if there is none. /// -/// Ready responses are handled, by logging and decreasing peer's reputation on error and by +/// Ready responses are handled, by logging and by /// forwarding proper responses to the requester. /// /// Returns: `true` if `from_collator` future was ready and maybe a reputation change. From cbba13a49d165654c23dd8a60f6882873d66cffa Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 29 Oct 2021 19:21:43 +0200 Subject: [PATCH 6/8] a test --- .../src/validator_side/tests.rs | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/node/network/collator-protocol/src/validator_side/tests.rs b/node/network/collator-protocol/src/validator_side/tests.rs index a4bc8cac682e..7c110d67feed 100644 --- a/node/network/collator-protocol/src/validator_side/tests.rs +++ b/node/network/collator-protocol/src/validator_side/tests.rs @@ -674,6 +674,46 @@ fn fetch_collations_works() { }); } +#[test] +fn dont_fetch_collation_if_assigned_to_next_group() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + our_view![test_state.relay_parent], + )), + ) + .await; + + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + let peer_b = PeerId::random(); + + connect_and_declare_collator( + &mut virtual_overseer, + peer_b.clone(), + test_state.collators[0].clone(), + test_state.chain_ids[1].clone(), // next, not current para_id + ) + .await; + + advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await; + + assert!( + overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30)) + .await + .is_none(), + "There should be no PoV fetching request.", + ); + + virtual_overseer + }) +} + // Ensure that we fetch a second collation, after the first checked collation was found to be invalid. #[test] fn fetch_next_collation_on_invalid_collation() { From 8bc9321a831173436dc4e1d9c8152e32c7517ee5 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Sat, 30 Oct 2021 17:16:24 +0200 Subject: [PATCH 7/8] review nits --- .../src/validator_side/mod.rs | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 9eeffc70316e..b53c099b630f 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -96,6 +96,7 @@ const ACTIVITY_POLL: Duration = Duration::from_millis(10); // How often to poll collation responses. // This is a hack that should be removed in a refactoring. +// See https://github.com/paritytech/polkadot/issues/4182 const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5); #[derive(Clone, Default)] @@ -1242,14 +1243,14 @@ async fn poll_requests( let mut reputation_changes = Vec::new(); for (pending_collation, per_req) in requested_collations.iter_mut() { // Despite the await, this won't block on the response itself. - let (finished, maybe_rep) = + let result = poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req) .await; - if !finished { + if !result.is_ready() { retained_requested.insert(pending_collation.clone()); } - if let Some(rep) = maybe_rep { + if let CollationFetchResult::Error(rep) = result { reputation_changes.push((pending_collation.peer_id.clone(), rep)); } } @@ -1358,6 +1359,22 @@ async fn disconnect_inactive_peers( } } +enum CollationFetchResult { + /// The collation is still being fetched. + Pending, + /// The collation was fetched successfully. + Success, + /// An error occurred when fetching a collation or it was invalid. + /// A reputation change should be applied to the peer. + Error(Rep), +} + +impl CollationFetchResult { + fn is_ready(&self) -> bool { + !matches!(self, Self::Pending) + } +} + /// Poll collation response, return immediately if there is none. /// /// Ready responses are handled, by logging and by @@ -1369,13 +1386,13 @@ async fn poll_collation_response( spans: &HashMap, pending_collation: &PendingCollation, per_req: &mut PerRequest, -) -> (bool, Option) { +) -> CollationFetchResult { if never!(per_req.from_collator.is_terminated()) { tracing::error!( target: LOG_TARGET, "We remove pending responses once received, this should not happen." ); - return (true, None) + return CollationFetchResult::Success } if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) { @@ -1387,7 +1404,7 @@ async fn poll_collation_response( let mut metrics_result = Err(()); let mut success = "false"; - let reputation_change = match response { + let result = match response { Err(RequestError::InvalidResponse(err)) => { tracing::warn!( target: LOG_TARGET, @@ -1397,7 +1414,7 @@ async fn poll_collation_response( err = ?err, "Collator provided response that could not be decoded" ); - Some(COST_CORRUPTED_MESSAGE) + CollationFetchResult::Error(COST_CORRUPTED_MESSAGE) }, Err(RequestError::NetworkError(err)) => { tracing::debug!( @@ -1412,7 +1429,7 @@ async fn poll_collation_response( // sensible. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalties on timeouts, which we also have. - Some(COST_NETWORK_ERROR) + CollationFetchResult::Error(COST_NETWORK_ERROR) }, Err(RequestError::Canceled(_)) => { tracing::debug!( @@ -1426,7 +1443,7 @@ async fn poll_collation_response( // sensible. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalties on timeouts, which we also have. - Some(COST_REQUEST_TIMED_OUT) + CollationFetchResult::Error(COST_REQUEST_TIMED_OUT) }, Ok(CollationFetchingResponse::Collation(receipt, _)) if receipt.descriptor().para_id != pending_collation.para_id => @@ -1439,7 +1456,7 @@ async fn poll_collation_response( "Got wrong para ID for requested collation." ); - Some(COST_WRONG_PARA) + CollationFetchResult::Error(COST_WRONG_PARA) } Ok(CollationFetchingResponse::Collation(receipt, pov)) => { tracing::debug!( @@ -1468,13 +1485,14 @@ async fn poll_collation_response( success = "true"; } - None + CollationFetchResult::Success }, }; metrics.on_request(metrics_result); per_req.span.as_mut().map(|s| s.add_string_tag("success", success)); - (true, reputation_change) + + result } else { - (false, None) + CollationFetchResult::Pending } } From 5aea24f14661fd516c207ea5fb2e81882231c92e Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Sat, 30 Oct 2021 19:09:23 +0200 Subject: [PATCH 8/8] remove outdated comment --- node/network/collator-protocol/src/validator_side/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index b53c099b630f..0b050d3c5e19 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -1379,8 +1379,6 @@ impl CollationFetchResult { /// /// Ready responses are handled, by logging and by /// forwarding proper responses to the requester. -/// -/// Returns: `true` if `from_collator` future was ready and maybe a reputation change. async fn poll_collation_response( metrics: &Metrics, spans: &HashMap,