Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 125 additions & 4 deletions crates/networking/p2p/discv4/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ pub struct Contact {
/// None if no ping was sent yet or it was already acknowledged.
pub ping_hash: Option<H256>,

/// The hash of the last unacknowledged ENRRequest sent to this contact, or
/// None if no request was sent yet or it was already acknowledged.
pub enr_request_hash: Option<H256>,

pub n_find_node_sent: u64,
/// ENR associated with this contact, if it was provided by the peer.
pub record: Option<NodeRecord>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should set this to None if we receive a PING with a different seq-num. I opened issue #5488 for that

// This contact failed to respond our Ping.
pub disposable: bool,
// Set to true after we send a successful ENRResponse to it.
Expand All @@ -67,6 +73,25 @@ impl Contact {
self.validation_timestamp = Some(Instant::now());
self.ping_hash = Some(ping_hash);
}

pub fn record_enr_request_sent(&mut self, request_hash: H256) {
self.enr_request_hash = Some(request_hash);
}

// If hash does not match, ignore. Otherwise, reset enr_request_hash
pub fn record_enr_response_received(&mut self, request_hash: H256, record: NodeRecord) {
if self
.enr_request_hash
.take_if(|h| *h == request_hash)
.is_some()
{
self.record = Some(record);
}
}

pub fn has_pending_enr_request(&self) -> bool {
self.enr_request_hash.is_some()
}
}

impl From<Node> for Contact {
Expand All @@ -75,7 +100,9 @@ impl From<Node> for Contact {
node,
validation_timestamp: None,
ping_hash: None,
enr_request_hash: None,
n_find_node_sent: 0,
record: None,
disposable: false,
knows_us: true,
unwanted: false,
Expand Down Expand Up @@ -248,6 +275,38 @@ impl PeerTable {
Ok(())
}

/// Record request sent, store the request hash for later check
pub async fn record_enr_request_sent(
&mut self,
node_id: &H256,
request_hash: H256,
) -> Result<(), PeerTableError> {
self.handle
.cast(CastMessage::RecordEnrRequestSent {
node_id: *node_id,
request_hash,
})
.await?;
Ok(())
}

/// Record a response received. Check previously saved hash and reset it if it matches
pub async fn record_enr_response_received(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function is not used (and so, neither the RecordEnrResponseReceived message) because when we receive a ENRResponse we should record it directly (set_node_record), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've instead removed set_node_record and record_enr_response_received would set it directly after matching the hash.

&mut self,
node_id: &H256,
request_hash: H256,
record: NodeRecord,
) -> Result<(), PeerTableError> {
self.handle
.cast(CastMessage::RecordEnrResponseReceived {
node_id: *node_id,
request_hash,
record,
})
.await?;
Ok(())
}

/// Set peer as disposable
pub async fn set_disposable(&mut self, node_id: &H256) -> Result<(), PeerTableError> {
self.handle
Expand Down Expand Up @@ -322,7 +381,7 @@ impl PeerTable {
/// Provide a contact to initiate a connection
pub async fn get_contact_to_initiate(&mut self) -> Result<Option<Contact>, PeerTableError> {
match self.handle.call(CallMessage::GetContactToInitiate).await? {
OutMessage::Contact(contact) => Ok(Some(contact)),
OutMessage::Contact(contact) => Ok(Some(*contact)),
OutMessage::NotFound => Ok(None),
_ => unreachable!(),
}
Expand All @@ -331,7 +390,20 @@ impl PeerTable {
/// Provide a contact to perform Discovery lookup
pub async fn get_contact_for_lookup(&mut self) -> Result<Option<Contact>, PeerTableError> {
match self.handle.call(CallMessage::GetContactForLookup).await? {
OutMessage::Contact(contact) => Ok(Some(contact)),
OutMessage::Contact(contact) => Ok(Some(*contact)),
OutMessage::NotFound => Ok(None),
_ => unreachable!(),
}
}

/// Provide a contact to perform ENR lookup
pub async fn get_contact_for_enr_lookup(&mut self) -> Result<Option<Contact>, PeerTableError> {
match self
.handle
.call(CallMessage::GetContactForEnrLookup)
.await?
{
OutMessage::Contact(contact) => Ok(Some(*contact)),
OutMessage::NotFound => Ok(None),
_ => unreachable!(),
}
Expand Down Expand Up @@ -597,6 +669,21 @@ impl PeerTableServer {
.cloned()
}

fn get_contact_for_enr_lookup(&mut self) -> Option<Contact> {
self.contacts
.values()
.filter(|c| {
c.was_validated()
&& !c.has_pending_enr_request()
&& c.record.is_none()
&& !c.disposable
})
.collect::<Vec<_>>()
.choose(&mut rand::rngs::OsRng)
.cloned()
.cloned()
}

fn get_contacts_to_revalidate(&self, revalidation_interval: Duration) -> Vec<Contact> {
self.contacts
.values()
Expand All @@ -620,7 +707,7 @@ impl PeerTableServer {
if sender_ip != contact.node.ip {
return OutMessage::IpMismatch;
}
OutMessage::Contact(contact.clone())
OutMessage::Contact(Box::new(contact.clone()))
}

fn get_closest_nodes(&self, node_id: H256) -> Vec<Node> {
Expand Down Expand Up @@ -777,6 +864,15 @@ enum CastMessage {
node_id: H256,
ping_hash: H256,
},
RecordEnrRequestSent {
node_id: H256,
request_hash: H256,
},
RecordEnrResponseReceived {
node_id: H256,
request_hash: H256,
record: NodeRecord,
},
SetDisposable {
node_id: H256,
},
Expand All @@ -798,6 +894,7 @@ enum CallMessage {
TargetPeersReached,
GetContactToInitiate,
GetContactForLookup,
GetContactForEnrLookup,
GetContactsToRevalidate(Duration),
GetBestPeer { capabilities: Vec<Capability> },
GetScore { node_id: H256 },
Expand Down Expand Up @@ -826,7 +923,7 @@ pub enum OutMessage {
TargetReached(bool),
IsNew(bool),
Nodes(Vec<Node>),
Contact(Contact),
Contact(Box<Contact>),
InvalidContact,
UnknownContact,
IpMismatch,
Expand Down Expand Up @@ -874,10 +971,17 @@ impl GenServer for PeerTableServer {
)),
CallMessage::GetContactToInitiate => CallResponse::Reply(
self.get_contact_to_initiate()
.map(Box::new)
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
),
CallMessage::GetContactForLookup => CallResponse::Reply(
self.get_contact_for_lookup()
.map(Box::new)
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
),
CallMessage::GetContactForEnrLookup => CallResponse::Reply(
self.get_contact_for_enr_lookup()
.map(Box::new)
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
),
CallMessage::GetContactsToRevalidate(revalidation_interval) => CallResponse::Reply(
Expand Down Expand Up @@ -1027,6 +1131,23 @@ impl GenServer for PeerTableServer {
}
});
}
CastMessage::RecordEnrRequestSent {
node_id,
request_hash,
} => {
self.contacts
.entry(node_id)
.and_modify(|contact| contact.record_enr_request_sent(request_hash));
}
CastMessage::RecordEnrResponseReceived {
node_id,
request_hash,
record,
} => {
self.contacts.entry(node_id).and_modify(|contact| {
contact.record_enr_response_received(request_hash, record);
});
}
CastMessage::SetDisposable { node_id } => {
self.contacts
.entry(node_id)
Expand Down
77 changes: 74 additions & 3 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::{
discv4::{
codec::Discv4Codec,
messages::{
ENRResponseMessage, FindNodeMessage, Message, NeighborsMessage, Packet,
PacketDecodeErr, PingMessage, PongMessage,
ENRRequestMessage, ENRResponseMessage, FindNodeMessage, Message, NeighborsMessage,
Packet, PacketDecodeErr, PingMessage, PongMessage,
},
peer_table::{Contact, OutMessage as PeerTableOutMessage, PeerTable, PeerTableError},
},
Expand Down Expand Up @@ -65,6 +65,7 @@ pub enum InMessage {
Message(Box<Discv4Message>),
Revalidate,
Lookup,
EnrLookup,
Prune,
ChangeFindNodeMessage,
Shutdown,
Expand Down Expand Up @@ -203,6 +204,8 @@ impl DiscoveryServer {
- Take the `eth` part of the record. If it's None, this peer is garbage; if it's set
*/
trace!(received = "ENRResponse", msg = ?enrresponse_message, from = %format!("{sender_public_key:#x}"));
self.handle_enr_response(sender_public_key, from, enrresponse_message)
.await?;
}
}
Ok(())
Expand Down Expand Up @@ -263,6 +266,35 @@ impl DiscoveryServer {
}
}

async fn enr_lookup(&mut self) -> Result<(), DiscoveryServerError> {
if let Some(contact) = self.peer_table.get_contact_for_enr_lookup().await? {
let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
let enr_request = Message::ENRRequest(ENRRequestMessage { expiration });

let mut buf = Vec::new();
enr_request.encode_with_header(&mut buf, &self.signer);
let enr_request_hash: [u8; 32] = buf[..32]
.try_into()
.expect("first 32 bytes are the message hash");

if self.udp_socket.send_to(&buf, contact.node.udp_addr())
Copy link
Contributor

@ElFantasma ElFantasma Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it seems we have two kind of messages that save the hash. We may want to refactor it to have a single send_and_get_hash function (or some better naming 😬) to implement this logic only once.
It can be done on a different PR, though

.await
.inspect_err( |e| error!(sending = "ENRRequest", addr = ?&contact.node.udp_addr(), to = %format!("{:#x}", contact.node.public_key), err=?e, "Error sending message"),)
.is_err()
{
self.peer_table
.set_disposable(&contact.node.node_id())
.await?;
METRICS.record_new_discarded_node().await;
}

self.peer_table
.record_enr_request_sent(&contact.node.node_id(), H256::from(enr_request_hash))
.await?;
}
Ok(())
}

async fn send_ping(&mut self, node: &Node) -> Result<(), DiscoveryServerError> {
match self.send_ping_internal(node).await {
Ok(ping_hash) => {
Expand Down Expand Up @@ -436,6 +468,34 @@ impl DiscoveryServer {
Ok(())
}

async fn handle_enr_response(
&mut self,
sender_public_key: H512,
from: SocketAddr,
enr_response_message: ENRResponseMessage,
) -> Result<(), DiscoveryServerError> {
let node_id = node_id(&sender_public_key);

let contact = self
.validate_contact(sender_public_key, node_id, from, "ENRResponse")
.await?;

if !contact.has_pending_enr_request() {
debug!(received = "ENRResponse", from = %format!("{sender_public_key:#x}"), "unsolicited message received, skipping");
return Err(DiscoveryServerError::InvalidContact);
}

self.peer_table
.record_enr_response_received(
&node_id,
enr_response_message.request_hash,
enr_response_message.node_record,
)
.await?;

Ok(())
}

async fn validate_contact(
&mut self,
sender_public_key: H512,
Expand Down Expand Up @@ -464,7 +524,7 @@ impl DiscoveryServer {
debug!(received = message_type, to = %format!("{sender_public_key:#x}"), "IP address mismatch, skipping");
Err(DiscoveryServerError::InvalidContact)
}
PeerTableOutMessage::Contact(contact) => Ok(contact),
PeerTableOutMessage::Contact(contact) => Ok(*contact),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -521,6 +581,7 @@ impl GenServer for DiscoveryServer {
InMessage::ChangeFindNodeMessage,
);
let _ = handle.clone().cast(InMessage::Lookup).await;
let _ = handle.clone().cast(InMessage::EnrLookup).await;
send_message_on(handle.clone(), tokio::signal::ctrl_c(), InMessage::Shutdown);

Ok(Success(self))
Expand Down Expand Up @@ -555,6 +616,16 @@ impl GenServer for DiscoveryServer {
let interval = self.get_lookup_interval().await;
send_after(interval, handle.clone(), Self::CastMsg::Lookup);
}
Self::CastMsg::EnrLookup => {
trace!(received = "EnrLookup");
let _ = self
.enr_lookup()
.await
.inspect_err(|e| error!(err=?e, "Error performing Discovery lookup"));

let interval = self.get_lookup_interval().await;
send_after(interval, handle.clone(), Self::CastMsg::EnrLookup);
}
Self::CastMsg::Prune => {
trace!(received = "Prune");
let _ = self
Expand Down