Skip to content
9 changes: 8 additions & 1 deletion crates/networking/p2p/discv4/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,14 +517,21 @@ impl PeerTableServer {
score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT
}

// Returns if the peer has room for more connections given the current score
// and amount of inflight requests
fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool {
let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64;
(*requests as f64) < MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio
}

fn get_best_peer(&self, capabilities: &[Capability]) -> Option<(H256, PeerConnection)> {
self.peers
.iter()
// We filter only to those peers which are useful to us
.filter_map(|(id, peer_data)| {
// Skip the peer if it has too many ongoing requests or if it doesn't match
// the capabilities
if peer_data.requests > MAX_CONCURRENT_REQUESTS_PER_PEER
if !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
|| !capabilities
.iter()
.any(|cap| peer_data.supported_capabilities.contains(cap))
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const INITIAL_LOOKUP_INTERVAL: Duration = Duration::from_secs(5);
const LOOKUP_INTERVAL: Duration = Duration::from_secs(5 * 60); // 5 minutes
const PRUNE_INTERVAL: Duration = Duration::from_secs(5);
/// The target number of RLPx connections to reach.
const TARGET_PEERS: usize = 100;
pub const TARGET_PEERS: usize = 100;
/// The target number of contacts to maintain in peer_table.
const TARGET_CONTACTS: usize = 100_000;

Expand Down
14 changes: 10 additions & 4 deletions crates/networking/p2p/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,18 @@ impl Metrics {
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::NoMatchingCapabilities() => {
PeerConnectionError::NoMatchingCapabilities => {
failures_grouped_by_reason
.entry("NoMatchingCapabilities".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::TooManyPeers => {
failures_grouped_by_reason
.entry("TooManyPeers".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::Disconnected => {
failures_grouped_by_reason
.entry("Disconnected".to_owned())
Expand All @@ -371,19 +377,19 @@ impl Metrics {
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::InvalidPeerId() => {
PeerConnectionError::InvalidPeerId => {
failures_grouped_by_reason
.entry("InvalidPeerId".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::InvalidRecoveryId() => {
PeerConnectionError::InvalidRecoveryId => {
failures_grouped_by_reason
.entry("InvalidRecoveryId".to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
PeerConnectionError::InvalidMessageLength() => {
PeerConnectionError::InvalidMessageLength => {
failures_grouped_by_reason
.entry("InvalidMessageLength".to_owned())
.and_modify(|e| *e += 1)
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/p2p/rlpx/connection/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Decoder for RLPxCodec {
// Check that the size is not too large to avoid a denial of
// service attack where the server runs out of memory.
if padded_size > MAX_MESSAGE_SIZE {
return Err(PeerConnectionError::InvalidMessageLength());
return Err(PeerConnectionError::InvalidMessageLength);
}

let total_message_size = (32 + padded_size + 16) as usize;
Expand Down
22 changes: 11 additions & 11 deletions crates/networking/p2p/rlpx/connection/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn send_auth<S: AsyncWrite + std::marker::Unpin>(
mut stream: S,
) -> Result<LocalState, PeerConnectionError> {
let peer_pk =
compress_pubkey(remote_public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?;
compress_pubkey(remote_public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?;

let local_nonce = H256::random_using(&mut rand::thread_rng());
let local_ephemeral_key = SecretKey::new(&mut rand::thread_rng());
Expand All @@ -183,7 +183,7 @@ async fn send_ack<S: AsyncWrite + std::marker::Unpin>(
mut stream: S,
) -> Result<LocalState, PeerConnectionError> {
let peer_pk =
compress_pubkey(remote_public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?;
compress_pubkey(remote_public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?;

let local_nonce = H256::random_using(&mut rand::thread_rng());
let local_ephemeral_key = SecretKey::new(&mut rand::thread_rng());
Expand All @@ -205,10 +205,10 @@ async fn receive_auth<S: AsyncRead + std::marker::Unpin>(
let msg_bytes = receive_handshake_msg(stream).await?;
let size_data = &msg_bytes
.get(..2)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let msg = &msg_bytes
.get(2..)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let (auth, remote_ephemeral_key) = decode_auth_message(signer, msg, size_data)?;

Ok(RemoteState {
Expand All @@ -227,10 +227,10 @@ async fn receive_ack<S: AsyncRead + std::marker::Unpin>(
let msg_bytes = receive_handshake_msg(stream).await?;
let size_data = &msg_bytes
.get(..2)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let msg = &msg_bytes
.get(2..)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let ack = decode_ack_message(signer, msg, size_data)?;
let remote_ephemeral_key = ack
.get_ephemeral_pubkey()
Expand All @@ -254,7 +254,7 @@ async fn receive_handshake_msg<S: AsyncRead + std::marker::Unpin>(
let ack_data = [buf[0], buf[1]];
let msg_size = u16::from_be_bytes(ack_data) as usize;
if msg_size > P2P_MAX_MESSAGE_SIZE {
return Err(PeerConnectionError::InvalidMessageLength());
return Err(PeerConnectionError::InvalidMessageLength);
}
buf.resize(msg_size + 2, 0);

Expand Down Expand Up @@ -315,7 +315,7 @@ fn decode_auth_message(

// Derive a shared secret from the static keys.
let peer_pk =
compress_pubkey(auth.public_key).ok_or_else(PeerConnectionError::InvalidPeerId)?;
compress_pubkey(auth.public_key).ok_or_else(|| PeerConnectionError::InvalidPeerId)?;
let static_shared_secret = ecdh_xchng(static_key, &peer_pk).map_err(|error| {
PeerConnectionError::CryptographyError(format!(
"Invalid generated static shared secret: {error}"
Expand Down Expand Up @@ -367,13 +367,13 @@ fn decrypt_message(
// public-key (65) || iv (16) || ciphertext || mac (32)
let (pk, rest) = msg
.split_at_checked(65)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let (iv, rest) = rest
.split_at_checked(16)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;
let (c, d) = rest
.split_at_checked(rest.len() - 32)
.ok_or_else(PeerConnectionError::InvalidMessageLength)?;
.ok_or_else(|| PeerConnectionError::InvalidMessageLength)?;

// Derive the message shared secret.
let shared_secret = ecdh_xchng(static_key, &PublicKey::from_slice(pk)?).map_err(|error| {
Expand Down
19 changes: 12 additions & 7 deletions crates/networking/p2p/rlpx/connection/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
discv4::peer_table::PeerTable,
discv4::{peer_table::PeerTable, server::TARGET_PEERS},
metrics::METRICS,
network::P2PContext,
rlpx::{
Expand Down Expand Up @@ -264,7 +264,7 @@ impl GenServer for PeerConnectionServer {
initialize_connection(handle, &mut established_state, stream, eth_version).await
{
match &reason {
PeerConnectionError::NoMatchingCapabilities()
PeerConnectionError::NoMatchingCapabilities
| PeerConnectionError::HandshakeError(_) => {
established_state
.peer_table
Expand Down Expand Up @@ -390,11 +390,11 @@ impl GenServer for PeerConnectionServer {
| PeerConnectionError::DisconnectReceived(_)
| PeerConnectionError::DisconnectSent(_)
| PeerConnectionError::HandshakeError(_)
| PeerConnectionError::NoMatchingCapabilities()
| PeerConnectionError::InvalidPeerId()
| PeerConnectionError::InvalidMessageLength()
| PeerConnectionError::NoMatchingCapabilities
| PeerConnectionError::InvalidPeerId
| PeerConnectionError::InvalidMessageLength
| PeerConnectionError::StateError(_)
| PeerConnectionError::InvalidRecoveryId() => {
| PeerConnectionError::InvalidRecoveryId => {
log_peer_debug(&established_state.node, &e.to_string());
return CastResponse::Stop;
}
Expand Down Expand Up @@ -475,6 +475,10 @@ async fn initialize_connection<S>(
where
S: Unpin + Send + Stream<Item = Result<Message, PeerConnectionError>> + 'static,
{
if state.peer_table.peer_count().await? > TARGET_PEERS {
log_peer_warn(&state.node, "Reached target peer connections, discarding.");
return Err(PeerConnectionError::TooManyPeers);
}
exchange_hello_messages(state, &mut stream).await?;

// Update eth capability version to the negotiated version for further message decoding
Expand Down Expand Up @@ -713,6 +717,7 @@ fn match_disconnect_reason(error: &PeerConnectionError) -> Option<DisconnectReas
PeerConnectionError::DisconnectSent(reason) => Some(*reason),
PeerConnectionError::DisconnectReceived(reason) => Some(*reason),
PeerConnectionError::RLPDecodeError(_) => Some(DisconnectReason::NetworkError),
PeerConnectionError::TooManyPeers => Some(DisconnectReason::TooManyPeers),
// TODO build a proper matching between error types and disconnection reasons
_ => None,
}
Expand Down Expand Up @@ -787,7 +792,7 @@ where
state.capabilities = hello_message.capabilities;

if negotiated_eth_version == 0 {
return Err(PeerConnectionError::NoMatchingCapabilities());
return Err(PeerConnectionError::NoMatchingCapabilities);
}
debug!("Negotatied eth version: eth/{}", negotiated_eth_version);
state.negotiated_eth_capability = Some(Capability::eth(negotiated_eth_version));
Expand Down
10 changes: 6 additions & 4 deletions crates/networking/p2p/rlpx/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub enum PeerConnectionError {
#[error("Invalid connection state: {0}")]
StateError(String),
#[error("No matching capabilities")]
NoMatchingCapabilities(),
NoMatchingCapabilities,
#[error("Too many peers")]
TooManyPeers,
#[error("Peer disconnected")]
Disconnected,
#[error("Disconnect requested: {0}")]
Expand All @@ -34,11 +36,11 @@ pub enum PeerConnectionError {
#[error("Not Found: {0}")]
NotFound(String),
#[error("Invalid peer id")]
InvalidPeerId(),
InvalidPeerId,
#[error("Invalid recovery id")]
InvalidRecoveryId(),
InvalidRecoveryId,
#[error("Invalid message length")]
InvalidMessageLength(),
InvalidMessageLength,
#[error("Request id not present: {0}")]
ExpectedRequestId(String),
#[error("Cannot handle message: {0}")]
Expand Down
25 changes: 15 additions & 10 deletions crates/networking/p2p/rlpx/initiator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{
discv4::peer_table::PeerTableError, metrics::METRICS, network::P2PContext,
discv4::{peer_table::PeerTableError, server::TARGET_PEERS},
metrics::METRICS,
network::P2PContext,
rlpx::connection::server::PeerConnection,
};
use spawned_concurrency::{
Expand Down Expand Up @@ -54,15 +56,18 @@ impl RLPxInitiator {
async fn look_for_peers(&mut self) -> Result<(), RLPxInitiatorError> {
info!("Looking for peers");

let contacts = self
.context
.table
.get_contacts_to_initiate(self.new_connections_per_lookup)
.await?;

for contact in contacts {
PeerConnection::spawn_as_initiator(self.context.clone(), &contact.node).await;
METRICS.record_new_rlpx_conn_attempt().await;
if self.context.table.peer_count().await? < TARGET_PEERS {
let contacts = self
.context
.table
.get_contacts_to_initiate(self.new_connections_per_lookup)
.await?;
for contact in contacts {
PeerConnection::spawn_as_initiator(self.context.clone(), &contact.node).await;
METRICS.record_new_rlpx_conn_attempt().await;
}
} else {
info!("Target peer connections reached, no need to initiate new connections.");
}
Ok(())
}
Expand Down
Loading