Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit e4081cf

Browse files
committed
attempt to move things into one spot
1 parent 0318d6e commit e4081cf

File tree

15 files changed

+576
-101
lines changed

15 files changed

+576
-101
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/overseer/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ authors = ["Parity Technologies <[email protected]>"]
55
edition = "2018"
66

77
[dependencies]
8+
async-trait = "0.1.42"
89
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
910
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
1011
futures = "0.3.15"
12+
futures-timer = "3.0.2"
1113
polkadot-node-network-protocol = { path = "../network/protocol" }
14+
polkadot-node-primitives = { path = "../primitives" }
1215
polkadot-node-subsystem-types = { path = "../subsystem-types" }
1316
polkadot-node-metrics = { path = "../metrics" }
1417
polkadot-primitives = { path = "../../primitives" }

node/overseer/examples/minimal-example.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use polkadot_overseer::{
3636
Overseer,
3737
OverseerSignal,
3838
SubsystemError,
39-
FromOverseer,
40-
SpawnedSubsystem,
39+
gen::{
40+
FromOverseer,
41+
SpawnedSubsystem,
42+
},
4143
};
4244
use polkadot_node_subsystem_types::messages::{
4345
CandidateValidationMessage, CandidateBackingMessage,
@@ -56,7 +58,7 @@ struct Subsystem1;
5658
impl Subsystem1 {
5759
async fn run<Ctx>(mut ctx: Ctx) -> ()
5860
where
59-
Ctx: overseer::SubsystemContext<Message=CandidateBackingMessage>,
61+
Ctx: overseer::SubsystemContext<Message=CandidateBackingMessage,AllMessages=AllMessages,Signal=OverseerSignal>,
6062
{
6163
'louy: loop {
6264
match ctx.try_recv().await {
@@ -83,14 +85,14 @@ impl Subsystem1 {
8385
}.into(),
8486
tx,
8587
);
86-
ctx.send_message(msg).await;
88+
ctx.send_message(<Ctx as overseer::SubsystemContext>::AllMessages::from(msg)).await;
8789
}
8890
()
8991
}
9092
}
9193

9294

93-
impl<Context> overseer::Subsystem<Context> for Subsystem1
95+
impl<Context> overseer::Subsystem<Context,SubsystemError> for Subsystem1
9496
where
9597
Context: overseer::SubsystemContext<Message=CandidateBackingMessage,AllMessages=AllMessages,Signal=OverseerSignal>,
9698
{
@@ -142,11 +144,11 @@ impl Subsystem2 {
142144
}
143145
}
144146

145-
impl<Context> overseer::Subsystem<Context> for Subsystem2
147+
impl<Context> overseer::Subsystem<Context,SubsystemError> for Subsystem2
146148
where
147149
Context: overseer::SubsystemContext<Message=CandidateValidationMessage,AllMessages=AllMessages,Signal=OverseerSignal>,
148150
{
149-
fn start(self, ctx: Context) -> SpawnedSubsystem {
151+
fn start(self, ctx: Context) -> SpawnedSubsystem<SubsystemError> {
150152
let future = Box::pin(async move {
151153
Self::run(ctx).await;
152154
Ok(())

node/overseer/overseer-gen/examples/dummy.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
11
//! A dummy to be used with cargo expand
22
3-
pub use polkadot_overseer_gen::prelude::*;
3+
use polkadot_overseer_gen::*;
44
use polkadot_node_network_protocol::WrongVariant;
55

66

77
/// Concrete subsystem implementation for `MsgStrukt` msg type.
88
#[derive(Default)]
99
pub struct AwesomeSubSys;
1010

11-
impl Subsystem<XxxSubsystemContext<MsgStrukt>> for AwesomeSubSys {
12-
fn start(self, _ctx: XxxSubsystemContext<MsgStrukt>) -> SpawnedSubsystem {
11+
impl ::polkadot_overseer_gen::Subsystem<XxxSubsystemContext<MsgStrukt>, Yikes> for AwesomeSubSys {
12+
fn start(self, _ctx: XxxSubsystemContext<MsgStrukt>) -> SpawnedSubsystem < Yikes > {
1313
unimplemented!("starting yay!")
1414
}
1515
}
1616

1717
#[derive(Default)]
1818
pub struct GoblinTower;
1919

20-
impl Subsystem<XxxSubsystemContext<Plinko>> for GoblinTower {
21-
fn start(self, _ctx: XxxSubsystemContext<Plinko>) -> SpawnedSubsystem {
20+
impl ::polkadot_overseer_gen::Subsystem<XxxSubsystemContext<Plinko>, Yikes> for GoblinTower {
21+
fn start(self, _ctx: XxxSubsystemContext<Plinko>) -> SpawnedSubsystem < Yikes > {
2222
unimplemented!("welcum")
2323
}
2424
}

node/overseer/overseer-gen/proc-macro/src/impl_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
318318
// connection to the subsystems
319319
channels_out: ChannelsOut,
320320
ctx: Ctx,
321-
subsystem: SubSys,
321+
s: SubSys,
322322
futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
323323
) -> ::std::result::Result<OverseenSubsystem<M>, #error_ty >
324324
where
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
// Copyright 2021 Parity Technologies (UK) Ltd.
2+
// This file is part of Polkadot.
3+
4+
// Polkadot is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
9+
// Polkadot is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
14+
// You should have received a copy of the GNU General Public License
15+
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
16+
17+
use quote::quote;
18+
use syn::Ident;
19+
20+
use super::*;
21+
22+
/// Implement a builder pattern for the `Overseer`-type,
23+
/// which acts as the gateway to constructing the overseer.
24+
pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
25+
let overseer_name = info.overseer_name.clone();
26+
let subsystem_sender_name = Ident::new(&(overseer_name.to_string() + "SubsystemSender"), overseer_name.span());
27+
let subsystem_ctx_name = Ident::new(&(overseer_name.to_string() + "SubsystemContext"), overseer_name.span());
28+
let consumes = &info.consumes();
29+
let signal = &info.extern_signal_ty;
30+
let wrapper_message = &info.message_wrapper;
31+
let error_ty = &info.extern_error_ty;
32+
let support_crate = info.support_crate_name();
33+
34+
let ts = quote! {
35+
/// Connector to send messages towards all subsystems,
36+
/// while tracking the which signals where already received.
37+
#[derive(Debug, Clone)]
38+
pub struct #subsystem_sender_name {
39+
/// Collection of channels to all subsystems.
40+
channels: ChannelsOut,
41+
/// Systemwide tick for which signals were received by all subsystems.
42+
signals_received: SignalsReceived,
43+
}
44+
45+
/// impl for wrapping message type...
46+
#[#support_crate ::async_trait]
47+
impl SubsystemSender< #wrapper_message > for #subsystem_sender_name {
48+
async fn send_message(&mut self, msg: #wrapper_message) {
49+
self.channels.send_and_log_error(self.signals_received.load(), msg).await;
50+
}
51+
52+
async fn send_messages<T>(&mut self, msgs: T)
53+
where
54+
T: IntoIterator<Item = #wrapper_message> + Send,
55+
T::IntoIter: Send,
56+
{
57+
// This can definitely be optimized if necessary.
58+
for msg in msgs {
59+
self.send_message(msg).await;
60+
}
61+
}
62+
63+
fn send_unbounded_message(&mut self, msg: #wrapper_message) {
64+
self.channels.send_unbounded_and_log_error(self.signals_received.load(), msg);
65+
}
66+
}
67+
68+
// ... but also implement for all individual messages to avoid
69+
// the necessity for manual wrapping, and do the conversion
70+
// based on the generated `From::from` impl for the individual variants.
71+
#(
72+
#[#support_crate ::async_trait]
73+
impl SubsystemSender< #consumes > for #subsystem_sender_name {
74+
async fn send_message(&mut self, msg: #consumes) {
75+
self.channels.send_and_log_error(self.signals_received.load(), #wrapper_message ::from ( msg )).await;
76+
}
77+
78+
async fn send_messages<T>(&mut self, msgs: T)
79+
where
80+
T: IntoIterator<Item = #consumes> + Send,
81+
T::IntoIter: Send,
82+
{
83+
// This can definitely be optimized if necessary.
84+
for msg in msgs {
85+
self.send_message(msg).await;
86+
}
87+
}
88+
89+
fn send_unbounded_message(&mut self, msg: #consumes) {
90+
self.channels.send_unbounded_and_log_error(self.signals_received.load(), #wrapper_message ::from ( msg ));
91+
}
92+
}
93+
)*
94+
95+
/// A context type that is given to the [`Subsystem`] upon spawning.
96+
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
97+
/// or to spawn it's [`SubsystemJob`]s.
98+
///
99+
/// [`Overseer`]: struct.Overseer.html
100+
/// [`Subsystem`]: trait.Subsystem.html
101+
/// [`SubsystemJob`]: trait.SubsystemJob.html
102+
#[derive(Debug)]
103+
#[allow(missing_docs)]
104+
pub struct #subsystem_ctx_name<M>{
105+
signals: #support_crate ::metered::MeteredReceiver< #signal >,
106+
messages: SubsystemIncomingMessages<M>,
107+
to_subsystems: #subsystem_sender_name,
108+
to_overseer: #support_crate ::metered::UnboundedMeteredSender<
109+
#support_crate ::ToOverseer
110+
>,
111+
signals_received: SignalsReceived,
112+
pending_incoming: Option<(usize, M)>,
113+
}
114+
115+
impl<M> #subsystem_ctx_name<M> {
116+
/// Create a new context.
117+
fn new(
118+
signals: #support_crate ::metered::MeteredReceiver< #signal >,
119+
messages: SubsystemIncomingMessages<M>,
120+
to_subsystems: ChannelsOut,
121+
to_overseer: #support_crate ::metered::UnboundedMeteredSender<#support_crate ::ToOverseer>,
122+
) -> Self {
123+
let signals_received = SignalsReceived::default();
124+
#subsystem_ctx_name {
125+
signals,
126+
messages,
127+
to_subsystems: #subsystem_sender_name {
128+
channels: to_subsystems,
129+
signals_received: signals_received.clone(),
130+
},
131+
to_overseer,
132+
signals_received,
133+
pending_incoming: None,
134+
}
135+
}
136+
}
137+
138+
#[#support_crate ::async_trait]
139+
impl<M: std::fmt::Debug + Send + 'static> #support_crate ::SubsystemContext for #subsystem_ctx_name<M>
140+
where
141+
#subsystem_sender_name: #support_crate ::SubsystemSender< #wrapper_message >,
142+
#wrapper_message: From<M>,
143+
{
144+
type Message = M;
145+
type Signal = #signal;
146+
type Sender = #subsystem_sender_name;
147+
type AllMessages = #wrapper_message;
148+
type Error = #error_ty;
149+
150+
async fn try_recv(&mut self) -> ::std::result::Result<Option<#support_crate:: FromOverseer<M, #signal>>, ()> {
151+
match #support_crate ::poll!(self.recv()) {
152+
#support_crate ::Poll::Ready(msg) => Ok(Some(msg.map_err(|_| ())?)),
153+
#support_crate ::Poll::Pending => Ok(None),
154+
}
155+
}
156+
157+
async fn recv(&mut self) -> ::std::result::Result<#support_crate:: FromOverseer<M, #signal>, #error_ty> {
158+
loop {
159+
// If we have a message pending an overseer signal, we only poll for signals
160+
// in the meantime.
161+
if let Some((needs_signals_received, msg)) = self.pending_incoming.take() {
162+
if needs_signals_received <= self.signals_received.load() {
163+
return Ok(#support_crate ::FromOverseer::Communication { msg });
164+
} else {
165+
self.pending_incoming = Some((needs_signals_received, msg));
166+
167+
// wait for next signal.
168+
let signal = self.signals.next().await
169+
.ok_or(#support_crate ::OverseerError::Context(
170+
"Signal channel is terminated and empty."
171+
.to_owned()
172+
))?;
173+
174+
self.signals_received.inc();
175+
return Ok(#support_crate ::FromOverseer::Signal(signal))
176+
}
177+
}
178+
179+
let mut await_message = self.messages.next().fuse();
180+
let mut await_signal = self.signals.next().fuse();
181+
let signals_received = self.signals_received.load();
182+
let pending_incoming = &mut self.pending_incoming;
183+
184+
// Otherwise, wait for the next signal or incoming message.
185+
let from_overseer = #support_crate ::futures::select_biased! {
186+
signal = await_signal => {
187+
let signal = signal
188+
.ok_or(#support_crate ::OverseerError::Context(
189+
"Signal channel is terminated and empty."
190+
.to_owned()
191+
))?;
192+
193+
#support_crate ::FromOverseer::Signal(signal)
194+
}
195+
msg = await_message => {
196+
let packet = msg
197+
.ok_or(#support_crate ::OverseerError::Context(
198+
"Message channel is terminated and empty."
199+
.to_owned()
200+
))?;
201+
202+
if packet.signals_received > signals_received {
203+
// wait until we've received enough signals to return this message.
204+
*pending_incoming = Some((packet.signals_received, packet.message));
205+
continue;
206+
} else {
207+
// we know enough to return this message.
208+
#support_crate ::FromOverseer::Communication { msg: packet.message}
209+
}
210+
}
211+
};
212+
213+
if let #support_crate ::FromOverseer::Signal(_) = from_overseer {
214+
self.signals_received.inc();
215+
}
216+
217+
return Ok(from_overseer);
218+
}
219+
}
220+
221+
fn sender(&mut self) -> &mut Self::Sender {
222+
&mut self.to_subsystems
223+
}
224+
225+
fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
226+
-> ::std::result::Result<(), #error_ty>
227+
{
228+
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnJob {
229+
name,
230+
s,
231+
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
232+
Ok(())
233+
}
234+
235+
fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
236+
-> ::std::result::Result<(), #error_ty>
237+
{
238+
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnBlockingJob {
239+
name,
240+
s,
241+
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
242+
Ok(())
243+
}
244+
}
245+
};
246+
247+
ts
248+
}

node/overseer/overseer-gen/proc-macro/src/impl_subsystem_ctx.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ pub(crate) fn impl_trait(info: &OverseerInfo) -> proc_macro2::TokenStream {
215215
}
216216
}
217217

218-
fn sender(&mut self) -> &mut Self::Sender {
218+
fn sender(&mut self) -> &mut <Self as >::Sender {
219219
&mut self.to_subsystems
220220
}
221221

0 commit comments

Comments
 (0)