From 8f293c055e4c6dceb2b3a87398f7ac480843e805 Mon Sep 17 00:00:00 2001 From: Kevin Lu Date: Tue, 18 May 2021 16:35:42 +0200 Subject: [PATCH 1/5] implemented min outbound peers req --- .../eth2_libp2p/src/peer_manager/mod.rs | 35 +++++++++++++++---- .../eth2_libp2p/src/peer_manager/peer_info.rs | 5 +++ .../eth2_libp2p/src/peer_manager/peerdb.rs | 27 ++++++++++++++ beacon_node/eth2_libp2p/src/types/globals.rs | 5 +++ 4 files changed, 65 insertions(+), 7 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index eeef06d0791..a05e3ee35e3 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1; /// them in lighthouse. const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; +/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections. +const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.1; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -923,7 +926,11 @@ impl PeerManager { /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { let peer_count = self.network_globals.connected_or_dialing_peers(); - if peer_count < self.target_peers { + let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let min_outbound_only_target = + (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; + + if peer_count < self.target_peers || outbound_only_peer_count < min_outbound_only_target { // If we need more peers, queue a discovery lookup. if self.discovery.started { debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); @@ -938,21 +945,35 @@ impl PeerManager { let mut disconnecting_peers = Vec::new(); let connected_peer_count = self.network_globals.connected_peers(); + //refresh outbound-only peer count + outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); if connected_peer_count > self.target_peers { //remove excess peers with the worst scores, but keep subnet peers - for (peer_id, _) in self + //must also ensure that the outbound-only peer count does not go below the minimum threshold + let mut peers_removed = 0; + let mut n_outbound_removed = 0; + for (peer_id, info) in self .network_globals .peers .read() .worst_connected_peers() .iter() .filter(|(_, info)| !info.has_future_duty()) - .take(connected_peer_count - self.target_peers) - //we only need to disconnect peers with healthy scores, since the others got already - //disconnected in update_peer_scores - .filter(|(_, info)| info.score_state() == ScoreState::Healthy) { - disconnecting_peers.push(**peer_id); + if peers_removed == connected_peer_count - self.target_peers { + break; + } + if info.is_outbound_only() { + if n_outbound_removed < outbound_only_peer_count - min_outbound_only_target { + n_outbound_removed += 1; + } else { + continue; + } + } + peers_removed += 1; + if info.score_state() == ScoreState::Healthy { + disconnecting_peers.push(**peer_id); + } } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 49546776a8f..9dd976792c5 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -182,6 +182,11 @@ impl PeerInfo { matches!(self.connection_status, Disconnected { .. }) } + /// Checks if the peer is outbound-only + pub fn is_outbound_only(&self) -> bool { + matches!(self.connection_status, Connected {n_in, n_out} if n_in == 0 && n_out > 0) + } + /// Returns the number of connections with this peer. pub fn connections(&self) -> (u8, u8) { match self.connection_status { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 6803f49ed11..1bba19c23a6 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -221,6 +221,14 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + ///Connected outbound-only peers + pub fn connected_outbound_only_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_outbound_only()) + .map(|(peer_id, _)| peer_id) + } + /// Gives the `peer_id` of all known connected and synced peers. pub fn synced_peers(&self) -> impl Iterator { self.peers @@ -677,6 +685,25 @@ mod tests { assert_eq!(peer_info.unwrap().connections(), (n_in, n_out)); } + #[test] + fn test_outbound_only_peers_counted_correctly() { + let mut pdb = get_db(); + let p0 = PeerId::random(); + let p1 = PeerId::random(); + let p2 = PeerId::random(); + //create peer with no connections + let _p3 = PeerId::random(); + + pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None); + + // we should only have one outbound-only peer (p2). + // peers that are inbound-only, have both types of connections, or no connections should not be counted + assert_eq!(pdb.connected_outbound_only_peers().count(), 1); + } + #[test] fn test_disconnected_are_bounded() { let mut pdb = get_db(); diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 67abcf77242..69f9b0872b3 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -84,6 +84,11 @@ impl NetworkGlobals { self.peers.read().connected_peer_ids().count() } + /// Returns the number of libp2p connected peers with outbound-only connections + pub fn connected_outbound_only_peers(&self) -> usize { + self.peers.read().connected_outbound_only_peers().count() + } + /// Returns the number of libp2p peers that are either connected or being dialed. pub fn connected_or_dialing_peers(&self) -> usize { self.peers.read().connected_or_dialing_peers().count() From 852c26aed9a0d195961e19ae253a1f5cb1f94b24 Mon Sep 17 00:00:00 2001 From: Kevin Lu Date: Wed, 19 May 2021 16:13:26 +0200 Subject: [PATCH 2/5] added unit test for heartbeat --- .../eth2_libp2p/src/peer_manager/mod.rs | 117 +++++++++++++++++- 1 file changed, 114 insertions(+), 3 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index a05e3ee35e3..da597f9c00c 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -926,7 +926,7 @@ impl PeerManager { /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { let peer_count = self.network_globals.connected_or_dialing_peers(); - let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); let min_outbound_only_target = (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; @@ -945,8 +945,6 @@ impl PeerManager { let mut disconnecting_peers = Vec::new(); let connected_peer_count = self.network_globals.connected_peers(); - //refresh outbound-only peer count - outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); if connected_peer_count > self.target_peers { //remove excess peers with the worst scores, but keep subnet peers //must also ensure that the outbound-only peer count does not go below the minimum threshold @@ -1074,3 +1072,116 @@ enum ConnectingType { multiaddr: Multiaddr, }, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::discovery::enr::build_enr; + use crate::discovery::enr_ext::CombinedKeyExt; + use crate::rpc::methods::MetaData; + use crate::Enr; + use discv5::enr::CombinedKey; + use slog::{o, Drain}; + use std::net::UdpSocket; + use types::{EnrForkId, MinimalEthSpec}; + + type E = MinimalEthSpec; + + pub fn unused_port() -> u16 { + let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket"); + let local_addr = socket.local_addr().expect("should read udp socket"); + local_addr.port() + } + + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } + } + + async fn build_peer_manager() -> PeerManager { + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let config = NetworkConfig { + discovery_port: unused_port(), + target_peers: 5, + ..Default::default() + }; + let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); + let enr: Enr = build_enr::(&enr_key, &config, EnrForkId::default()).unwrap(); + let log = build_log(slog::Level::Debug, false); + let globals = NetworkGlobals::new( + enr, + 9000, + 9000, + MetaData { + seq_number: 0, + attnets: Default::default(), + }, + vec![], + &log, + ); + PeerManager::new(&keypair, &config, Arc::new(globals), &log) + .await + .unwrap() + } + + #[tokio::test] + async fn test_peer_manager_disconnects_peers_when_target_reached() { + let mut peer_manager = build_peer_manager().await; + + //create 6 peers to connect too + //1 will be outbound-only, and have the lowest score. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let peer3 = PeerId::random(); + let peer4 = PeerId::random(); + let outbound_only_peer = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer3, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer3, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer4, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer4, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap()); + + //set the outbound-only peer to have the lowest score + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer) + .unwrap() + .add_to_score(-1.0); + + //check initial connected peers + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 6); + + peer_manager.heartbeat(); + + //check that we disconnected from one peer + //and that we did not disconnect the outbound peer even though it was the worst peer + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); + assert!(peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer)); + + peer_manager.heartbeat(); + + //check that if we are at target number of peers, we do not disconnect any + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); + } +} From 011a1469383e41d44a8e0b43300ea9d047e7479f Mon Sep 17 00:00:00 2001 From: Kevin Lu Date: Fri, 21 May 2021 13:01:50 +0200 Subject: [PATCH 3/5] changed logic for removing outbound peers conditional, added more tests --- .../eth2_libp2p/src/peer_manager/mod.rs | 243 +++++++++++++++--- .../eth2_libp2p/src/peer_manager/peer_info.rs | 5 + .../eth2_libp2p/src/peer_manager/score.rs | 9 + 3 files changed, 228 insertions(+), 29 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index da597f9c00c..2020bc8bd22 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -846,7 +846,6 @@ impl PeerManager { /// NOTE: This is experimental and will likely be adjusted fn update_peer_scores(&mut self) { /* Check how long have peers been in this state and update their reputations if needed */ - let mut to_ban_peers = Vec::new(); let mut to_unban_peers = Vec::new(); @@ -962,13 +961,15 @@ impl PeerManager { break; } if info.is_outbound_only() { - if n_outbound_removed < outbound_only_peer_count - min_outbound_only_target { + if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed { n_outbound_removed += 1; } else { continue; } } peers_removed += 1; + //only add healthy peers to disconnect, + //update_peer_scores would have already disconnected unhealthy peers if info.score_state() == ScoreState::Healthy { disconnecting_peers.push(**peer_id); } @@ -1105,11 +1106,11 @@ mod tests { } } - async fn build_peer_manager() -> PeerManager { + async fn build_peer_manager(target: usize) -> PeerManager { let keypair = libp2p::identity::Keypair::generate_secp256k1(); let config = NetworkConfig { discovery_port: unused_port(), - target_peers: 5, + target_peers: target, ..Default::default() }; let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); @@ -1132,56 +1133,240 @@ mod tests { } #[tokio::test] - async fn test_peer_manager_disconnects_peers_when_target_reached() { - let mut peer_manager = build_peer_manager().await; + async fn test_peer_manager_disconnects_correctly_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; - //create 6 peers to connect too - //1 will be outbound-only, and have the lowest score. + //create 5 peers to connect too + //2 will be outbound-only, and have the lowest score. let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); - let peer3 = PeerId::random(); - let peer4 = PeerId::random(); - let outbound_only_peer = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + let outbound_only_peer2 = PeerId::random(); peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_outgoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_outgoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_ingoing(&peer3, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_outgoing(&peer3, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_ingoing(&peer4, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_outgoing(&peer4, "/ip4/0.0.0.0".parse().unwrap()); - peer_manager.connect_outgoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap()); - - //set the outbound-only peer to have the lowest score + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap()); + + //set the outbound-only peers to have the lowest score peer_manager .network_globals .peers .write() - .peer_info_mut(&outbound_only_peer) + .peer_info_mut(&outbound_only_peer1) .unwrap() .add_to_score(-1.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer2) + .unwrap() + .add_to_score(-2.0); + //check initial connected peers - assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 6); + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); peer_manager.heartbeat(); - //check that we disconnected from one peer - //and that we did not disconnect the outbound peer even though it was the worst peer - assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); + //check that we disconnected from two peers + //check that one outbound-only peer was removed because it had the worst score + //and that we did not disconnect the other outbound peer due to the minimum outbound quota + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); assert!(peer_manager .network_globals .peers .read() - .is_connected(&outbound_only_peer)); + .is_connected(&outbound_only_peer1)); + assert!(!peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer2)); peer_manager.heartbeat(); - //check that if we are at target number of peers, we do not disconnect any - assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); + //check that if we are at target number of peers who are all healthy, we do not disconnect any + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + } + + #[tokio::test] + async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() { + let mut peer_manager = build_peer_manager(20).await; + + //connect to 20 ingoing-only peers + for _i in 0..19 { + let peer = PeerId::random(); + peer_manager.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap()); + } + + //connect an outbound-only peer + //give it the lowest score so that it is evaluated first in the disconnect list iterator + let outbound_only_peer = PeerId::random(); + peer_manager.connect_ingoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer)) + .unwrap() + .add_to_score(-1.0); + //after heartbeat, we will have removed one peer. + //having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection + peer_manager.heartbeat(); + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + 20 + ); + } + + #[tokio::test] + async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; + + //create 3 peers to connect too + //one pair will be unhealthy inbound only and outbound only peers + let peer0 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + + //connect to two peers that are on the threshold of being disconnected + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + //update the gossipsub scores to induce connection downgrade + //during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20 + //this will then trigger a disconnection. + //if we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected. + //then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + + //checks that update_peer_score peer disconnection and the disconnecting_peers logic + //work together to remove the correct number of peers + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); + } + + #[tokio::test] + async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { + let mut peer_manager = build_peer_manager(3).await; + + //create 4 peers to connect too + //one pair will be unhealthy inbound only and outbound only peers + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + + //connect to two peers that are on the threshold of being disconnected + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager.heartbeat(); + //tests that when we are over the target peer limit, after disconnecting two unhealthy peers, + //the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target) + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2); + } + + #[tokio::test] + async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() { + let mut peer_manager = build_peer_manager(3).await; + + //create 5 peers to connect too + //one will be unhealthy inbound only and outbound only peers + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + //have one peer be on the verge of disconnection + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + //tests that when we are over the target peer limit, even though one peer was already disconnected for being unhealthy, + //the loop to check for disconnecting peers will still remove the correct amount of peers + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 9dd976792c5..43570c5aeee 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -311,6 +311,11 @@ impl PeerInfo { self.score.test_add(score) } } + + #[cfg(test)] + pub fn set_gossipsub_score(&mut self, score: f64) { + self.score.set_gossipsub_score(score); + } } #[derive(Clone, Debug, Serialize)] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 38fecad3af9..090636e55ee 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -216,6 +216,13 @@ impl RealScore { self.set_lighthouse_score(0f64); } + #[cfg(test)] + //set the gossipsub_score to a specific f64 + //used in testing to induce score status changes during a heartbeat + pub fn set_gossipsub_score(&mut self, score: f64) { + self.gossipsub_score = score; + } + /// Applies time-based logic such as decay rates to the score. /// This function should be called periodically. pub fn update(&mut self) { @@ -291,6 +298,8 @@ apply!(update_gossipsub_score, new_score: f64, ignore: bool); apply!(test_add, score: f64); #[cfg(test)] apply!(test_reset); +#[cfg(test)] +apply!(set_gossipsub_score, score: f64); impl Score { pub fn score(&self) -> f64 { From 6fd9651f3469aa8de025e50e9341c875b8e7cdb7 Mon Sep 17 00:00:00 2001 From: Kevin Lu Date: Sun, 23 May 2021 09:47:17 +0200 Subject: [PATCH 4/5] reformatted commenting --- .../eth2_libp2p/src/peer_manager/mod.rs | 73 +++++++++---------- .../eth2_libp2p/src/peer_manager/peerdb.rs | 8 +- .../eth2_libp2p/src/peer_manager/score.rs | 4 +- beacon_node/eth2_libp2p/src/types/globals.rs | 2 +- 4 files changed, 43 insertions(+), 44 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 2020bc8bd22..d84e10a6e9b 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -945,8 +945,8 @@ impl PeerManager { let connected_peer_count = self.network_globals.connected_peers(); if connected_peer_count > self.target_peers { - //remove excess peers with the worst scores, but keep subnet peers - //must also ensure that the outbound-only peer count does not go below the minimum threshold + // Remove excess peers with the worst scores, but keep subnet peers. + // Must also ensure that the outbound-only peer count does not go below the minimum threshold. let mut peers_removed = 0; let mut n_outbound_removed = 0; for (peer_id, info) in self @@ -968,8 +968,8 @@ impl PeerManager { } } peers_removed += 1; - //only add healthy peers to disconnect, - //update_peer_scores would have already disconnected unhealthy peers + // Only add healthy peers to disconnect, + // update_peer_scores would have already disconnected unhealthy peers. if info.score_state() == ScoreState::Healthy { disconnecting_peers.push(**peer_id); } @@ -1136,8 +1136,8 @@ mod tests { async fn test_peer_manager_disconnects_correctly_during_heartbeat() { let mut peer_manager = build_peer_manager(3).await; - //create 5 peers to connect too - //2 will be outbound-only, and have the lowest score. + // Create 5 peers to connect to. + // 2 will be outbound-only, and have the lowest score. let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); @@ -1150,7 +1150,7 @@ mod tests { peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_outgoing(&outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap()); - //set the outbound-only peers to have the lowest score + // Set the outbound-only peers to have the lowest score. peer_manager .network_globals .peers @@ -1167,14 +1167,14 @@ mod tests { .unwrap() .add_to_score(-2.0); - //check initial connected peers + // Check initial connected peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); peer_manager.heartbeat(); - //check that we disconnected from two peers - //check that one outbound-only peer was removed because it had the worst score - //and that we did not disconnect the other outbound peer due to the minimum outbound quota + // Check that we disconnected from two peers. + // Check that one outbound-only peer was removed because it had the worst score + // and that we did not disconnect the other outbound peer due to the minimum outbound quota. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); assert!(peer_manager .network_globals @@ -1189,7 +1189,7 @@ mod tests { peer_manager.heartbeat(); - //check that if we are at target number of peers who are all healthy, we do not disconnect any + // Check that if we are at target number of peers who are all healthy, we do not disconnect any. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } @@ -1197,14 +1197,14 @@ mod tests { async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() { let mut peer_manager = build_peer_manager(20).await; - //connect to 20 ingoing-only peers + // Connect to 20 ingoing-only peers. for _i in 0..19 { let peer = PeerId::random(); peer_manager.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap()); } - //connect an outbound-only peer - //give it the lowest score so that it is evaluated first in the disconnect list iterator + // Connect an outbound-only peer. + // Give it the lowest score so that it is evaluated first in the disconnect list iterator. let outbound_only_peer = PeerId::random(); peer_manager.connect_ingoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap()); peer_manager @@ -1214,8 +1214,8 @@ mod tests { .peer_info_mut(&(outbound_only_peer)) .unwrap() .add_to_score(-1.0); - //after heartbeat, we will have removed one peer. - //having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection + // After heartbeat, we will have removed one peer. + // Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection. peer_manager.heartbeat(); assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), @@ -1227,8 +1227,8 @@ mod tests { async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() { let mut peer_manager = build_peer_manager(3).await; - //create 3 peers to connect too - //one pair will be unhealthy inbound only and outbound only peers + // Create 3 peers to connect to. + // One pair will be unhealthy inbound only and outbound only peers. let peer0 = PeerId::random(); let inbound_only_peer1 = PeerId::random(); let outbound_only_peer1 = PeerId::random(); @@ -1236,7 +1236,7 @@ mod tests { peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); - //connect to two peers that are on the threshold of being disconnected + // Connect to two peers that are on the threshold of being disconnected. peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager @@ -1253,11 +1253,10 @@ mod tests { .peer_info_mut(&(outbound_only_peer1)) .unwrap() .add_to_score(-19.9); - //update the gossipsub scores to induce connection downgrade - //during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20 - //this will then trigger a disconnection. - //if we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected. - //then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. + // Update the gossipsub scores to induce connection downgrade + // during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection. + // If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected, + // then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. peer_manager .network_globals .peers @@ -1275,8 +1274,8 @@ mod tests { peer_manager.heartbeat(); - //checks that update_peer_score peer disconnection and the disconnecting_peers logic - //work together to remove the correct number of peers + // Checks that update_peer_score peer disconnection and the disconnecting_peers logic + // work together to remove the correct number of peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); } @@ -1284,8 +1283,8 @@ mod tests { async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { let mut peer_manager = build_peer_manager(3).await; - //create 4 peers to connect too - //one pair will be unhealthy inbound only and outbound only peers + // Create 4 peers to connect to. + // One pair will be unhealthy inbound only and outbound only peers. let peer0 = PeerId::random(); let peer1 = PeerId::random(); let inbound_only_peer1 = PeerId::random(); @@ -1294,7 +1293,7 @@ mod tests { peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); - //connect to two peers that are on the threshold of being disconnected + // Connect to two peers that are on the threshold of being disconnected. peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager @@ -1326,8 +1325,8 @@ mod tests { .unwrap() .set_gossipsub_score(-85.0); peer_manager.heartbeat(); - //tests that when we are over the target peer limit, after disconnecting two unhealthy peers, - //the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target) + // Tests that when we are over the target peer limit, after disconnecting two unhealthy peers, + // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target). assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2); } @@ -1335,8 +1334,8 @@ mod tests { async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() { let mut peer_manager = build_peer_manager(3).await; - //create 5 peers to connect too - //one will be unhealthy inbound only and outbound only peers + // Create 5 peers to connect to. + // One will be unhealthy inbound only and outbound only peers. let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); @@ -1347,7 +1346,7 @@ mod tests { peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); - //have one peer be on the verge of disconnection + // Have one peer be on the verge of disconnection. peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); peer_manager .network_globals @@ -1365,8 +1364,8 @@ mod tests { .set_gossipsub_score(-85.0); peer_manager.heartbeat(); - //tests that when we are over the target peer limit, even though one peer was already disconnected for being unhealthy, - //the loop to check for disconnecting peers will still remove the correct amount of peers + // Tests that when we are over the target peer limit, even though one peer was already disconnected for being unhealthy, + // the loop to check for disconnecting peers will still remove the correct amount of peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 1bba19c23a6..d4a8cecf13a 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -221,7 +221,7 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } - ///Connected outbound-only peers + /// Connected outbound-only peers pub fn connected_outbound_only_peers(&self) -> impl Iterator { self.peers .iter() @@ -691,7 +691,7 @@ mod tests { let p0 = PeerId::random(); let p1 = PeerId::random(); let p2 = PeerId::random(); - //create peer with no connections + // Create peer with no connections. let _p3 = PeerId::random(); pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None); @@ -699,8 +699,8 @@ mod tests { pdb.connect_outgoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_outgoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None); - // we should only have one outbound-only peer (p2). - // peers that are inbound-only, have both types of connections, or no connections should not be counted + // We should only have one outbound-only peer (p2). + // Peers that are inbound-only, have both types of connections, or no connections should not be counted. assert_eq!(pdb.connected_outbound_only_peers().count(), 1); } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 090636e55ee..d2d12b8a19a 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -217,8 +217,8 @@ impl RealScore { } #[cfg(test)] - //set the gossipsub_score to a specific f64 - //used in testing to induce score status changes during a heartbeat + // Set the gossipsub_score to a specific f64. + // Used in testing to induce score status changes during a heartbeat. pub fn set_gossipsub_score(&mut self, score: f64) { self.gossipsub_score = score; } diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 69f9b0872b3..4055e53b205 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -84,7 +84,7 @@ impl NetworkGlobals { self.peers.read().connected_peer_ids().count() } - /// Returns the number of libp2p connected peers with outbound-only connections + /// Returns the number of libp2p connected peers with outbound-only connections. pub fn connected_outbound_only_peers(&self) -> usize { self.peers.read().connected_outbound_only_peers().count() } From 9e67970381a47291f37340bcf00c60e70cb949b8 Mon Sep 17 00:00:00 2001 From: Kevin Lu Date: Fri, 28 May 2021 00:26:49 +0200 Subject: [PATCH 5/5] removed healthy score state check in heartbeat disconnecting functionality --- .../eth2_libp2p/src/peer_manager/mod.rs | 24 +++++++------------ .../eth2_libp2p/src/peer_manager/score.rs | 2 +- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index d84e10a6e9b..a111826028e 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -920,12 +920,12 @@ impl PeerManager { /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of - /// peers. + /// overall peers, as well as the desired number of outbound-only peers. /// /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { let peer_count = self.network_globals.connected_or_dialing_peers(); - let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); let min_outbound_only_target = (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; @@ -947,7 +947,7 @@ impl PeerManager { if connected_peer_count > self.target_peers { // Remove excess peers with the worst scores, but keep subnet peers. // Must also ensure that the outbound-only peer count does not go below the minimum threshold. - let mut peers_removed = 0; + outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); let mut n_outbound_removed = 0; for (peer_id, info) in self .network_globals @@ -957,7 +957,7 @@ impl PeerManager { .iter() .filter(|(_, info)| !info.has_future_duty()) { - if peers_removed == connected_peer_count - self.target_peers { + if disconnecting_peers.len() == connected_peer_count - self.target_peers { break; } if info.is_outbound_only() { @@ -967,12 +967,7 @@ impl PeerManager { continue; } } - peers_removed += 1; - // Only add healthy peers to disconnect, - // update_peer_scores would have already disconnected unhealthy peers. - if info.score_state() == ScoreState::Healthy { - disconnecting_peers.push(**peer_id); - } + disconnecting_peers.push(**peer_id); } } @@ -1189,7 +1184,7 @@ mod tests { peer_manager.heartbeat(); - // Check that if we are at target number of peers who are all healthy, we do not disconnect any. + // Check that if we are at target number of peers, we do not disconnect any. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } @@ -1228,7 +1223,6 @@ mod tests { let mut peer_manager = build_peer_manager(3).await; // Create 3 peers to connect to. - // One pair will be unhealthy inbound only and outbound only peers. let peer0 = PeerId::random(); let inbound_only_peer1 = PeerId::random(); let outbound_only_peer1 = PeerId::random(); @@ -1274,8 +1268,6 @@ mod tests { peer_manager.heartbeat(); - // Checks that update_peer_score peer disconnection and the disconnecting_peers logic - // work together to remove the correct number of peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); } @@ -1364,8 +1356,8 @@ mod tests { .set_gossipsub_score(-85.0); peer_manager.heartbeat(); - // Tests that when we are over the target peer limit, even though one peer was already disconnected for being unhealthy, - // the loop to check for disconnecting peers will still remove the correct amount of peers. + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index d2d12b8a19a..02479bef067 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -216,9 +216,9 @@ impl RealScore { self.set_lighthouse_score(0f64); } - #[cfg(test)] // Set the gossipsub_score to a specific f64. // Used in testing to induce score status changes during a heartbeat. + #[cfg(test)] pub fn set_gossipsub_score(&mut self, score: f64) { self.gossipsub_score = score; }