Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 102 additions & 47 deletions node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::{
channel::oneshot,
future::{BoxFuture, Fuse, FusedFuture},
select,
stream::FuturesUnordered,
stream::{FusedStream, FuturesUnordered},
FutureExt, StreamExt,
};
use futures_timer::Delay;
Expand Down Expand Up @@ -94,6 +94,11 @@ const ACTIVITY_POLL: Duration = Duration::from_secs(1);
#[cfg(test)]
const ACTIVITY_POLL: Duration = Duration::from_millis(10);

// How often to poll collation responses.
// This is a hack that should be removed in a refactoring.
// See https://github.com/paritytech/polkadot/issues/4182
const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5);

#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);

Expand Down Expand Up @@ -467,6 +472,10 @@ impl ActiveParas {
fn is_current_or_next(&self, id: ParaId) -> bool {
self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id)
}

fn is_current(&self, id: &ParaId) -> bool {
self.current_assignments.contains_key(id)
}
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
Expand Down Expand Up @@ -886,6 +895,20 @@ async fn process_incoming_peer_message<Context>(
Some(p) => p,
};

if let PeerState::Collating(ref collating_state) = peer_data.state {
let para_id = collating_state.para_id;
if !state.active_paras.is_current(&para_id) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Received advertise collation, but we are assigned to the next group",
);
return
}
}

match peer_data.insert_advertisement(relay_parent, &state.view) {
Ok((id, para_id)) => {
tracing::debug!(
Expand Down Expand Up @@ -1136,6 +1159,13 @@ async fn wait_until_next_check(last_poll: Instant) -> Instant {
Instant::now()
}

fn infinite_stream(every: Duration) -> impl FusedStream<Item = ()> {
futures::stream::unfold(Instant::now() + every, |next_check| async move {
Some(((), wait_until_next_check(next_check).await))
})
.fuse()
}

/// The main run loop.
pub(crate) async fn run<Context>(
mut ctx: Context,
Expand All @@ -1147,18 +1177,14 @@ where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
use OverseerSignal::*;

let mut state = State { metrics, ..Default::default() };

let next_inactivity_stream =
futures::stream::unfold(Instant::now() + ACTIVITY_POLL, |next_check| async move {
Some(((), wait_until_next_check(next_check).await))
})
.fuse();

let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
futures::pin_mut!(next_inactivity_stream);

let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL);
futures::pin_mut!(check_collations_stream);

loop {
select! {
res = ctx.recv().fuse() => {
Expand All @@ -1172,8 +1198,8 @@ where
&mut state,
).await;
}
Ok(FromOverseer::Signal(Conclude)) => break,
_ => {},
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) | Err(_) => break,
Ok(FromOverseer::Signal(_)) => continue,
}
}
_ = next_inactivity_stream.next() => {
Expand All @@ -1191,26 +1217,45 @@ where
);
dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await;
}
_ = check_collations_stream.next() => {
let reputation_changes = poll_requests(
&mut state.requested_collations,
&state.metrics,
&state.span_per_relay_parent,
).await;

for (peer_id, rep) in reputation_changes {
modify_reputation(&mut ctx, peer_id, rep).await;
}
},
}
}

let mut retained_requested = HashSet::new();
for (pending_collation, per_req) in state.requested_collations.iter_mut() {
// Despite the await, this won't block on the response itself.
let finished = poll_collation_response(
&mut ctx,
&state.metrics,
&state.span_per_relay_parent,
pending_collation,
per_req,
)
.await;
if !finished {
retained_requested.insert(pending_collation.clone());
}
Ok(())
}

async fn poll_requests(
requested_collations: &mut HashMap<PendingCollation, PerRequest>,
metrics: &Metrics,
span_per_relay_parent: &HashMap<Hash, PerLeafSpan>,
) -> Vec<(PeerId, Rep)> {
let mut retained_requested = HashSet::new();
let mut reputation_changes = Vec::new();
for (pending_collation, per_req) in requested_collations.iter_mut() {
// Despite the await, this won't block on the response itself.
let result =
poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req)
.await;

if !result.is_ready() {
retained_requested.insert(pending_collation.clone());
}
if let CollationFetchResult::Error(rep) = result {
reputation_changes.push((pending_collation.peer_id.clone(), rep));
}
state.requested_collations.retain(|k, _| retained_requested.contains(k));
}
Ok(())
requested_collations.retain(|k, _| retained_requested.contains(k));
reputation_changes
}

/// Dequeue another collation and fetch.
Expand Down Expand Up @@ -1314,29 +1359,38 @@ async fn disconnect_inactive_peers<Context>(
}
}

enum CollationFetchResult {
/// The collation is still being fetched.
Pending,
/// The collation was fetched successfully.
Success,
/// An error occurred when fetching a collation or it was invalid.
/// A reputation change should be applied to the peer.
Error(Rep),
}

impl CollationFetchResult {
fn is_ready(&self) -> bool {
!matches!(self, Self::Pending)
}
}

/// Poll collation response, return immediately if there is none.
///
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
/// Ready responses are handled, by logging and by
/// forwarding proper responses to the requester.
///
/// Returns: `true` if `from_collator` future was ready.
async fn poll_collation_response<Context>(
ctx: &mut Context,
async fn poll_collation_response(
metrics: &Metrics,
spans: &HashMap<Hash, PerLeafSpan>,
pending_collation: &PendingCollation,
per_req: &mut PerRequest,
) -> bool
where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext,
{
) -> CollationFetchResult {
if never!(per_req.from_collator.is_terminated()) {
tracing::error!(
target: LOG_TARGET,
"We remove pending responses once received, this should not happen."
);
return true
return CollationFetchResult::Success
}

if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) {
Expand All @@ -1348,7 +1402,7 @@ where
let mut metrics_result = Err(());
let mut success = "false";

match response {
let result = match response {
Err(RequestError::InvalidResponse(err)) => {
tracing::warn!(
target: LOG_TARGET,
Expand All @@ -1358,8 +1412,7 @@ where
err = ?err,
"Collator provided response that could not be decoded"
);
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_CORRUPTED_MESSAGE)
.await;
CollationFetchResult::Error(COST_CORRUPTED_MESSAGE)
},
Err(RequestError::NetworkError(err)) => {
tracing::debug!(
Expand All @@ -1374,7 +1427,7 @@ where
// sensible. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalties on timeouts, which we also have.
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_NETWORK_ERROR).await;
CollationFetchResult::Error(COST_NETWORK_ERROR)
},
Err(RequestError::Canceled(_)) => {
tracing::debug!(
Expand All @@ -1388,8 +1441,7 @@ where
// sensible. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalties on timeouts, which we also have.
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_REQUEST_TIMED_OUT)
.await;
CollationFetchResult::Error(COST_REQUEST_TIMED_OUT)
},
Ok(CollationFetchingResponse::Collation(receipt, _))
if receipt.descriptor().para_id != pending_collation.para_id =>
Expand All @@ -1402,7 +1454,7 @@ where
"Got wrong para ID for requested collation."
);

modify_reputation(ctx, pending_collation.peer_id.clone(), COST_WRONG_PARA).await;
CollationFetchResult::Error(COST_WRONG_PARA)
}
Ok(CollationFetchingResponse::Collation(receipt, pov)) => {
tracing::debug!(
Expand Down Expand Up @@ -1430,12 +1482,15 @@ where
metrics_result = Ok(());
success = "true";
}

CollationFetchResult::Success
},
};
metrics.on_request(metrics_result);
per_req.span.as_mut().map(|s| s.add_string_tag("success", success));
true

result
} else {
false
CollationFetchResult::Pending
}
}
40 changes: 40 additions & 0 deletions node/network/collator-protocol/src/validator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,46 @@ fn fetch_collations_works() {
});
}

#[test]
fn dont_fetch_collation_if_assigned_to_next_group() {
let test_state = TestState::default();

test_harness(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;

overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
our_view![test_state.relay_parent],
)),
)
.await;

respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;

let peer_b = PeerId::random();

connect_and_declare_collator(
&mut virtual_overseer,
peer_b.clone(),
test_state.collators[0].clone(),
test_state.chain_ids[1].clone(), // next, not current para_id
)
.await;

advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await;

assert!(
overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30))
.await
.is_none(),
"There should be no PoV fetching request.",
);

virtual_overseer
})
}

// Ensure that we fetch a second collation, after the first checked collation was found to be invalid.
#[test]
fn fetch_next_collation_on_invalid_collation() {
Expand Down