Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beacon_node/eth1/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::Config;
use crate::{
block_cache::{BlockCache, Eth1Block},
deposit_cache::{DepositCache, SszDepositCache},
service::EndpointsCache,
};
use parking_lot::RwLock;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::sync::Arc;
use types::ChainSpec;

#[derive(Default)]
Expand All @@ -28,6 +30,7 @@ impl DepositUpdater {
pub struct Inner {
pub block_cache: RwLock<BlockCache>,
pub deposit_cache: RwLock<DepositUpdater>,
pub endpoints_cache: RwLock<Option<Arc<EndpointsCache>>>,
pub config: RwLock<Config>,
pub remote_head_block: RwLock<Option<Eth1Block>>,
pub spec: ChainSpec,
Expand Down Expand Up @@ -87,6 +90,7 @@ impl SszEth1Cache {
cache: self.deposit_cache.to_deposit_cache()?,
last_processed_block: self.last_processed_block,
}),
endpoints_cache: RwLock::new(None),
// Set the remote head_block zero when creating a new instance. We only care about
// present and future eth1 nodes.
remote_head_block: RwLock::new(None),
Expand Down
91 changes: 70 additions & 21 deletions beacon_node/eth1/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,27 @@ pub enum EndpointError {

type EndpointState = Result<(), EndpointError>;

type EndpointWithState = (SensitiveUrl, TRwLock<Option<EndpointState>>);
pub struct EndpointWithState {
endpoint: SensitiveUrl,
state: TRwLock<Option<EndpointState>>,
}

impl EndpointWithState {
pub fn new(endpoint: SensitiveUrl) -> Self {
Self {
endpoint,
state: TRwLock::new(None),
}
}
}

async fn reset_endpoint_state(endpoint: &EndpointWithState) {
*endpoint.state.write().await = None;
}

async fn get_state(endpoint: &EndpointWithState) -> Option<EndpointState> {
*endpoint.state.read().await
}

/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is
/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint
Expand All @@ -70,19 +90,19 @@ impl EndpointsCache {
/// Checks the usability of an endpoint. Results get cached and therefore only the first call
/// for each endpoint does the real check.
async fn state(&self, endpoint: &EndpointWithState) -> EndpointState {
if let Some(result) = *endpoint.1.read().await {
if let Some(result) = *endpoint.state.read().await {
return result;
}
let mut value = endpoint.1.write().await;
let mut value = endpoint.state.write().await;
if let Some(result) = *value {
return result;
}
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[&endpoint.0.to_string()],
&[&endpoint.endpoint.to_string()],
);
let state = endpoint_state(
&endpoint.0,
&endpoint.endpoint,
&self.config_network_id,
&self.config_chain_id,
&self.log,
Expand All @@ -92,7 +112,7 @@ impl EndpointsCache {
if state.is_err() {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[&endpoint.0.to_string()],
&[&endpoint.endpoint.to_string()],
);
}
state
Expand All @@ -111,20 +131,23 @@ impl EndpointsCache {
.first_success(|endpoint| async move {
match self.state(endpoint).await {
Ok(()) => {
let endpoint_str = &endpoint.0.to_string();
let endpoint_str = &endpoint.endpoint.to_string();
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[endpoint_str],
);
match func(&endpoint.0).await {
match func(&endpoint.endpoint).await {
Ok(t) => Ok(t),
Err(t) => {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[endpoint_str],
);
if let SingleEndpointError::EndpointError(e) = &t {
*endpoint.1.write().await = Some(Err(*e));
*endpoint.state.write().await = Some(Err(*e));
} else {
// A non-`EndpointError` error occurred, so reset the state.
reset_endpoint_state(endpoint).await;
}
Err(t)
}
Expand All @@ -135,6 +158,16 @@ impl EndpointsCache {
})
.await
}

pub async fn reset_errorred_endpoints(&self) {
for endpoint in &self.fallback.servers {
if let Some(state) = get_state(endpoint).await {
if state.is_err() {
reset_endpoint_state(endpoint).await;
}
}
}
}
}

/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and
Expand Down Expand Up @@ -402,9 +435,9 @@ impl Default for Config {
follow_distance: 128,
node_far_behind_seconds: 128 * 14,
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 7_000,
auto_update_interval_millis: 60_000,
blocks_per_log_query: 1_000,
max_log_requests_per_update: Some(100),
max_log_requests_per_update: Some(5_000),
max_blocks_per_update: Some(8_192),
purge_cache: false,
}
Expand Down Expand Up @@ -432,6 +465,7 @@ impl Service {
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
endpoints_cache: RwLock::new(None),
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
Expand Down Expand Up @@ -602,20 +636,31 @@ impl Service {
self.inner.config.write().lowest_cached_block_number = block_number;
}

pub fn init_endpoints(&self) -> EndpointsCache {
/// Builds a new `EndpointsCache` with empty states.
pub fn init_endpoints(&self) -> Arc<EndpointsCache> {
let endpoints = self.config().endpoints.clone();
let config_network_id = self.config().network_id.clone();
let config_chain_id = self.config().chain_id.clone();
EndpointsCache {
fallback: Fallback::new(
endpoints
.into_iter()
.map(|s| (s, TRwLock::new(None)))
.collect(),
),
let new_cache = Arc::new(EndpointsCache {
fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()),
config_network_id,
config_chain_id,
log: self.log.clone(),
});

let mut endpoints_cache = self.inner.endpoints_cache.write();
*endpoints_cache = Some(new_cache.clone());
new_cache
}

/// Returns the cached `EndpointsCache` if it exists or builds a new one.
pub fn get_endpoints(&self) -> Arc<EndpointsCache> {
let endpoints_cache = self.inner.endpoints_cache.read();
if let Some(cache) = endpoints_cache.clone() {
cache
} else {
drop(endpoints_cache);
self.init_endpoints()
}
}

Expand All @@ -630,7 +675,11 @@ impl Service {
pub async fn update(
&self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let endpoints = self.init_endpoints();
let endpoints = self.get_endpoints();

// Reset the state of any endpoints which have errored so their state can be redetermined.
endpoints.reset_errorred_endpoints().await;

let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds;

let process_single_err = |e: &FallbackError<SingleEndpointError>| {
Expand All @@ -653,7 +702,7 @@ impl Service {
}
}
}
endpoints.fallback.map_format_error(|s| &s.0, &e)
endpoints.fallback.map_format_error(|s| &s.endpoint, &e)
};

let process_err = |e: Error| match &e {
Expand Down
3 changes: 2 additions & 1 deletion common/fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use itertools::{join, zip};
use std::fmt::{Debug, Display};
use std::future::Future;

#[derive(Clone)]
pub struct Fallback<T> {
servers: Vec<T>,
pub servers: Vec<T>,
}

#[derive(Debug, PartialEq)]
Expand Down