Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 30 additions & 19 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,14 @@ impl<Client> HeadSupportsParachains for Arc<Client> where
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct Handle(pub OverseerHandle);
pub struct Handle(pub Option<OverseerHandle>);

impl Handle {
/// Create a new disconnected [`Handle`].
pub fn new_disconnected() -> Self {
Self(None)
}

/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self, block: BlockInfo) {
self.send_and_log_error(Event::BlockImported(block)).await
Expand Down Expand Up @@ -207,19 +212,18 @@ impl Handle {

/// Most basic operation, to stop a server.
async fn send_and_log_error(&mut self, event: Event) {
if self.0.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
if let Some(handle) = self.0.as_mut() {
if handle.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
} else {
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
}
}

/// Whether the overseer handler is connected to an overseer.
pub fn is_connected(&self) -> bool {
true
}

/// Whether the handler is disconnected.
pub fn is_disconnected(&self) -> bool {
false
self.0.is_none()
}

/// Using this handler, connect another handler to the same
Expand Down Expand Up @@ -338,7 +342,6 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
network=NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
)]
pub struct Overseer<SupportsParachains> {

#[subsystem(no_dispatch, CandidateValidationMessage)]
candidate_validation: CandidateValidation,

Expand Down Expand Up @@ -390,16 +393,16 @@ pub struct Overseer<SupportsParachains> {
#[subsystem(no_dispatch, GossipSupportMessage)]
gossip_support: GossipSupport,

#[subsystem(no_dispatch, wip, DisputeCoordinatorMessage)]
dipute_coordinator: DisputeCoordinator,
#[subsystem(no_dispatch, DisputeCoordinatorMessage)]
dispute_coordinator: DisputeCoordinator,

#[subsystem(no_dispatch, wip, DisputeParticipationMessage)]
#[subsystem(no_dispatch, DisputeParticipationMessage)]
dispute_participation: DisputeParticipation,

#[subsystem(no_dispatch, wip, DisputeDistributionMessage)]
dipute_distribution: DisputeDistribution,
#[subsystem(no_dispatch, DisputeDistributionMessage)]
dispute_distribution: DisputeDistribution,
Copy link
Copy Markdown
Contributor

@drahnr drahnr Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rphmeier : Removing the wip hooks up all the channels as expected. I'll write some proper documentation once #3454 is complete.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah, I like this "wip" thing.


#[subsystem(no_dispatch, wip, ChainSelectionMessage)]
#[subsystem(no_dispatch, ChainSelectionMessage)]
chain_selection: ChainSelection,

/// External listeners waiting for a hash to be in the active-leave set.
Expand Down Expand Up @@ -549,9 +552,9 @@ where
/// # });
/// # }
/// ```
pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>(
pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>(
leaves: impl IntoIterator<Item = BlockInfo>,
all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>,
all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>,
prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains,
s: S,
Expand All @@ -574,6 +577,10 @@ where
ApD: Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>, SubsystemError> + Send,
ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>, SubsystemError> + Send,
GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>, SubsystemError> + Send,
DC: Subsystem<OverseerSubsystemContext<DisputeCoordinatorMessage>, SubsystemError> + Send,
DP: Subsystem<OverseerSubsystemContext<DisputeParticipationMessage>, SubsystemError> + Send,
DD: Subsystem<OverseerSubsystemContext<DisputeDistributionMessage>, SubsystemError> + Send,
CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
S: SpawnNamed,
{
let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?;
Expand All @@ -596,6 +603,10 @@ where
.approval_distribution(all_subsystems.approval_distribution)
.approval_voting(all_subsystems.approval_voting)
.gossip_support(all_subsystems.gossip_support)
.dispute_coordinator(all_subsystems.dispute_coordinator)
.dispute_participation(all_subsystems.dispute_participation)
.dispute_distribution(all_subsystems.dispute_distribution)
.chain_selection(all_subsystems.chain_selection)
.leaves(Vec::from_iter(
leaves.into_iter().map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
))
Expand Down Expand Up @@ -647,7 +658,7 @@ where
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
}

Ok((overseer, Handle(handler)))
Ok((overseer, Handle(Some(handler))))
}

/// Stop the overseer.
Expand Down
40 changes: 36 additions & 4 deletions node/overseer/src/subsystems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where
pub struct AllSubsystems<
CV = (), CB = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (),
RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (),
GS = (),
GS = (), DC = (), DP = (), DD = (), CS = (),
> {
/// A candidate validation subsystem.
pub candidate_validation: CV,
Expand Down Expand Up @@ -113,10 +113,18 @@ pub struct AllSubsystems<
pub approval_voting: ApV,
/// A Connection Request Issuer subsystem.
pub gossip_support: GS,
/// A Dispute Coordinator subsystem.
pub dispute_coordinator: DC,
/// A Dispute Participation subsystem.
pub dispute_participation: DP,
/// A Dispute Distribution subsystem.
pub dispute_distribution: DD,
/// A Chain Selection subsystem.
pub chain_selection: CS,
Copy link
Copy Markdown
Contributor

@rphmeier rphmeier Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this automatically generate communication handling for the subsystems?

Copy link
Copy Markdown
Contributor

@drahnr drahnr Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a barely used abstraction for Overseer::new which will be removed soon™ ( tracked as chore in #3427 ), removing the wip keyword from the #[subsystem(..)] annotation does it.

}

impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
{
/// Create a new instance of [`AllSubsystems`].
///
Expand Down Expand Up @@ -148,6 +156,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
> {
AllSubsystems {
candidate_validation: DummySubsystem,
Expand All @@ -167,11 +179,15 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
approval_distribution: DummySubsystem,
approval_voting: DummySubsystem,
gossip_support: DummySubsystem,
dispute_coordinator: DummySubsystem,
dispute_participation: DummySubsystem,
dispute_distribution: DummySubsystem,
chain_selection: DummySubsystem,
}
}

/// Reference every individual subsystem.
pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS, &'_ DC, &'_ DP, &'_ DD, &'_ CS> {
AllSubsystems {
candidate_validation: &self.candidate_validation,
candidate_backing: &self.candidate_backing,
Expand All @@ -190,6 +206,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
approval_distribution: &self.approval_distribution,
approval_voting: &self.approval_voting,
gossip_support: &self.gossip_support,
dispute_coordinator: &self.dispute_coordinator,
dispute_participation: &self.dispute_participation,
dispute_distribution: &self.dispute_distribution,
chain_selection: &self.chain_selection,
}
}

Expand All @@ -213,6 +233,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
<Mapper as MapSubsystem<ApD>>::Output,
<Mapper as MapSubsystem<ApV>>::Output,
<Mapper as MapSubsystem<GS>>::Output,
<Mapper as MapSubsystem<DC>>::Output,
<Mapper as MapSubsystem<DP>>::Output,
<Mapper as MapSubsystem<DD>>::Output,
<Mapper as MapSubsystem<CS>>::Output,
>
where
Mapper: MapSubsystem<CV>,
Expand All @@ -232,6 +256,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
Mapper: MapSubsystem<ApD>,
Mapper: MapSubsystem<ApV>,
Mapper: MapSubsystem<GS>,
Mapper: MapSubsystem<DC>,
Mapper: MapSubsystem<DP>,
Mapper: MapSubsystem<DD>,
Mapper: MapSubsystem<CS>,
{
AllSubsystems {
candidate_validation: <Mapper as MapSubsystem<CV>>::map_subsystem(&mapper, self.candidate_validation),
Expand All @@ -251,6 +279,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
approval_distribution: <Mapper as MapSubsystem<ApD>>::map_subsystem(&mapper, self.approval_distribution),
approval_voting: <Mapper as MapSubsystem<ApV>>::map_subsystem(&mapper, self.approval_voting),
gossip_support: <Mapper as MapSubsystem<GS>>::map_subsystem(&mapper, self.gossip_support),
dispute_coordinator: <Mapper as MapSubsystem<DC>>::map_subsystem(&mapper, self.dispute_coordinator),
dispute_participation: <Mapper as MapSubsystem<DP>>::map_subsystem(&mapper, self.dispute_participation),
dispute_distribution: <Mapper as MapSubsystem<DD>>::map_subsystem(&mapper, self.dispute_distribution),
chain_selection: <Mapper as MapSubsystem<CS>>::map_subsystem(&mapper, self.chain_selection),
}
}
}
52 changes: 51 additions & 1 deletion node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::atomic;
use std::collections::HashMap;
use std::task::{Poll};
use futures::{executor, pin_mut, select, FutureExt, pending, poll, stream};
use futures::channel::mpsc;

use polkadot_primitives::v1::{CollatorPair, CandidateHash};
use polkadot_node_primitives::{CollationResult, CollationGenerationConfig, PoV, BlockData};
Expand Down Expand Up @@ -809,10 +810,35 @@ fn test_approval_voting_msg() -> ApprovalVotingMessage {
ApprovalVotingMessage::ApprovedAncestor(Default::default(), 0, sender)
}

fn test_dispute_coordinator_msg() -> DisputeCoordinatorMessage {
let (sender, _) = oneshot::channel();
DisputeCoordinatorMessage::RecentDisputes(sender)
}

fn test_dispute_participation_msg() -> DisputeParticipationMessage {
let (sender, _) = oneshot::channel();
DisputeParticipationMessage::Participate {
candidate_hash: Default::default(),
candidate_receipt: Default::default(),
session: 0,
n_validators: 0,
report_availability: sender,
}
}

fn test_dispute_distribution_msg() -> DisputeDistributionMessage {
let (_, receiver) = mpsc::channel(1);
DisputeDistributionMessage::DisputeSendingReceiver(receiver)
}

fn test_chain_selection_msg() -> ChainSelectionMessage {
ChainSelectionMessage::Approved(Default::default())
}

// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
#[test]
fn overseer_all_subsystems_receive_signals_and_messages() {
const NUM_SUBSYSTEMS: usize = 17;
const NUM_SUBSYSTEMS: usize = 21;
// -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution
const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3;

Expand Down Expand Up @@ -846,6 +872,10 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
approval_distribution: subsystem.clone(),
approval_voting: subsystem.clone(),
gossip_support: subsystem.clone(),
dispute_coordinator: subsystem.clone(),
dispute_participation: subsystem.clone(),
dispute_distribution: subsystem.clone(),
chain_selection: subsystem.clone(),
};
let (overseer, mut handler) = Overseer::new(
vec![],
Expand Down Expand Up @@ -883,6 +913,10 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await;
handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
handler.send_msg_anon(AllMessages::DisputeCoordinator(test_dispute_coordinator_msg())).await;
handler.send_msg_anon(AllMessages::DisputeParticipation(test_dispute_participation_msg())).await;
handler.send_msg_anon(AllMessages::DisputeDistribution(test_dispute_distribution_msg())).await;
handler.send_msg_anon(AllMessages::ChainSelection(test_chain_selection_msg())).await;

// Wait until all subsystems have received. Otherwise the messages might race against
// the conclude signal.
Expand Down Expand Up @@ -933,6 +967,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
let (approval_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (approval_voting_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (gossip_support_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (dispute_coordinator_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (dispute_participation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (dispute_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
let (chain_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);

let (candidate_validation_unbounded_tx, _) = metered::unbounded();
let (candidate_backing_unbounded_tx, _) = metered::unbounded();
Expand All @@ -951,6 +989,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
let (approval_distribution_unbounded_tx, _) = metered::unbounded();
let (approval_voting_unbounded_tx, _) = metered::unbounded();
let (gossip_support_unbounded_tx, _) = metered::unbounded();
let (dispute_coordinator_unbounded_tx, _) = metered::unbounded();
let (dispute_participation_unbounded_tx, _) = metered::unbounded();
let (dispute_distribution_unbounded_tx, _) = metered::unbounded();
let (chain_selection_unbounded_tx, _) = metered::unbounded();

let channels_out = ChannelsOut {
candidate_validation: candidate_validation_bounded_tx.clone(),
Expand All @@ -970,6 +1012,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
approval_distribution: approval_distribution_bounded_tx.clone(),
approval_voting: approval_voting_bounded_tx.clone(),
gossip_support: gossip_support_bounded_tx.clone(),
dispute_coordinator: dispute_coordinator_bounded_tx.clone(),
dispute_participation: dispute_participation_bounded_tx.clone(),
dispute_distribution: dispute_distribution_bounded_tx.clone(),
chain_selection: chain_selection_bounded_tx.clone(),

candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(),
candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(),
Expand All @@ -988,6 +1034,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
approval_distribution_unbounded: approval_distribution_unbounded_tx.clone(),
approval_voting_unbounded: approval_voting_unbounded_tx.clone(),
gossip_support_unbounded: gossip_support_unbounded_tx.clone(),
dispute_coordinator_unbounded: dispute_coordinator_unbounded_tx.clone(),
dispute_participation_unbounded: dispute_participation_unbounded_tx.clone(),
dispute_distribution_unbounded: dispute_distribution_unbounded_tx.clone(),
chain_selection_unbounded: chain_selection_unbounded_tx.clone(),
};

let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY);
Expand Down
6 changes: 6 additions & 0 deletions node/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ polkadot-node-core-backing = { path = "../core/backing", optional = true }
polkadot-node-core-bitfield-signing = { path = "../core/bitfield-signing", optional = true }
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation", optional = true }
polkadot-node-core-chain-api = { path = "../core/chain-api", optional = true }
polkadot-node-core-chain-selection = { path = "../core/chain-selection", optional = true }
polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator", optional = true }
polkadot-node-core-dispute-participation = { path = "../core/dispute-participation", optional = true }
polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true }
polkadot-node-core-runtime-api = { path = "../core/runtime-api", optional = true }
polkadot-statement-distribution = { path = "../network/statement-distribution", optional = true }
Expand Down Expand Up @@ -133,6 +136,9 @@ full-node = [
"polkadot-node-core-bitfield-signing",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-chain-api",
"polkadot-node-core-chain-selection",
"polkadot-node-core-dispute-coordinator",
"polkadot-node-core-dispute-participation",
"polkadot-node-core-provisioner",
"polkadot-node-core-runtime-api",
"polkadot-statement-distribution",
Expand Down
Loading