diff --git a/Cargo.lock b/Cargo.lock index c122c982af190..25ddad2e82f8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8320,9 +8320,15 @@ dependencies = [ [[package]] name = "litep2p" +<<<<<<< HEAD version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f46c51c205264b834ceed95c8b195026e700494bc3991aaba3b4ea9e20626d9" +======= +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ca6ee50a125dc4fc4e9a3ae3640010796d1d07bc517a0ac715fdf0b24a0b6ac" +>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099)) dependencies = [ "async-trait", "bs58 0.4.0", diff --git a/Cargo.toml b/Cargo.toml index 93cbcd2d338d1..012ea3b6c2022 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -830,7 +830,11 @@ linked-hash-map = { version = "0.5.4" } linked_hash_set = { version = "0.1.4" } linregress = { version = "0.5.1" } lite-json = { version = "0.2.0", default-features = false } +<<<<<<< HEAD litep2p = { version = "0.6.2" } +======= +litep2p = { version = "0.9.0", features = ["websocket"] } +>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099)) log = { version = "0.4.22", default-features = false } macro_magic = { version = "0.5.1" } maplit = { version = "1.0.2" } diff --git a/prdoc/pr_7099.prdoc b/prdoc/pr_7099.prdoc new file mode 100644 index 0000000000000..58d809f3c0909 --- /dev/null +++ b/prdoc/pr_7099.prdoc @@ -0,0 +1,16 @@ +title: Provide partial results to speedup GetRecord queries + +doc: + - audience: Node Dev + description: | + This PR provides the partial results of the GetRecord kademlia query. + + This significantly improves the authority discovery records, from ~37 minutes to ~2/3 minutes. + In contrast, libp2p discovers authority records in around ~10 minutes. + + The authority discovery was slow because litep2p provided the records only after the Kademlia query was completed. A normal Kademlia query completes in around 40 seconds to a few minutes. + In this PR, partial records are provided as soon as they are discovered from the network. + +crates: + - name: sc-network + bump: patch diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 62d5f0fb6f06a..6691ba5dc3d3d 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -33,9 +33,15 @@ use litep2p::{ libp2p::{ identify::{Config as IdentifyConfig, IdentifyEvent}, kademlia::{ +<<<<<<< HEAD Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum, Record, RecordKey, RecordsType, +======= + Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider, + IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId, + Quorum, Record, RecordKey, +>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099)) }, ping::{Config as PingConfig, PingEvent}, }, @@ -129,13 +135,19 @@ pub enum DiscoveryEvent { address: Multiaddr, }, - /// Record was found from the DHT. + /// `GetRecord` query succeeded. GetRecordSuccess { /// Query ID. query_id: QueryId, + }, - /// Records. - records: RecordsType, + /// Record was found from the DHT. + GetRecordPartialResult { + /// Query ID. + query_id: QueryId, + + /// Record. + record: PeerRecord, }, /// Record was successfully stored on the DHT. @@ -525,13 +537,24 @@ impl Stream for Discovery { peers: peers.into_iter().collect(), })) }, - Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => { + Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => { log::trace!( target: LOG_TARGET, - "`GET_RECORD` succeeded for {query_id:?}: {records:?}", + "`GET_RECORD` succeeded for {query_id:?}", ); - return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records })); + return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id })); + }, + Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => { + log::trace!( + target: LOG_TARGET, + "`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}", + ); + + return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult { + query_id, + record, + })); }, Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) => return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })), diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 521f1a5fe0f79..98385947a40e7 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -58,7 +58,7 @@ use litep2p::{ protocol::{ libp2p::{ bitswap::Config as BitswapConfig, - kademlia::{QueryId, Record, RecordsType}, + kademlia::{QueryId, Record}, }, request_response::ConfigBuilder as RequestResponseConfigBuilder, }, @@ -813,6 +813,7 @@ impl NetworkBackend for Litep2pNetworkBac self.peerstore_handle.add_known_peer(peer.into()); } } +<<<<<<< HEAD Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => { match self.pending_get_values.remove(&query_id) { None => log::warn!( @@ -820,11 +821,48 @@ impl NetworkBackend for Litep2pNetworkBac "`GET_VALUE` succeeded for a non-existent query", ), Some((key, started)) => { +======= + Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => { + if !self.pending_queries.contains_key(&query_id) { + log::error!( + target: LOG_TARGET, + "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}" + ); + + continue + } + + let peer_id: sc_network_types::PeerId = record.peer.into(); + let record = PeerRecord { + record: P2PRecord { + key: record.record.key.to_vec().into(), + value: record.record.value, + publisher: record.record.publisher.map(|peer_id| { + let peer_id: sc_network_types::PeerId = peer_id.into(); + peer_id.into() + }), + expires: record.record.expires, + }, + peer: Some(peer_id.into()), + }; + + self.event_streams.send( + Event::Dht( + DhtEvent::ValueFound( + record.into() + ) + ) + ); + } + Some(DiscoveryEvent::GetRecordSuccess { query_id }) => { + match self.pending_queries.remove(&query_id) { + Some(KadQuery::GetValue(key, started)) => { +>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099)) log::trace!( target: LOG_TARGET, - "`GET_VALUE` for {:?} ({query_id:?}) succeeded", - key, + "`GET_VALUE` for {key:?} ({query_id:?}) succeeded", ); +<<<<<<< HEAD for record in litep2p_to_libp2p_peer_record(records) { self.event_streams.send( Event::Dht( @@ -834,6 +872,8 @@ impl NetworkBackend for Litep2pNetworkBac ) ); } +======= +>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099)) if let Some(ref metrics) = self.metrics { metrics @@ -1028,42 +1068,3 @@ impl NetworkBackend for Litep2pNetworkBac } } } - -// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord. -fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec { - match records { - litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => { - vec![PeerRecord { - record: P2PRecord { - key: record.key.to_vec().into(), - value: record.value, - publisher: record.publisher.map(|peer_id| { - let peer_id: sc_network_types::PeerId = peer_id.into(); - peer_id.into() - }), - expires: record.expires, - }, - peer: None, - }] - }, - litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records - .into_iter() - .map(|record| { - let peer_id: sc_network_types::PeerId = record.peer.into(); - - PeerRecord { - record: P2PRecord { - key: record.record.key.to_vec().into(), - value: record.record.value, - publisher: record.record.publisher.map(|peer_id| { - let peer_id: sc_network_types::PeerId = peer_id.into(); - peer_id.into() - }), - expires: record.record.expires, - }, - peer: Some(peer_id.into()), - } - }) - .collect::>(), - } -}