Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl Kademlia {
message: BytesMut,
substream: Substream,
) -> crate::Result<()> {
tracing::trace!(target: LOG_TARGET, ?peer, ?query_id, "handle message from peer");
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "handle message from peer");

match KademliaMessage::from_bytes(message).ok_or(Error::InvalidData)? {
ref message @ KademliaMessage::FindNode {
Expand All @@ -399,6 +399,7 @@ impl Kademlia {
target: LOG_TARGET,
?peer,
?target,
query = ?query_id,
"handle `FIND_NODE` response",
);

Expand Down Expand Up @@ -447,7 +448,7 @@ impl Kademlia {
tracing::trace!(
target: LOG_TARGET,
?peer,
?query_id,
query = ?query_id,
?peers,
?record,
"handle `GET_VALUE` response",
Expand Down Expand Up @@ -531,7 +532,7 @@ impl Kademlia {
tracing::trace!(
target: LOG_TARGET,
?peer,
?query_id,
query = ?query_id,
?address,
"report failure for pending query",
);
Expand Down Expand Up @@ -748,11 +749,11 @@ impl Kademlia {

match result {
QueryResult::SendSuccess { substream } => {
tracing::trace!(target: LOG_TARGET, ?peer, ?query_id, "message sent to peer");
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer");
let _ = substream.close().await;
}
QueryResult::ReadSuccess { substream, message } => {
tracing::trace!(target: LOG_TARGET, ?peer, ?query_id, "message read from peer");
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer");

if let Err(error) = self.on_message_received(peer, query_id, message, substream).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message");
Expand All @@ -762,7 +763,7 @@ impl Kademlia {
tracing::debug!(
target: LOG_TARGET,
?peer,
?query_id,
query = ?query_id,
?result,
"failed to read message from substream",
);
Expand All @@ -774,7 +775,7 @@ impl Kademlia {
command = self.cmd_rx.recv() => {
match command {
Some(KademliaCommand::FindNode { peer, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?peer, ?query_id, "starting `FIND_NODE` query");
tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query");

self.engine.start_find_node(
query_id,
Expand All @@ -783,7 +784,7 @@ impl Kademlia {
);
}
Some(KademliaCommand::PutRecord { mut record, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT");
tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT");

// For `PUT_VALUE` requests originating locally we are always the publisher.
record.publisher = Some(self.local_key.clone().into_preimage());
Expand All @@ -802,7 +803,7 @@ impl Kademlia {
);
}
Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers");
tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers");

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
Expand Down
11 changes: 7 additions & 4 deletions src/protocol/libp2p/kademlia/query/find_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
/// Register response failure for `peer`.
pub fn register_response_failure(&mut self, peer: PeerId) {
let Some(peer) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, ?peer, "pending peer doesn't exist");
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist");
return;
};

Expand All @@ -113,9 +113,10 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {

/// Register `FIND_NODE` response from `peer`.
pub fn register_response(&mut self, peer: PeerId, peers: Vec<KademliaPeer>) {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer");

let Some(peer) = self.pending.remove(&peer) else {
tracing::warn!(target: LOG_TARGET, ?peer, "received response from peer but didn't expect it");
debug_assert!(false);
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it");
return;
};

Expand Down Expand Up @@ -185,12 +186,14 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer");

let (_, candidate) = self.candidates.pop_first()?;
let peer = candidate.peer;

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate");
self.pending.insert(candidate.peer, candidate.clone());

Some(QueryAction::SendMessage {
query: self.config.query,
peer: candidate.peer,
peer,
message: self.kad_message.clone(),
})
}
Expand Down
9 changes: 5 additions & 4 deletions src/protocol/libp2p/kademlia/query/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl GetRecordContext {
/// Register response failure for `peer`.
pub fn register_response_failure(&mut self, peer: PeerId) {
let Some(peer) = self.pending.remove(&peer) else {
tracing::trace!(target: LOG_TARGET, ?peer, "pending peer doesn't exist");
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist");
return;
};

Expand All @@ -149,8 +149,10 @@ impl GetRecordContext {
record: Option<Record>,
peers: Vec<KademliaPeer>,
) {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer");

let Some(peer) = self.pending.remove(&peer) else {
tracing::trace!(target: LOG_TARGET, ?peer, "received response from peer but didn't expect it");
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it");
return;
};

Expand Down Expand Up @@ -207,10 +209,9 @@ impl GetRecordContext {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer");

let (_, candidate) = self.candidates.pop_first()?;

let peer = candidate.peer;

tracing::trace!(target: LOG_TARGET, ?peer, "current candidate");
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate");
self.pending.insert(candidate.peer, candidate);

Some(QueryAction::SendMessage {
Expand Down