Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 117 additions & 17 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// MIN_SYNC_COMMITTEE_PEERS
/// number should be set low as an absolute lower bound to maintain peers on the sync
/// committees.
/// - Do not prune trusted peers. NOTE: This means if a user has more trusted peers than the
/// excess peer limit, all of the following logic is subverted as we will not prune any peers.
/// Also, the more trusted peers a user has, the less room Lighthouse has to efficiently manage
/// its peers across the subnets.
///
/// Prune peers in the following order:
/// 1. Remove worst scoring peers
Expand Down Expand Up @@ -961,7 +965,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.read()
.worst_connected_peers()
.iter()
.filter(|(_, info)| !info.has_future_duty() && $filter(*info))
.filter(|(_, info)| {
!info.has_future_duty() && !info.is_trusted() && $filter(*info)
})
{
if peers_to_prune.len()
>= connected_peer_count.saturating_sub(self.target_peers)
Expand Down Expand Up @@ -1011,8 +1017,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
> = HashMap::new();

for (peer_id, info) in self.network_globals.peers.read().connected_peers() {
// Ignore peers we are already pruning
if peers_to_prune.contains(peer_id) {
// Ignore peers we trust or that we are already pruning
if info.is_trusted() || peers_to_prune.contains(peer_id) {
continue;
}

Expand Down Expand Up @@ -1309,25 +1315,48 @@ mod tests {
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(&log);
let globals = NetworkGlobals::new_test_globals(vec![], &log);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}

async fn build_peer_manager_with_trusted_peers(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
) -> PeerManager<E> {
let config = config::Config {
target_peer_count,
discovery_enabled: false,
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}

#[tokio::test]
async fn test_peer_manager_disconnects_correctly_during_heartbeat() {
let mut peer_manager = build_peer_manager(3).await;

// Create 5 peers to connect to.
// Create 6 peers to connect to with a target of 3.
// 2 will be outbound-only, and have the lowest score.
// 1 will be a trusted peer.
// The other 3 will be ingoing peers.

// We expect this test to disconnect from 3 peers. 1 from the outbound peer (the other must
// remain due to the outbound peer limit) and 2 from the ingoing peers (the trusted peer
// should remain connected).
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let outbound_only_peer1 = PeerId::random();
let outbound_only_peer2 = PeerId::random();
let trusted_peer = PeerId::random();

let mut peer_manager =
build_peer_manager_with_trusted_peers(vec![trusted_peer.clone()], 3).await;

peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_outgoing(
&outbound_only_peer1,
"/ip4/0.0.0.0".parse().unwrap(),
Expand Down Expand Up @@ -1357,7 +1386,7 @@ mod tests {
.add_to_score(-2.0);

// Check initial connected peers.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5);
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 6);

peer_manager.heartbeat();

Expand All @@ -1376,8 +1405,22 @@ mod tests {
.read()
.is_connected(&outbound_only_peer2));

// The trusted peer remains connected
assert!(peer_manager
.network_globals
.peers
.read()
.is_connected(&trusted_peer));

peer_manager.heartbeat();

// The trusted peer remains connected, even after subsequent heartbeats.
assert!(peer_manager
.network_globals
.peers
.read()
.is_connected(&trusted_peer));

// Check that if we are at target number of peers, we do not disconnect any.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
}
Expand Down Expand Up @@ -2122,7 +2165,7 @@ mod tests {
#[cfg(test)]
mod property_based_tests {
use crate::peer_manager::config::DEFAULT_TARGET_PEERS;
use crate::peer_manager::tests::build_peer_manager;
use crate::peer_manager::tests::build_peer_manager_with_trusted_peers;
use crate::rpc::MetaData;
use libp2p::PeerId;
use quickcheck::{Arbitrary, Gen, TestResult};
Expand All @@ -2133,10 +2176,12 @@ mod tests {

#[derive(Clone, Debug)]
struct PeerCondition {
peer_id: PeerId,
outgoing: bool,
attestation_net_bitfield: Vec<bool>,
sync_committee_net_bitfield: Vec<bool>,
score: f64,
trusted: bool,
gossipsub_score: f64,
}

Expand All @@ -2161,10 +2206,12 @@ mod tests {
};

PeerCondition {
peer_id: PeerId::random(),
outgoing: bool::arbitrary(g),
attestation_net_bitfield,
sync_committee_net_bitfield,
score: f64::arbitrary(g),
trusted: bool::arbitrary(g),
gossipsub_score: f64::arbitrary(g),
}
}
Expand All @@ -2176,26 +2223,42 @@ mod tests {
if peer_conditions.len() < target_peer_count {
return TestResult::discard();
}
let trusted_peers: Vec<_> = peer_conditions
.iter()
.filter_map(|p| {
if p.trusted {
Some(p.peer_id.clone())
} else {
None
}
})
.collect();
// If we have a high percentage of trusted peers, it is very difficult to reason about
// the expected results of the pruning.
if trusted_peers.len() > peer_conditions.len() / 3 as usize {
return TestResult::discard();
}
let rt = Runtime::new().unwrap();

rt.block_on(async move {
let mut peer_manager = build_peer_manager(target_peer_count).await;
// Collect all the trusted peers
let mut peer_manager =
build_peer_manager_with_trusted_peers(trusted_peers, target_peer_count).await;

// Create peers based on the randomly generated conditions.
for condition in &peer_conditions {
let peer = PeerId::random();
let mut attnets = crate::types::EnrAttestationBitfield::<E>::new();
let mut syncnets = crate::types::EnrSyncCommitteeBitfield::<E>::new();

if condition.outgoing {
peer_manager.inject_connect_outgoing(
&peer,
&condition.peer_id,
"/ip4/0.0.0.0".parse().unwrap(),
None,
);
} else {
peer_manager.inject_connect_ingoing(
&peer,
&condition.peer_id,
"/ip4/0.0.0.0".parse().unwrap(),
None,
);
Expand All @@ -2216,22 +2279,59 @@ mod tests {
};

let mut peer_db = peer_manager.network_globals.peers.write();
let peer_info = peer_db.peer_info_mut(&peer).unwrap();
let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap();
peer_info.set_meta_data(MetaData::V2(metadata));
peer_info.set_gossipsub_score(condition.gossipsub_score);
peer_info.add_to_score(condition.score);

for subnet in peer_info.long_lived_subnets() {
peer_db.add_subscription(&peer, subnet);
peer_db.add_subscription(&condition.peer_id, subnet);
}
}

// Perform the heartbeat.
peer_manager.heartbeat();

TestResult::from_bool(
// The minimum number of connected peers cannot be less than the target peer count
// or submitted peers.

let expected_peer_count = target_peer_count.min(peer_conditions.len());
// Trust peers could make this larger however.
let no_of_trusted_peers =
peer_conditions.iter().fold(
0,
|acc, condition| {
if condition.trusted {
acc + 1
} else {
acc
}
},
);
let expected_peer_count = expected_peer_count.max(no_of_trusted_peers);

let target_peer_condition =
peer_manager.network_globals.connected_or_dialing_peers()
== target_peer_count.min(peer_conditions.len()),
== expected_peer_count;

// It could be that we reach our target outbound limit and are unable to prune any
// extra, which violates the target_peer_condition.
let outbound_peers = peer_manager.network_globals.connected_outbound_only_peers();
let hit_outbound_limit = outbound_peers == peer_manager.target_outbound_peers();

// No trusted peers should be disconnected
let trusted_peer_disconnected =
peer_conditions.iter().fold(false, |acc, condition| {
acc && (condition.trusted
&& !peer_manager
.network_globals
.peers
.read()
.is_connected(&condition.peer_id))
});

TestResult::from_bool(
(target_peer_condition || hit_outbound_limit) && !trusted_peer_disconnected,
)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
if let Some(to_drop) = self
.peers
.iter()
.filter(|(_, info)| info.is_disconnected())
.filter(|(_, info)| info.is_disconnected() && !info.is_trusted())
.filter_map(|(id, info)| match info.connection_status() {
PeerConnectionStatus::Disconnected { since } => Some((id, since)),
_ => None,
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(log: &slog::Logger) -> NetworkGlobals<TSpec> {
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
log: &slog::Logger,
) -> NetworkGlobals<TSpec> {
use crate::CombinedKeyExt;
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let enr_key: discv5::enr::CombinedKey =
Expand All @@ -143,7 +146,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
attnets: Default::default(),
syncnets: Default::default(),
}),
vec![],
trusted_peers,
log,
)
}
Expand Down