Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 6a64f11

Browse files
sandreimcoderobe
authored andcommitted
Improve dispute-coordinator message burstiness handling (#5471)
* Increase message channel size to 2048 Signed-off-by: Andrei Sandu <[email protected]> * Use unbounded channel for reading data Signed-off-by: Andrei Sandu <[email protected]>
1 parent d71548f commit 6a64f11

3 files changed

Lines changed: 15 additions & 12 deletions

File tree

  • node

node/core/provisioner/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,8 @@ async fn request_disputes(
641641
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
642642
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
643643
};
644-
sender.send_message(msg.into()).await;
644+
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
645+
sender.send_unbounded_message(msg.into());
645646

646647
let recent_disputes = match rx.await {
647648
Ok(r) => r,
@@ -659,9 +660,10 @@ async fn request_votes(
659660
disputes_to_query: Vec<(SessionIndex, CandidateHash)>,
660661
) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> {
661662
let (tx, rx) = oneshot::channel();
662-
sender
663-
.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into())
664-
.await;
663+
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
664+
sender.send_unbounded_message(
665+
DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into(),
666+
);
665667

666668
match rx.await {
667669
Ok(v) => v,

node/network/dispute-distribution/src/sender/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -340,10 +340,10 @@ async fn get_active_disputes<Context: SubsystemContext>(
340340
ctx: &mut Context,
341341
) -> JfyiErrorResult<Vec<(SessionIndex, CandidateHash)>> {
342342
let (tx, rx) = oneshot::channel();
343-
ctx.send_message(AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes(
344-
tx,
345-
)))
346-
.await;
343+
// Caller scope is in `update_leaves` and this is bounded by fork count.
344+
ctx.send_unbounded_message(AllMessages::DisputeCoordinator(
345+
DisputeCoordinatorMessage::ActiveDisputes(tx),
346+
));
347347
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
348348
}
349349

@@ -354,10 +354,10 @@ async fn get_candidate_votes<Context: SubsystemContext>(
354354
candidate_hash: CandidateHash,
355355
) -> JfyiErrorResult<Option<CandidateVotes>> {
356356
let (tx, rx) = oneshot::channel();
357-
ctx.send_message(AllMessages::DisputeCoordinator(
357+
// Caller scope is in `update_leaves` and this is bounded by fork count.
358+
ctx.send_unbounded_message(AllMessages::DisputeCoordinator(
358359
DisputeCoordinatorMessage::QueryCandidateVotes(vec![(session_index, candidate_hash)], tx),
359-
))
360-
.await;
360+
));
361361
rx.await
362362
.map(|v| v.get(0).map(|inner| inner.to_owned().2))
363363
.map_err(|_| JfyiError::AskCandidateVotesCanceled)

node/overseer/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
414414
event=Event,
415415
signal=OverseerSignal,
416416
error=SubsystemError,
417-
network=NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
417+
network=NetworkBridgeEvent<VersionedValidationProtocol>,
418+
message_capacity=2048,
418419
)]
419420
pub struct Overseer<SupportsParachains> {
420421
#[subsystem(no_dispatch, CandidateValidationMessage)]

0 commit comments

Comments
 (0)