Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ num-rational = { version = "0.4.1" }
num-traits = { version = "0.2.17", default-features = false }
num_cpus = { version = "1.13.1" }
once_cell = { version = "1.19.0" }
orchestra = { version = "0.3.5", default-features = false }
orchestra = { version = "0.4.0", default-features = false }
pallet-alliance = { path = "substrate/frame/alliance", default-features = false }
pallet-asset-conversion = { path = "substrate/frame/asset-conversion", default-features = false }
pallet-asset-conversion-ops = { path = "substrate/frame/asset-conversion/ops", default-features = false }
Expand Down
13 changes: 12 additions & 1 deletion polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ where
<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
}

async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
Expand All @@ -103,7 +107,14 @@ where
}
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
fn try_send_message(
&mut self,
msg: OutgoingMessage,
) -> Result<(), polkadot_node_subsystem_util::metered::TrySendError<OutgoingMessage>> {
self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
}

fn try_send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl TestState {
// Test will fail if this does not happen until timeout.
let mut remaining_stores = self.valid_chunks.len();

let TestSubsystemContextHandle { tx, mut rx } = harness.virtual_overseer;
let TestSubsystemContextHandle { tx, mut rx, .. } = harness.virtual_overseer;

// Spawning necessary as incoming queue can only hold a single item, we don't want to dead
// lock ;-)
Expand Down
32 changes: 26 additions & 6 deletions polkadot/node/network/bridge/src/rx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1135,13 +1135,33 @@ async fn dispatch_validation_events_to_all<I>(
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
macro_rules! send_message {
($event:expr, $message:ident) => {
if let Ok(event) = $event.focus() {
let has_high_priority = matches!(
event,
// NetworkBridgeEvent::OurViewChange(..) must also be here,
// but it is sent via an unbounded channel.
// See https://github.com/paritytech/polkadot-sdk/issues/824
NetworkBridgeEvent::PeerConnected(..) |
NetworkBridgeEvent::PeerDisconnected(..) |
NetworkBridgeEvent::PeerViewChange(..)
);
let message = $message::from(event);
if has_high_priority {
sender.send_message_with_priority::<overseer::HighPriority>(message).await;
} else {
sender.send_message(message).await;
}
}
};
}

for event in events {
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
send_message!(event, StatementDistributionMessage);
send_message!(event, BitfieldDistributionMessage);
send_message!(event, ApprovalDistributionMessage);
send_message!(event, GossipSupportMessage);
}
}

Expand Down
17 changes: 17 additions & 0 deletions polkadot/node/network/bridge/src/rx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,8 @@ fn peer_view_updates_sent_via_overseer() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

network_handle
Expand All @@ -895,6 +897,7 @@ fn peer_view_updates_sent_via_overseer() {
&mut virtual_overseer,
)
.await;
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);
virtual_overseer
});
}
Expand Down Expand Up @@ -930,6 +933,8 @@ fn peer_messages_sent_via_overseer() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

let approval_distribution_message =
Expand Down Expand Up @@ -970,6 +975,7 @@ fn peer_messages_sent_via_overseer() {
&mut virtual_overseer,
)
.await;
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);
virtual_overseer
});
}
Expand Down Expand Up @@ -1008,6 +1014,8 @@ fn peer_disconnect_from_just_one_peerset() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

{
Expand Down Expand Up @@ -1036,6 +1044,7 @@ fn peer_disconnect_from_just_one_peerset() {
&mut virtual_overseer,
)
.await;
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);

// to show that we're still connected on the collation protocol, send a view update.

Expand Down Expand Up @@ -1094,6 +1103,8 @@ fn relays_collation_protocol_messages() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

{
Expand Down Expand Up @@ -1201,6 +1212,8 @@ fn different_views_on_different_peer_sets() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

{
Expand Down Expand Up @@ -1247,6 +1260,8 @@ fn different_views_on_different_peer_sets() {
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12);

assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerViewChange(peer, view_b.clone()),
&mut virtual_overseer,
Expand Down Expand Up @@ -1481,6 +1496,8 @@ fn network_protocol_versioning_subsystem_msg() {
&mut virtual_overseer,
)
.await;

assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8);
}

let approval_distribution_message =
Expand Down
17 changes: 9 additions & 8 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ pub use polkadot_node_metrics::{

pub use orchestra as gen;
pub use orchestra::{
contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket,
OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
SubsystemSender, TimeoutExt, ToOrchestra, TrySendError,
contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
TrySendError,
};

#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
Expand Down Expand Up @@ -495,7 +496,7 @@ pub struct Overseer<SupportsParachains> {
RuntimeApiMessage,
ProspectiveParachainsMessage,
ChainApiMessage,
])]
], can_receive_priority_messages)]
statement_distribution: StatementDistribution,

#[subsystem(AvailabilityDistributionMessage, sends: [
Expand Down Expand Up @@ -524,7 +525,7 @@ pub struct Overseer<SupportsParachains> {
RuntimeApiMessage,
NetworkBridgeTxMessage,
ProvisionerMessage,
])]
], can_receive_priority_messages)]
bitfield_distribution: BitfieldDistribution,

#[subsystem(ProvisionerMessage, sends: [
Expand Down Expand Up @@ -580,7 +581,7 @@ pub struct Overseer<SupportsParachains> {
#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
NetworkBridgeTxMessage,
ApprovalVotingMessage,
])]
], can_receive_priority_messages)]
approval_distribution: ApprovalDistribution,

#[subsystem(blocking, ApprovalVotingMessage, sends: [
Expand All @@ -599,7 +600,7 @@ pub struct Overseer<SupportsParachains> {
NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
RuntimeApiMessage,
ChainSelectionMessage,
])]
], can_receive_priority_messages)]
gossip_support: GossipSupport,

#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
Expand Down
Loading