Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ async fn construct_and_distribute_receipt(
} = collation;

let persisted_validation_data_hash = validation_data.hash();
let parent_head_data = validation_data.parent_head.clone();
let parent_head_data_hash = validation_data.parent_head.hash();

// Apply compression to the block data.
Expand Down Expand Up @@ -551,12 +552,13 @@ async fn construct_and_distribute_receipt(
metrics.on_collation_generated();

sender
.send_message(CollatorProtocolMessage::DistributeCollation(
ccr,
.send_message(CollatorProtocolMessage::DistributeCollation {
candidate_receipt: ccr,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
))
})
.await;
}

Expand Down
33 changes: 17 additions & 16 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ fn sends_distribute_collation_message() {

assert_eq!(to_collator_protocol.len(), 1);
match AllMessages::from(to_collator_protocol.pop().unwrap()) {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
)) => {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
..
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
// signature generation is non-deterministic, so we can't just assert that the
// expected descriptor is correct. What we can do is validate that the produced
// descriptor has a valid signature, then just copy in the generated signature
Expand Down Expand Up @@ -529,11 +529,11 @@ fn fallback_when_no_validation_code_hash_api() {

assert_eq!(to_collator_protocol.len(), 1);
match &to_collator_protocol[0] {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
)) => {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
..
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
assert_eq!(expect_validation_code_hash, descriptor.validation_code_hash);
},
_ => panic!("received wrong message type"),
Expand Down Expand Up @@ -619,15 +619,16 @@ fn submit_collation_leads_to_distribution() {

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
ccr,
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
parent_head_data_hash,
..
)) => {
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
assert_eq!(parent_head_data_hash, parent_head.hash());
assert_eq!(ccr.descriptor().persisted_validation_data_hash, expected_pvd.hash());
assert_eq!(ccr.descriptor().para_head, dummy_head_data().hash());
assert_eq!(ccr.descriptor().validation_code_hash, validation_code_hash);
assert_eq!(descriptor.persisted_validation_data_hash, expected_pvd.hash());
assert_eq!(descriptor.para_head, dummy_head_data().hash());
assert_eq!(descriptor.validation_code_hash, validation_code_hash);
}
);

Expand Down
5 changes: 3 additions & 2 deletions polkadot/node/core/prospective-parachains/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,9 @@ fn answer_prospective_validation_data_request(
Some(s) => s,
};

let mut head_data =
storage.head_data_by_hash(&request.parent_head_data_hash).map(|x| x.clone());
let mut head_data = request
.maybe_parent_head_data
.or_else(|| storage.head_data_by_hash(&request.parent_head_data_hash).map(|x| x.clone()));
let mut relay_parent_info = None;
let mut max_pov_size = None;

Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/prospective-parachains/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ async fn get_pvd(
para_id,
candidate_relay_parent,
parent_head_data_hash: parent_head_data.hash(),
maybe_parent_head_data: None,
};
let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use polkadot_node_network_protocol::{
PeerId,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, HeadData, Id as ParaId};

/// The status of a collation as seen from the collator.
pub enum CollationStatus {
Expand Down Expand Up @@ -63,6 +63,8 @@ pub struct Collation {
pub parent_head_data_hash: Hash,
/// Proof to verify the state transition of the parachain.
pub pov: PoV,
/// Parent head-data needed for elastic scaling.
pub parent_head_data: HeadData,
/// Collation status.
pub status: CollationStatus,
}
Expand Down
65 changes: 47 additions & 18 deletions polkadot/node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
GroupIndex, Hash, Id as ParaId, SessionIndex,
GroupIndex, Hash, HeadData, Id as ParaId, SessionIndex,
};

use super::LOG_TARGET;
Expand Down Expand Up @@ -347,6 +347,7 @@ async fn distribute_collation<Context>(
receipt: CandidateReceipt,
parent_head_data_hash: Hash,
pov: PoV,
parent_head_data: HeadData,
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
) -> Result<()> {
let candidate_relay_parent = receipt.descriptor.relay_parent;
Expand Down Expand Up @@ -465,7 +466,13 @@ async fn distribute_collation<Context>(

per_relay_parent.collations.insert(
candidate_hash,
Collation { receipt, parent_head_data_hash, pov, status: CollationStatus::Created },
Collation {
receipt,
parent_head_data_hash,
pov,
parent_head_data,
status: CollationStatus::Created,
},
);

// If prospective parachains are disabled, a leaf should be known to peer.
Expand Down Expand Up @@ -763,20 +770,26 @@ async fn process_msg<Context>(
CollateOn(id) => {
state.collating_on = Some(id);
},
DistributeCollation(receipt, parent_head_data_hash, pov, result_sender) => {
DistributeCollation {
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
} => {
let _span1 = state
.span_per_relay_parent
.get(&receipt.descriptor.relay_parent)
.get(&candidate_receipt.descriptor.relay_parent)
.map(|s| s.child("distributing-collation"));
let _span2 = jaeger::Span::new(&pov, "distributing-collation");

match state.collating_on {
Some(id) if receipt.descriptor.para_id != id => {
Some(id) if candidate_receipt.descriptor.para_id != id => {
// If the ParaId of a collation requested to be distributed does not match
// the one we expect, we ignore the message.
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
collating_on = %id,
"DistributeCollation for unexpected para_id",
);
Expand All @@ -788,17 +801,18 @@ async fn process_msg<Context>(
runtime,
state,
id,
receipt,
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
)
.await?;
},
None => {
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
"DistributeCollation message while not collating on any",
);
},
Expand Down Expand Up @@ -835,20 +849,30 @@ async fn send_collation(
request: VersionedCollationRequest,
receipt: CandidateReceipt,
pov: PoV,
_parent_head_data: HeadData,
) {
let (tx, rx) = oneshot::channel();

let relay_parent = request.relay_parent();
let peer_id = request.peer_id();
let candidate_hash = receipt.hash();

// The response payload is the same for both versions of protocol
// The response payload is the same for v1 and v2 versions of protocol
// and doesn't have v2 alias for simplicity.
let response = OutgoingResponse {
result: Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov)),
reputation_changes: Vec::new(),
sent_feedback: Some(tx),
};
// For now, we don't send parent head data to the collation requester.
let result =
// if assigned_multiple_cores {
// Ok(request_v1::CollationFetchingResponse::CollationWithParentHeadData {
// receipt,
// pov,
// parent_head_data,
// })
// } else {
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov))
// }
;
let response =
OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };

if let Err(_) = request.send_outgoing_response(response) {
gum::warn!(target: LOG_TARGET, "Sending collation response failed");
Expand Down Expand Up @@ -1027,9 +1051,13 @@ async fn handle_incoming_request<Context>(
return Ok(())
},
};
let (receipt, pov) = if let Some(collation) = collation {
let (receipt, pov, parent_head_data) = if let Some(collation) = collation {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
(
collation.receipt.clone(),
collation.pov.clone(),
collation.parent_head_data.clone(),
)
} else {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1068,7 +1096,7 @@ async fn handle_incoming_request<Context>(
waiting.collation_fetch_active = true;
// Obtain a timer for sending collation
let _ = state.metrics.time_collation_distribution("send");
send_collation(state, req, receipt, pov).await;
send_collation(state, req, receipt, pov, parent_head_data).await;
}
},
Some(our_para_id) => {
Expand Down Expand Up @@ -1453,8 +1481,9 @@ async fn run_inner<Context>(
if let Some(collation) = next_collation {
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
let parent_head_data = collation.parent_head_data.clone();

send_collation(&mut state, next, receipt, pov).await;
send_collation(&mut state, next, receipt, pov, parent_head_data).await;
}
},
(candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,13 @@ async fn distribute_collation_with_receipt(
) -> DistributeCollation {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(
candidate.clone(),
CollatorProtocolMessage::DistributeCollation {
candidate_receipt: candidate.clone(),
parent_head_data_hash,
pov.clone(),
None,
),
pov: pov.clone(),
parent_head_data: HeadData(vec![1, 2, 3]),
result_sender: None,
},
)
.await;

Expand Down Expand Up @@ -627,6 +628,18 @@ async fn send_peer_view_change(
.await;
}

fn decode_collation_response(bytes: &[u8]) -> (CandidateReceipt, PoV) {
let response: request_v1::CollationFetchingResponse =
request_v1::CollationFetchingResponse::decode(&mut &bytes[..])
.expect("Decoding should work");
match response {
request_v1::CollationFetchingResponse::Collation(receipt, pov) => (receipt, pov),
request_v1::CollationFetchingResponse::CollationWithParentHeadData {
receipt, pov, ..
} => (receipt, pov),
}
}

#[test]
fn advertise_and_send_collation() {
let mut test_state = TestState::default();
Expand Down Expand Up @@ -736,12 +749,10 @@ fn advertise_and_send_collation() {
assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(receipt, pov): request_v1::CollationFetchingResponse
= request_v1::CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
let (receipt, pov) = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
}
Expand Down Expand Up @@ -1338,12 +1349,10 @@ where
let feedback_tx = assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(receipt, pov): request_v1::CollationFetchingResponse
= request_v1::CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
let (receipt, pov) = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);

Expand Down Expand Up @@ -1375,12 +1384,10 @@ where
assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(receipt, pov): request_v1::CollationFetchingResponse
= request_v1::CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
let (receipt, pov) = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);

Expand Down Expand Up @@ -1469,11 +1476,10 @@ fn connect_to_buffered_groups() {
assert_matches!(
rx.await,
Ok(full_response) => {
let request_v1::CollationFetchingResponse::Collation(..) =
request_v1::CollationFetchingResponse::decode(
&mut full_response.result.expect("We should have a proper answer").as_ref(),
)
.expect("Decoding should work");
let _ = decode_collation_response(
full_response.result
.expect("We should have a proper answer").as_ref()
);
}
);

Expand Down
Loading