Skip to content

Commit a4bf9a1

Browse files
timorltimorl
andauthored
Move data network into its own module (#809)
Co-authored-by: timorl <[email protected]>
1 parent 56a75f6 commit a4bf9a1

File tree

16 files changed

+119
-101
lines changed

16 files changed

+119
-101
lines changed

finality-aleph/src/abft/current.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
},
1313
crypto::Signature,
1414
data_io::{AlephData, OrderedDataInterpreter},
15-
network::DataNetwork,
15+
network::data::Network,
1616
oneshot,
1717
party::{
1818
backup::ABFTBackup,
@@ -27,7 +27,7 @@ pub const VERSION: u32 = 1;
2727
pub fn run_member<
2828
B: Block,
2929
C: HeaderBackend<B> + Send + 'static,
30-
ADN: DataNetwork<CurrentNetworkData<B>> + 'static,
30+
ADN: Network<CurrentNetworkData<B>> + 'static,
3131
>(
3232
subtask_common: SubtaskCommon,
3333
multikeychain: Keychain,

finality-aleph/src/abft/legacy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
NetworkWrapper, SpawnHandleT,
1212
},
1313
data_io::{AlephData, OrderedDataInterpreter},
14-
network::DataNetwork,
14+
network::data::Network,
1515
oneshot,
1616
party::{
1717
backup::ABFTBackup,
@@ -26,7 +26,7 @@ pub const VERSION: u32 = 0;
2626
pub fn run_member<
2727
B: Block,
2828
C: HeaderBackend<B> + Send + 'static,
29-
ADN: DataNetwork<LegacyNetworkData<B>> + 'static,
29+
ADN: Network<LegacyNetworkData<B>> + 'static,
3030
>(
3131
subtask_common: SubtaskCommon,
3232
multikeychain: Keychain,

finality-aleph/src/abft/network.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
abft::SignatureSet,
88
crypto::Signature,
99
data_io::{AlephData, AlephNetworkMessage},
10-
network::{Data, DataNetwork},
10+
network::{data::Network, Data},
1111
Hasher, Recipient,
1212
};
1313

@@ -34,12 +34,12 @@ impl<B: Block> AlephNetworkMessage<B>
3434
}
3535

3636
/// A wrapper needed only because of type system theoretical constraints. Sadness.
37-
pub struct NetworkWrapper<D: Data, DN: DataNetwork<D>> {
37+
pub struct NetworkWrapper<D: Data, DN: Network<D>> {
3838
inner: DN,
3939
_phantom: PhantomData<D>,
4040
}
4141

42-
impl<D: Data, DN: DataNetwork<D>> From<DN> for NetworkWrapper<D, DN> {
42+
impl<D: Data, DN: Network<D>> From<DN> for NetworkWrapper<D, DN> {
4343
fn from(inner: DN) -> Self {
4444
NetworkWrapper {
4545
inner,
@@ -48,7 +48,7 @@ impl<D: Data, DN: DataNetwork<D>> From<DN> for NetworkWrapper<D, DN> {
4848
}
4949
}
5050

51-
impl<D: Data, DN: DataNetwork<D>> NetworkWrapper<D, DN> {
51+
impl<D: Data, DN: Network<D>> NetworkWrapper<D, DN> {
5252
fn send<R>(&self, data: D, recipient: R)
5353
where
5454
R: Into<Recipient>,
@@ -64,7 +64,7 @@ impl<D: Data, DN: DataNetwork<D>> NetworkWrapper<D, DN> {
6464
}
6565

6666
#[async_trait::async_trait]
67-
impl<D: Data, DN: DataNetwork<D>> current_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
67+
impl<D: Data, DN: Network<D>> current_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
6868
fn send(&self, data: D, recipient: current_aleph_bft::Recipient) {
6969
NetworkWrapper::send(self, data, recipient)
7070
}
@@ -75,7 +75,7 @@ impl<D: Data, DN: DataNetwork<D>> current_aleph_bft::Network<D> for NetworkWrapp
7575
}
7676

7777
#[async_trait::async_trait]
78-
impl<D: Data, DN: DataNetwork<D>> legacy_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
78+
impl<D: Data, DN: Network<D>> legacy_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
7979
fn send(&self, data: D, recipient: legacy_aleph_bft::Recipient) {
8080
NetworkWrapper::send(self, data, recipient)
8181
}

finality-aleph/src/aggregation/mod.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use crate::{
1111
crypto::Signature,
1212
metrics::Checkpoint,
1313
mpsc,
14-
network::{Data, DataNetwork, SendError},
14+
network::{
15+
data::{Network, SendError},
16+
Data,
17+
},
1518
Keychain, Metrics,
1619
};
1720

@@ -50,8 +53,8 @@ pub type CurrentAggregator<'a, B, N> = current_aleph_aggregator::IO<
5053
enum EitherAggregator<'a, B, CN, LN>
5154
where
5255
B: Block,
53-
LN: DataNetwork<LegacyRmcNetworkData<B>>,
54-
CN: DataNetwork<CurrentRmcNetworkData<B>>,
56+
LN: Network<LegacyRmcNetworkData<B>>,
57+
CN: Network<CurrentRmcNetworkData<B>>,
5558
<B as Block>::Hash: AsRef<[u8]>,
5659
{
5760
Current(CurrentAggregator<'a, B, CN>),
@@ -63,8 +66,8 @@ where
6366
pub struct Aggregator<'a, B, CN, LN>
6467
where
6568
B: Block,
66-
LN: DataNetwork<LegacyRmcNetworkData<B>>,
67-
CN: DataNetwork<CurrentRmcNetworkData<B>>,
69+
LN: Network<LegacyRmcNetworkData<B>>,
70+
CN: Network<CurrentRmcNetworkData<B>>,
6871
<B as Block>::Hash: AsRef<[u8]>,
6972
{
7073
agg: EitherAggregator<'a, B, CN, LN>,
@@ -73,8 +76,8 @@ where
7376
impl<'a, B, CN, LN> Aggregator<'a, B, CN, LN>
7477
where
7578
B: Block,
76-
LN: DataNetwork<LegacyRmcNetworkData<B>>,
77-
CN: DataNetwork<CurrentRmcNetworkData<B>>,
79+
LN: Network<LegacyRmcNetworkData<B>>,
80+
CN: Network<CurrentRmcNetworkData<B>>,
7881
<B as Block>::Hash: AsRef<[u8]>,
7982
{
8083
pub fn new_legacy(
@@ -163,9 +166,9 @@ where
163166
}
164167
}
165168

166-
pub struct NetworkWrapper<D: Data, N: DataNetwork<D>>(N, PhantomData<D>);
169+
pub struct NetworkWrapper<D: Data, N: Network<D>>(N, PhantomData<D>);
167170

168-
impl<D: Data, N: DataNetwork<D>> NetworkWrapper<D, N> {
171+
impl<D: Data, N: Network<D>> NetworkWrapper<D, N> {
169172
pub fn new(network: N) -> Self {
170173
Self(network, PhantomData)
171174
}
@@ -186,7 +189,7 @@ impl<H: Debug + Hash + Eq + Debug + Copy> current_aleph_aggregator::Metrics<H> f
186189
#[async_trait::async_trait]
187190
impl<T, D> legacy_aleph_aggregator::ProtocolSink<D> for NetworkWrapper<D, T>
188191
where
189-
T: DataNetwork<D>,
192+
T: Network<D>,
190193
D: Data,
191194
{
192195
async fn next(&mut self) -> Option<D> {
@@ -207,7 +210,7 @@ where
207210
#[async_trait::async_trait]
208211
impl<T, D> current_aleph_aggregator::ProtocolSink<D> for NetworkWrapper<D, T>
209212
where
210-
T: DataNetwork<D>,
213+
T: Network<D>,
211214
D: Data,
212215
{
213216
async fn next(&mut self) -> Option<D> {

finality-aleph/src/data_io/data_store.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ use crate::{
2626
status_provider::get_proposal_status,
2727
AlephNetworkMessage,
2828
},
29-
network::{ComponentNetwork, DataNetwork, ReceiverComponent, RequestBlocks, SimpleNetwork},
29+
network::{
30+
data::{
31+
component::{Network as ComponentNetwork, Receiver, SimpleNetwork},
32+
Network as DataNetwork,
33+
},
34+
RequestBlocks,
35+
},
3036
BlockHashNum, SessionBoundaries,
3137
};
3238

@@ -174,7 +180,7 @@ where
174180
RB: RequestBlocks<B> + 'static,
175181
Message:
176182
AlephNetworkMessage<B> + std::fmt::Debug + Send + Sync + Clone + codec::Codec + 'static,
177-
R: ReceiverComponent<Message>,
183+
R: Receiver<Message>,
178184
{
179185
next_free_id: MessageId,
180186
pending_proposals: HashMap<AlephProposal<B>, PendingProposalInfo<B>>,
@@ -201,7 +207,7 @@ where
201207
RB: RequestBlocks<B> + 'static,
202208
Message:
203209
AlephNetworkMessage<B> + std::fmt::Debug + Send + Sync + Clone + codec::Codec + 'static,
204-
R: ReceiverComponent<Message>,
210+
R: Receiver<Message>,
205211
{
206212
/// Returns a struct to be run and a network that outputs messages filtered as appropriate
207213
pub fn new<N: ComponentNetwork<Message, R = R>>(

finality-aleph/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tokio::time::Duration;
2222
use crate::{
2323
abft::{CurrentNetworkData, LegacyNetworkData},
2424
aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData},
25-
network::{protocol_name, Split},
25+
network::{data::split::Split, protocol_name},
2626
session::{
2727
first_block_of_session, last_block_of_session, session_id_from_block_num,
2828
SessionBoundaries, SessionId,

finality-aleph/src/network/component.rs renamed to finality-aleph/src/network/data/component.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use futures::{channel::mpsc, StreamExt};
44
use log::warn;
55

66
use crate::{
7-
network::{Data, DataNetwork, SendError},
7+
network::{
8+
data::{Network as DataNetwork, SendError},
9+
Data,
10+
},
811
Recipient,
912
};
1013

@@ -185,8 +188,11 @@ mod tests {
185188
use super::{DataNetwork, NetworkMap, Receiver, Sender};
186189
use crate::{
187190
network::{
188-
component::{Network, ReceiverMap, SenderMap},
189-
Data, SendError,
191+
data::{
192+
component::{Network, ReceiverMap, SenderMap},
193+
SendError,
194+
},
195+
Data,
190196
},
191197
Recipient,
192198
};
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
//! Abstraction over an abstract network sending data to a set of nodes.
2+
use crate::{abft::Recipient, network::Data};
3+
4+
pub mod component;
5+
pub mod split;
6+
7+
/// Returned when something went wrong when sending data using a Network.
8+
#[derive(Debug)]
9+
pub enum SendError {
10+
SendFailed,
11+
}
12+
13+
/// A generic interface for sending and receiving data.
14+
#[async_trait::async_trait]
15+
pub trait Network<D: Data>: Send + Sync {
16+
fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError>;
17+
async fn next(&mut self) -> Option<D>;
18+
}
Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ use tokio::sync::Mutex;
99

1010
use crate::{
1111
network::{
12-
ComponentNetwork, ComponentNetworkExt, Data, ReceiverComponent, SendError, SenderComponent,
13-
SimpleNetwork,
12+
data::{
13+
component::{Network, NetworkExt, Receiver, Sender, SimpleNetwork},
14+
SendError,
15+
},
16+
Data,
1417
},
1518
Recipient, Version, Versioned,
1619
};
@@ -65,7 +68,7 @@ impl<A: Data, B: Data> Convert for ToRightSplitConvert<A, B> {
6568
struct SplitSender<
6669
LeftData: Data,
6770
RightData: Data,
68-
S: SenderComponent<Split<LeftData, RightData>>,
71+
S: Sender<Split<LeftData, RightData>>,
6972
Conv: Convert,
7073
> {
7174
sender: S,
@@ -75,9 +78,9 @@ struct SplitSender<
7578
impl<
7679
LeftData: Data,
7780
RightData: Data,
78-
S: SenderComponent<Split<LeftData, RightData>>,
81+
S: Sender<Split<LeftData, RightData>>,
7982
Conv: Convert<To = Split<LeftData, RightData>> + Clone + Send + Sync,
80-
> SenderComponent<Conv::From> for SplitSender<LeftData, RightData, S, Conv>
83+
> Sender<Conv::From> for SplitSender<LeftData, RightData, S, Conv>
8184
where
8285
<Conv as Convert>::From: Data,
8386
<Conv as Convert>::To: Data,
@@ -96,7 +99,7 @@ type RightSender<LeftData, RightData, S> =
9699
struct SplitReceiver<
97100
LeftData: Data,
98101
RightData: Data,
99-
R: ReceiverComponent<Split<LeftData, RightData>>,
102+
R: Receiver<Split<LeftData, RightData>>,
100103
TranslatedData: Data,
101104
> {
102105
receiver: Arc<Mutex<R>>,
@@ -110,9 +113,9 @@ struct SplitReceiver<
110113
impl<
111114
LeftData: Data,
112115
RightData: Data,
113-
R: ReceiverComponent<Split<LeftData, RightData>>,
116+
R: Receiver<Split<LeftData, RightData>>,
114117
TranslatedData: Data,
115-
> ReceiverComponent<TranslatedData> for SplitReceiver<LeftData, RightData, R, TranslatedData>
118+
> Receiver<TranslatedData> for SplitReceiver<LeftData, RightData, R, TranslatedData>
116119
{
117120
async fn next(&mut self) -> Option<TranslatedData> {
118121
loop {
@@ -137,7 +140,7 @@ type RightReceiver<LeftData, RightData, R> = SplitReceiver<LeftData, RightData,
137140
async fn forward_or_wait<
138141
LeftData: Data,
139142
RightData: Data,
140-
R: ReceiverComponent<Split<LeftData, RightData>>,
143+
R: Receiver<Split<LeftData, RightData>>,
141144
>(
142145
receiver: &Arc<Mutex<R>>,
143146
left_sender: &mpsc::UnboundedSender<LeftData>,
@@ -169,7 +172,7 @@ async fn forward_or_wait<
169172
}
170173
}
171174

172-
fn split_sender<LeftData: Data, RightData: Data, S: SenderComponent<Split<LeftData, RightData>>>(
175+
fn split_sender<LeftData: Data, RightData: Data, S: Sender<Split<LeftData, RightData>>>(
173176
sender: S,
174177
) -> (
175178
LeftSender<LeftData, RightData, S>,
@@ -187,11 +190,7 @@ fn split_sender<LeftData: Data, RightData: Data, S: SenderComponent<Split<LeftDa
187190
)
188191
}
189192

190-
fn split_receiver<
191-
LeftData: Data,
192-
RightData: Data,
193-
R: ReceiverComponent<Split<LeftData, RightData>>,
194-
>(
193+
fn split_receiver<LeftData: Data, RightData: Data, R: Receiver<Split<LeftData, RightData>>>(
195194
receiver: R,
196195
left_name: &'static str,
197196
right_name: &'static str,
@@ -229,14 +228,11 @@ fn split_receiver<
229228
///
230229
/// The main example for now is creating an `aleph_bft::Network` and a separate one for accumulating
231230
/// signatures for justifications.
232-
pub fn split<LeftData: Data, RightData: Data, CN: ComponentNetwork<Split<LeftData, RightData>>>(
231+
pub fn split<LeftData: Data, RightData: Data, CN: Network<Split<LeftData, RightData>>>(
233232
network: CN,
234233
left_name: &'static str,
235234
right_name: &'static str,
236-
) -> (
237-
impl ComponentNetworkExt<LeftData>,
238-
impl ComponentNetworkExt<RightData>,
239-
) {
235+
) -> (impl NetworkExt<LeftData>, impl NetworkExt<RightData>) {
240236
let (sender, receiver) = network.into();
241237
let (left_sender, right_sender) = split_sender(sender);
242238
let (left_receiver, right_receiver) = split_receiver(receiver, left_name, right_name);

0 commit comments

Comments
 (0)