From 92efa5571f914324053d5776002917011c71f4c8 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Mon, 27 Mar 2023 19:34:12 +0300 Subject: [PATCH 1/5] Attempt to relieve pressure on `mpsc_network_worker` `SyncingEngine` interacting with `NetworkWorker` can put a lot of strain on the channel if the number of inbound connections is high. This is because `SyncingEngine` is notified of each inbound substream which it then can either accept or reject and this causes a lot of message exchange on the already busy channel. Use a direct channel pair between `Protocol` and `SyncingEngine` to exchange notification events. It is a temporary change to alleviate the problems caused by syncing being an independent protocol and the fix will be removed once `NotificationService` is implemented. --- Cargo.lock | 1 + client/network/src/config.rs | 12 +- client/network/src/event.rs | 43 +++++- client/network/src/lib.rs | 6 +- client/network/src/protocol.rs | 114 ++++++++++++---- client/network/src/service.rs | 4 +- client/network/sync/src/engine.rs | 204 ++++++++++++++--------------- client/network/test/Cargo.toml | 1 + client/network/test/src/lib.rs | 6 +- client/network/test/src/service.rs | 6 +- client/service/src/builder.rs | 7 +- 11 files changed, 261 insertions(+), 143 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33bca4005483c..32627ef0f52c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9134,6 +9134,7 @@ dependencies = [ "sc-network-light", "sc-network-sync", "sc-service", + "sc-utils", "sp-blockchain", "sp-consensus", "sp-consensus-babe", diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 925a7795d290f..bb3395775e264 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -22,6 +22,7 @@ //! See the documentation of [`Params`]. pub use crate::{ + protocol::NotificationsSink, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, @@ -29,9 +30,15 @@ pub use crate::{ }; use codec::Encode; +use futures::channel::oneshot; use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; -pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT}; +pub use sc_network_common::{ + role::{Role, Roles}, + sync::warp::WarpSyncProvider, + ExHashT, +}; +use sc_utils::mpsc::TracingUnboundedSender; use zeroize::Zeroize; use std::{ @@ -714,6 +721,9 @@ pub struct Params { /// Block announce protocol configuration pub block_announce_config: NonDefaultSetConfig, + /// TX channel for direct communication with `SyncingEngine` and `Protocol`. + pub tx: TracingUnboundedSender, + /// Request response protocol configurations pub request_response_protocol_configs: Vec, } diff --git a/client/network/src/event.rs b/client/network/src/event.rs index 3ecd8f9311429..abf2fa6a2c631 100644 --- a/client/network/src/event.rs +++ b/client/network/src/event.rs @@ -19,9 +19,10 @@ //! Network event types. These are are not the part of the protocol, but rather //! events that happen on the network like DHT get/put results received. -use crate::types::ProtocolName; +use crate::{types::ProtocolName, NotificationsSink}; use bytes::Bytes; +use futures::channel::oneshot; use libp2p::{core::PeerId, kad::record::Key}; use sc_network_common::role::ObservedRole; @@ -90,3 +91,43 @@ pub enum Event { messages: Vec<(ProtocolName, Bytes)>, }, } + +// TODO: move to sc-network +pub enum SyncEvent { + /// Opened a substream with the given node with the given notifications protocol. + /// + /// The protocol is always one of the notification protocols that have been registered. + NotificationStreamOpened { + /// Node we opened the substream with. + remote: PeerId, + /// Received handshake. + received_handshake: Vec, + /// Notification sink. + sink: NotificationsSink, + /// Channel for reporting accept/reject of the substream. + tx: oneshot::Sender, + }, + + /// Closed a substream with the given node. Always matches a corresponding previous + /// `NotificationStreamOpened` message. + NotificationStreamClosed { + /// Node we closed the substream with. + remote: PeerId, + }, + + /// Notification sink was replaced. + NotificationSinkReplaced { + /// Node we closed the substream with. + remote: PeerId, + /// Notification sink. + sink: NotificationsSink, + }, + + /// Received one or more messages from the given node using the given protocol. + NotificationsReceived { + /// Node we received the message from. + remote: PeerId, + /// Concerned protocol and associated message. + messages: Vec, + }, +} diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index c290f4b94db53..5374ac13435be 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -259,7 +259,7 @@ pub mod request_responses; pub mod types; pub mod utils; -pub use event::{DhtEvent, Event}; +pub use event::{DhtEvent, Event, SyncEvent}; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig}; @@ -278,8 +278,8 @@ pub use service::{ NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT, NotificationSenderError, NotificationSenderReady, }, - DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure, - PublicKey, + DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink, + OutboundFailure, PublicKey, }; pub use types::ProtocolName; diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 06ca02c0ca8d5..330eb4f1aa605 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -24,6 +24,7 @@ use crate::{ use bytes::Bytes; use codec::{DecodeAll, Encode}; +use futures::{channel::oneshot, stream::FuturesUnordered, FutureExt, StreamExt}; use libp2p::{ core::connection::ConnectionId, swarm::{ @@ -35,11 +36,14 @@ use libp2p::{ use log::{debug, error, warn}; use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake}; +use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; use std::{ collections::{HashMap, HashSet, VecDeque}, + future::Future, iter, + pin::Pin, task::Poll, }; @@ -68,6 +72,9 @@ mod rep { pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); } +type PendingSyncSubstreamValidation = + Pin> + Send>>; + // Lock must always be taken in order declared here. pub struct Protocol { /// Pending list of messages to return from `poll` as a priority. @@ -87,6 +94,8 @@ pub struct Protocol { bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>, /// Connected peers. peers: HashMap, + sync_substream_validations: FuturesUnordered, + tx: TracingUnboundedSender, _marker: std::marker::PhantomData, } @@ -96,6 +105,7 @@ impl Protocol { roles: Roles, network_config: &config::NetworkConfiguration, block_announces_protocol: config::NonDefaultSetConfig, + tx: TracingUnboundedSender, ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let mut known_addresses = Vec::new(); @@ -179,6 +189,8 @@ impl Protocol { .collect(), bad_handshake_substreams: Default::default(), peers: HashMap::new(), + sync_substream_validations: FuturesUnordered::new(), + tx, // TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol` _marker: Default::default(), }; @@ -418,6 +430,23 @@ impl NetworkBehaviour for Protocol { return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }), }; + while let Poll::Ready(Some(validation_result)) = + self.sync_substream_validations.poll_next_unpin(cx) + { + match validation_result { + Ok((peer, roles)) => { + self.peers.insert(peer, roles); + }, + Err(peer) => { + log::debug!( + target: "sub-libp2p", + "`SyncingEngine` rejected stream" + ); + self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); + }, + } + } + let outcome = match event { NotificationsOut::CustomProtocolOpen { peer_id, @@ -434,22 +463,29 @@ impl NetworkBehaviour for Protocol { match as DecodeAll>::decode_all(&mut &received_handshake[..]) { Ok(GenericMessage::Status(handshake)) => { let roles = handshake.roles; - let handshake = BlockAnnouncesHandshake:: { - roles: handshake.roles, - best_number: handshake.best_number, - best_hash: handshake.best_hash, - genesis_hash: handshake.genesis_hash, - }; - self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)].clone(), - negotiated_fallback, - received_handshake: handshake.encode(), - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + + CustomMessageOutcome::None }, Ok(msg) => { debug!( @@ -469,15 +505,27 @@ impl NetworkBehaviour for Protocol { let roles = handshake.roles; self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)] - .clone(), - negotiated_fallback, - received_handshake, - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + CustomMessageOutcome::None }, Err(err2) => { log::debug!( @@ -535,6 +583,12 @@ impl NetworkBehaviour for Protocol { NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } => if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced { + remote: peer_id, + sink: notifications_sink, + }); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamReplaced { remote: peer_id, @@ -548,6 +602,12 @@ impl NetworkBehaviour for Protocol { // handshake. The outer layers have never received an opening event about this // substream, and consequently shouldn't receive a closing event either. CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed { + remote: peer_id, + }); + self.peers.remove(&peer_id); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, @@ -558,6 +618,12 @@ impl NetworkBehaviour for Protocol { NotificationsOut::Notification { peer_id, set_id, message } => { if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived { + remote: peer_id, + messages: vec![message.freeze()], + }); + CustomMessageOutcome::None } else { let protocol_name = self.notification_protocols[usize::from(set_id)].clone(); CustomMessageOutcome::NotificationsReceived { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6dc00b36ceb53..05fc1c3f8859d 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -36,7 +36,7 @@ use crate::{ network_state::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, - protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready}, + protocol::{self, NotifsHandlerError, Protocol, Ready}, request_responses::{IfDisconnected, RequestFailure}, service::{ signature::{Signature, SigningError}, @@ -92,6 +92,7 @@ use std::{ pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; +pub use protocol::NotificationsSink; mod metrics; mod out_events; @@ -230,6 +231,7 @@ where From::from(¶ms.role), ¶ms.network_config, params.block_announce_config, + params.tx, )?; // List of multiaddresses that we know in the network. diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index b4c1a2ed05bb0..b6d6f12cfca16 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -25,7 +25,7 @@ use crate::{ }; use codec::{Decode, DecodeAll, Encode}; -use futures::{FutureExt, Stream, StreamExt}; +use futures::{channel::oneshot, FutureExt, Stream, StreamExt}; use futures_timer::Delay; use libp2p::PeerId; use lru::LruCache; @@ -41,7 +41,7 @@ use sc_network::{ }, event::Event, utils::LruHashSet, - ProtocolName, + NotificationsSink, ProtocolName, }; use sc_network_common::{ role::Roles, @@ -162,6 +162,8 @@ pub struct Peer { pub info: ExtendedPeerInfo, /// Holds a set of blocks known to this peer. pub known_blocks: LruHashSet, + /// Notification sink. + sink: NotificationsSink, } pub struct SyncingEngine { @@ -184,6 +186,9 @@ pub struct SyncingEngine { /// Channel for receiving service commands service_rx: TracingUnboundedReceiver>, + /// Channel for receiving inbound connections from `Protocol`. + rx: sc_utils::mpsc::TracingUnboundedReceiver, + /// Assigned roles. roles: Roles, @@ -254,6 +259,7 @@ where block_request_protocol_name: ProtocolName, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, + rx: sc_utils::mpsc::TracingUnboundedReceiver, ) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> { let mode = match network_config.sync_mode { SyncOperationMode::Full => SyncMode::Full, @@ -347,6 +353,7 @@ where num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), service_rx, + rx, genesis_hash, important_peers, default_peers_set_no_slot_connected_peers: HashSet::new(), @@ -554,11 +561,7 @@ where data: Some(data.clone()), }; - self.network_service.write_notification( - *who, - self.block_announce_protocol_name.clone(), - message.encode(), - ); + peer.sink.send_sync_notification(message.encode()); } } } @@ -575,17 +578,13 @@ where ) } - pub async fn run(mut self, mut stream: Pin + Send>>) { + pub async fn run(mut self) { loop { - futures::future::poll_fn(|cx| self.poll(cx, &mut stream)).await; + futures::future::poll_fn(|cx| self.poll(cx)).await; } } - pub fn poll( - &mut self, - cx: &mut std::task::Context, - event_stream: &mut Pin + Send>>, - ) -> Poll<()> { + pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { self.num_connected.store(self.peers.len(), Ordering::Relaxed); self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); @@ -595,82 +594,8 @@ where self.tick_timeout.reset(TICK_TIMEOUT); } - while let Poll::Ready(Some(event)) = event_stream.poll_next_unpin(cx) { - match event { - Event::NotificationStreamOpened { - remote, protocol, received_handshake, .. - } => { - if protocol != self.block_announce_protocol_name { - continue - } - - match as DecodeAll>::decode_all( - &mut &received_handshake[..], - ) { - Ok(handshake) => { - if self.on_sync_peer_connected(remote, handshake).is_err() { - log::debug!( - target: "sync", - "Failed to register peer {remote:?}: {received_handshake:?}", - ); - } - }, - Err(err) => { - log::debug!( - target: "sync", - "Couldn't decode handshake sent by {}: {:?}: {}", - remote, - received_handshake, - err, - ); - self.network_service.report_peer(remote, rep::BAD_MESSAGE); - }, - } - }, - Event::NotificationStreamClosed { remote, protocol } => { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.on_sync_peer_disconnected(remote).is_err() { - log::trace!( - target: "sync", - "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", - remote - ); - } - }, - Event::NotificationsReceived { remote, messages } => { - for (protocol, message) in messages { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.peers.contains_key(&remote) { - if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { - self.push_block_announce_validation(remote, announce); - - // Make sure that the newly added block announce validation future - // was polled once to be registered in the task. - if let Poll::Ready(res) = - self.chain_sync.poll_block_announce_validation(cx) - { - self.process_block_announce_validation_result(res) - } - } else { - log::warn!(target: "sub-libp2p", "Failed to decode block announce"); - } - } else { - log::trace!( - target: "sync", - "Received sync for peer earlier refused by sync layer: {}", - remote - ); - } - } - }, - _ => {}, - } + while let Poll::Ready(result) = self.chain_sync.poll(cx) { + self.process_block_announce_validation_result(result); } while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { @@ -746,8 +671,81 @@ where } } - while let Poll::Ready(result) = self.chain_sync.poll(cx) { - self.process_block_announce_validation_result(result); + while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) { + match event { + sc_network::SyncEvent::NotificationStreamOpened { + remote, + received_handshake, + sink, + tx, + } => { + match as DecodeAll>::decode_all( + &mut &received_handshake[..], + ) { + Ok(decoded_handshake) => { + match self.on_sync_peer_connected(remote, decoded_handshake, sink) { + Ok(()) => { + let _ = tx.send(true); + }, + Err(()) => { + log::debug!( + target: "sync", + "Failed to register peer {remote:?}: {received_handshake:?}", + ); + let _ = tx.send(false); + }, + } + }, + Err(_) => { + let _ = tx.send(false); + log::debug!( + target: "sync", + "failed to decode handshake but it was decoded correctly by `Protocol`", + ); + debug_assert!(false); + }, + } + }, + sc_network::SyncEvent::NotificationStreamClosed { remote } => { + if self.on_sync_peer_disconnected(remote).is_err() { + log::trace!( + target: "sync", + "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", + remote + ); + } + }, + sc_network::SyncEvent::NotificationsReceived { remote, messages } => { + for message in messages { + if self.peers.contains_key(&remote) { + if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { + self.push_block_announce_validation(remote, announce); + + // Make sure that the newly added block announce validation future + // was polled once to be registered in the task. + if let Poll::Ready(res) = + self.chain_sync.poll_block_announce_validation(cx) + { + self.process_block_announce_validation_result(res) + } + } else { + log::warn!(target: "sub-libp2p", "Failed to decode block announce"); + } + } else { + log::trace!( + target: "sync", + "Received sync for peer earlier refused by sync layer: {}", + remote + ); + } + } + }, + sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => { + if let Some(peer) = self.peers.get_mut(&remote) { + peer.sink = sink; + } + }, + } } Poll::Pending @@ -757,13 +755,13 @@ where /// /// Returns a result if the handshake of this peer was indeed accepted. pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { - if self.important_peers.contains(&peer) { - log::warn!(target: "sync", "Reserved peer {} disconnected", peer); - } else { - log::debug!(target: "sync", "{} disconnected", peer); - } - if self.peers.remove(&peer).is_some() { + if self.important_peers.contains(&peer) { + log::warn!(target: "sync", "Reserved peer {} disconnected", peer); + } else { + log::debug!(target: "sync", "{} disconnected", peer); + } + self.chain_sync.peer_disconnected(&peer); self.default_peers_set_no_slot_connected_peers.remove(&peer); self.event_streams @@ -783,6 +781,7 @@ where &mut self, who: PeerId, status: BlockAnnouncesHandshake, + sink: NotificationsSink, ) -> Result<(), ()> { log::trace!(target: "sync", "New peer {} {:?}", who, status); @@ -794,8 +793,6 @@ where if status.genesis_hash != self.genesis_hash { self.network_service.report_peer(who, rep::GENESIS_MISMATCH); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); if self.important_peers.contains(&who) { log::error!( @@ -834,8 +831,6 @@ where this_peer_reserved_slot { log::debug!(target: "sync", "Too many full nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -844,8 +839,6 @@ where { // Make sure that not all slots are occupied by light clients. log::debug!(target: "sync", "Too many light nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -858,14 +851,13 @@ where known_blocks: LruHashSet::new( NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"), ), + sink, }; let req = if peer.info.roles.is_full() { match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { Ok(req) => req, Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); return Err(()) }, diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 8368fa278712a..9763feed5ea54 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -26,6 +26,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-network = { version = "0.10.0-dev", path = "../" } sc-network-common = { version = "0.10.0-dev", path = "../common" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sc-network-light = { version = "0.10.0-dev", path = "../light" } sc-network-sync = { version = "0.10.0-dev", path = "../sync" } sc-service = { version = "0.10.0-dev", default-features = false, features = ["test-helpers"], path = "../../service" } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 75b8287b08dcf..8db1287f30bed 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -896,6 +896,7 @@ where let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, sync_service, block_announce_config) = sc_network_sync::engine::SyncingEngine::new( Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), @@ -911,6 +912,7 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), Some(warp_protocol_config.name.clone()), + rx, ) .unwrap(); let sync_service_import_queue = Box::new(sync_service.clone()); @@ -927,6 +929,7 @@ where fork_id, metrics_registry: None, block_announce_config, + tx, request_response_protocol_configs: [ block_request_protocol_config, state_request_protocol_config, @@ -948,9 +951,8 @@ where import_queue.run(sync_service_import_queue).await; }); - let service = network.service().clone(); tokio::spawn(async move { - engine.run(service.event_stream("syncing")).await; + engine.run().await; }); self.mut_peers(move |peers| { diff --git a/client/network/test/src/service.rs b/client/network/test/src/service.rs index b1de2a91ebcc9..e66245a76bbe2 100644 --- a/client/network/test/src/service.rs +++ b/client/network/test/src/service.rs @@ -176,6 +176,7 @@ impl TestNetworkBuilder { let (chain_sync_network_provider, chain_sync_network_handle) = self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), @@ -191,6 +192,7 @@ impl TestNetworkBuilder { block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), None, + rx, ) .unwrap(); let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone())); @@ -214,6 +216,7 @@ impl TestNetworkBuilder { light_client_request_protocol_config, ] .to_vec(), + tx, }) .unwrap(); @@ -231,8 +234,7 @@ impl TestNetworkBuilder { tokio::time::sleep(std::time::Duration::from_millis(250)).await; } }); - let stream = worker.service().event_stream("syncing"); - tokio::spawn(engine.run(stream)); + tokio::spawn(engine.run()); TestNetwork::new(worker) } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index f0f9a7ed0d4d7..deb9f785092fc 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -825,6 +825,7 @@ where protocol_config }; + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (engine, sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config.role), @@ -840,6 +841,7 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), + rx, )?; let sync_service_import_queue = sync_service.clone(); let sync_service = Arc::new(sync_service); @@ -864,6 +866,7 @@ where fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, + tx, request_response_protocol_configs: request_response_protocol_configs .into_iter() .chain([ @@ -909,9 +912,7 @@ where chain_sync_network_provider.run(network.clone()), ); spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue))); - - let event_stream = network.event_stream("syncing"); - spawn_handle.spawn("syncing", None, engine.run(event_stream)); + spawn_handle.spawn_blocking("syncing", None, engine.run()); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); spawn_handle.spawn( From a3180f3948d0c72743fbb9d14812a43c6b20035c Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 29 Mar 2023 14:00:43 +0300 Subject: [PATCH 2/5] Apply review comments --- client/network/src/config.rs | 6 ++-- client/network/src/event.rs | 10 +++--- client/network/src/protocol.rs | 16 +++++++--- client/network/src/service.rs | 2 +- client/network/sync/src/engine.rs | 52 ++++++++++--------------------- client/network/test/src/lib.rs | 4 +-- client/service/src/builder.rs | 6 ++-- 7 files changed, 41 insertions(+), 55 deletions(-) diff --git a/client/network/src/config.rs b/client/network/src/config.rs index bb3395775e264..39df2259c888d 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -30,7 +30,6 @@ pub use crate::{ }; use codec::Encode; -use futures::channel::oneshot; use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; pub use sc_network_common::{ @@ -39,6 +38,7 @@ pub use sc_network_common::{ ExHashT, }; use sc_utils::mpsc::TracingUnboundedSender; +use sp_runtime::traits::Block as BlockT; use zeroize::Zeroize; use std::{ @@ -695,7 +695,7 @@ impl NetworkConfiguration { } /// Network initialization parameters. -pub struct Params { +pub struct Params { /// Assigned role for our node (full, light, ...). pub role: Role, @@ -722,7 +722,7 @@ pub struct Params { pub block_announce_config: NonDefaultSetConfig, /// TX channel for direct communication with `SyncingEngine` and `Protocol`. - pub tx: TracingUnboundedSender, + pub tx: TracingUnboundedSender>, /// Request response protocol configurations pub request_response_protocol_configs: Vec, diff --git a/client/network/src/event.rs b/client/network/src/event.rs index abf2fa6a2c631..975fde0e40a28 100644 --- a/client/network/src/event.rs +++ b/client/network/src/event.rs @@ -25,7 +25,8 @@ use bytes::Bytes; use futures::channel::oneshot; use libp2p::{core::PeerId, kad::record::Key}; -use sc_network_common::role::ObservedRole; +use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake}; +use sp_runtime::traits::Block as BlockT; /// Events generated by DHT as a response to get_value and put_value requests. #[derive(Debug, Clone)] @@ -92,8 +93,9 @@ pub enum Event { }, } -// TODO: move to sc-network -pub enum SyncEvent { +/// Event sent to `SyncingEngine` +// TODO: remove once `NotificationService` is implemented. +pub enum SyncEvent { /// Opened a substream with the given node with the given notifications protocol. /// /// The protocol is always one of the notification protocols that have been registered. @@ -101,7 +103,7 @@ pub enum SyncEvent { /// Node we opened the substream with. remote: PeerId, /// Received handshake. - received_handshake: Vec, + received_handshake: BlockAnnouncesHandshake, /// Notification sink. sink: NotificationsSink, /// Channel for reporting accept/reject of the substream. diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 330eb4f1aa605..a7e6f36ef6215 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -24,7 +24,7 @@ use crate::{ use bytes::Bytes; use codec::{DecodeAll, Encode}; -use futures::{channel::oneshot, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; use libp2p::{ core::connection::ConnectionId, swarm::{ @@ -95,7 +95,7 @@ pub struct Protocol { /// Connected peers. peers: HashMap, sync_substream_validations: FuturesUnordered, - tx: TracingUnboundedSender, + tx: TracingUnboundedSender>, _marker: std::marker::PhantomData, } @@ -105,7 +105,7 @@ impl Protocol { roles: Roles, network_config: &config::NetworkConfiguration, block_announces_protocol: config::NonDefaultSetConfig, - tx: TracingUnboundedSender, + tx: TracingUnboundedSender>, ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let mut known_addresses = Vec::new(); @@ -463,12 +463,18 @@ impl NetworkBehaviour for Protocol { match as DecodeAll>::decode_all(&mut &received_handshake[..]) { Ok(GenericMessage::Status(handshake)) => { let roles = handshake.roles; + let handshake = BlockAnnouncesHandshake:: { + roles: handshake.roles, + best_number: handshake.best_number, + best_hash: handshake.best_hash, + genesis_hash: handshake.genesis_hash, + }; let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send( crate::SyncEvent::NotificationStreamOpened { remote: peer_id, - received_handshake, + received_handshake: handshake, sink: notifications_sink, tx, }, @@ -509,7 +515,7 @@ impl NetworkBehaviour for Protocol { let _ = self.tx.unbounded_send( crate::SyncEvent::NotificationStreamOpened { remote: peer_id, - received_handshake, + received_handshake: handshake, sink: notifications_sink, tx, }, diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 05fc1c3f8859d..368f359425f07 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -149,7 +149,7 @@ where /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. pub fn new + 'static>( - mut params: Params, + mut params: Params, ) -> Result { // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index b6d6f12cfca16..a35f3f1331d8e 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -24,8 +24,8 @@ use crate::{ ChainSync, ClientError, SyncingService, }; -use codec::{Decode, DecodeAll, Encode}; -use futures::{channel::oneshot, FutureExt, Stream, StreamExt}; +use codec::{Decode, Encode}; +use futures::{FutureExt, StreamExt}; use futures_timer::Delay; use libp2p::PeerId; use lru::LruCache; @@ -39,7 +39,6 @@ use sc_network::{ config::{ NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode, }, - event::Event, utils::LruHashSet, NotificationsSink, ProtocolName, }; @@ -63,7 +62,6 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; use std::{ collections::{HashMap, HashSet}, num::NonZeroUsize, - pin::Pin, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -79,8 +77,6 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead mod rep { use sc_peerset::ReputationChange as Rep; - /// We received a message that failed to decode. - pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); /// Peer has different genesis. pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); /// Peer send us a block announcement that failed at validation. @@ -187,7 +183,7 @@ pub struct SyncingEngine { service_rx: TracingUnboundedReceiver>, /// Channel for receiving inbound connections from `Protocol`. - rx: sc_utils::mpsc::TracingUnboundedReceiver, + rx: sc_utils::mpsc::TracingUnboundedReceiver>, /// Assigned roles. roles: Roles, @@ -259,7 +255,7 @@ where block_request_protocol_name: ProtocolName, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, - rx: sc_utils::mpsc::TracingUnboundedReceiver, + rx: sc_utils::mpsc::TracingUnboundedReceiver>, ) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> { let mode = match network_config.sync_mode { SyncOperationMode::Full => SyncMode::Full, @@ -678,33 +674,17 @@ where received_handshake, sink, tx, - } => { - match as DecodeAll>::decode_all( - &mut &received_handshake[..], - ) { - Ok(decoded_handshake) => { - match self.on_sync_peer_connected(remote, decoded_handshake, sink) { - Ok(()) => { - let _ = tx.send(true); - }, - Err(()) => { - log::debug!( - target: "sync", - "Failed to register peer {remote:?}: {received_handshake:?}", - ); - let _ = tx.send(false); - }, - } - }, - Err(_) => { - let _ = tx.send(false); - log::debug!( - target: "sync", - "failed to decode handshake but it was decoded correctly by `Protocol`", - ); - debug_assert!(false); - }, - } + } => match self.on_sync_peer_connected(remote, &received_handshake, sink) { + Ok(()) => { + let _ = tx.send(true); + }, + Err(()) => { + log::debug!( + target: "sync", + "Failed to register peer {remote:?}: {received_handshake:?}", + ); + let _ = tx.send(false); + }, }, sc_network::SyncEvent::NotificationStreamClosed { remote } => { if self.on_sync_peer_disconnected(remote).is_err() { @@ -780,7 +760,7 @@ where pub fn on_sync_peer_connected( &mut self, who: PeerId, - status: BlockAnnouncesHandshake, + status: &BlockAnnouncesHandshake, sink: NotificationsSink, ) -> Result<(), ()> { log::trace!(target: "sync", "New peer {} {:?}", who, status); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 8db1287f30bed..b72f6625ee600 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -55,8 +55,8 @@ use sc_network::{ }, request_responses::ProtocolConfig as RequestResponseConfig, types::ProtocolName, - Multiaddr, NetworkBlock, NetworkEventStream, NetworkService, NetworkStateInfo, - NetworkSyncForkRequest, NetworkWorker, + Multiaddr, NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest, + NetworkWorker, }; use sc_network_common::{ role::Roles, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index deb9f785092fc..aab5b34d1d1bb 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -38,9 +38,7 @@ use sc_client_db::{Backend, DatabaseSettings}; use sc_consensus::import_queue::ImportQueue; use sc_executor::RuntimeVersionOf; use sc_keystore::LocalKeystore; -use sc_network::{ - config::SyncMode, NetworkEventStream, NetworkService, NetworkStateInfo, NetworkStatusProvider, -}; +use sc_network::{config::SyncMode, NetworkService, NetworkStateInfo, NetworkStatusProvider}; use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{role::Roles, sync::warp::WarpSyncParams}; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; @@ -906,7 +904,7 @@ where )?; spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); - spawn_handle.spawn( + spawn_handle.spawn_blocking( "chain-sync-network-service-provider", Some("networking"), chain_sync_network_provider.run(network.clone()), From a1deb406cd5944ad65a1fb48257383a735c301d2 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 29 Mar 2023 14:52:24 +0300 Subject: [PATCH 3/5] fixes --- client/network/src/config.rs | 1 - client/network/src/service.rs | 2 +- client/network/test/src/lib.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/client/network/src/config.rs b/client/network/src/config.rs index ad2e7e4b04617..781ae9c786694 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -38,7 +38,6 @@ pub use sc_network_common::{ ExHashT, }; use sc_utils::mpsc::TracingUnboundedSender; -use sp_runtime::traits::Block as BlockT; use zeroize::Zeroize; use sp_runtime::traits::Block as BlockT; diff --git a/client/network/src/service.rs b/client/network/src/service.rs index e0c03960fe5e5..9708b24d29b52 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -147,7 +147,7 @@ where /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(mut params: Params) -> Result { + pub fn new(mut params: Params) -> Result { // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index ac4bf80a261f9..f85d6ed63c247 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -920,7 +920,7 @@ where let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); - let network = NetworkWorker::new::(sc_network::config::Params { + let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: Box::new(|f| { tokio::spawn(f); From 1b565ab89fd83072f4ab5c9ff7729dc215960b25 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 29 Mar 2023 15:57:39 +0300 Subject: [PATCH 4/5] trigger ci From 3724745e9df912525fc39dc3bc9a3c0c6dac8ce0 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 29 Mar 2023 18:08:53 +0300 Subject: [PATCH 5/5] Fix tests Verify that both peers have a connection now that the validation goes through `SyncingEngine`. Depending on how the tasks are scheduled, one of them might not have the peer registered in `SyncingEngine` at which point the test won't make any progress because block announcement received from an unknown peer is discarded. Move polling of `ChainSync` at the end of the function so that if a block announcement causes a block request to be sent, that can be sent in the same call to `SyncingEngine::poll()`. --- client/network/sync/src/engine.rs | 11 +++++++---- client/network/test/src/sync.rs | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index a35f3f1331d8e..e3d45a980a0b4 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -590,10 +590,6 @@ where self.tick_timeout.reset(TICK_TIMEOUT); } - while let Poll::Ready(result) = self.chain_sync.poll(cx) { - self.process_block_announce_validation_result(result); - } - while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { match event { ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { @@ -728,6 +724,13 @@ where } } + // poll `ChainSync` last because of a block announcement was received through the + // event stream between `SyncingEngine` and `Protocol` and the validation finished + // right after it as queued, the resulting block request (if any) can be sent right away. + while let Poll::Ready(result) = self.chain_sync.poll(cx) { + self.process_block_announce_validation_result(result); + } + Poll::Pending } diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index d87b03fb3a78c..af46d15a2bacd 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -414,7 +414,7 @@ async fn can_sync_small_non_best_forks() { // poll until the two nodes connect, otherwise announcing the block will not work futures::future::poll_fn::<(), _>(|cx| { net.poll(cx); - if net.peer(0).num_peers() == 0 { + if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { Poll::Pending } else { Poll::Ready(())