Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions crates/networking/p2p/discv4/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,19 @@ impl PeerTable {
}
}

/// Get a contact using node_id
pub async fn get_contact(&mut self, node_id: H256) -> Result<Option<Contact>, PeerTableError> {
match self
.handle
.call(CallMessage::GetContact { node_id })
.await?
{
OutMessage::Contact(contact) => Ok(Some(*contact)),
OutMessage::NotFound => Ok(None),
_ => unreachable!(),
}
}

/// Get all contacts available to revalidate
pub async fn get_contacts_to_revalidate(
&mut self,
Expand Down Expand Up @@ -921,6 +934,7 @@ enum CallMessage {
GetContactToInitiate,
GetContactForLookup,
GetContactForEnrLookup,
GetContact { node_id: H256 },
GetContactsToRevalidate(Duration),
GetBestPeer { capabilities: Vec<Capability> },
GetScore { node_id: H256 },
Expand Down Expand Up @@ -1010,6 +1024,13 @@ impl GenServer for PeerTableServer {
.map(Box::new)
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
),
CallMessage::GetContact { node_id } => CallResponse::Reply(
self.contacts
.get(&node_id)
.cloned()
.map(Box::new)
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
),
CallMessage::GetContactsToRevalidate(revalidation_interval) => CallResponse::Reply(
Self::OutMsg::Contacts(self.get_contacts_to_revalidate(revalidation_interval)),
),
Expand Down
137 changes: 84 additions & 53 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl DiscoveryServer {
sender_public_key,
);

let _ = self.handle_ping(hash, node).await.inspect_err(|e| {
let _ = self.handle_ping(ping_message, hash, sender_public_key, node).await.inspect_err(|e| {
error!(sent = "Ping", to = %format!("{sender_public_key:#x}"), err = ?e, "Error handling message");
});
}
Expand Down Expand Up @@ -249,9 +249,12 @@ impl DiscoveryServer {

async fn lookup(&mut self) -> Result<(), DiscoveryServerError> {
if let Some(contact) = self.peer_table.get_contact_for_lookup().await? {
if self.udp_socket.send_to(&self.find_node_message, &contact.node.udp_addr()).await.inspect_err(
|e| error!(sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"),
).is_err() {
if let Err(e) = self
.udp_socket
.send_to(&self.find_node_message, &contact.node.udp_addr())
.await
{
error!(sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message");
self.peer_table
.set_disposable(&contact.node.node_id())
.await?;
Expand Down Expand Up @@ -281,52 +284,12 @@ impl DiscoveryServer {

async fn enr_lookup(&mut self) -> Result<(), DiscoveryServerError> {
if let Some(contact) = self.peer_table.get_contact_for_enr_lookup().await? {
let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
let enr_request = Message::ENRRequest(ENRRequestMessage { expiration });

let mut buf = Vec::new();
enr_request.encode_with_header(&mut buf, &self.signer);
let enr_request_hash: [u8; 32] = buf[..32]
.try_into()
.expect("first 32 bytes are the message hash");

if self.udp_socket.send_to(&buf, contact.node.udp_addr())
.await
.inspect_err( |e| error!(sending = "ENRRequest", addr = ?&contact.node.udp_addr(), to = %format!("{:#x}", contact.node.public_key), err=?e, "Error sending message"),)
.is_err()
{
self.peer_table
.set_disposable(&contact.node.node_id())
.await?;
METRICS.record_new_discarded_node().await;
}

self.peer_table
.record_enr_request_sent(&contact.node.node_id(), H256::from(enr_request_hash))
.await?;
self.send_enr_request(&contact.node).await?;
}
Ok(())
}

async fn send_ping(&mut self, node: &Node) -> Result<(), DiscoveryServerError> {
match self.send_ping_internal(node).await {
Ok(ping_hash) => {
METRICS.record_ping_sent().await;
self.peer_table
.record_ping_sent(&node.node_id(), ping_hash)
.await?;
}
Err(err) => {
error!(sent = "Ping", to = %format!("{:#x}", node.public_key), err = ?err, "Error sending message");
self.peer_table.set_disposable(&node.node_id()).await?;
METRICS.record_new_discarded_node().await;
}
}
Ok(())
}

async fn send_ping_internal(&self, node: &Node) -> Result<H256, DiscoveryServerError> {
let mut buf = Vec::new();
// TODO: Parametrize this expiration.
let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
let from = Endpoint {
Expand All @@ -341,14 +304,13 @@ impl DiscoveryServer {
};
let enr_seq = self.local_node_record.seq;
let ping = Message::Ping(PingMessage::new(from, to, expiration).with_enr_seq(enr_seq));
ping.encode_with_header(&mut buf, &self.signer);
let ping_hash: [u8; 32] = buf[..32]
.try_into()
.expect("first 32 bytes are the message hash");
// We do not use self.send() here, as we already encoded the message to calculate hash.
self.udp_socket.send_to(&buf, node.udp_addr()).await?;
let ping_hash = self.send_else_dispose(ping, node).await?;
trace!(sent = "Ping", to = %format!("{:#x}", node.public_key));
Ok(H256::from(ping_hash))
METRICS.record_ping_sent().await;
self.peer_table
.record_ping_sent(&node.node_id(), ping_hash)
.await?;
Ok(())
}

async fn send_pong(&self, ping_hash: H256, node: &Node) -> Result<(), DiscoveryServerError> {
Expand Down Expand Up @@ -389,6 +351,18 @@ impl DiscoveryServer {
Ok(())
}

async fn send_enr_request(&mut self, node: &Node) -> Result<(), DiscoveryServerError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's factorize common code (message encoding, hash extraction, udp_socket.send, maybe set_disposable) with send_ping/send_ping_internal to an utility function to minimize code repetiton.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added send_else_dispose to remove code duplication.

let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
let enr_request = Message::ENRRequest(ENRRequestMessage { expiration });

let enr_request_hash = self.send_else_dispose(enr_request, node).await?;

self.peer_table
.record_enr_request_sent(&node.node_id(), enr_request_hash)
.await?;
Ok(())
}

async fn send_enr_response(
&self,
request_hash: H256,
Expand All @@ -403,11 +377,34 @@ impl DiscoveryServer {
Ok(())
}

async fn handle_ping(&mut self, hash: H256, node: Node) -> Result<(), DiscoveryServerError> {
async fn handle_ping(
&mut self,
ping_message: PingMessage,
hash: H256,
sender_public_key: H512,
node: Node,
) -> Result<(), DiscoveryServerError> {
self.send_pong(hash, &node).await?;

if self.peer_table.insert_if_new(&node).await.unwrap_or(false) {
self.send_ping(&node).await?;
} else {
// If the contact has stale ENR then request the updated one.
let node_id = node_id(&sender_public_key);
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node_id is being computed redundantly here. The node parameter already has the same node_id since it was created from sender_public_key (see line 159-164). You can use node.node_id() directly instead of recomputing node_id(&sender_public_key) to avoid the extra keccak hash computation.

Suggested change
let node_id = node_id(&sender_public_key);
let node_id = node.node_id();

Copilot uses AI. Check for mistakes.
let stored_enr_seq = self
.peer_table
.get_contact(node_id)
.await?
.and_then(|c| c.record)
.map(|r| r.seq);

let received_enr_seq = ping_message.enr_seq;

if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
Comment on lines +403 to +407
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
if received_enr_seq.is_some_and(|r| stored_enr_seq.is_some_and(|s| r > s)) {
self.send_enr_request(&node).await?;
}

Not sure which one I prefer 🤔

Copy link
Contributor

@Oppen Oppen Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe transpose:

Suggested change
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
if [received_enr_seq, stored_enr_seq].transpose().is_some_and(|[received, stored]| received > stored) {
self.send_enr_request(&node).await?;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or:

Suggested change
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&node).await?;
}
if let Some([received, stored]) = [received_enr_seq, stored_enr_seq].transpose() && received > stored {
self.send_enr_request(&node).await?;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transpose is an unstable feature.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😭

Comment on lines +391 to +407
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ENR staleness detection logic in handle_ping lacks test coverage. Consider adding tests to verify that ENR requests are sent when a ping is received with a higher sequence number than the stored ENR.

Copilot uses AI. Check for mistakes.
}
Ok(())
}
Expand All @@ -417,9 +414,25 @@ impl DiscoveryServer {
message: PongMessage,
node_id: H256,
) -> Result<(), DiscoveryServerError> {
let Some(contact) = self.peer_table.get_contact(node_id).await? else {
return Ok(());
};

// If the contact doesn't exist then there is nothing to record.
// So we do it after making sure that the contact exists.
self.peer_table
.record_pong_received(&node_id, message.ping_hash)
.await?;

// If the contact has stale ENR then request the updated one.
let stored_enr_seq = contact.record.map(|r| r.seq);
let received_enr_seq = message.enr_seq;
if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq)
&& received > stored
{
self.send_enr_request(&contact.node).await?;
}
Comment on lines +427 to +434
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ENR staleness detection logic in handle_pong lacks test coverage. Consider adding tests to verify that ENR requests are sent when a pong is received with a higher sequence number than the stored ENR.

Copilot uses AI. Check for mistakes.

Ok(())
}

Expand Down Expand Up @@ -634,6 +647,24 @@ impl DiscoveryServer {
|e| error!(sending = ?message, addr = ?addr, err=?e, "Error sending message"),
)?)
}

async fn send_else_dispose(
&mut self,
message: Message,
node: &Node,
) -> Result<H256, DiscoveryServerError> {
let mut buf = BytesMut::new();
message.encode_with_header(&mut buf, &self.signer);
let message_hash: [u8; 32] = buf[..32]
.try_into()
.expect("first 32 bytes are the message hash");
if let Err(e) = self.udp_socket.send_to(&buf, node.udp_addr()).await {
error!(sending = ?message, addr = ?node.udp_addr(), to = ?node.node_id(), err=?e, "Error sending message");
self.peer_table.set_disposable(&node.node_id()).await?;
METRICS.record_new_discarded_node().await;
}
Ok(H256::from(message_hash))
}
Comment on lines +651 to +667
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new send_else_dispose helper method lacks test coverage. Consider adding tests to verify its behavior when sending succeeds and when it fails, particularly ensuring that the node is marked as disposable and metrics are recorded on failure.

Copilot uses AI. Check for mistakes.
}

impl GenServer for DiscoveryServer {
Expand Down