From 08479869367dce85af8d0c546e18257e31494f66 Mon Sep 17 00:00:00 2001 From: Mac L Date: Mon, 31 May 2021 04:18:18 +0000 Subject: [PATCH 1/5] Reduce outbound requests to eth1 endpoints (#2340) ## Issue Addressed #2282 ## Proposed Changes Reduce the outbound requests made to eth1 endpoints by caching the results from `eth_chainId` and `net_version`. Further reduce the overall request count by increasing `auto_update_interval_millis` from `7_000` (7 seconds) to `60_000` (1 minute). This will result in a reduction from ~2000 requests per hour to 360 requests per hour (during normal operation). A reduction of 82%. ## Additional Info If an endpoint fails, its state is dropped from the cache and the `eth_chainId` and `net_version` calls will be made for that endpoint again during the regular update cycle (once per minute) until it is back online. Co-authored-by: Paul Hauner --- beacon_node/eth1/src/inner.rs | 4 ++ beacon_node/eth1/src/service.rs | 91 +++++++++++++++++++++++++-------- common/fallback/src/lib.rs | 3 +- 3 files changed, 76 insertions(+), 22 deletions(-) diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index f6c5e85efb7..2dc39a1de92 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -2,10 +2,12 @@ use crate::Config; use crate::{ block_cache::{BlockCache, Eth1Block}, deposit_cache::{DepositCache, SszDepositCache}, + service::EndpointsCache, }; use parking_lot::RwLock; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use std::sync::Arc; use types::ChainSpec; #[derive(Default)] @@ -28,6 +30,7 @@ impl DepositUpdater { pub struct Inner { pub block_cache: RwLock, pub deposit_cache: RwLock, + pub endpoints_cache: RwLock>>, pub config: RwLock, pub remote_head_block: RwLock>, pub spec: ChainSpec, @@ -87,6 +90,7 @@ impl SszEth1Cache { cache: self.deposit_cache.to_deposit_cache()?, last_processed_block: self.last_processed_block, }), + endpoints_cache: RwLock::new(None), // Set the remote head_block zero when creating a new instance. We only care about // present and future eth1 nodes. remote_head_block: RwLock::new(None), diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 8a28881b120..ba5060989ea 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -54,7 +54,27 @@ pub enum EndpointError { type EndpointState = Result<(), EndpointError>; -type EndpointWithState = (SensitiveUrl, TRwLock>); +pub struct EndpointWithState { + endpoint: SensitiveUrl, + state: TRwLock>, +} + +impl EndpointWithState { + pub fn new(endpoint: SensitiveUrl) -> Self { + Self { + endpoint, + state: TRwLock::new(None), + } + } +} + +async fn reset_endpoint_state(endpoint: &EndpointWithState) { + *endpoint.state.write().await = None; +} + +async fn get_state(endpoint: &EndpointWithState) -> Option { + *endpoint.state.read().await +} /// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is /// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint @@ -70,19 +90,19 @@ impl EndpointsCache { /// Checks the usability of an endpoint. Results get cached and therefore only the first call /// for each endpoint does the real check. async fn state(&self, endpoint: &EndpointWithState) -> EndpointState { - if let Some(result) = *endpoint.1.read().await { + if let Some(result) = *endpoint.state.read().await { return result; } - let mut value = endpoint.1.write().await; + let mut value = endpoint.state.write().await; if let Some(result) = *value { return result; } crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_REQUESTS, - &[&endpoint.0.to_string()], + &[&endpoint.endpoint.to_string()], ); let state = endpoint_state( - &endpoint.0, + &endpoint.endpoint, &self.config_network_id, &self.config_chain_id, &self.log, @@ -92,7 +112,7 @@ impl EndpointsCache { if state.is_err() { crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_ERRORS, - &[&endpoint.0.to_string()], + &[&endpoint.endpoint.to_string()], ); crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0); } else { @@ -114,12 +134,12 @@ impl EndpointsCache { .first_success(|endpoint| async move { match self.state(endpoint).await { Ok(()) => { - let endpoint_str = &endpoint.0.to_string(); + let endpoint_str = &endpoint.endpoint.to_string(); crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_REQUESTS, &[endpoint_str], ); - match func(&endpoint.0).await { + match func(&endpoint.endpoint).await { Ok(t) => Ok(t), Err(t) => { crate::metrics::inc_counter_vec( @@ -127,7 +147,10 @@ impl EndpointsCache { &[endpoint_str], ); if let SingleEndpointError::EndpointError(e) = &t { - *endpoint.1.write().await = Some(Err(*e)); + *endpoint.state.write().await = Some(Err(*e)); + } else { + // A non-`EndpointError` error occurred, so reset the state. + reset_endpoint_state(endpoint).await; } Err(t) } @@ -138,6 +161,16 @@ impl EndpointsCache { }) .await } + + pub async fn reset_errorred_endpoints(&self) { + for endpoint in &self.fallback.servers { + if let Some(state) = get_state(endpoint).await { + if state.is_err() { + reset_endpoint_state(endpoint).await; + } + } + } + } } /// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and @@ -405,9 +438,9 @@ impl Default for Config { follow_distance: 128, node_far_behind_seconds: 128 * 14, block_cache_truncation: Some(4_096), - auto_update_interval_millis: 7_000, + auto_update_interval_millis: 60_000, blocks_per_log_query: 1_000, - max_log_requests_per_update: Some(100), + max_log_requests_per_update: Some(5_000), max_blocks_per_update: Some(8_192), purge_cache: false, } @@ -435,6 +468,7 @@ impl Service { deposit_cache: RwLock::new(DepositUpdater::new( config.deposit_contract_deploy_block, )), + endpoints_cache: RwLock::new(None), remote_head_block: RwLock::new(None), config: RwLock::new(config), spec, @@ -605,20 +639,31 @@ impl Service { self.inner.config.write().lowest_cached_block_number = block_number; } - pub fn init_endpoints(&self) -> EndpointsCache { + /// Builds a new `EndpointsCache` with empty states. + pub fn init_endpoints(&self) -> Arc { let endpoints = self.config().endpoints.clone(); let config_network_id = self.config().network_id.clone(); let config_chain_id = self.config().chain_id.clone(); - EndpointsCache { - fallback: Fallback::new( - endpoints - .into_iter() - .map(|s| (s, TRwLock::new(None))) - .collect(), - ), + let new_cache = Arc::new(EndpointsCache { + fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()), config_network_id, config_chain_id, log: self.log.clone(), + }); + + let mut endpoints_cache = self.inner.endpoints_cache.write(); + *endpoints_cache = Some(new_cache.clone()); + new_cache + } + + /// Returns the cached `EndpointsCache` if it exists or builds a new one. + pub fn get_endpoints(&self) -> Arc { + let endpoints_cache = self.inner.endpoints_cache.read(); + if let Some(cache) = endpoints_cache.clone() { + cache + } else { + drop(endpoints_cache); + self.init_endpoints() } } @@ -633,7 +678,11 @@ impl Service { pub async fn update( &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { - let endpoints = self.init_endpoints(); + let endpoints = self.get_endpoints(); + + // Reset the state of any endpoints which have errored so their state can be redetermined. + endpoints.reset_errorred_endpoints().await; + let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds; let process_single_err = |e: &FallbackError| { @@ -656,7 +705,7 @@ impl Service { } } } - endpoints.fallback.map_format_error(|s| &s.0, &e) + endpoints.fallback.map_format_error(|s| &s.endpoint, &e) }; let process_err = |e: Error| match &e { diff --git a/common/fallback/src/lib.rs b/common/fallback/src/lib.rs index 1e3cb9cf044..94b177de35c 100644 --- a/common/fallback/src/lib.rs +++ b/common/fallback/src/lib.rs @@ -2,8 +2,9 @@ use itertools::{join, zip}; use std::fmt::{Debug, Display}; use std::future::Future; +#[derive(Clone)] pub struct Fallback { - servers: Vec, + pub servers: Vec, } #[derive(Debug, PartialEq)] From 320a683e723aa5fb295932889af778a3a6562b73 Mon Sep 17 00:00:00 2001 From: Kevin Lu Date: Mon, 31 May 2021 04:18:19 +0000 Subject: [PATCH 2/5] Minimum Outbound-Only Peers Requirement (#2356) ## Issue Addressed #2325 ## Proposed Changes This pull request changes the behavior of the Peer Manager by including a minimum outbound-only peers requirement. The peer manager will continue querying for peers if this outbound-only target number hasn't been met. Additionally, when peers are being removed, an outbound-only peer will not be disconnected if doing so brings us below the minimum. ## Additional Info Unit test for heartbeat function tests that disconnection behavior is correct. Continual querying for peers if outbound-only hasn't been met is not directly tested, but indirectly through unit testing of the helper function that counts the number of outbound-only peers. EDIT: Am concerned about the behavior of ```update_peer_scores```. If we have connected to a peer with a score below the disconnection threshold (-20), then its connection status will remain connected, while its score state will change to disconnected. ```rust let previous_state = info.score_state(); // Update scores info.score_update(); Self::handle_score_transitions( previous_state, peer_id, info, &mut to_ban_peers, &mut to_unban_peers, &mut self.events, &self.log, ); ``` ```previous_state``` will be set to Disconnected, and then because ```handle_score_transitions``` only changes connection status for a peer if the state changed, the peer remains connected. Then in the heartbeat code, because we only disconnect healthy peers if we have too many peers, these peers don't get disconnected. I'm not sure realistically how often this scenario would occur, but it might be better to adjust the logic to account for scenarios where the score state implies a connection status different from the current connection status. Co-authored-by: Kevin Lu --- .../eth2_libp2p/src/peer_manager/mod.rs | 326 +++++++++++++++++- .../eth2_libp2p/src/peer_manager/peer_info.rs | 10 + .../eth2_libp2p/src/peer_manager/peerdb.rs | 27 ++ .../eth2_libp2p/src/peer_manager/score.rs | 9 + beacon_node/eth2_libp2p/src/types/globals.rs | 5 + 5 files changed, 368 insertions(+), 9 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 659c8d9a392..d2763669e71 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1; /// them in lighthouse. const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; +/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections. +const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.1; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -835,7 +838,6 @@ impl PeerManager { /// NOTE: This is experimental and will likely be adjusted fn update_peer_scores(&mut self) { /* Check how long have peers been in this state and update their reputations if needed */ - let mut to_ban_peers = Vec::new(); let mut to_unban_peers = Vec::new(); @@ -910,12 +912,16 @@ impl PeerManager { /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of - /// peers. + /// overall peers, as well as the desired number of outbound-only peers. /// /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { let peer_count = self.network_globals.connected_or_dialing_peers(); - if peer_count < self.target_peers { + let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let min_outbound_only_target = + (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; + + if peer_count < self.target_peers || outbound_only_peer_count < min_outbound_only_target { // If we need more peers, queue a discovery lookup. if self.discovery.started { debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); @@ -931,19 +937,28 @@ impl PeerManager { let connected_peer_count = self.network_globals.connected_peers(); if connected_peer_count > self.target_peers { - //remove excess peers with the worst scores, but keep subnet peers - for (peer_id, _) in self + // Remove excess peers with the worst scores, but keep subnet peers. + // Must also ensure that the outbound-only peer count does not go below the minimum threshold. + outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let mut n_outbound_removed = 0; + for (peer_id, info) in self .network_globals .peers .read() .worst_connected_peers() .iter() .filter(|(_, info)| !info.has_future_duty()) - .take(connected_peer_count - self.target_peers) - //we only need to disconnect peers with healthy scores, since the others got already - //disconnected in update_peer_scores - .filter(|(_, info)| info.score_state() == ScoreState::Healthy) { + if disconnecting_peers.len() == connected_peer_count - self.target_peers { + break; + } + if info.is_outbound_only() { + if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed { + n_outbound_removed += 1; + } else { + continue; + } + } disconnecting_peers.push(**peer_id); } } @@ -1045,3 +1060,296 @@ enum ConnectingType { multiaddr: Multiaddr, }, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::discovery::enr::build_enr; + use crate::discovery::enr_ext::CombinedKeyExt; + use crate::rpc::methods::MetaData; + use crate::Enr; + use discv5::enr::CombinedKey; + use slog::{o, Drain}; + use std::net::UdpSocket; + use types::{EnrForkId, MinimalEthSpec}; + + type E = MinimalEthSpec; + + pub fn unused_port() -> u16 { + let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket"); + let local_addr = socket.local_addr().expect("should read udp socket"); + local_addr.port() + } + + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } + } + + async fn build_peer_manager(target: usize) -> PeerManager { + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let config = NetworkConfig { + discovery_port: unused_port(), + target_peers: target, + ..Default::default() + }; + let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); + let enr: Enr = build_enr::(&enr_key, &config, EnrForkId::default()).unwrap(); + let log = build_log(slog::Level::Debug, false); + let globals = NetworkGlobals::new( + enr, + 9000, + 9000, + MetaData { + seq_number: 0, + attnets: Default::default(), + }, + vec![], + &log, + ); + PeerManager::new(&keypair, &config, Arc::new(globals), &log) + .await + .unwrap() + } + + #[tokio::test] + async fn test_peer_manager_disconnects_correctly_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 5 peers to connect to. + // 2 will be outbound-only, and have the lowest score. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + let outbound_only_peer2 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap()); + + // Set the outbound-only peers to have the lowest score. + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer1) + .unwrap() + .add_to_score(-1.0); + + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer2) + .unwrap() + .add_to_score(-2.0); + + // Check initial connected peers. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); + + peer_manager.heartbeat(); + + // Check that we disconnected from two peers. + // Check that one outbound-only peer was removed because it had the worst score + // and that we did not disconnect the other outbound peer due to the minimum outbound quota. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + assert!(peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer1)); + assert!(!peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer2)); + + peer_manager.heartbeat(); + + // Check that if we are at target number of peers, we do not disconnect any. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + } + + #[tokio::test] + async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() { + let mut peer_manager = build_peer_manager(20).await; + + // Connect to 20 ingoing-only peers. + for _i in 0..19 { + let peer = PeerId::random(); + peer_manager.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap()); + } + + // Connect an outbound-only peer. + // Give it the lowest score so that it is evaluated first in the disconnect list iterator. + let outbound_only_peer = PeerId::random(); + peer_manager.connect_ingoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer)) + .unwrap() + .add_to_score(-1.0); + // After heartbeat, we will have removed one peer. + // Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection. + peer_manager.heartbeat(); + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + 20 + ); + } + + #[tokio::test] + async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 3 peers to connect to. + let peer0 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + + // Connect to two peers that are on the threshold of being disconnected. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + // Update the gossipsub scores to induce connection downgrade + // during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection. + // If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected, + // then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); + } + + #[tokio::test] + async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 4 peers to connect to. + // One pair will be unhealthy inbound only and outbound only peers. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + + // Connect to two peers that are on the threshold of being disconnected. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager.heartbeat(); + // Tests that when we are over the target peer limit, after disconnecting two unhealthy peers, + // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target). + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2); + } + + #[tokio::test] + async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 5 peers to connect to. + // One will be unhealthy inbound only and outbound only peers. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + // Have one peer be on the verge of disconnection. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + } +} diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 49546776a8f..43570c5aeee 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -182,6 +182,11 @@ impl PeerInfo { matches!(self.connection_status, Disconnected { .. }) } + /// Checks if the peer is outbound-only + pub fn is_outbound_only(&self) -> bool { + matches!(self.connection_status, Connected {n_in, n_out} if n_in == 0 && n_out > 0) + } + /// Returns the number of connections with this peer. pub fn connections(&self) -> (u8, u8) { match self.connection_status { @@ -306,6 +311,11 @@ impl PeerInfo { self.score.test_add(score) } } + + #[cfg(test)] + pub fn set_gossipsub_score(&mut self, score: f64) { + self.score.set_gossipsub_score(score); + } } #[derive(Clone, Debug, Serialize)] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index f3785e92540..e96ca3a81d0 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -232,6 +232,14 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Connected outbound-only peers + pub fn connected_outbound_only_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_outbound_only()) + .map(|(peer_id, _)| peer_id) + } + /// Gives the `peer_id` of all known connected and synced peers. pub fn synced_peers(&self) -> impl Iterator { self.peers @@ -688,6 +696,25 @@ mod tests { assert_eq!(peer_info.unwrap().connections(), (n_in, n_out)); } + #[test] + fn test_outbound_only_peers_counted_correctly() { + let mut pdb = get_db(); + let p0 = PeerId::random(); + let p1 = PeerId::random(); + let p2 = PeerId::random(); + // Create peer with no connections. + let _p3 = PeerId::random(); + + pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None); + + // We should only have one outbound-only peer (p2). + // Peers that are inbound-only, have both types of connections, or no connections should not be counted. + assert_eq!(pdb.connected_outbound_only_peers().count(), 1); + } + #[test] fn test_disconnected_are_bounded() { let mut pdb = get_db(); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 38fecad3af9..02479bef067 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -216,6 +216,13 @@ impl RealScore { self.set_lighthouse_score(0f64); } + // Set the gossipsub_score to a specific f64. + // Used in testing to induce score status changes during a heartbeat. + #[cfg(test)] + pub fn set_gossipsub_score(&mut self, score: f64) { + self.gossipsub_score = score; + } + /// Applies time-based logic such as decay rates to the score. /// This function should be called periodically. pub fn update(&mut self) { @@ -291,6 +298,8 @@ apply!(update_gossipsub_score, new_score: f64, ignore: bool); apply!(test_add, score: f64); #[cfg(test)] apply!(test_reset); +#[cfg(test)] +apply!(set_gossipsub_score, score: f64); impl Score { pub fn score(&self) -> f64 { diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 67abcf77242..4055e53b205 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -84,6 +84,11 @@ impl NetworkGlobals { self.peers.read().connected_peer_ids().count() } + /// Returns the number of libp2p connected peers with outbound-only connections. + pub fn connected_outbound_only_peers(&self) -> usize { + self.peers.read().connected_outbound_only_peers().count() + } + /// Returns the number of libp2p peers that are either connected or being dialed. pub fn connected_or_dialing_peers(&self) -> usize { self.peers.read().connected_or_dialing_peers().count() From 4c7bb4984cde8118f0cfdda6e50eee1d335ccf2a Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 31 May 2021 04:18:20 +0000 Subject: [PATCH 3/5] Use the forwards iterator more often (#2376) ## Issue Addressed NA ## Primary Change When investigating memory usage, I noticed that retrieving a block from an early slot (e.g., slot 900) would cause a sharp increase in the memory footprint (from 400mb to 800mb+) which seemed to be ever-lasting. After some investigation, I found that the reverse iteration from the head back to that slot was the likely culprit. To counter this, I've switched the `BeaconChain::block_root_at_slot` to use the forwards iterator, instead of the reverse one. I also noticed that the networking stack is using `BeaconChain::root_at_slot` to check if a peer is relevant (`check_peer_relevance`). Perhaps the steep, seemingly-random-but-consistent increases in memory usage are caused by the use of this function. Using the forwards iterator with the HTTP API alleviated the sharp increases in memory usage. It also made the response much faster (before it felt like to took 1-2s, now it feels instant). ## Additional Changes In the process I also noticed that we have two functions for getting block roots: - `BeaconChain::block_root_at_slot`: returns `None` for a skip slot. - `BeaconChain::root_at_slot`: returns the previous root for a skip slot. I unified these two functions into `block_root_at_slot` and added the `WhenSlotSkipped` enum. Now, the caller must be explicit about the skip-slot behaviour when requesting a root. Additionally, I replaced `vec![]` with `Vec::with_capacity` in `store::chunked_vector::range_query`. I stumbled across this whilst debugging and made this modification to see what effect it would have (not much). It seems like a decent change to keep around, but I'm not concerned either way. Also, `BeaconChain::get_ancestor_block_root` is unused, so I got rid of it :wastebasket:. ## Additional Info I haven't also done the same for state roots here. Whilst it's possible and a good idea, it's more work since the fwds iterators are presently block-roots-specific. Whilst there's a few places a reverse iteration of state roots could be triggered (e.g., attestation production, HTTP API), they're no where near as common as the `check_peer_relevance` call. As such, I think we should get this PR merged first, then come back for the state root iters. I made an issue here https://github.com/sigp/lighthouse/issues/2377. --- beacon_node/beacon_chain/src/beacon_chain.rs | 182 ++++++++++++++---- beacon_node/beacon_chain/src/errors.rs | 4 + beacon_node/beacon_chain/src/lib.rs | 2 +- .../tests/attestation_production.rs | 4 +- .../tests/attestation_verification.rs | 4 +- beacon_node/beacon_chain/tests/tests.rs | 145 ++++++++++++++ beacon_node/http_api/src/block_id.rs | 4 +- beacon_node/http_api/src/lib.rs | 3 +- beacon_node/http_api/tests/tests.rs | 34 +++- .../beacon_processor/worker/rpc_methods.rs | 4 +- beacon_node/store/src/chunked_vector.rs | 10 +- consensus/fork_choice/tests/tests.rs | 8 +- 12 files changed, 339 insertions(+), 65 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7fa306b0933..fb726c16189 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -36,6 +36,7 @@ use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead}; use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use itertools::process_results; +use itertools::Itertools; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; use slasher::Slasher; @@ -85,6 +86,18 @@ pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero(); pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero(); pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero(); +/// Defines the behaviour when a block/block-root for a skipped slot is requested. +pub enum WhenSlotSkipped { + /// If the slot is a skip slot, return `None`. + /// + /// This is how the HTTP API behaves. + None, + /// If the slot it a skip slot, return the previous non-skipped block. + /// + /// This is generally how the specification behaves. + Prev, +} + /// The result of a chain segment processing. pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. @@ -442,18 +455,6 @@ impl BeaconChain { .map(|result| result.map_err(|e| e.into()))) } - /// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`. - pub fn get_ancestor_block_root( - &self, - block_root: Hash256, - slot: Slot, - ) -> Result, Error> { - process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| { - iter.find(|(_, ancestor_slot)| *ancestor_slot == slot) - .map(|(ancestor_block_root, _)| ancestor_block_root) - }) - } - /// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to /// the earliest reachable ancestor (may or may not be genesis). /// @@ -489,17 +490,17 @@ impl BeaconChain { /// Returns the block at the given slot, if any. Only returns blocks in the canonical chain. /// + /// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot. + /// /// ## Errors /// /// May return a database error. pub fn block_at_slot( &self, - slot: Slot, + request_slot: Slot, + skips: WhenSlotSkipped, ) -> Result>, Error> { - let root = process_results(self.rev_iter_block_roots()?, |mut iter| { - iter.find(|(_, this_slot)| *this_slot == slot) - .map(|(root, _)| root) - })?; + let root = self.block_root_at_slot(request_slot, skips)?; if let Some(block_root) = root { Ok(self.store.get_item(&block_root)?) @@ -521,21 +522,132 @@ impl BeaconChain { } /// Returns the block root at the given slot, if any. Only returns roots in the canonical chain. - /// Returns `Ok(None)` if the given `Slot` was skipped. + /// + /// ## Notes + /// + /// - Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot. + /// - Returns `Ok(None)` for any slot higher than the current wall-clock slot. + pub fn block_root_at_slot( + &self, + request_slot: Slot, + skips: WhenSlotSkipped, + ) -> Result, Error> { + match skips { + WhenSlotSkipped::None => self.block_root_at_slot_skips_none(request_slot), + WhenSlotSkipped::Prev => self.block_root_at_slot_skips_prev(request_slot), + } + } + + /// Returns the block root at the given slot, if any. Only returns roots in the canonical chain. + /// + /// ## Notes + /// + /// - Returns `Ok(None)` if the given `Slot` was skipped. + /// - Returns `Ok(None)` for any slot higher than the current wall-clock slot. /// /// ## Errors /// /// May return a database error. - pub fn block_root_at_slot(&self, slot: Slot) -> Result, Error> { - process_results(self.rev_iter_block_roots()?, |mut iter| { - let root_opt = iter - .find(|(_, this_slot)| *this_slot == slot) - .map(|(root, _)| root); - if let (Some(root), Some((prev_root, _))) = (root_opt, iter.next()) { - return (prev_root != root).then(|| root); + fn block_root_at_slot_skips_none(&self, request_slot: Slot) -> Result, Error> { + if request_slot > self.slot()? { + return Ok(None); + } else if request_slot == self.spec.genesis_slot { + return Ok(Some(self.genesis_block_root)); + } + + let prev_slot = request_slot.saturating_sub(1_u64); + + // Try an optimized path of reading the root directly from the head state. + let fast_lookup: Option> = self.with_head(|head| { + let state = &head.beacon_state; + + // Try find the root for the `request_slot`. + let request_root_opt = match state.slot.cmp(&request_slot) { + // It's always a skip slot if the head is less than the request slot, return early. + Ordering::Less => return Ok(Some(None)), + // The request slot is the head slot. + Ordering::Equal => Some(head.beacon_block_root), + // Try find the request slot in the state. + Ordering::Greater => state.get_block_root(request_slot).ok().copied(), + }; + + if let Some(request_root) = request_root_opt { + if let Ok(prev_root) = state.get_block_root(prev_slot) { + return Ok(Some((*prev_root != request_root).then(|| request_root))); + } } - root_opt - }) + + // Fast lookup is not possible. + Ok::<_, Error>(None) + })?; + if let Some(root_opt) = fast_lookup { + return Ok(root_opt); + } + + if let Some(((prev_root, _), (curr_root, curr_slot))) = + process_results(self.forwards_iter_block_roots(prev_slot)?, |iter| { + iter.tuple_windows().next() + })? + { + // Sanity check. + if curr_slot != request_slot { + return Err(Error::InconsistentForwardsIter { + request_slot, + slot: curr_slot, + }); + } + Ok((curr_root != prev_root).then(|| curr_root)) + } else { + Ok(None) + } + } + + /// Returns the block root at the given slot, if any. Only returns roots in the canonical chain. + /// + /// ## Notes + /// + /// - Returns the root at the previous non-skipped slot if the given `Slot` was skipped. + /// - Returns `Ok(None)` for any slot higher than the current wall-clock slot. + /// + /// ## Errors + /// + /// May return a database error. + fn block_root_at_slot_skips_prev(&self, request_slot: Slot) -> Result, Error> { + if request_slot > self.slot()? { + return Ok(None); + } else if request_slot == self.spec.genesis_slot { + return Ok(Some(self.genesis_block_root)); + } + + // Try an optimized path of reading the root directly from the head state. + let fast_lookup: Option = self.with_head(|head| { + if head.beacon_block.slot() <= request_slot { + // Return the head root if all slots between the request and the head are skipped. + Ok(Some(head.beacon_block_root)) + } else if let Ok(root) = head.beacon_state.get_block_root(request_slot) { + // Return the root if it's easily accessible from the head state. + Ok(Some(*root)) + } else { + // Fast lookup is not possible. + Ok::<_, Error>(None) + } + })?; + if let Some(root) = fast_lookup { + return Ok(Some(root)); + } + + process_results(self.forwards_iter_block_roots(request_slot)?, |mut iter| { + if let Some((root, slot)) = iter.next() { + if slot == request_slot { + Ok(Some(root)) + } else { + // Sanity check. + Err(Error::InconsistentForwardsIter { request_slot, slot }) + } + } else { + Ok(None) + } + })? } /// Returns the block at the given root, if any. @@ -825,16 +937,6 @@ impl BeaconChain { Ok(map) } - /// Returns the block canonical root of the current canonical chain at a given slot. - /// - /// Returns `None` if the given slot doesn't exist in the chain. - pub fn root_at_slot(&self, target_slot: Slot) -> Result, Error> { - process_results(self.rev_iter_block_roots()?, |mut iter| { - iter.find(|(_, slot)| *slot == target_slot) - .map(|(root, _)| root) - }) - } - /// Returns the block canonical root of the current canonical chain at a given slot, starting from the given state. /// /// Returns `None` if the given slot doesn't exist in the chain. @@ -2324,10 +2426,10 @@ impl BeaconChain { if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_head_subscribers() { if let Ok(Some(current_duty_dependent_root)) = - self.root_at_slot(target_epoch_start_slot - 1) + self.block_root_at_slot(target_epoch_start_slot - 1, WhenSlotSkipped::Prev) { - if let Ok(Some(previous_duty_dependent_root)) = - self.root_at_slot(prev_target_epoch_start_slot - 1) + if let Ok(Some(previous_duty_dependent_root)) = self + .block_root_at_slot(prev_target_epoch_start_slot - 1, WhenSlotSkipped::Prev) { event_handler.register(EventKind::Head(SseHead { slot: head_slot, diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9c47904dda1..dabe96c8afd 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -113,6 +113,10 @@ pub enum BeaconChainError { state_epoch: Epoch, shuffling_epoch: Epoch, }, + InconsistentForwardsIter { + request_slot: Slot, + slot: Slot, + }, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 27f999d305f..eb15a699a92 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -31,7 +31,7 @@ mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + ForkChoiceError, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::chain_config::ChainConfig; diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 4e4e062d404..e00a04395dc 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -5,7 +5,7 @@ extern crate lazy_static; use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}, - StateSkipConfig, + StateSkipConfig, WhenSlotSkipped, }; use store::config::StoreConfig; use tree_hash::TreeHash; @@ -60,7 +60,7 @@ fn produces_attestations() { }; let block = chain - .block_at_slot(block_slot) + .block_at_slot(block_slot, WhenSlotSkipped::Prev) .expect("should get block") .expect("block should not be skipped"); let block_root = block.message.tree_hash_root(); diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 0b7e7ef8d1f..9ee351faa6e 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -6,7 +6,7 @@ extern crate lazy_static; use beacon_chain::{ attestation_verification::Error as AttnError, test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, - BeaconChain, BeaconChainTypes, + BeaconChain, BeaconChainTypes, WhenSlotSkipped, }; use int_to_bytes::int_to_bytes32; use state_processing::{ @@ -912,7 +912,7 @@ fn attestation_that_skips_epochs() { let earlier_slot = (current_epoch - 2).start_slot(MainnetEthSpec::slots_per_epoch()); let earlier_block = harness .chain - .block_at_slot(earlier_slot) + .block_at_slot(earlier_slot, WhenSlotSkipped::Prev) .expect("should not error getting block at slot") .expect("should find block at slot"); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 616f2b544ff..daa306659de 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -9,6 +9,7 @@ use beacon_chain::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, OP_POOL_DB_KEY, }, + WhenSlotSkipped, }; use operation_pool::PersistedOperationPool; use state_processing::{ @@ -609,3 +610,147 @@ fn produces_and_processes_with_genesis_skip_slots() { run_skip_slot_test(i) } } + +#[test] +fn block_roots_skip_slot_behaviour() { + let harness = get_harness(VALIDATOR_COUNT); + + // Test should be longer than the block roots to ensure a DB lookup is triggered. + let chain_length = harness.chain.head().unwrap().beacon_state.block_roots.len() as u64 * 3; + + let skipped_slots = [1, 6, 7, 10, chain_length]; + + // Build a chain with some skip slots. + for i in 1..=chain_length { + if i > 1 { + harness.advance_slot(); + } + + let slot = harness.chain.slot().unwrap().as_u64(); + + if !skipped_slots.contains(&slot) { + harness.extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + } + } + + let mut prev_unskipped_root = None; + + for target_slot in 0..=chain_length { + if skipped_slots.contains(&target_slot) { + /* + * A skip slot + */ + assert!( + harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .is_none(), + "WhenSlotSkipped::None should return None on a skip slot" + ); + + let skipped_root = harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .expect("WhenSlotSkipped::Prev should always return Some"); + + assert_eq!( + skipped_root, + prev_unskipped_root.expect("test is badly formed"), + "WhenSlotSkipped::Prev should accurately return the prior skipped block" + ); + + let expected_block = harness.chain.get_block(&skipped_root).unwrap().unwrap(); + + assert_eq!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .unwrap(), + expected_block, + ); + + assert!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .is_none(), + "WhenSlotSkipped::None should return None on a skip slot" + ); + } else { + /* + * Not a skip slot + */ + let skips_none = harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .expect("WhenSlotSkipped::None should return Some for non-skipped block"); + let skips_prev = harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .expect("WhenSlotSkipped::Prev should always return Some"); + assert_eq!( + skips_none, skips_prev, + "WhenSlotSkipped::None and WhenSlotSkipped::Prev should be equal on non-skipped slot" + ); + + let expected_block = harness.chain.get_block(&skips_prev).unwrap().unwrap(); + + assert_eq!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .unwrap(), + expected_block + ); + + assert_eq!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .unwrap(), + expected_block + ); + + prev_unskipped_root = Some(skips_prev); + } + } + + /* + * A future, non-existent slot. + */ + + let future_slot = harness.chain.slot().unwrap() + 1; + assert_eq!( + harness.chain.head().unwrap().beacon_block.slot(), + future_slot - 2, + "test precondition" + ); + assert!( + harness + .chain + .block_root_at_slot(future_slot, WhenSlotSkipped::None) + .unwrap() + .is_none(), + "WhenSlotSkipped::None should return None on a future slot" + ); + assert!( + harness + .chain + .block_root_at_slot(future_slot, WhenSlotSkipped::Prev) + .unwrap() + .is_none(), + "WhenSlotSkipped::Prev should return None on a future slot" + ); +} diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 793a10a5e8c..c21701f3a37 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -1,4 +1,4 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::{BeaconChain, BeaconChainTypes, WhenSlotSkipped}; use eth2::types::BlockId as CoreBlockId; use std::str::FromStr; use types::{Hash256, SignedBeaconBlock, Slot}; @@ -37,7 +37,7 @@ impl BlockId { .map(|head| head.current_justified_checkpoint.root) .map_err(warp_utils::reject::beacon_chain_error), CoreBlockId::Slot(slot) => chain - .block_root_at_slot(*slot) + .block_root_at_slot(*slot, WhenSlotSkipped::None) .map_err(warp_utils::reject::beacon_chain_error) .and_then(|root_opt| { root_opt.ok_or_else(|| { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 325b2898865..30b8c8d90bd 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -17,6 +17,7 @@ use beacon_chain::{ observed_operations::ObservationOutcome, validator_monitor::{get_block_delay_ms, timestamp_now}, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + WhenSlotSkipped, }; use block_id::BlockId; use eth2::types::{self as api_types, ValidatorId}; @@ -751,7 +752,7 @@ pub fn serve( let block = BlockId::from_root(root).block(&chain)?; let canonical = chain - .block_root_at_slot(block.slot()) + .block_root_at_slot(block.slot(), WhenSlotSkipped::None) .map_err(warp_utils::reject::beacon_chain_error)? .map_or(false, |canonical| root == canonical); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index fd284a7c1ce..4fb7dc39a83 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -2,7 +2,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, - BeaconChain, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; use environment::null_logger; use eth2::Error; @@ -791,7 +791,10 @@ impl ApiTester { .current_justified_checkpoint .root, ), - BlockId::Slot(slot) => self.chain.block_root_at_slot(slot).unwrap(), + BlockId::Slot(slot) => self + .chain + .block_root_at_slot(slot, WhenSlotSkipped::None) + .unwrap(), BlockId::Root(root) => Some(root), } } @@ -812,14 +815,21 @@ impl ApiTester { .unwrap() .map(|res| res.data); - let root = self.chain.block_root_at_slot(slot).unwrap(); + let root = self + .chain + .block_root_at_slot(slot, WhenSlotSkipped::None) + .unwrap(); if root.is_none() && result.is_none() { continue; } let root = root.unwrap(); - let block = self.chain.block_at_slot(slot).unwrap().unwrap(); + let block = self + .chain + .block_at_slot(slot, WhenSlotSkipped::Prev) + .unwrap() + .unwrap(); let header = BlockHeaderData { root, canonical: true, @@ -900,7 +910,7 @@ impl ApiTester { let block_root = block_root_opt.unwrap(); let canonical = self .chain - .block_root_at_slot(block.slot()) + .block_root_at_slot(block.slot(), WhenSlotSkipped::None) .unwrap() .map_or(false, |canonical| block_root == canonical); @@ -1532,7 +1542,10 @@ impl ApiTester { let dependent_root = self .chain - .root_at_slot((epoch - 1).start_slot(E::slots_per_epoch()) - 1) + .block_root_at_slot( + (epoch - 1).start_slot(E::slots_per_epoch()) - 1, + WhenSlotSkipped::Prev, + ) .unwrap() .unwrap_or(self.chain.head_beacon_block_root().unwrap()); @@ -1604,7 +1617,10 @@ impl ApiTester { let dependent_root = self .chain - .root_at_slot(epoch.start_slot(E::slots_per_epoch()) - 1) + .block_root_at_slot( + epoch.start_slot(E::slots_per_epoch()) - 1, + WhenSlotSkipped::Prev, + ) .unwrap() .unwrap_or(self.chain.head_beacon_block_root().unwrap()); @@ -2186,7 +2202,7 @@ impl ApiTester { current_duty_dependent_root, previous_duty_dependent_root: self .chain - .root_at_slot(current_slot - E::slots_per_epoch()) + .block_root_at_slot(current_slot - E::slots_per_epoch(), WhenSlotSkipped::Prev) .unwrap() .unwrap(), epoch_transition: true, @@ -2195,7 +2211,7 @@ impl ApiTester { let expected_finalized = EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint { block: self .chain - .root_at_slot(next_slot - finalization_distance) + .block_root_at_slot(next_slot - finalization_distance, WhenSlotSkipped::Prev) .unwrap() .unwrap(), state: self diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index d25590f7c74..e1e51ef2c6b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -2,7 +2,7 @@ use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; -use beacon_chain::{BeaconChainError, BeaconChainTypes}; +use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; use eth2_libp2p::rpc::StatusMessage; use eth2_libp2p::rpc::*; use eth2_libp2p::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; @@ -72,7 +72,7 @@ impl Worker { && local.finalized_root != Hash256::zero() && self .chain - .root_at_slot(start_slot(remote.finalized_epoch)) + .block_root_at_slot(start_slot(remote.finalized_epoch), WhenSlotSkipped::Prev) .map(|root_opt| root_opt != Some(remote.finalized_root))? { // The remote's finalized epoch is less than or equal to ours, but the block root is diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs index 7dda6278e57..9a26304a0b5 100644 --- a/beacon_node/store/src/chunked_vector.rs +++ b/beacon_node/store/src/chunked_vector.rs @@ -431,9 +431,15 @@ fn range_query, E: EthSpec, T: Decode + Encode>( start_index: usize, end_index: usize, ) -> Result>, Error> { - let mut result = vec![]; + let range = start_index..=end_index; + let len = range + .end() + // Add one to account for inclusive range. + .saturating_add(1) + .saturating_sub(*range.start()); + let mut result = Vec::with_capacity(len); - for chunk_index in start_index..=end_index { + for chunk_index in range { let key = &chunk_key(chunk_index as u64)[..]; let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?; result.push(chunk); diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 5c0db4ebc92..ec0b1277432 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -3,7 +3,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, BeaconChainError, BeaconForkChoiceStore, ChainConfig, ForkChoiceError, - StateSkipConfig, + StateSkipConfig, WhenSlotSkipped, }; use fork_choice::{ ForkChoiceStore, InvalidAttestation, InvalidBlock, QueuedAttestation, @@ -872,7 +872,7 @@ fn invalid_attestation_future_block() { MutationDelay::Blocks(1), |attestation, chain| { attestation.data.beacon_block_root = chain - .block_at_slot(chain.slot().unwrap()) + .block_at_slot(chain.slot().unwrap(), WhenSlotSkipped::Prev) .unwrap() .unwrap() .canonical_root(); @@ -901,7 +901,7 @@ fn invalid_attestation_inconsistent_ffg_vote() { MutationDelay::NoDelay, |attestation, chain| { attestation.data.target.root = chain - .block_at_slot(Slot::new(1)) + .block_at_slot(Slot::new(1), WhenSlotSkipped::Prev) .unwrap() .unwrap() .canonical_root(); @@ -909,7 +909,7 @@ fn invalid_attestation_inconsistent_ffg_vote() { *attestation_opt.lock().unwrap() = Some(attestation.data.target.root); *local_opt.lock().unwrap() = Some( chain - .block_at_slot(Slot::new(0)) + .block_at_slot(Slot::new(0), WhenSlotSkipped::Prev) .unwrap() .unwrap() .canonical_root(), From ce027b6693750784a66cd004d962a3b80ee4d703 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 31 May 2021 14:21:17 +1000 Subject: [PATCH 4/5] Bump versions --- beacon_node/Cargo.toml | 2 +- boot_node/Cargo.toml | 2 +- common/lighthouse_version/src/lib.rs | 2 +- lcli/Cargo.toml | 2 +- lighthouse/Cargo.toml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 4cfede51001..f170ee86f51 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "1.3.0" +version = "1.4.0" authors = ["Paul Hauner ", "Age Manning "] edition = "2018" diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 5fa8fbe5a8f..06dec1c0dee 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -16,7 +16,7 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v1.3.0-", + prefix = "Lighthouse/v1.4.0-", fallback = "unknown" ); diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index bc9c69c8b7d..56d8b615610 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "1.3.0" +version = "1.4.0" authors = ["Paul Hauner "] edition = "2018" diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 60009400aa3..7746d58956c 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "1.3.0" +version = "1.4.0" authors = ["Sigma Prime "] edition = "2018" autotests = false From 703c2ffdb4b3079354da630969ec354b2b79e4db Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 31 May 2021 14:21:31 +1000 Subject: [PATCH 5/5] Freshen Cargo.lock --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de30f767e51..3d7f9395383 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,7 +579,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "1.3.0" +version = "1.4.0" dependencies = [ "beacon_chain", "clap", @@ -785,7 +785,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "1.3.0" +version = "1.4.0" dependencies = [ "beacon_node", "clap", @@ -3315,7 +3315,7 @@ dependencies = [ [[package]] name = "lcli" -version = "1.3.0" +version = "1.4.0" dependencies = [ "account_utils", "bls", @@ -3692,7 +3692,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "1.3.0" +version = "1.4.0" dependencies = [ "account_manager", "account_utils",