From 6a44f875f072fc749df6ec767ec816627aa483c7 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Tue, 17 Jun 2025 19:17:36 +0300 Subject: [PATCH] extend overseer to send priority messages (#8834) Extend overseer to send priority messages, the new functionality is used for sending messages on the grandpa call path when we call dispute-coordinator and approval-voting in finality_target_with_longest_chain to make sure we don't block unnecessarily. Depends on: https://github.com/paritytech/orchestra/pull/87. --------- Signed-off-by: Alexandru Gheorghe Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Javier Viola Co-authored-by: Javier Viola <363911+pepoviola@users.noreply.github.com> (cherry picked from commit 6b5a1284e83ed52dfc61f7deb920af41ae1efd31) --- Cargo.lock | 8 ++-- .../core/approval-voting-parallel/src/lib.rs | 2 +- polkadot/node/overseer/src/lib.rs | 28 ++++++++++-- .../node/service/src/relay_chain_selection.rs | 43 +++++++++++++++---- polkadot/node/service/src/tests.rs | 21 +++++++++ prdoc/pr_8834.prdoc | 15 +++++++ 6 files changed, 100 insertions(+), 17 deletions(-) create mode 100644 prdoc/pr_8834.prdoc diff --git a/Cargo.lock b/Cargo.lock index 588012408d76f..fa02b2c28ece0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11137,9 +11137,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestra" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41f6bbacc8c189a3f2e45e0fd0436e5d97f194db888e721bdbc3973e7dbed4c2" +checksum = "19051f0b0512402f5d52d6776999f55996f01887396278aeeccbbdfbc83eef2d" dependencies = [ "async-trait", "dyn-clonable", @@ -11154,9 +11154,9 @@ dependencies = [ [[package]] name = "orchestra-proc-macro" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b1d40dd8f367db3c65bec8d3dd47d4a604ee8874480738f93191bddab4e0e0" +checksum = "43dfaf083aef571385fccfdc3a2f8ede8d0a1863160455d4f2b014d8f7d04a3f" dependencies = [ "expander", "indexmap 2.9.0", diff --git a/polkadot/node/core/approval-voting-parallel/src/lib.rs b/polkadot/node/core/approval-voting-parallel/src/lib.rs index fed4b6789647a..bbef41f8afccc 100644 --- a/polkadot/node/core/approval-voting-parallel/src/lib.rs +++ b/polkadot/node/core/approval-voting-parallel/src/lib.rs @@ -347,7 +347,7 @@ async fn run_main_loop( // The message the approval voting subsystem would've handled. ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) | ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _) => { - to_approval_voting_worker.send_message( + to_approval_voting_worker.send_message_with_priority::( msg.try_into().expect( "Message is one of ApprovedAncestor, GetApprovalSignaturesForCandidate and that can be safely converted to ApprovalVotingMessage; qed" diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 92989e2b520e9..28a201f102c18 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -190,9 +190,20 @@ impl Handle { self.send_and_log_error(Event::BlockImported(block)).await } - /// Send some message to one of the `Subsystem`s. + /// Send some message with normal priority to one of the `Subsystem`s. pub async fn send_msg(&mut self, msg: impl Into, origin: &'static str) { - self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await + self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await + } + + /// Send some message with the specified priority to one of the `Subsystem`s. + pub async fn send_msg_with_priority( + &mut self, + msg: impl Into, + origin: &'static str, + priority: PriorityLevel, + ) { + self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority }) + .await } /// Send a message not providing an origin. @@ -296,6 +307,8 @@ pub enum Event { msg: AllMessages, /// The originating subsystem name. origin: &'static str, + /// The priority of the message. + priority: PriorityLevel, }, /// A request from the outer world. ExternalRequest(ExternalRequest), @@ -764,8 +777,15 @@ where select! { msg = self.events_rx.select_next_some() => { match msg { - Event::MsgToSubsystem { msg, origin } => { - self.route_message(msg.into(), origin).await?; + Event::MsgToSubsystem { msg, origin, priority } => { + match priority { + PriorityLevel::Normal => { + self.route_message(msg.into(), origin).await?; + }, + PriorityLevel::High => { + self.route_message_with_priority::(msg.into(), origin).await?; + }, + } self.metrics.on_message_relayed(); } Event::Stop => { diff --git a/polkadot/node/service/src/relay_chain_selection.rs b/polkadot/node/service/src/relay_chain_selection.rs index e48874f01ca6f..f016b8ba8b05b 100644 --- a/polkadot/node/service/src/relay_chain_selection.rs +++ b/polkadot/node/service/src/relay_chain_selection.rs @@ -43,7 +43,7 @@ use polkadot_node_subsystem::messages::{ ChainSelectionMessage, DisputeCoordinatorMessage, HighestApprovedAncestorBlock, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; -use polkadot_overseer::{AllMessages, Handle}; +use polkadot_overseer::{AllMessages, Handle, PriorityLevel}; use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader}; use sp_consensus::{Error as ConsensusError, SelectChain}; use std::sync::Arc; @@ -238,7 +238,7 @@ pub struct SelectRelayChainInner { impl SelectRelayChainInner where B: HeaderProviderProvider, - OH: OverseerHandleT, + OH: OverseerHandleT + OverseerHandleWithPriorityT, { /// Create a new [`SelectRelayChainInner`] wrapping the given chain backend /// and a handle to the overseer. @@ -286,7 +286,7 @@ where impl Clone for SelectRelayChainInner where B: HeaderProviderProvider + Send + Sync, - OH: OverseerHandleT, + OH: OverseerHandleT + OverseerHandleWithPriorityT, { fn clone(&self) -> Self { SelectRelayChainInner { @@ -325,6 +325,17 @@ pub trait OverseerHandleT: Clone + Send + Sync { async fn send_msg>(&mut self, msg: M, origin: &'static str); } +/// Trait for the overseer handle that allows sending messages with the specified priority level. +#[async_trait::async_trait] +pub trait OverseerHandleWithPriorityT: Clone + Send + Sync { + async fn send_msg_with_priority>( + &mut self, + msg: M, + origin: &'static str, + priority: PriorityLevel, + ); +} + #[async_trait::async_trait] impl OverseerHandleT for Handle { async fn send_msg>(&mut self, msg: M, origin: &'static str) { @@ -332,10 +343,22 @@ impl OverseerHandleT for Handle { } } +#[async_trait::async_trait] +impl OverseerHandleWithPriorityT for Handle { + async fn send_msg_with_priority>( + &mut self, + msg: M, + origin: &'static str, + priority: PriorityLevel, + ) { + Handle::send_msg_with_priority(self, msg, origin, priority).await + } +} + impl SelectRelayChainInner where B: HeaderProviderProvider, - OH: OverseerHandleT + 'static, + OH: OverseerHandleT + OverseerHandleWithPriorityT + 'static, { /// Get all leaves of the chain, i.e. block hashes that are suitable to /// build upon and have no suitable children. @@ -472,9 +495,10 @@ where .await; } else { overseer - .send_msg( + .send_msg_with_priority( ApprovalVotingMessage::ApprovedAncestor(subchain_head, target_number, tx), std::any::type_name::(), + PriorityLevel::High, ) .await; } @@ -503,16 +527,18 @@ where let lag_update_task = async move { if approval_voting_parallel_enabled { overseer_handle - .send_msg( + .send_msg_with_priority( ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag), std::any::type_name::(), + PriorityLevel::High, ) .await; } else { overseer_handle - .send_msg( + .send_msg_with_priority( ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), std::any::type_name::(), + PriorityLevel::High, ) .await; } @@ -542,13 +568,14 @@ where // 3. Constrain according to disputes: let (tx, rx) = oneshot::channel(); overseer - .send_msg( + .send_msg_with_priority( DisputeCoordinatorMessage::DetermineUndisputedChain { base: (target_number, target_hash), block_descriptions: subchain_block_descriptions, tx, }, std::any::type_name::(), + PriorityLevel::High, ) .await; diff --git a/polkadot/node/service/src/tests.rs b/polkadot/node/service/src/tests.rs index 78bbfcd5444f0..1b8095436c6ec 100644 --- a/polkadot/node/service/src/tests.rs +++ b/polkadot/node/service/src/tests.rs @@ -20,6 +20,7 @@ use futures::channel::oneshot::Receiver; use polkadot_node_primitives::approval::v2::VrfSignature; use polkadot_node_subsystem::messages::{AllMessages, BlockDescription}; use polkadot_node_subsystem_util::TimeoutExt; +use polkadot_overseer::{HighPriority, PriorityLevel}; use polkadot_test_client::Sr25519Keyring; use sp_consensus_babe::{ digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest}, @@ -55,6 +56,26 @@ impl OverseerHandleT for TestSubsystemSender { } } +#[async_trait::async_trait] +impl OverseerHandleWithPriorityT for TestSubsystemSender { + async fn send_msg_with_priority>( + &mut self, + msg: M, + _origin: &'static str, + priority: PriorityLevel, + ) { + match priority { + PriorityLevel::High => { + TestSubsystemSender::send_message_with_priority::(self, msg.into()) + .await; + }, + PriorityLevel::Normal => { + TestSubsystemSender::send_message(self, msg.into()).await; + }, + } + } +} + struct TestHarness { virtual_overseer: VirtualOverseer, case_vars: CaseVars, diff --git a/prdoc/pr_8834.prdoc b/prdoc/pr_8834.prdoc new file mode 100644 index 0000000000000..414d8e89280bb --- /dev/null +++ b/prdoc/pr_8834.prdoc @@ -0,0 +1,15 @@ +title: extend overseer to send priority messages +doc: +- audience: Node Dev + description: |- + Extend overseer to send priority messages, the new functionality is used for sending messages + on the grandpa call path when we call dispute-coordinator and approval-voting in + finality_target_with_longest_chain to make sure we don't block unnecessarily. + +crates: +- name: polkadot-node-core-approval-voting-parallel + bump: patch +- name: polkadot-overseer + bump: patch +- name: polkadot-service + bump: patch