Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,11 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
<<<<<<< HEAD
litep2p = { version = "0.6.2" }
=======
litep2p = { version = "0.9.0", features = ["websocket"] }
>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099))
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_7099.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Provide partial results to speedup GetRecord queries

doc:
- audience: Node Dev
description: |
This PR provides the partial results of the GetRecord kademlia query.

This significantly improves the authority discovery records, from ~37 minutes to ~2/3 minutes.
In contrast, libp2p discovers authority records in around ~10 minutes.

The authority discovery was slow because litep2p provided the records only after the Kademlia query was completed. A normal Kademlia query completes in around 40 seconds to a few minutes.
In this PR, partial records are provided as soon as they are discovered from the network.

crates:
- name: sc-network
bump: patch
35 changes: 29 additions & 6 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ use litep2p::{
libp2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
<<<<<<< HEAD
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
=======
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId,
Quorum, Record, RecordKey,
>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099))
},
ping::{Config as PingConfig, PingEvent},
},
Expand Down Expand Up @@ -129,13 +135,19 @@ pub enum DiscoveryEvent {
address: Multiaddr,
},

/// Record was found from the DHT.
/// `GetRecord` query succeeded.
GetRecordSuccess {
/// Query ID.
query_id: QueryId,
},

/// Records.
records: RecordsType,
/// Record was found from the DHT.
GetRecordPartialResult {
/// Query ID.
query_id: QueryId,

/// Record.
record: PeerRecord,
},

/// Record was successfully stored on the DHT.
Expand Down Expand Up @@ -525,13 +537,24 @@ impl Stream for Discovery {
peers: peers.into_iter().collect(),
}))
},
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
"`GET_RECORD` succeeded for {query_id:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id }));
},
Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult {
query_id,
record,
}));
},
Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Expand Down
85 changes: 43 additions & 42 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use litep2p::{
protocol::{
libp2p::{
bitswap::Config as BitswapConfig,
kademlia::{QueryId, Record, RecordsType},
kademlia::{QueryId, Record},
},
request_response::ConfigBuilder as RequestResponseConfigBuilder,
},
Expand Down Expand Up @@ -813,18 +813,56 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.peerstore_handle.add_known_peer(peer.into());
}
}
<<<<<<< HEAD
Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
match self.pending_get_values.remove(&query_id) {
None => log::warn!(
target: LOG_TARGET,
"`GET_VALUE` succeeded for a non-existent query",
),
Some((key, started)) => {
=======
Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
if !self.pending_queries.contains_key(&query_id) {
log::error!(
target: LOG_TARGET,
"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
);

continue
}

let peer_id: sc_network_types::PeerId = record.peer.into();
let record = PeerRecord {
record: P2PRecord {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
},
peer: Some(peer_id.into()),
};

self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
record.into()
)
)
);
}
Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
match self.pending_queries.remove(&query_id) {
Some(KadQuery::GetValue(key, started)) => {
>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099))
log::trace!(
target: LOG_TARGET,
"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
key,
"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
);
<<<<<<< HEAD
for record in litep2p_to_libp2p_peer_record(records) {
self.event_streams.send(
Event::Dht(
Expand All @@ -834,6 +872,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
)
);
}
=======
>>>>>>> 77c78e15 (litep2p: Provide partial results to speedup GetRecord queries (#7099))

if let Some(ref metrics) = self.metrics {
metrics
Expand Down Expand Up @@ -1028,42 +1068,3 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
}

// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord.
fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec<PeerRecord> {
match records {
litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
vec![PeerRecord {
record: P2PRecord {
key: record.key.to_vec().into(),
value: record.value,
publisher: record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.expires,
},
peer: None,
}]
},
litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records
.into_iter()
.map(|record| {
let peer_id: sc_network_types::PeerId = record.peer.into();

PeerRecord {
record: P2PRecord {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
},
peer: Some(peer_id.into()),
}
})
.collect::<Vec<_>>(),
}
}