Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 4d29da2

Browse files
authored
Make NetworkService callable for ChainSync (#12542)
Introduce a middleware called `NetworkServiceProvider` which the `ChainSync` can use to communicate with `NetworkService`. `ChainSync` is given a `NetworkServiceHandle` which it uses to call `NetworkServiceProvider` which then dispatches the calls to `NetworkService` on behalf of `ChainSync`. This change will allow `ChainSync` to disconnect and report peers and in the future it'll be possible to send requests and notifications through the `NetworkServiceProvider`. `NetworkServiceProvider` is needed only until the `ChainSync` object has been removed from `Protocol`. After that, a normal `NetworkService` handle can be passed onto `ChainSync` and these changes can be deprecated. Co-authored-by: parity-processbot <>
1 parent c7a86c2 commit 4d29da2

File tree

9 files changed

+443
-13
lines changed

9 files changed

+443
-13
lines changed

client/network/src/service/tests/chain_sync.rs

Lines changed: 179 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,34 @@
1616
// You should have received a copy of the GNU General Public License
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

19-
use crate::service::tests::TestNetworkBuilder;
19+
use crate::{
20+
config,
21+
service::tests::{TestNetworkBuilder, BLOCK_ANNOUNCE_PROTO_NAME},
22+
};
2023

2124
use futures::prelude::*;
2225
use libp2p::PeerId;
2326
use sc_block_builder::BlockBuilderProvider;
2427
use sc_client_api::HeaderBackend;
2528
use sc_consensus::JustificationSyncLink;
2629
use sc_network_common::{
30+
config::{MultiaddrWithPeerId, SetConfig},
31+
protocol::event::Event,
2732
service::NetworkSyncForkRequest,
2833
sync::{SyncState, SyncStatus},
2934
};
30-
use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface};
35+
use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface, ChainSync};
3136
use sp_core::H256;
3237
use sp_runtime::{
3338
generic::BlockId,
3439
traits::{Block as BlockT, Header as _},
3540
};
36-
use std::{iter, sync::Arc, task::Poll};
41+
use std::{
42+
iter,
43+
sync::{Arc, RwLock},
44+
task::Poll,
45+
time::Duration,
46+
};
3747
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _};
3848

3949
fn set_default_expecations_no_peers(
@@ -224,3 +234,169 @@ async fn on_block_finalized() {
224234
})
225235
.await;
226236
}
237+
238+
// report from mock import queue that importing a justification was not successful
239+
// and verify that connection to the peer is closed
240+
#[async_std::test]
241+
async fn invalid_justification_imported() {
242+
struct DummyImportQueue(
243+
Arc<
244+
RwLock<
245+
Option<(
246+
PeerId,
247+
substrate_test_runtime_client::runtime::Hash,
248+
sp_runtime::traits::NumberFor<substrate_test_runtime_client::runtime::Block>,
249+
)>,
250+
>,
251+
>,
252+
);
253+
254+
impl sc_consensus::ImportQueue<substrate_test_runtime_client::runtime::Block> for DummyImportQueue {
255+
fn import_blocks(
256+
&mut self,
257+
_origin: sp_consensus::BlockOrigin,
258+
_blocks: Vec<
259+
sc_consensus::IncomingBlock<substrate_test_runtime_client::runtime::Block>,
260+
>,
261+
) {
262+
}
263+
264+
fn import_justifications(
265+
&mut self,
266+
_who: sc_consensus::import_queue::RuntimeOrigin,
267+
_hash: substrate_test_runtime_client::runtime::Hash,
268+
_number: sp_runtime::traits::NumberFor<substrate_test_runtime_client::runtime::Block>,
269+
_justifications: sp_runtime::Justifications,
270+
) {
271+
}
272+
273+
fn poll_actions(
274+
&mut self,
275+
_cx: &mut futures::task::Context,
276+
link: &mut dyn sc_consensus::Link<substrate_test_runtime_client::runtime::Block>,
277+
) {
278+
if let Some((peer, hash, number)) = *self.0.read().unwrap() {
279+
link.justification_imported(peer, &hash, number, false);
280+
}
281+
}
282+
}
283+
284+
let justification_info = Arc::new(RwLock::new(None));
285+
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
286+
287+
let (service1, mut event_stream1) = TestNetworkBuilder::new()
288+
.with_import_queue(Box::new(DummyImportQueue(justification_info.clone())))
289+
.with_listen_addresses(vec![listen_addr.clone()])
290+
.build()
291+
.start_network();
292+
293+
let (service2, mut event_stream2) = TestNetworkBuilder::new()
294+
.with_set_config(SetConfig {
295+
reserved_nodes: vec![MultiaddrWithPeerId {
296+
multiaddr: listen_addr,
297+
peer_id: service1.local_peer_id,
298+
}],
299+
..Default::default()
300+
})
301+
.build()
302+
.start_network();
303+
304+
async fn wait_for_events(stream: &mut (impl Stream<Item = Event> + std::marker::Unpin)) {
305+
let mut notif_received = false;
306+
let mut sync_received = false;
307+
while !notif_received || !sync_received {
308+
match stream.next().await.unwrap() {
309+
Event::NotificationStreamOpened { .. } => notif_received = true,
310+
Event::SyncConnected { .. } => sync_received = true,
311+
_ => {},
312+
};
313+
}
314+
}
315+
316+
wait_for_events(&mut event_stream1).await;
317+
wait_for_events(&mut event_stream2).await;
318+
319+
{
320+
let mut info = justification_info.write().unwrap();
321+
*info = Some((service2.local_peer_id, H256::random(), 1337u64));
322+
}
323+
324+
let wait_disconnection = async {
325+
while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {}
326+
};
327+
328+
if async_std::future::timeout(Duration::from_secs(5), wait_disconnection)
329+
.await
330+
.is_err()
331+
{
332+
panic!("did not receive disconnection event in time");
333+
}
334+
}
335+
336+
#[async_std::test]
337+
async fn disconnect_peer_using_chain_sync_handle() {
338+
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
339+
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
340+
341+
let (chain_sync_network_provider, chain_sync_network_handle) =
342+
sc_network_sync::service::network::NetworkServiceProvider::new();
343+
let handle_clone = chain_sync_network_handle.clone();
344+
345+
let (chain_sync, chain_sync_service) = ChainSync::new(
346+
sc_network_common::sync::SyncMode::Full,
347+
client.clone(),
348+
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator),
349+
1u32,
350+
None,
351+
chain_sync_network_handle.clone(),
352+
)
353+
.unwrap();
354+
355+
let (node1, mut event_stream1) = TestNetworkBuilder::new()
356+
.with_listen_addresses(vec![listen_addr.clone()])
357+
.with_chain_sync((Box::new(chain_sync), chain_sync_service))
358+
.with_chain_sync_network((chain_sync_network_provider, chain_sync_network_handle))
359+
.with_client(client.clone())
360+
.build()
361+
.start_network();
362+
363+
let (node2, mut event_stream2) = TestNetworkBuilder::new()
364+
.with_set_config(SetConfig {
365+
reserved_nodes: vec![MultiaddrWithPeerId {
366+
multiaddr: listen_addr,
367+
peer_id: node1.local_peer_id,
368+
}],
369+
..Default::default()
370+
})
371+
.with_client(client.clone())
372+
.build()
373+
.start_network();
374+
375+
async fn wait_for_events(stream: &mut (impl Stream<Item = Event> + std::marker::Unpin)) {
376+
let mut notif_received = false;
377+
let mut sync_received = false;
378+
while !notif_received || !sync_received {
379+
match stream.next().await.unwrap() {
380+
Event::NotificationStreamOpened { .. } => notif_received = true,
381+
Event::SyncConnected { .. } => sync_received = true,
382+
_ => {},
383+
};
384+
}
385+
}
386+
387+
wait_for_events(&mut event_stream1).await;
388+
wait_for_events(&mut event_stream2).await;
389+
390+
handle_clone.disconnect_peer(node2.local_peer_id, BLOCK_ANNOUNCE_PROTO_NAME.into());
391+
392+
let wait_disconnection = async {
393+
while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {}
394+
};
395+
396+
if async_std::future::timeout(Duration::from_secs(5), wait_disconnection)
397+
.await
398+
.is_err()
399+
{
400+
panic!("did not receive disconnection event in time");
401+
}
402+
}

client/network/src/service/tests/mod.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use sc_network_common::{
3333
};
3434
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
3535
use sc_network_sync::{
36-
block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler,
36+
block_request_handler::BlockRequestHandler,
37+
service::network::{NetworkServiceHandle, NetworkServiceProvider},
38+
state_request_handler::StateRequestHandler,
3739
ChainSync,
3840
};
3941
use sp_runtime::traits::{Block as BlockT, Header as _, Zero};
@@ -93,6 +95,7 @@ struct TestNetworkBuilder {
9395
listen_addresses: Vec<Multiaddr>,
9496
set_config: Option<SetConfig>,
9597
chain_sync: Option<(Box<dyn ChainSyncT<TestBlock>>, Box<dyn ChainSyncInterface<TestBlock>>)>,
98+
chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>,
9699
config: Option<config::NetworkConfiguration>,
97100
}
98101

@@ -104,6 +107,7 @@ impl TestNetworkBuilder {
104107
listen_addresses: Vec::new(),
105108
set_config: None,
106109
chain_sync: None,
110+
chain_sync_network: None,
107111
config: None,
108112
}
109113
}
@@ -136,6 +140,19 @@ impl TestNetworkBuilder {
136140
self
137141
}
138142

143+
pub fn with_chain_sync_network(
144+
mut self,
145+
chain_sync_network: (NetworkServiceProvider, NetworkServiceHandle),
146+
) -> Self {
147+
self.chain_sync_network = Some(chain_sync_network);
148+
self
149+
}
150+
151+
pub fn with_import_queue(mut self, import_queue: Box<dyn ImportQueue<TestBlock>>) -> Self {
152+
self.import_queue = Some(import_queue);
153+
self
154+
}
155+
139156
pub fn build(mut self) -> TestNetwork {
140157
let client = self.client.as_mut().map_or(
141158
Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0),
@@ -199,6 +216,9 @@ impl TestNetworkBuilder {
199216
None,
200217
)));
201218

219+
let (chain_sync_network_provider, chain_sync_network_handle) =
220+
self.chain_sync_network.unwrap_or(NetworkServiceProvider::new());
221+
202222
let (chain_sync, chain_sync_service) = self.chain_sync.unwrap_or({
203223
let (chain_sync, chain_sync_service) = ChainSync::new(
204224
match network_config.sync_mode {
@@ -214,6 +234,7 @@ impl TestNetworkBuilder {
214234
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator),
215235
network_config.max_parallel_downloads,
216236
None,
237+
chain_sync_network_handle,
217238
)
218239
.unwrap();
219240

@@ -292,6 +313,11 @@ impl TestNetworkBuilder {
292313
})
293314
.unwrap();
294315

316+
let service = worker.service().clone();
317+
async_std::task::spawn(async move {
318+
let _ = chain_sync_network_provider.run(service).await;
319+
});
320+
295321
TestNetwork::new(worker)
296322
}
297323
}

0 commit comments

Comments
 (0)