From a1c0c994ec2771f0240f9bda7d5b09a8911945b2 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Tue, 24 Jun 2025 10:28:49 +0300 Subject: [PATCH] make sure dispute_coordinator/approval-voting parallel can receive priority messages (#8948) https://github.com/paritytech/polkadot-sdk/pull/8834, changed relay_chain_selection to send priority messages, but did not configured the subsystems to tell they can receive priority messages, with `can_receive_priority_messages` flag. If `can_receive_priority_messages` is not specified orchestra falls back when sending a priority message to the normal queue, so this resulted in the messages not being processed ahead of the others in the queue. Fix this configuration mistake and add a test to make sure priority messages are consumed ahead of normal ones by the subsystems. --------- Signed-off-by: Alexandru Gheorghe Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com> (cherry picked from commit 5f3507ec02185e05c96f055d808d8d17d8b969e6) --- polkadot/node/overseer/src/lib.rs | 4 +- polkadot/node/overseer/src/tests.rs | 278 +++++++++++++++++++++++++++- polkadot/node/service/src/tests.rs | 2 + prdoc/pr_8948.prdoc | 16 ++ 4 files changed, 296 insertions(+), 4 deletions(-) create mode 100644 prdoc/pr_8948.prdoc diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 28a201f102c18..b7cc98d47d713 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -622,7 +622,7 @@ pub struct Overseer { ApprovalVotingMessage, ApprovalDistributionMessage, ApprovalVotingParallelMessage, - ])] + ], can_receive_priority_messages)] approval_voting_parallel: ApprovalVotingParallel, #[subsystem(GossipSupportMessage, sends: [ NetworkBridgeTxMessage, @@ -643,7 +643,7 @@ pub struct Overseer { AvailabilityRecoveryMessage, ChainSelectionMessage, ApprovalVotingParallelMessage, - ])] + ], can_receive_priority_messages)] dispute_coordinator: DisputeCoordinator, #[subsystem(DisputeDistributionMessage, sends: [ diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index 0b9b783ef9b15..5a9c3b3ec1161 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -18,7 +18,9 @@ use async_trait::async_trait; use futures::{executor, pending, pin_mut, poll, select, stream, FutureExt}; use std::{collections::HashMap, sync::atomic, task::Poll}; -use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange}; +use polkadot_node_network_protocol::{ + peer_set::ValidationVersion, ObservedRole, PeerId, UnifiedReputationChange, +}; use polkadot_node_primitives::{ BlockData, CollationGenerationConfig, CollationResult, DisputeMessage, InvalidDisputeVote, PoV, UncheckedDisputeMessage, ValidDisputeVote, @@ -853,10 +855,14 @@ fn test_network_bridge_event() -> NetworkBridgeEvent { NetworkBridgeEvent::PeerDisconnected(PeerId::random()) } -fn test_statement_distribution_msg() -> StatementDistributionMessage { +fn test_statement_distribution_with_priority_msg() -> StatementDistributionMessage { StatementDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event()) } +fn test_statement_distribution_msg() -> StatementDistributionMessage { + StatementDistributionMessage::Backed(Default::default()) +} + fn test_availability_recovery_msg() -> AvailabilityRecoveryMessage { let (sender, _) = oneshot::channel(); AvailabilityRecoveryMessage::RecoverAvailableData( @@ -872,6 +878,15 @@ fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage { BitfieldDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event()) } +fn test_bitfield_distribution_with_priority_msg() -> BitfieldDistributionMessage { + BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + PeerId::random(), + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )) +} + fn test_provisioner_msg() -> ProvisionerMessage { let (sender, _) = oneshot::channel(); ProvisionerMessage::RequestInherentData(Default::default(), sender) @@ -912,11 +927,25 @@ fn test_approval_voting_msg() -> ApprovalVotingMessage { ApprovalVotingMessage::ApprovedAncestor(Default::default(), 0, sender) } +fn test_approval_voting_parallel_with_priority_msg() -> ApprovalVotingParallelMessage { + let (sender, _) = oneshot::channel(); + ApprovalVotingParallelMessage::ApprovedAncestor(Default::default(), 0, sender) +} + fn test_dispute_coordinator_msg() -> DisputeCoordinatorMessage { let (sender, _) = oneshot::channel(); DisputeCoordinatorMessage::RecentDisputes(sender) } +fn test_dispute_coordinator_msg_with_priority() -> DisputeCoordinatorMessage { + let (sender, _) = oneshot::channel(); + DisputeCoordinatorMessage::DetermineUndisputedChain { + base: Default::default(), + block_descriptions: Default::default(), + tx: sender, + } +} + fn test_dispute_distribution_msg() -> DisputeDistributionMessage { let dummy_dispute_message = UncheckedDisputeMessage { candidate_receipt: dummy_candidate_receipt_v2(dummy_hash()), @@ -1238,3 +1267,248 @@ fn context_holds_onto_message_until_enough_signals_received() { futures::executor::block_on(test_fut); } + +// A subsystem that simulates a slow subsystem, processing messages at a rate of one per second. +// We will use this to test the prioritization of messages in the subsystems generated by orchestra. +#[derive(Clone)] +struct SlowSubsystem { + num_normal_msgs_received: Arc, + num_prio_msgs_received: Arc, +} + +impl SlowSubsystem { + fn new( + msgs_received: Arc, + prio_msgs_received: Arc, + ) -> Self { + Self { num_normal_msgs_received: msgs_received, num_prio_msgs_received: prio_msgs_received } + } +} + +// Trait to determine if a message is a priority message or not, it is by the SlowSubsystem +// to determine if it should count the message as a priority message or not. +trait IsPrioMessage { + // Tells if the message is a priority message. + fn is_prio(&self) -> bool { + // By default, messages are not priority messages. + false + } +} + +// Implement the IsPrioMessage trait for all message types. +impl IsPrioMessage for CandidateValidationMessage {} +impl IsPrioMessage for CandidateBackingMessage {} +impl IsPrioMessage for ChainApiMessage {} +impl IsPrioMessage for CollationGenerationMessage {} +impl IsPrioMessage for CollatorProtocolMessage {} +impl IsPrioMessage for StatementDistributionMessage { + fn is_prio(&self) -> bool { + matches!(self, StatementDistributionMessage::NetworkBridgeUpdate(_)) + } +} +impl IsPrioMessage for ApprovalDistributionMessage {} +impl IsPrioMessage for ApprovalVotingMessage {} +impl IsPrioMessage for ApprovalVotingParallelMessage { + fn is_prio(&self) -> bool { + matches!(self, ApprovalVotingParallelMessage::ApprovedAncestor(_, _, _)) + } +} +impl IsPrioMessage for AvailabilityDistributionMessage {} +impl IsPrioMessage for AvailabilityRecoveryMessage {} +impl IsPrioMessage for AvailabilityStoreMessage {} +impl IsPrioMessage for BitfieldDistributionMessage { + fn is_prio(&self) -> bool { + matches!( + self, + BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + _, + _, + _, + _ + ),) + ) + } +} +impl IsPrioMessage for ChainSelectionMessage {} +impl IsPrioMessage for DisputeCoordinatorMessage { + fn is_prio(&self) -> bool { + matches!(self, DisputeCoordinatorMessage::DetermineUndisputedChain { .. }) + } +} +impl IsPrioMessage for DisputeDistributionMessage {} +impl IsPrioMessage for GossipSupportMessage {} +impl IsPrioMessage for NetworkBridgeRxMessage {} +impl IsPrioMessage for NetworkBridgeTxMessage {} +impl IsPrioMessage for ProspectiveParachainsMessage {} +impl IsPrioMessage for ProvisionerMessage {} +impl IsPrioMessage for RuntimeApiMessage {} +impl IsPrioMessage for BitfieldSigningMessage {} +impl IsPrioMessage for PvfCheckerMessage {} + +impl Subsystem for SlowSubsystem +where + C: overseer::SubsystemContext, + M: Send + IsPrioMessage, +{ + fn start(self, mut ctx: C) -> SpawnedSubsystem { + SpawnedSubsystem { + name: "counter-subsystem", + future: Box::pin(async move { + loop { + // Simulate a slow processing subsystem to give time for both priority and + // normal messages to accumulate. + Delay::new(Duration::from_secs(1)).await; + match ctx.try_recv().await { + Ok(Some(FromOrchestra::Signal(OverseerSignal::Conclude))) => break, + Ok(Some(FromOrchestra::Signal(_))) => continue, + Ok(Some(FromOrchestra::Communication { msg })) => { + if msg.is_prio() { + self.num_prio_msgs_received.fetch_add(1, atomic::Ordering::SeqCst); + } else { + self.num_normal_msgs_received + .fetch_add(1, atomic::Ordering::SeqCst); + } + continue + }, + Err(_) => (), + _ => (), + } + pending!(); + } + + Ok(()) + }), + } + } +} + +#[test] +fn overseer_all_subsystems_can_receive_their_priority_messages() { + const NUM_NORMAL_MESSAGES: usize = 10; + const NUM_PRIORITY_MESSAGES: usize = 4; + overseer_check_subsystem_can_receive_their_priority_messages( + (0..NUM_NORMAL_MESSAGES) + .map(|_| AllMessages::DisputeCoordinator(test_dispute_coordinator_msg())) + .collect(), + (0..NUM_PRIORITY_MESSAGES) + .map(|_| AllMessages::DisputeCoordinator(test_dispute_coordinator_msg_with_priority())) + .collect(), + ); + + overseer_check_subsystem_can_receive_their_priority_messages( + (0..NUM_NORMAL_MESSAGES) + .map(|_| AllMessages::ApprovalVotingParallel(test_approval_distribution_msg().into())) + .collect(), + (0..NUM_PRIORITY_MESSAGES) + .map(|_| { + AllMessages::ApprovalVotingParallel( + test_approval_voting_parallel_with_priority_msg(), + ) + }) + .collect(), + ); + + overseer_check_subsystem_can_receive_their_priority_messages( + (0..NUM_NORMAL_MESSAGES) + .map(|_| AllMessages::StatementDistribution(test_statement_distribution_msg())) + .collect(), + (0..NUM_PRIORITY_MESSAGES) + .map(|_| { + AllMessages::StatementDistribution(test_statement_distribution_with_priority_msg()) + }) + .collect(), + ); + + overseer_check_subsystem_can_receive_their_priority_messages( + (0..NUM_NORMAL_MESSAGES) + .map(|_| AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())) + .collect(), + (0..NUM_PRIORITY_MESSAGES) + .map(|_| { + AllMessages::BitfieldDistribution(test_bitfield_distribution_with_priority_msg()) + }) + .collect(), + ); +} + +// Test that when subsystem processes messages slow, the priority messages are processed before +// the normal messages. This is important to ensure that the subsytem can handle priority messages. +fn overseer_check_subsystem_can_receive_their_priority_messages( + normal_msgs: Vec, + prio_msgs: Vec, +) { + let num_normal_messages = normal_msgs.len(); + let num_prio_messages: usize = prio_msgs.len(); + let spawner = sp_core::testing::TaskExecutor::new(); + executor::block_on(async move { + let msgs_received = Arc::new(atomic::AtomicUsize::new(0)); + let prio_msgs_received = Arc::new(atomic::AtomicUsize::new(0)); + + let subsystem = SlowSubsystem::new(msgs_received.clone(), prio_msgs_received.clone()); + + let (overseer, handle) = + one_for_all_overseer_builder(spawner, MockSupportsParachains, subsystem, None) + .unwrap() + .build() + .unwrap(); + + let mut handle = Handle::new(handle); + let overseer_fut = overseer.run_inner().fuse(); + + pin_mut!(overseer_fut); + + // send a signal to each subsystem + let unpin_handle = dummy_unpin_handle(dummy_hash()); + handle + .block_imported(BlockInfo { + hash: Default::default(), + parent_hash: Default::default(), + number: Default::default(), + unpin_handle: unpin_handle.clone(), + }) + .await; + + // Send normal messages first, they are processed 1 per second by the SlowSubsystem, so they + // should accumulated in the queue. + for msg in normal_msgs { + handle.send_msg_anon(msg).await; + } + + // Send priority messages. + for msg in prio_msgs { + handle.send_msg_with_priority(msg, "test", PriorityLevel::High).await; + } + + loop { + match (&mut overseer_fut).timeout(Duration::from_millis(100)).await { + None => { + let normal_msgs: usize = msgs_received.load(atomic::Ordering::SeqCst); + let prio_msgs: usize = prio_msgs_received.load(atomic::Ordering::SeqCst); + + assert!( + prio_msgs == num_prio_messages || normal_msgs < num_normal_messages, + "we should not receive all normal messages before the prio message" + ); + + assert!( + normal_msgs <= num_normal_messages && prio_msgs <= num_prio_messages, + "too many messages received" + ); + + if normal_msgs < num_normal_messages || prio_msgs < num_prio_messages { + Delay::new(Duration::from_millis(100)).await; + } else { + break; + } + }, + Some(_) => panic!("exited too early"), + } + } + + // send a stop signal to each subsystems + handle.stop().await; + + let res = overseer_fut.await; + assert!(res.is_ok()); + }); +} diff --git a/polkadot/node/service/src/tests.rs b/polkadot/node/service/src/tests.rs index 1b8095436c6ec..cca4516a6d531 100644 --- a/polkadot/node/service/src/tests.rs +++ b/polkadot/node/service/src/tests.rs @@ -406,6 +406,8 @@ async fn test_skeleton( ) => { tx.send(undisputed_chain.unwrap_or((target_block_number, target_block_hash))).unwrap(); }); + // Check that ApprovedAncestor and DetermineUndisputedChain are sent with high priority. + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 2); } /// Straight forward test case, where the test is not diff --git a/prdoc/pr_8948.prdoc b/prdoc/pr_8948.prdoc new file mode 100644 index 0000000000000..423167b64a1f3 --- /dev/null +++ b/prdoc/pr_8948.prdoc @@ -0,0 +1,16 @@ +title: make sure dispute_coordinator/approval-voting parallel can receive priority messages +doc: +- audience: Node Dev + description: |- + https://github.com/paritytech/polkadot-sdk/pull/8834, changed relay_chain_selection to send priority messages, but did not configured + the subsystems to tell they can receive priority messages, with `can_receive_priority_messages` flag. + + If `can_receive_priority_messages` is not specified orchestra falls back when sending a priority message to the normal queue, + so this resulted in the messages not being processed ahead of the others in the queue. + + Fix this configuration mistake and add a test to make sure priority messages are consumed ahead of normal ones by the subsystems. +crates: +- name: polkadot-overseer + bump: patch +- name: polkadot-service + bump: patch