Skip to content
14 changes: 13 additions & 1 deletion cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use std::{
use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand};
use ethrex_blockchain::{BlockchainOptions, BlockchainType, error::ChainError};
use ethrex_common::types::{Block, Genesis, fee_config::FeeConfig};
use ethrex_p2p::{sync::SyncMode, tx_broadcaster::BROADCAST_INTERVAL_MS, types::Node};
use ethrex_p2p::{
discv4::peer_table::TARGET_PEERS, sync::SyncMode, tx_broadcaster::BROADCAST_INTERVAL_MS,
types::Node,
};
use ethrex_rlp::encode::RLPEncode;
use ethrex_storage::error::StoreError;
use tracing::{Level, info, warn};
Expand Down Expand Up @@ -182,6 +185,14 @@ pub struct Options {
help_heading = "P2P options"
)]
pub tx_broadcasting_time_interval: u64,
#[arg(
long = "target.peers",
default_value_t = TARGET_PEERS,
value_name = "MAX_PEERS",
help = "Max amount of connected peers.",
help_heading = "P2P options"
)]
pub target_peers: usize,
#[arg(
long = "block-producer.extra-data",
default_value = get_minimal_client_version(),
Expand Down Expand Up @@ -256,6 +267,7 @@ impl Default for Options {
force: false,
mempool_max_size: Default::default(),
tx_broadcasting_time_interval: Default::default(),
target_peers: Default::default(),
extra_data: get_minimal_client_version(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ pub async fn init_l1(
&signer,
)));

let peer_handler = PeerHandler::new(PeerTable::spawn());
let peer_handler = PeerHandler::new(PeerTable::spawn(opts.target_peers));

// TODO: Check every module starts properly.
let tracker = TaskTracker::new();
Expand Down
2 changes: 1 addition & 1 deletion cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub async fn init_l2(
&signer,
)));

let peer_handler = PeerHandler::new(PeerTable::spawn());
let peer_handler = PeerHandler::new(PeerTable::spawn(opts.node_opts.target_peers));

// TODO: Check every module starts properly.
let tracker = TaskTracker::new();
Expand Down
101 changes: 50 additions & 51 deletions crates/networking/p2p/discv4/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const SCORE_WEIGHT: i64 = 1;
const REQUESTS_WEIGHT: i64 = 1;
/// Max amount of ongoing requests per peer.
const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100;
/// The target number of RLPx connections to reach.
pub const TARGET_PEERS: usize = 100;
/// The target number of contacts to maintain in peer_table.
const TARGET_CONTACTS: usize = 100_000;

#[derive(Debug, Clone)]
pub struct Contact {
Expand Down Expand Up @@ -121,9 +125,9 @@ pub struct PeerTable {
}

impl PeerTable {
pub fn spawn() -> PeerTable {
pub fn spawn(target_peers: usize) -> PeerTable {
PeerTable {
handle: PeerTableServer::default().start(),
handle: PeerTableServer::new(target_peers).start(),
}
}

Expand Down Expand Up @@ -301,19 +305,16 @@ impl PeerTable {
}

/// Check if target number of contacts and connected peers is reached
pub async fn target_reached(
&mut self,
target_contacts: usize,
target_peers: usize,
) -> Result<bool, PeerTableError> {
match self
.handle
.call(CallMessage::TargetReached {
target_contacts,
target_peers,
})
.await?
{
pub async fn target_reached(&mut self) -> Result<bool, PeerTableError> {
match self.handle.call(CallMessage::TargetReached).await? {
OutMessage::TargetReached(result) => Ok(result),
_ => unreachable!(),
}
}

/// Check if target number of connected peers is reached
pub async fn target_peers_reached(&mut self) -> Result<bool, PeerTableError> {
match self.handle.call(CallMessage::TargetPeersReached).await? {
OutMessage::TargetReached(result) => Ok(result),
_ => unreachable!(),
}
Expand Down Expand Up @@ -501,15 +502,25 @@ impl PeerTable {
}
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct PeerTableServer {
contacts: IndexMap<H256, Contact>,
peers: IndexMap<H256, PeerData>,
already_tried_peers: HashSet<H256>,
discarded_contacts: HashSet<H256>,
target_peers: usize,
}

impl PeerTableServer {
pub(crate) fn new(target_peers: usize) -> Self {
Self {
contacts: Default::default(),
peers: Default::default(),
already_tried_peers: Default::default(),
discarded_contacts: Default::default(),
target_peers,
}
}
// Internal functions //

// Weighting function used to select best peer
Expand All @@ -518,14 +529,21 @@ impl PeerTableServer {
score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT
}

// Returns if the peer has room for more connections given the current score
// and amount of inflight requests
fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool {
let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64;
(*requests as f64) < MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio
}

fn get_best_peer(&self, capabilities: &[Capability]) -> Option<(H256, PeerConnection)> {
self.peers
.iter()
// We filter only to those peers which are useful to us
.filter_map(|(id, peer_data)| {
// Skip the peer if it has too many ongoing requests or if it doesn't match
// the capabilities
if peer_data.requests > MAX_CONCURRENT_REQUESTS_PER_PEER
if !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
|| !capabilities
.iter()
.any(|cap| peer_data.supported_capabilities.contains(cap))
Expand Down Expand Up @@ -792,41 +810,22 @@ enum CastMessage {
#[derive(Clone, Debug)]
enum CallMessage {
PeerCount,
PeerCountByCapabilities {
capabilities: Vec<Capability>,
},
TargetReached {
target_contacts: usize,
target_peers: usize,
},
PeerCountByCapabilities { capabilities: Vec<Capability> },
TargetReached,
TargetPeersReached,
GetContactsToInitiate(usize),
GetContactsForLookup,
GetContactsToRevalidate(Duration),
GetBestPeer {
capabilities: Vec<Capability>,
},
GetScore {
node_id: H256,
},
GetBestPeer { capabilities: Vec<Capability> },
GetScore { node_id: H256 },
GetConnectedNodes,
GetPeersWithCapabilities,
GetPeerConnections {
capabilities: Vec<Capability>,
},
InsertIfNew {
node: Node,
},
ValidateContact {
node_id: H256,
sender_ip: IpAddr,
},
GetClosestNodes {
node_id: H256,
},
GetPeerConnections { capabilities: Vec<Capability> },
InsertIfNew { node: Node },
ValidateContact { node_id: H256, sender_ip: IpAddr },
GetClosestNodes { node_id: H256 },
GetPeersData,
GetRandomPeer {
capabilities: Vec<Capability>,
},
GetRandomPeer { capabilities: Vec<Capability> },
}

#[derive(Debug)]
Expand Down Expand Up @@ -875,11 +874,11 @@ impl GenServer for PeerTableServer {
CallMessage::PeerCountByCapabilities { capabilities } => CallResponse::Reply(
OutMessage::PeerCount(self.peer_count_by_capabilities(capabilities)),
),
CallMessage::TargetReached {
target_contacts,
target_peers,
} => CallResponse::Reply(Self::OutMsg::TargetReached(
self.contacts.len() < target_contacts && self.peers.len() < target_peers,
CallMessage::TargetReached => CallResponse::Reply(Self::OutMsg::TargetReached(
self.contacts.len() >= TARGET_CONTACTS && self.peers.len() >= self.target_peers,
)),
CallMessage::TargetPeersReached => CallResponse::Reply(Self::OutMsg::TargetReached(
self.peers.len() >= self.target_peers,
)),
CallMessage::GetContactsToInitiate(amount) => CallResponse::Reply(
Self::OutMsg::Contacts(self.get_contacts_to_initiate(amount)),
Expand Down
11 changes: 1 addition & 10 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12
const INITIAL_LOOKUP_INTERVAL: Duration = Duration::from_secs(5);
const LOOKUP_INTERVAL: Duration = Duration::from_secs(5 * 60); // 5 minutes
const PRUNE_INTERVAL: Duration = Duration::from_secs(5);
/// The target number of RLPx connections to reach.
const TARGET_PEERS: usize = 100;
/// The target number of contacts to maintain in peer_table.
const TARGET_CONTACTS: usize = 100_000;

#[derive(Debug, thiserror::Error)]
pub enum DiscoveryServerError {
Expand Down Expand Up @@ -240,12 +236,7 @@ impl DiscoveryServer {
}

async fn get_lookup_interval(&mut self) -> Duration {
if self
.peer_table
.target_reached(TARGET_CONTACTS, TARGET_PEERS)
.await
.unwrap_or(false)
{
if !self.peer_table.target_reached().await.unwrap_or(false) {
INITIAL_LOOKUP_INTERVAL
} else {
trace!("Reached target number of peers or contacts. Using longer lookup interval.");
Expand Down
14 changes: 10 additions & 4 deletions crates/networking/p2p/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,18 @@ impl Metrics {
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::NoMatchingCapabilities() => {
PeerConnectionError::NoMatchingCapabilities => {
failures_grouped_by_reason
.entry("NoMatchingCapabilities".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::TooManyPeers => {
failures_grouped_by_reason
.entry("TooManyPeers".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::Disconnected => {
failures_grouped_by_reason
.entry("Disconnected".to_owned())
Expand All @@ -371,19 +377,19 @@ impl Metrics {
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::InvalidPeerId() => {
PeerConnectionError::InvalidPeerId => {
failures_grouped_by_reason
.entry("InvalidPeerId".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::InvalidRecoveryId() => {
PeerConnectionError::InvalidRecoveryId => {
failures_grouped_by_reason
.entry("InvalidRecoveryId".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::InvalidMessageLength() => {
PeerConnectionError::InvalidMessageLength => {
failures_grouped_by_reason
.entry("InvalidMessageLength".to_owned())
.and_modify(|e| *e += 1)
Expand Down
5 changes: 2 additions & 3 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
discv4::peer_table::{PeerData, PeerTable, PeerTableError},
discv4::peer_table::{PeerData, PeerTable, PeerTableError, TARGET_PEERS},
metrics::{CurrentStepValue, METRICS},
rlpx::{
connection::server::PeerConnection,
Expand Down Expand Up @@ -149,8 +149,7 @@ impl PeerHandler {
/// Creates a dummy PeerHandler for tests where interacting with peers is not needed
/// This should only be used in tests as it won't be able to interact with the node's connected peers
pub fn dummy() -> PeerHandler {
let dummy_peer_table = PeerTable::spawn();
PeerHandler::new(dummy_peer_table)
PeerHandler::new(PeerTable::spawn(TARGET_PEERS))
}

async fn make_request(
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/p2p/rlpx/connection/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Decoder for RLPxCodec {
// Check that the size is not too large to avoid a denial of
// service attack where the server runs out of memory.
if padded_size > MAX_MESSAGE_SIZE {
return Err(PeerConnectionError::InvalidMessageLength());
return Err(PeerConnectionError::InvalidMessageLength);
}

let total_message_size = (32 + padded_size + 16) as usize;
Expand Down
22 changes: 11 additions & 11 deletions crates/networking/p2p/rlpx/connection/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn send_auth<S: AsyncWrite + std::marker::Unpin>(
mut stream: S,
) -> Result<LocalState, PeerConnectionError> {
let peer_pk =
compress_pubkey(remote_public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?;
compress_pubkey(remote_public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?;

let local_nonce = H256::random_using(&mut rand::thread_rng());
let local_ephemeral_key = SecretKey::new(&mut rand::thread_rng());
Expand All @@ -183,7 +183,7 @@ async fn send_ack<S: AsyncWrite + std::marker::Unpin>(
mut stream: S,
) -> Result<LocalState, PeerConnectionError> {
let peer_pk =
compress_pubkey(remote_public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?;
compress_pubkey(remote_public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?;

let local_nonce = H256::random_using(&mut rand::thread_rng());
let local_ephemeral_key = SecretKey::new(&mut rand::thread_rng());
Expand All @@ -205,10 +205,10 @@ async fn receive_auth<S: AsyncRead + std::marker::Unpin>(
let msg_bytes = receive_handshake_msg(stream).await?;
let size_data = &msg_bytes
.get(..2)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let msg = &msg_bytes
.get(2..)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let (auth, remote_ephemeral_key) = decode_auth_message(signer, msg, size_data)?;

Ok(RemoteState {
Expand All @@ -227,10 +227,10 @@ async fn receive_ack<S: AsyncRead + std::marker::Unpin>(
let msg_bytes = receive_handshake_msg(stream).await?;
let size_data = &msg_bytes
.get(..2)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let msg = &msg_bytes
.get(2..)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let ack = decode_ack_message(signer, msg, size_data)?;
let remote_ephemeral_key = ack
.get_ephemeral_pubkey()
Expand All @@ -254,7 +254,7 @@ async fn receive_handshake_msg<S: AsyncRead + std::marker::Unpin>(
let ack_data = [buf[0], buf[1]];
let msg_size = u16::from_be_bytes(ack_data) as usize;
if msg_size > P2P_MAX_MESSAGE_SIZE {
return Err(PeerConnectionError::InvalidMessageLength());
return Err(PeerConnectionError::InvalidMessageLength);
}
buf.resize(msg_size + 2, 0);

Expand Down Expand Up @@ -315,7 +315,7 @@ fn decode_auth_message(

// Derive a shared secret from the static keys.
let peer_pk =
compress_pubkey(auth.public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?;
compress_pubkey(auth.public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?;
let static_shared_secret = ecdh_xchng(static_key, &peer_pk).map_err(|error| {
PeerConnectionError::CryptographyError(format!(
"Invalid generated static shared secret: {error}"
Expand Down Expand Up @@ -367,13 +367,13 @@ fn decrypt_message(
// public-key (65) || iv (16) || ciphertext || mac (32)
let (pk, rest) = msg
.split_at_checked(65)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let (iv, rest) = rest
.split_at_checked(16)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let (c, d) = rest
.split_at_checked(rest.len() - 32)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;

// Derive the message shared secret.
let shared_secret = ecdh_xchng(static_key, &PublicKey::from_slice(pk)?).map_err(|error| {
Expand Down
Loading
Loading