From 62101ca46f284019a2183165a78c2bf066a25f65 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 14 Oct 2025 16:55:01 -0300 Subject: [PATCH 1/7] Limit the amount of peers to target --- crates/networking/p2p/discv4/peer_table.rs | 9 ++++++- crates/networking/p2p/discv4/server.rs | 2 +- crates/networking/p2p/metrics.rs | 14 +++++++--- crates/networking/p2p/peer_handler.rs | 3 +++ .../networking/p2p/rlpx/connection/codec.rs | 2 +- .../p2p/rlpx/connection/handshake.rs | 22 +++++++-------- .../networking/p2p/rlpx/connection/server.rs | 19 ++++++++----- crates/networking/p2p/rlpx/error.rs | 10 ++++--- crates/networking/p2p/rlpx/initiator.rs | 27 +++++++++++-------- 9 files changed, 68 insertions(+), 40 deletions(-) diff --git a/crates/networking/p2p/discv4/peer_table.rs b/crates/networking/p2p/discv4/peer_table.rs index 51d85eb7bf3..c418d0934e1 100644 --- a/crates/networking/p2p/discv4/peer_table.rs +++ b/crates/networking/p2p/discv4/peer_table.rs @@ -517,6 +517,13 @@ impl PeerTableServer { score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT } + // Returns if the peer has room for more connections given the current score + // and amount of inflight requests + fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool { + let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64; + (*requests as f64) < MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio + } + fn get_best_peer(&self, capabilities: &[Capability]) -> Option<(H256, PeerConnection)> { self.peers .iter() @@ -524,7 +531,7 @@ impl PeerTableServer { .filter_map(|(id, peer_data)| { // Skip the peer if it has too many ongoing requests or if it doesn't match // the capabilities - if peer_data.requests > MAX_CONCURRENT_REQUESTS_PER_PEER + if !self.can_try_more_requests(&peer_data.score, &peer_data.requests) || !capabilities .iter() .any(|cap| peer_data.supported_capabilities.contains(cap)) diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 0a897a57119..b77f75c6848 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -43,7 +43,7 @@ const INITIAL_LOOKUP_INTERVAL: Duration = Duration::from_secs(5); const LOOKUP_INTERVAL: Duration = Duration::from_secs(5 * 60); // 5 minutes const PRUNE_INTERVAL: Duration = Duration::from_secs(5); /// The target number of RLPx connections to reach. -const TARGET_PEERS: usize = 100; +pub const TARGET_PEERS: usize = 100; /// The target number of contacts to maintain in peer_table. const TARGET_CONTACTS: usize = 100_000; diff --git a/crates/networking/p2p/metrics.rs b/crates/networking/p2p/metrics.rs index c17af704654..f625ea02144 100644 --- a/crates/networking/p2p/metrics.rs +++ b/crates/networking/p2p/metrics.rs @@ -341,12 +341,18 @@ impl Metrics { .and_modify(|e| *e += 1) .or_insert(1); } - PeerConnectionError::NoMatchingCapabilities() => { + PeerConnectionError::NoMatchingCapabilities => { failures_grouped_by_reason .entry("NoMatchingCapabilities".to_owned()) .and_modify(|e| *e += 1) .or_insert(1); } + PeerConnectionError::TooManyPeers => { + failures_grouped_by_reason + .entry("TooManyPeers".to_owned()) + .and_modify(|e| *e += 1) + .or_insert(1); + } PeerConnectionError::Disconnected => { failures_grouped_by_reason .entry("Disconnected".to_owned()) @@ -371,19 +377,19 @@ impl Metrics { .and_modify(|e| *e += 1) .or_insert(1); } - PeerConnectionError::InvalidPeerId() => { + PeerConnectionError::InvalidPeerId => { failures_grouped_by_reason .entry("InvalidPeerId".to_owned()) .and_modify(|e| *e += 1) .or_insert(1); } - PeerConnectionError::InvalidRecoveryId() => { + PeerConnectionError::InvalidRecoveryId => { failures_grouped_by_reason .entry("InvalidRecoveryId".to_owned()) .and_modify(|e| *e += 1) .or_insert(1); } - PeerConnectionError::InvalidMessageLength() => { + PeerConnectionError::InvalidMessageLength => { failures_grouped_by_reason .entry("InvalidMessageLength".to_owned()) .and_modify(|e| *e += 1) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 0dda9cb1b48..f8c22422184 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -165,6 +165,9 @@ impl PeerHandler { peer_table.inc_requests(peer_id).await?; let result = connection.outgoing_request(message, timeout).await; peer_table.dec_requests(peer_id).await?; + if let Err(PeerConnectionError::Timeout) = result { + peer_table.record_critical_failure(&peer_id).await?; + }; result } diff --git a/crates/networking/p2p/rlpx/connection/codec.rs b/crates/networking/p2p/rlpx/connection/codec.rs index 2fdcdf5ab87..e17d5b0b38c 100644 --- a/crates/networking/p2p/rlpx/connection/codec.rs +++ b/crates/networking/p2p/rlpx/connection/codec.rs @@ -187,7 +187,7 @@ impl Decoder for RLPxCodec { // Check that the size is not too large to avoid a denial of // service attack where the server runs out of memory. if padded_size > MAX_MESSAGE_SIZE { - return Err(PeerConnectionError::InvalidMessageLength()); + return Err(PeerConnectionError::InvalidMessageLength); } let total_message_size = (32 + padded_size + 16) as usize; diff --git a/crates/networking/p2p/rlpx/connection/handshake.rs b/crates/networking/p2p/rlpx/connection/handshake.rs index 688b1f8dbda..048202f1909 100644 --- a/crates/networking/p2p/rlpx/connection/handshake.rs +++ b/crates/networking/p2p/rlpx/connection/handshake.rs @@ -163,7 +163,7 @@ async fn send_auth( mut stream: S, ) -> Result { let peer_pk = - compress_pubkey(remote_public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?; + compress_pubkey(remote_public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?; let local_nonce = H256::random_using(&mut rand::thread_rng()); let local_ephemeral_key = SecretKey::new(&mut rand::thread_rng()); @@ -183,7 +183,7 @@ async fn send_ack( mut stream: S, ) -> Result { let peer_pk = - compress_pubkey(remote_public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?; + compress_pubkey(remote_public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?; let local_nonce = H256::random_using(&mut rand::thread_rng()); let local_ephemeral_key = SecretKey::new(&mut rand::thread_rng()); @@ -205,10 +205,10 @@ async fn receive_auth( let msg_bytes = receive_handshake_msg(stream).await?; let size_data = &msg_bytes .get(..2) - .ok_or_else(PeerConnectionError::InvalidMessageLength)?; + .ok_or_else(|| PeerConnectionError::InvalidMessageLength)?; let msg = &msg_bytes .get(2..) - .ok_or_else(PeerConnectionError::InvalidMessageLength)?; + .ok_or_else(|| PeerConnectionError::InvalidMessageLength)?; let (auth, remote_ephemeral_key) = decode_auth_message(signer, msg, size_data)?; Ok(RemoteState { @@ -227,10 +227,10 @@ async fn receive_ack( let msg_bytes = receive_handshake_msg(stream).await?; let size_data = &msg_bytes .get(..2) - .ok_or_else(PeerConnectionError::InvalidMessageLength)?; + .ok_or_else(|| PeerConnectionError::InvalidMessageLength)?; let msg = &msg_bytes .get(2..) - .ok_or_else(PeerConnectionError::InvalidMessageLength)?; + .ok_or_else(|| PeerConnectionError::InvalidMessageLength)?; let ack = decode_ack_message(signer, msg, size_data)?; let remote_ephemeral_key = ack .get_ephemeral_pubkey() @@ -254,7 +254,7 @@ async fn receive_handshake_msg( let ack_data = [buf[0], buf[1]]; let msg_size = u16::from_be_bytes(ack_data) as usize; if msg_size > P2P_MAX_MESSAGE_SIZE { - return Err(PeerConnectionError::InvalidMessageLength()); + return Err(PeerConnectionError::InvalidMessageLength); } buf.resize(msg_size + 2, 0); @@ -315,7 +315,7 @@ fn decode_auth_message( // Derive a shared secret from the static keys. let peer_pk = - compress_pubkey(auth.public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?; + compress_pubkey(auth.public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?; let static_shared_secret = ecdh_xchng(static_key, &peer_pk).map_err(|error| { PeerConnectionError::CryptographyError(format!( "Invalid generated static shared secret: {error}" @@ -367,13 +367,13 @@ fn decrypt_message( // public-key (65) || iv (16) || ciphertext || mac (32) let (pk, rest) = msg .split_at_checked(65) - .ok_or_else(PeerConnectionError::InvalidMessageLength)?; + .ok_or_else(|| PeerConnectionError::InvalidMessageLength)?; let (iv, rest) = rest .split_at_checked(16) - .ok_or_else(PeerConnectionError::InvalidMessageLength)?; + .ok_or_else(|| PeerConnectionError::InvalidMessageLength)?; let (c, d) = rest .split_at_checked(rest.len() - 32) - .ok_or_else(PeerConnectionError::InvalidMessageLength)?; + .ok_or_else(|| PeerConnectionError::InvalidMessageLength)?; // Derive the message shared secret. let shared_secret = ecdh_xchng(static_key, &PublicKey::from_slice(pk)?).map_err(|error| { diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index ae2af5bc955..fed7a5a5e98 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -1,5 +1,5 @@ use crate::{ - discv4::peer_table::PeerTable, + discv4::{peer_table::PeerTable, server::TARGET_PEERS}, metrics::METRICS, network::P2PContext, rlpx::{ @@ -264,7 +264,7 @@ impl GenServer for PeerConnectionServer { initialize_connection(handle, &mut established_state, stream, eth_version).await { match &reason { - PeerConnectionError::NoMatchingCapabilities() + PeerConnectionError::NoMatchingCapabilities | PeerConnectionError::HandshakeError(_) => { established_state .peer_table @@ -390,11 +390,11 @@ impl GenServer for PeerConnectionServer { | PeerConnectionError::DisconnectReceived(_) | PeerConnectionError::DisconnectSent(_) | PeerConnectionError::HandshakeError(_) - | PeerConnectionError::NoMatchingCapabilities() - | PeerConnectionError::InvalidPeerId() - | PeerConnectionError::InvalidMessageLength() + | PeerConnectionError::NoMatchingCapabilities + | PeerConnectionError::InvalidPeerId + | PeerConnectionError::InvalidMessageLength | PeerConnectionError::StateError(_) - | PeerConnectionError::InvalidRecoveryId() => { + | PeerConnectionError::InvalidRecoveryId => { log_peer_debug(&established_state.node, &e.to_string()); return CastResponse::Stop; } @@ -475,6 +475,10 @@ async fn initialize_connection( where S: Unpin + Send + Stream> + 'static, { + if state.peer_table.peer_count().await? > TARGET_PEERS { + log_peer_warn(&state.node, "Reached target peer connections, discarding."); + return Err(PeerConnectionError::TooManyPeers); + } exchange_hello_messages(state, &mut stream).await?; // Update eth capability version to the negotiated version for further message decoding @@ -713,6 +717,7 @@ fn match_disconnect_reason(error: &PeerConnectionError) -> Option Some(*reason), PeerConnectionError::DisconnectReceived(reason) => Some(*reason), PeerConnectionError::RLPDecodeError(_) => Some(DisconnectReason::NetworkError), + PeerConnectionError::TooManyPeers => Some(DisconnectReason::TooManyPeers), // TODO build a proper matching between error types and disconnection reasons _ => None, } @@ -787,7 +792,7 @@ where state.capabilities = hello_message.capabilities; if negotiated_eth_version == 0 { - return Err(PeerConnectionError::NoMatchingCapabilities()); + return Err(PeerConnectionError::NoMatchingCapabilities); } debug!("Negotatied eth version: eth/{}", negotiated_eth_version); state.negotiated_eth_capability = Some(Capability::eth(negotiated_eth_version)); diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 71e8e0337ab..e6c659ccfde 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -24,7 +24,9 @@ pub enum PeerConnectionError { #[error("Invalid connection state: {0}")] StateError(String), #[error("No matching capabilities")] - NoMatchingCapabilities(), + NoMatchingCapabilities, + #[error("Too many peers")] + TooManyPeers, #[error("Peer disconnected")] Disconnected, #[error("Disconnect requested: {0}")] @@ -34,11 +36,11 @@ pub enum PeerConnectionError { #[error("Not Found: {0}")] NotFound(String), #[error("Invalid peer id")] - InvalidPeerId(), + InvalidPeerId, #[error("Invalid recovery id")] - InvalidRecoveryId(), + InvalidRecoveryId, #[error("Invalid message length")] - InvalidMessageLength(), + InvalidMessageLength, #[error("Request id not present: {0}")] ExpectedRequestId(String), #[error("Cannot handle message: {0}")] diff --git a/crates/networking/p2p/rlpx/initiator.rs b/crates/networking/p2p/rlpx/initiator.rs index 728718abf64..e861ee18cd7 100644 --- a/crates/networking/p2p/rlpx/initiator.rs +++ b/crates/networking/p2p/rlpx/initiator.rs @@ -1,5 +1,7 @@ use crate::{ - discv4::peer_table::PeerTableError, metrics::METRICS, network::P2PContext, + discv4::{peer_table::PeerTableError, server::TARGET_PEERS}, + metrics::METRICS, + network::P2PContext, rlpx::connection::server::PeerConnection, }; use spawned_concurrency::{ @@ -37,7 +39,7 @@ impl RLPxInitiator { initial_lookup_interval: Duration::from_secs(3), lookup_interval: Duration::from_secs(5 * 60), target_peers: 50, - new_connections_per_lookup: 5000, + new_connections_per_lookup: 500, } } @@ -54,15 +56,18 @@ impl RLPxInitiator { async fn look_for_peers(&mut self) -> Result<(), RLPxInitiatorError> { info!("Looking for peers"); - let contacts = self - .context - .table - .get_contacts_to_initiate(self.new_connections_per_lookup) - .await?; - - for contact in contacts { - PeerConnection::spawn_as_initiator(self.context.clone(), &contact.node).await; - METRICS.record_new_rlpx_conn_attempt().await; + if self.context.table.peer_count().await? < TARGET_PEERS { + let contacts = self + .context + .table + .get_contacts_to_initiate(self.new_connections_per_lookup) + .await?; + for contact in contacts { + PeerConnection::spawn_as_initiator(self.context.clone(), &contact.node).await; + METRICS.record_new_rlpx_conn_attempt().await; + } + } else { + info!("Target peer connections reached, no need to initiate new connections."); } Ok(()) } From eb0c8221fda2cf3487aebcacf566d067a37baf6c Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 14 Oct 2025 16:59:48 -0300 Subject: [PATCH 2/7] Limit the amount of peers to target --- crates/networking/p2p/peer_handler.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index f8c22422184..0dda9cb1b48 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -165,9 +165,6 @@ impl PeerHandler { peer_table.inc_requests(peer_id).await?; let result = connection.outgoing_request(message, timeout).await; peer_table.dec_requests(peer_id).await?; - if let Err(PeerConnectionError::Timeout) = result { - peer_table.record_critical_failure(&peer_id).await?; - }; result } From 69c8015ea56a9e69c8184212628667a23fe3953e Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 14 Oct 2025 17:00:47 -0300 Subject: [PATCH 3/7] Limit the amount of peers to target --- crates/networking/p2p/rlpx/initiator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/initiator.rs b/crates/networking/p2p/rlpx/initiator.rs index e861ee18cd7..07837d00ed4 100644 --- a/crates/networking/p2p/rlpx/initiator.rs +++ b/crates/networking/p2p/rlpx/initiator.rs @@ -39,7 +39,7 @@ impl RLPxInitiator { initial_lookup_interval: Duration::from_secs(3), lookup_interval: Duration::from_secs(5 * 60), target_peers: 50, - new_connections_per_lookup: 500, + new_connections_per_lookup: 5000, } } From 1820e821b9d9a6c18c072a0d1fb32d24d13d08eb Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 14 Oct 2025 18:29:56 -0300 Subject: [PATCH 4/7] Correct limit check --- crates/networking/p2p/rlpx/connection/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 62d7c85eb60..cb1429ef397 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -475,7 +475,7 @@ async fn initialize_connection( where S: Unpin + Send + Stream> + 'static, { - if state.peer_table.peer_count().await? > TARGET_PEERS { + if state.peer_table.peer_count().await? >= TARGET_PEERS { log_peer_warn(&state.node, "Reached target peer connections, discarding."); return Err(PeerConnectionError::TooManyPeers); } From d7f893eb836c73f6fc7bd01a53380fe691c08c55 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 15 Oct 2025 10:03:36 -0300 Subject: [PATCH 5/7] Added target.peers cli argument --- cmd/ethrex/cli.rs | 11 +++- cmd/ethrex/initializers.rs | 2 +- cmd/ethrex/l2/initializers.rs | 2 +- crates/networking/p2p/discv4/peer_table.rs | 59 ++++++++++++------- crates/networking/p2p/discv4/server.rs | 8 +-- crates/networking/p2p/peer_handler.rs | 13 ++-- .../networking/p2p/rlpx/connection/server.rs | 4 +- crates/networking/p2p/rlpx/initiator.rs | 4 +- 8 files changed, 61 insertions(+), 42 deletions(-) diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 18ad18e821f..40ac47854c2 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -8,7 +8,7 @@ use std::{ use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand}; use ethrex_blockchain::{BlockchainOptions, BlockchainType, error::ChainError}; use ethrex_common::types::{Block, Genesis, fee_config::FeeConfig}; -use ethrex_p2p::{sync::SyncMode, tx_broadcaster::BROADCAST_INTERVAL_MS, types::Node}; +use ethrex_p2p::{discv4::peer_table::TARGET_PEERS, sync::SyncMode, tx_broadcaster::BROADCAST_INTERVAL_MS, types::Node}; use ethrex_rlp::encode::RLPEncode; use ethrex_storage::error::StoreError; use tracing::{Level, info, warn}; @@ -182,6 +182,14 @@ pub struct Options { help_heading = "P2P options" )] pub tx_broadcasting_time_interval: u64, + #[arg( + long = "target.peers", + default_value_t = TARGET_PEERS, + value_name = "MAX_PEERS", + help = "Max amount of connected peers.", + help_heading = "P2P options" + )] + pub target_peers: usize, #[arg( long = "block-producer.extra-data", default_value = get_minimal_client_version(), @@ -256,6 +264,7 @@ impl Default for Options { force: false, mempool_max_size: Default::default(), tx_broadcasting_time_interval: Default::default(), + target_peers: Default::default(), extra_data: get_minimal_client_version(), } } diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index ab6dee336d4..6fa3baf8901 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -428,7 +428,7 @@ pub async fn init_l1( &signer, ))); - let peer_handler = PeerHandler::new(PeerTable::spawn()); + let peer_handler = PeerHandler::new(PeerTable::spawn(opts.target_peers)); // TODO: Check every module starts properly. let tracker = TaskTracker::new(); diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index b55dbb359a2..cf76e1b9929 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -179,7 +179,7 @@ pub async fn init_l2( &signer, ))); - let peer_handler = PeerHandler::new(PeerTable::spawn()); + let peer_handler = PeerHandler::new(PeerTable::spawn(opts.node_opts.target_peers)); // TODO: Check every module starts properly. let tracker = TaskTracker::new(); diff --git a/crates/networking/p2p/discv4/peer_table.rs b/crates/networking/p2p/discv4/peer_table.rs index 92c6a9c3d15..1d1b097bdf4 100644 --- a/crates/networking/p2p/discv4/peer_table.rs +++ b/crates/networking/p2p/discv4/peer_table.rs @@ -31,6 +31,10 @@ const SCORE_WEIGHT: i64 = 1; const REQUESTS_WEIGHT: i64 = 1; /// Max amount of ongoing requests per peer. const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100; +/// The target number of RLPx connections to reach. +pub const TARGET_PEERS: usize = 100; +/// The target number of contacts to maintain in peer_table. +const TARGET_CONTACTS: usize = 100_000; #[derive(Debug, Clone)] pub struct Contact { @@ -121,9 +125,9 @@ pub struct PeerTable { } impl PeerTable { - pub fn spawn() -> PeerTable { + pub fn spawn(target_peers: usize) -> PeerTable { PeerTable { - handle: PeerTableServer::default().start(), + handle: PeerTableServer::new(target_peers).start(), } } @@ -301,17 +305,22 @@ impl PeerTable { } /// Check if target number of contacts and connected peers is reached - pub async fn target_reached( - &mut self, - target_contacts: usize, - target_peers: usize, - ) -> Result { + pub async fn target_reached(&mut self) -> Result { match self .handle - .call(CallMessage::TargetReached { - target_contacts, - target_peers, - }) + .call(CallMessage::TargetReached) + .await? + { + OutMessage::TargetReached(result) => Ok(result), + _ => unreachable!(), + } + } + + /// Check if target number of connected peers is reached + pub async fn target_peers_reached(&mut self) -> Result { + match self + .handle + .call(CallMessage::TargetPeersReached) .await? { OutMessage::TargetReached(result) => Ok(result), @@ -501,15 +510,25 @@ impl PeerTable { } } -#[derive(Debug, Default)] +#[derive(Debug)] struct PeerTableServer { contacts: IndexMap, peers: IndexMap, already_tried_peers: HashSet, discarded_contacts: HashSet, + target_peers: usize, } impl PeerTableServer { + pub(crate) fn new(target_peers: usize) -> Self { + Self { + contacts: Default::default(), + peers: Default::default(), + already_tried_peers: Default::default(), + discarded_contacts: Default::default(), + target_peers, + } + } // Internal functions // // Weighting function used to select best peer @@ -802,10 +821,8 @@ enum CallMessage { PeerCountByCapabilities { capabilities: Vec, }, - TargetReached { - target_contacts: usize, - target_peers: usize, - }, + TargetReached, + TargetPeersReached, GetContactsToInitiate(usize), GetContactsForLookup, GetContactsToRevalidate(Duration), @@ -882,11 +899,11 @@ impl GenServer for PeerTableServer { CallMessage::PeerCountByCapabilities { capabilities } => CallResponse::Reply( OutMessage::PeerCount(self.peer_count_by_capabilities(capabilities)), ), - CallMessage::TargetReached { - target_contacts, - target_peers, - } => CallResponse::Reply(Self::OutMsg::TargetReached( - self.contacts.len() < target_contacts && self.peers.len() < target_peers, + CallMessage::TargetReached => CallResponse::Reply(Self::OutMsg::TargetReached( + self.contacts.len() >= TARGET_CONTACTS && self.peers.len() >= self.target_peers, + )), + CallMessage::TargetPeersReached => CallResponse::Reply(Self::OutMsg::TargetReached( + self.peers.len() >= self.target_peers, )), CallMessage::GetContactsToInitiate(amount) => CallResponse::Reply( Self::OutMsg::Contacts(self.get_contacts_to_initiate(amount)), diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index b77f75c6848..51a542f79a9 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -42,10 +42,6 @@ const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12 const INITIAL_LOOKUP_INTERVAL: Duration = Duration::from_secs(5); const LOOKUP_INTERVAL: Duration = Duration::from_secs(5 * 60); // 5 minutes const PRUNE_INTERVAL: Duration = Duration::from_secs(5); -/// The target number of RLPx connections to reach. -pub const TARGET_PEERS: usize = 100; -/// The target number of contacts to maintain in peer_table. -const TARGET_CONTACTS: usize = 100_000; #[derive(Debug, thiserror::Error)] pub enum DiscoveryServerError { @@ -240,9 +236,9 @@ impl DiscoveryServer { } async fn get_lookup_interval(&mut self) -> Duration { - if self + if !self .peer_table - .target_reached(TARGET_CONTACTS, TARGET_PEERS) + .target_reached() .await .unwrap_or(false) { diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 0dda9cb1b48..8693192c0c5 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -1,13 +1,12 @@ use crate::{ - discv4::peer_table::{PeerData, PeerTable, PeerTableError}, + discv4::peer_table::{PeerData, PeerTable, PeerTableError, TARGET_PEERS}, metrics::{CurrentStepValue, METRICS}, rlpx::{ connection::server::PeerConnection, error::PeerConnectionError, eth::{ blocks::{ - BLOCK_HEADER_LIMIT, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, - HashOrNumber, + BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, HashOrNumber, BLOCK_HEADER_LIMIT }, receipts::GetReceipts, }, @@ -19,10 +18,9 @@ use crate::{ }, }, snap::encodable_to_proof, - sync::{AccountStorageRoots, BlockSyncState, block_is_stale, update_pivot}, + sync::{block_is_stale, update_pivot, AccountStorageRoots, BlockSyncState}, utils::{ - AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file, - get_account_state_snapshot_file, get_account_storages_snapshot_file, + dump_accounts_to_file, dump_storages_to_file, get_account_state_snapshot_file, get_account_storages_snapshot_file, AccountsWithStorage }, }; use bytes::Bytes; @@ -149,8 +147,7 @@ impl PeerHandler { /// Creates a dummy PeerHandler for tests where interacting with peers is not needed /// This should only be used in tests as it won't be able to interact with the node's connected peers pub fn dummy() -> PeerHandler { - let dummy_peer_table = PeerTable::spawn(); - PeerHandler::new(dummy_peer_table) + PeerHandler::new(PeerTable::spawn(TARGET_PEERS)) } async fn make_request( diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index cb1429ef397..4ae690b84ac 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -1,5 +1,5 @@ use crate::{ - discv4::{peer_table::PeerTable, server::TARGET_PEERS}, + discv4::peer_table::PeerTable, metrics::METRICS, network::P2PContext, rlpx::{ @@ -475,7 +475,7 @@ async fn initialize_connection( where S: Unpin + Send + Stream> + 'static, { - if state.peer_table.peer_count().await? >= TARGET_PEERS { + if state.peer_table.target_peers_reached().await? { log_peer_warn(&state.node, "Reached target peer connections, discarding."); return Err(PeerConnectionError::TooManyPeers); } diff --git a/crates/networking/p2p/rlpx/initiator.rs b/crates/networking/p2p/rlpx/initiator.rs index 07837d00ed4..a0b79ffbffc 100644 --- a/crates/networking/p2p/rlpx/initiator.rs +++ b/crates/networking/p2p/rlpx/initiator.rs @@ -1,5 +1,5 @@ use crate::{ - discv4::{peer_table::PeerTableError, server::TARGET_PEERS}, + discv4::peer_table::PeerTableError, metrics::METRICS, network::P2PContext, rlpx::connection::server::PeerConnection, @@ -56,7 +56,7 @@ impl RLPxInitiator { async fn look_for_peers(&mut self) -> Result<(), RLPxInitiatorError> { info!("Looking for peers"); - if self.context.table.peer_count().await? < TARGET_PEERS { + if !self.context.table.target_peers_reached().await? { let contacts = self .context .table From 75a0990e72818c25b89c89ee8462f0b0e9583e37 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 15 Oct 2025 10:28:13 -0300 Subject: [PATCH 6/7] cargo fmt --- cmd/ethrex/cli.rs | 5 ++- crates/networking/p2p/discv4/peer_table.rs | 45 +++++----------------- crates/networking/p2p/discv4/server.rs | 7 +--- crates/networking/p2p/peer_handler.rs | 8 ++-- crates/networking/p2p/rlpx/initiator.rs | 4 +- 5 files changed, 21 insertions(+), 48 deletions(-) diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 40ac47854c2..8942564c231 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -8,7 +8,10 @@ use std::{ use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand}; use ethrex_blockchain::{BlockchainOptions, BlockchainType, error::ChainError}; use ethrex_common::types::{Block, Genesis, fee_config::FeeConfig}; -use ethrex_p2p::{discv4::peer_table::TARGET_PEERS, sync::SyncMode, tx_broadcaster::BROADCAST_INTERVAL_MS, types::Node}; +use ethrex_p2p::{ + discv4::peer_table::TARGET_PEERS, sync::SyncMode, tx_broadcaster::BROADCAST_INTERVAL_MS, + types::Node, +}; use ethrex_rlp::encode::RLPEncode; use ethrex_storage::error::StoreError; use tracing::{Level, info, warn}; diff --git a/crates/networking/p2p/discv4/peer_table.rs b/crates/networking/p2p/discv4/peer_table.rs index 1d1b097bdf4..479bfa45f71 100644 --- a/crates/networking/p2p/discv4/peer_table.rs +++ b/crates/networking/p2p/discv4/peer_table.rs @@ -306,11 +306,7 @@ impl PeerTable { /// Check if target number of contacts and connected peers is reached pub async fn target_reached(&mut self) -> Result { - match self - .handle - .call(CallMessage::TargetReached) - .await? - { + match self.handle.call(CallMessage::TargetReached).await? { OutMessage::TargetReached(result) => Ok(result), _ => unreachable!(), } @@ -318,11 +314,7 @@ impl PeerTable { /// Check if target number of connected peers is reached pub async fn target_peers_reached(&mut self) -> Result { - match self - .handle - .call(CallMessage::TargetPeersReached) - .await? - { + match self.handle.call(CallMessage::TargetPeersReached).await? { OutMessage::TargetReached(result) => Ok(result), _ => unreachable!(), } @@ -818,39 +810,22 @@ enum CastMessage { #[derive(Clone, Debug)] enum CallMessage { PeerCount, - PeerCountByCapabilities { - capabilities: Vec, - }, + PeerCountByCapabilities { capabilities: Vec }, TargetReached, TargetPeersReached, GetContactsToInitiate(usize), GetContactsForLookup, GetContactsToRevalidate(Duration), - GetBestPeer { - capabilities: Vec, - }, - GetScore { - node_id: H256, - }, + GetBestPeer { capabilities: Vec }, + GetScore { node_id: H256 }, GetConnectedNodes, GetPeersWithCapabilities, - GetPeerConnections { - capabilities: Vec, - }, - InsertIfNew { - node: Node, - }, - ValidateContact { - node_id: H256, - sender_ip: IpAddr, - }, - GetClosestNodes { - node_id: H256, - }, + GetPeerConnections { capabilities: Vec }, + InsertIfNew { node: Node }, + ValidateContact { node_id: H256, sender_ip: IpAddr }, + GetClosestNodes { node_id: H256 }, GetPeersData, - GetRandomPeer { - capabilities: Vec, - }, + GetRandomPeer { capabilities: Vec }, } #[derive(Debug)] diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 51a542f79a9..37b90943dc9 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -236,12 +236,7 @@ impl DiscoveryServer { } async fn get_lookup_interval(&mut self) -> Duration { - if !self - .peer_table - .target_reached() - .await - .unwrap_or(false) - { + if !self.peer_table.target_reached().await.unwrap_or(false) { INITIAL_LOOKUP_INTERVAL } else { trace!("Reached target number of peers or contacts. Using longer lookup interval."); diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 8693192c0c5..3e1df0a7567 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -6,7 +6,8 @@ use crate::{ error::PeerConnectionError, eth::{ blocks::{ - BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, HashOrNumber, BLOCK_HEADER_LIMIT + BLOCK_HEADER_LIMIT, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, + HashOrNumber, }, receipts::GetReceipts, }, @@ -18,9 +19,10 @@ use crate::{ }, }, snap::encodable_to_proof, - sync::{block_is_stale, update_pivot, AccountStorageRoots, BlockSyncState}, + sync::{AccountStorageRoots, BlockSyncState, block_is_stale, update_pivot}, utils::{ - dump_accounts_to_file, dump_storages_to_file, get_account_state_snapshot_file, get_account_storages_snapshot_file, AccountsWithStorage + AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file, + get_account_state_snapshot_file, get_account_storages_snapshot_file, }, }; use bytes::Bytes; diff --git a/crates/networking/p2p/rlpx/initiator.rs b/crates/networking/p2p/rlpx/initiator.rs index a0b79ffbffc..6febc5f725b 100644 --- a/crates/networking/p2p/rlpx/initiator.rs +++ b/crates/networking/p2p/rlpx/initiator.rs @@ -1,7 +1,5 @@ use crate::{ - discv4::peer_table::PeerTableError, - metrics::METRICS, - network::P2PContext, + discv4::peer_table::PeerTableError, metrics::METRICS, network::P2PContext, rlpx::connection::server::PeerConnection, }; use spawned_concurrency::{ From 2d915aa72ed22949c1ab7ebb00798b5cac25d09b Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 15 Oct 2025 10:53:35 -0300 Subject: [PATCH 7/7] Updated cli doc --- docs/CLI.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/CLI.md b/docs/CLI.md index f3735d60790..2e05cc9c63e 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -84,10 +84,15 @@ P2P options: [default: 30303] - --p2p.tx-broadcasting-interval - Transaction Broadcasting Time Interval (ms) for batching transactions before broadcasting them. + --p2p.tx-broadcasting-interval + Transaction Broadcasting Time Interval (ms) for batching transactions before broadcasting them. - [default: 1000] + [default: 1000] + + --target.peers + Max amount of connected peers. + + [default: 100] RPC options: --http.addr