Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions crates/networking/p2p/discv4/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,19 @@ impl PeerTable {
}
}

/// Get a contact using node_id
pub async fn get_contact(&mut self, node_id: H256) -> Result<Option<Contact>, PeerTableError> {
match self
.handle
.call(CallMessage::GetContact { node_id })
.await?
{
OutMessage::Contact(contact) => Ok(Some(*contact)),
OutMessage::NotFound => Ok(None),
_ => unreachable!(),
}
}

/// Get all contacts available to revalidate
pub async fn get_contacts_to_revalidate(
&mut self,
Expand Down Expand Up @@ -921,6 +934,7 @@ enum CallMessage {
GetContactToInitiate,
GetContactForLookup,
GetContactForEnrLookup,
GetContact { node_id: H256 },
GetContactsToRevalidate(Duration),
GetBestPeer { capabilities: Vec<Capability> },
GetScore { node_id: H256 },
Expand Down Expand Up @@ -1010,6 +1024,13 @@ impl GenServer for PeerTableServer {
.map(Box::new)
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
),
CallMessage::GetContact { node_id } => CallResponse::Reply(
self.contacts
.get(&node_id)
.cloned()
.map(Box::new)
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
),
CallMessage::GetContactsToRevalidate(revalidation_interval) => CallResponse::Reply(
Self::OutMsg::Contacts(self.get_contacts_to_revalidate(revalidation_interval)),
),
Expand Down
94 changes: 69 additions & 25 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl DiscoveryServer {
sender_public_key,
);

let _ = self.handle_ping(hash, node).await.inspect_err(|e| {
let _ = self.handle_ping(ping_message, hash, sender_public_key, node).await.inspect_err(|e| {
error!(sent = "Ping", to = %format!("{sender_public_key:#x}"), err = ?e, "Error handling message");
});
}
Expand Down Expand Up @@ -280,29 +280,7 @@ impl DiscoveryServer {

async fn enr_lookup(&mut self) -> Result<(), DiscoveryServerError> {
if let Some(contact) = self.peer_table.get_contact_for_enr_lookup().await? {
let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
let enr_request = Message::ENRRequest(ENRRequestMessage { expiration });

let mut buf = Vec::new();
enr_request.encode_with_header(&mut buf, &self.signer);
let enr_request_hash: [u8; 32] = buf[..32]
.try_into()
.expect("first 32 bytes are the message hash");

if self.udp_socket.send_to(&buf, contact.node.udp_addr())
.await
.inspect_err( |e| error!(sending = "ENRRequest", addr = ?&contact.node.udp_addr(), to = %format!("{:#x}", contact.node.public_key), err=?e, "Error sending message"),)
.is_err()
{
self.peer_table
.set_disposable(&contact.node.node_id())
.await?;
METRICS.record_new_discarded_node().await;
}

self.peer_table
.record_enr_request_sent(&contact.node.node_id(), H256::from(enr_request_hash))
.await?;
self.send_enr_request(&contact.node).await?;
}
Ok(())
}
Expand Down Expand Up @@ -388,6 +366,33 @@ impl DiscoveryServer {
Ok(())
}

async fn send_enr_request(&mut self, node: &Node) -> Result<(), DiscoveryServerError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's factorize common code (message encoding, hash extraction, udp_socket.send, maybe set_disposable) with send_ping/send_ping_internal to an utility function to minimize code repetiton.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added send_else_dispose to remove code duplication.

let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
let enr_request = Message::ENRRequest(ENRRequestMessage { expiration });

let mut buf = Vec::new();
enr_request.encode_with_header(&mut buf, &self.signer);
let enr_request_hash: [u8; 32] = buf[..32]
.try_into()
.expect("first 32 bytes are the message hash");

if self.udp_socket.send_to(&buf, node.udp_addr())
.await
.inspect_err( |e| error!(sending = "ENRRequest", addr = ?node.udp_addr(), to = %format!("{:#x}", node.public_key), err=?e, "Error sending message"),)
.is_err()
{
self.peer_table
.set_disposable(&node.node_id())
.await?;
METRICS.record_new_discarded_node().await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either put this inside the inspect_err or convert everything to an if let Err(e)...
Having two ways to check the same thing in the same place makes the code more confusing than it needs to be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to if let Err(e).. for better error handling.


self.peer_table
.record_enr_request_sent(&node.node_id(), H256::from(enr_request_hash))
.await?;
Ok(())
}

async fn send_enr_response(
&self,
request_hash: H256,
Expand All @@ -402,11 +407,34 @@ impl DiscoveryServer {
Ok(())
}

async fn handle_ping(&mut self, hash: H256, node: Node) -> Result<(), DiscoveryServerError> {
async fn handle_ping(
&mut self,
ping_message: PingMessage,
hash: H256,
sender_public_key: H512,
node: Node,
) -> Result<(), DiscoveryServerError> {
self.send_pong(hash, &node).await?;

if self.peer_table.insert_if_new(&node).await.unwrap_or(false) {
self.send_ping(&node).await?;
} else {
// If the contact has stale ENR then request the updated one.
let node_id = node_id(&sender_public_key);
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node_id is being computed redundantly here. The node parameter already has the same node_id since it was created from sender_public_key (see line 159-164). You can use node.node_id() directly instead of recomputing node_id(&sender_public_key) to avoid the extra keccak hash computation.

Suggested change
let node_id = node_id(&sender_public_key);
let node_id = node.node_id();

Copilot uses AI. Check for mistakes.
let stored_enr_seq = self
.peer_table
.get_contact(node_id)
.await?
.and_then(|c| c.record)
.map(|r| r.seq);

let received_enr_seq = ping_message.enr_seq;

if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
Comment on lines +403 to +407
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
if received_enr_seq.is_some_and(|r| stored_enr_seq.is_some_and(|s| r > s)) {
self.send_enr_request(&node).await?;
}

Not sure which one I prefer 🤔

Copy link
Contributor

@Oppen Oppen Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe transpose:

Suggested change
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
if [received_enr_seq, stored_enr_seq].transpose().is_some_and(|[received, stored]| received > stored) {
self.send_enr_request(&node).await?;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or:

Suggested change
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
if let Some([received, stored]) = [received_enr_seq, stored_enr_seq].transpose() && received > stored {
self.send_enr_request(&node).await?;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transpose is an unstable feature.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😭

Comment on lines +391 to +407
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ENR staleness detection logic in handle_ping lacks test coverage. Consider adding tests to verify that ENR requests are sent when a ping is received with a higher sequence number than the stored ENR.

Copilot uses AI. Check for mistakes.
}
Ok(())
}
Expand All @@ -416,9 +444,25 @@ impl DiscoveryServer {
message: PongMessage,
node_id: H256,
) -> Result<(), DiscoveryServerError> {
let Some(contact) = self.peer_table.get_contact(node_id).await? else {
return Ok(());
};

// If the contact doesn't exist then there is nothing to record.
// So we do it after making sure that the contact exists.
self.peer_table
.record_pong_received(&node_id, message.ping_hash)
.await?;

// If the contact has stale ENR then request the updated one.
let stored_enr_seq = contact.record.map(|r| r.seq);
let received_enr_seq = message.enr_seq;
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&contact.node).await?;
}
Comment on lines +427 to +434
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ENR staleness detection logic in handle_pong lacks test coverage. Consider adding tests to verify that ENR requests are sent when a pong is received with a higher sequence number than the stored ENR.

Copilot uses AI. Check for mistakes.

Ok(())
}

Expand Down
Loading