Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions beacon_node/eth1/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::Config;
use crate::{
block_cache::{BlockCache, Eth1Block},
deposit_cache::{DepositCache, SszDepositCache},
service::EndpointsCache,
};
use parking_lot::RwLock;
use ssz::{Decode, Encode};
Expand All @@ -28,6 +29,7 @@ impl DepositUpdater {
pub struct Inner {
pub block_cache: RwLock<BlockCache>,
pub deposit_cache: RwLock<DepositUpdater>,
pub endpoints_cache: RwLock<Option<EndpointsCache>>,
pub config: RwLock<Config>,
pub remote_head_block: RwLock<Option<Eth1Block>>,
pub spec: ChainSpec,
Expand Down Expand Up @@ -87,6 +89,7 @@ impl SszEth1Cache {
cache: self.deposit_cache.to_deposit_cache()?,
last_processed_block: self.last_processed_block,
}),
endpoints_cache: RwLock::new(None),
// Set the remote head_block zero when creating a new instance. We only care about
// present and future eth1 nodes.
remote_head_block: RwLock::new(None),
Expand Down
89 changes: 70 additions & 19 deletions beacon_node/eth1/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,25 @@ pub enum EndpointError {

type EndpointState = Result<(), EndpointError>;

type EndpointWithState = (SensitiveUrl, TRwLock<Option<EndpointState>>);
#[derive(Clone)]
pub struct EndpointWithState {
Comment thread
paulhauner marked this conversation as resolved.
endpoint: SensitiveUrl,
state: Arc<TRwLock<Option<EndpointState>>>,
}

impl EndpointWithState {
pub fn new(endpoint: SensitiveUrl) -> Self {
Self {
endpoint,
state: Arc::new(TRwLock::new(None)),
}
}
}

/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is
/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint
/// is not usable.
#[derive(Clone)]
pub struct EndpointsCache {
pub fallback: Fallback<EndpointWithState>,
pub config_network_id: Eth1Id,
Expand All @@ -70,19 +84,19 @@ impl EndpointsCache {
/// Checks the usability of an endpoint. Results get cached and therefore only the first call
/// for each endpoint does the real check.
async fn state(&self, endpoint: &EndpointWithState) -> EndpointState {
if let Some(result) = *endpoint.1.read().await {
if let Some(result) = *endpoint.state.read().await {
return result;
}
let mut value = endpoint.1.write().await;
let mut value = endpoint.state.write().await;
if let Some(result) = *value {
return result;
}
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[&endpoint.0.to_string()],
&[&endpoint.endpoint.to_string()],
);
let state = endpoint_state(
&endpoint.0,
&endpoint.endpoint,
&self.config_network_id,
&self.config_chain_id,
&self.log,
Expand All @@ -92,7 +106,7 @@ impl EndpointsCache {
if state.is_err() {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[&endpoint.0.to_string()],
&[&endpoint.endpoint.to_string()],
);
}
state
Expand All @@ -111,20 +125,23 @@ impl EndpointsCache {
.first_success(|endpoint| async move {
match self.state(endpoint).await {
Ok(()) => {
let endpoint_str = &endpoint.0.to_string();
let endpoint_str = &endpoint.endpoint.to_string();
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[endpoint_str],
);
match func(&endpoint.0).await {
match func(&endpoint.endpoint).await {
Ok(t) => Ok(t),
Err(t) => {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[endpoint_str],
);
if let SingleEndpointError::EndpointError(e) = &t {
*endpoint.1.write().await = Some(Err(*e));
*endpoint.state.write().await = Some(Err(*e));
} else {
// A non-`EndpointError` error occurred, so reset the state.
self.reset_endpoint_state(endpoint).await;
}
Err(t)
}
Expand All @@ -135,6 +152,24 @@ impl EndpointsCache {
})
.await
}

async fn reset_endpoint_state(&self, endpoint: &EndpointWithState) {
Comment thread
macladson marked this conversation as resolved.
Outdated
*endpoint.state.write().await = None;
}

async fn get_state(&self, endpoint: &EndpointWithState) -> Option<EndpointState> {
Comment thread
macladson marked this conversation as resolved.
Outdated
*endpoint.state.read().await
}

pub async fn reset_errorred_endpoints(&self) {
for endpoint in &self.fallback.servers {
if let Some(state) = self.get_state(endpoint).await {
if state.is_err() {
self.reset_endpoint_state(endpoint).await;
}
}
}
}
}

/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and
Expand Down Expand Up @@ -402,7 +437,7 @@ impl Default for Config {
follow_distance: 128,
node_far_behind_seconds: 128 * 14,
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 7_000,
auto_update_interval_millis: 60_000,
blocks_per_log_query: 1_000,
max_log_requests_per_update: Some(100),
max_blocks_per_update: Some(8_192),
Expand Down Expand Up @@ -432,6 +467,7 @@ impl Service {
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
endpoints_cache: RwLock::new(None),
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
Expand Down Expand Up @@ -602,20 +638,31 @@ impl Service {
self.inner.config.write().lowest_cached_block_number = block_number;
}

/// Builds a new `EndpointsCache` with empty states.
pub fn init_endpoints(&self) -> EndpointsCache {
let endpoints = self.config().endpoints.clone();
let config_network_id = self.config().network_id.clone();
let config_chain_id = self.config().chain_id.clone();
EndpointsCache {
fallback: Fallback::new(
endpoints
.into_iter()
.map(|s| (s, TRwLock::new(None)))
.collect(),
),
let new_cache = EndpointsCache {
fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()),
config_network_id,
config_chain_id,
log: self.log.clone(),
};

let mut endpoints_cache = self.inner.endpoints_cache.write();
*endpoints_cache = Some(new_cache.clone());
new_cache
}

/// Returns the cached `EndpointsCache` if it exists or builds a new one.
pub fn get_endpoints(&self) -> EndpointsCache {
let endpoints_cache = self.inner.endpoints_cache.read();
if let Some(cache) = endpoints_cache.clone() {
cache
} else {
drop(endpoints_cache);
Comment thread
macladson marked this conversation as resolved.
self.init_endpoints()
}
}

Expand All @@ -630,7 +677,11 @@ impl Service {
pub async fn update(
&self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let endpoints = self.init_endpoints();
let endpoints = self.get_endpoints();

// Reset the state of any endpoints which have errored so their state can be redetermined.
endpoints.reset_errorred_endpoints().await;

let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds;

let process_single_err = |e: &FallbackError<SingleEndpointError>| {
Expand All @@ -653,7 +704,7 @@ impl Service {
}
}
}
endpoints.fallback.map_format_error(|s| &s.0, &e)
endpoints.fallback.map_format_error(|s| &s.endpoint, &e)
};

let process_err = |e: Error| match &e {
Expand Down
3 changes: 2 additions & 1 deletion common/fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use itertools::{join, zip};
use std::fmt::{Debug, Display};
use std::future::Future;

#[derive(Clone)]
pub struct Fallback<T> {
servers: Vec<T>,
pub servers: Vec<T>,
}

#[derive(Debug, PartialEq)]
Expand Down