diff --git a/crates/networking/p2p/discv4/peer_table.rs b/crates/networking/p2p/discv4/peer_table.rs index 30c26c6f81..ca7939a7f0 100644 --- a/crates/networking/p2p/discv4/peer_table.rs +++ b/crates/networking/p2p/discv4/peer_table.rs @@ -427,6 +427,19 @@ impl PeerTable { } } + /// Get a contact using node_id + pub async fn get_contact(&mut self, node_id: H256) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetContact { node_id }) + .await? + { + OutMessage::Contact(contact) => Ok(Some(*contact)), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } + /// Get all contacts available to revalidate pub async fn get_contacts_to_revalidate( &mut self, @@ -921,6 +934,7 @@ enum CallMessage { GetContactToInitiate, GetContactForLookup, GetContactForEnrLookup, + GetContact { node_id: H256 }, GetContactsToRevalidate(Duration), GetBestPeer { capabilities: Vec }, GetScore { node_id: H256 }, @@ -1010,6 +1024,13 @@ impl GenServer for PeerTableServer { .map(Box::new) .map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact), ), + CallMessage::GetContact { node_id } => CallResponse::Reply( + self.contacts + .get(&node_id) + .cloned() + .map(Box::new) + .map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact), + ), CallMessage::GetContactsToRevalidate(revalidation_interval) => CallResponse::Reply( Self::OutMsg::Contacts(self.get_contacts_to_revalidate(revalidation_interval)), ), diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 5581fa3364..95e803cb58 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -163,7 +163,7 @@ impl DiscoveryServer { sender_public_key, ); - let _ = self.handle_ping(hash, node).await.inspect_err(|e| { + let _ = self.handle_ping(ping_message, hash, sender_public_key, node).await.inspect_err(|e| { error!(sent = "Ping", to = %format!("{sender_public_key:#x}"), err = ?e, "Error handling message"); }); } @@ -249,9 +249,12 @@ impl DiscoveryServer { async fn lookup(&mut self) -> Result<(), DiscoveryServerError> { if let Some(contact) = self.peer_table.get_contact_for_lookup().await? { - if self.udp_socket.send_to(&self.find_node_message, &contact.node.udp_addr()).await.inspect_err( - |e| error!(sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"), - ).is_err() { + if let Err(e) = self + .udp_socket + .send_to(&self.find_node_message, &contact.node.udp_addr()) + .await + { + error!(sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"); self.peer_table .set_disposable(&contact.node.node_id()) .await?; @@ -281,52 +284,12 @@ impl DiscoveryServer { async fn enr_lookup(&mut self) -> Result<(), DiscoveryServerError> { if let Some(contact) = self.peer_table.get_contact_for_enr_lookup().await? { - let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS); - let enr_request = Message::ENRRequest(ENRRequestMessage { expiration }); - - let mut buf = Vec::new(); - enr_request.encode_with_header(&mut buf, &self.signer); - let enr_request_hash: [u8; 32] = buf[..32] - .try_into() - .expect("first 32 bytes are the message hash"); - - if self.udp_socket.send_to(&buf, contact.node.udp_addr()) - .await - .inspect_err( |e| error!(sending = "ENRRequest", addr = ?&contact.node.udp_addr(), to = %format!("{:#x}", contact.node.public_key), err=?e, "Error sending message"),) - .is_err() - { - self.peer_table - .set_disposable(&contact.node.node_id()) - .await?; - METRICS.record_new_discarded_node().await; - } - - self.peer_table - .record_enr_request_sent(&contact.node.node_id(), H256::from(enr_request_hash)) - .await?; + self.send_enr_request(&contact.node).await?; } Ok(()) } async fn send_ping(&mut self, node: &Node) -> Result<(), DiscoveryServerError> { - match self.send_ping_internal(node).await { - Ok(ping_hash) => { - METRICS.record_ping_sent().await; - self.peer_table - .record_ping_sent(&node.node_id(), ping_hash) - .await?; - } - Err(err) => { - error!(sent = "Ping", to = %format!("{:#x}", node.public_key), err = ?err, "Error sending message"); - self.peer_table.set_disposable(&node.node_id()).await?; - METRICS.record_new_discarded_node().await; - } - } - Ok(()) - } - - async fn send_ping_internal(&self, node: &Node) -> Result { - let mut buf = Vec::new(); // TODO: Parametrize this expiration. let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS); let from = Endpoint { @@ -341,14 +304,13 @@ impl DiscoveryServer { }; let enr_seq = self.local_node_record.seq; let ping = Message::Ping(PingMessage::new(from, to, expiration).with_enr_seq(enr_seq)); - ping.encode_with_header(&mut buf, &self.signer); - let ping_hash: [u8; 32] = buf[..32] - .try_into() - .expect("first 32 bytes are the message hash"); - // We do not use self.send() here, as we already encoded the message to calculate hash. - self.udp_socket.send_to(&buf, node.udp_addr()).await?; + let ping_hash = self.send_else_dispose(ping, node).await?; trace!(sent = "Ping", to = %format!("{:#x}", node.public_key)); - Ok(H256::from(ping_hash)) + METRICS.record_ping_sent().await; + self.peer_table + .record_ping_sent(&node.node_id(), ping_hash) + .await?; + Ok(()) } async fn send_pong(&self, ping_hash: H256, node: &Node) -> Result<(), DiscoveryServerError> { @@ -389,6 +351,18 @@ impl DiscoveryServer { Ok(()) } + async fn send_enr_request(&mut self, node: &Node) -> Result<(), DiscoveryServerError> { + let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS); + let enr_request = Message::ENRRequest(ENRRequestMessage { expiration }); + + let enr_request_hash = self.send_else_dispose(enr_request, node).await?; + + self.peer_table + .record_enr_request_sent(&node.node_id(), enr_request_hash) + .await?; + Ok(()) + } + async fn send_enr_response( &self, request_hash: H256, @@ -403,11 +377,34 @@ impl DiscoveryServer { Ok(()) } - async fn handle_ping(&mut self, hash: H256, node: Node) -> Result<(), DiscoveryServerError> { + async fn handle_ping( + &mut self, + ping_message: PingMessage, + hash: H256, + sender_public_key: H512, + node: Node, + ) -> Result<(), DiscoveryServerError> { self.send_pong(hash, &node).await?; if self.peer_table.insert_if_new(&node).await.unwrap_or(false) { self.send_ping(&node).await?; + } else { + // If the contact has stale ENR then request the updated one. + let node_id = node_id(&sender_public_key); + let stored_enr_seq = self + .peer_table + .get_contact(node_id) + .await? + .and_then(|c| c.record) + .map(|r| r.seq); + + let received_enr_seq = ping_message.enr_seq; + + if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq) + && received > stored + { + self.send_enr_request(&node).await?; + } } Ok(()) } @@ -417,9 +414,25 @@ impl DiscoveryServer { message: PongMessage, node_id: H256, ) -> Result<(), DiscoveryServerError> { + let Some(contact) = self.peer_table.get_contact(node_id).await? else { + return Ok(()); + }; + + // If the contact doesn't exist then there is nothing to record. + // So we do it after making sure that the contact exists. self.peer_table .record_pong_received(&node_id, message.ping_hash) .await?; + + // If the contact has stale ENR then request the updated one. + let stored_enr_seq = contact.record.map(|r| r.seq); + let received_enr_seq = message.enr_seq; + if let (Some(received), Some(stored)) = (received_enr_seq, stored_enr_seq) + && received > stored + { + self.send_enr_request(&contact.node).await?; + } + Ok(()) } @@ -634,6 +647,24 @@ impl DiscoveryServer { |e| error!(sending = ?message, addr = ?addr, err=?e, "Error sending message"), )?) } + + async fn send_else_dispose( + &mut self, + message: Message, + node: &Node, + ) -> Result { + let mut buf = BytesMut::new(); + message.encode_with_header(&mut buf, &self.signer); + let message_hash: [u8; 32] = buf[..32] + .try_into() + .expect("first 32 bytes are the message hash"); + if let Err(e) = self.udp_socket.send_to(&buf, node.udp_addr()).await { + error!(sending = ?message, addr = ?node.udp_addr(), to = ?node.node_id(), err=?e, "Error sending message"); + self.peer_table.set_disposable(&node.node_id()).await?; + METRICS.record_new_discarded_node().await; + } + Ok(H256::from(message_hash)) + } } impl GenServer for DiscoveryServer {