Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ where
let genesis_time = head_snapshot.beacon_state.genesis_time();
let head_for_snapshot_cache = head_snapshot.clone();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;

let beacon_chain = BeaconChain {
spec: self.spec,
Expand Down Expand Up @@ -818,7 +819,7 @@ where
DEFAULT_SNAPSHOT_CACHE_SIZE,
head_for_snapshot_cache,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct ChainConfig {
pub prepare_payload_lookahead: Duration,
/// Use EL-free optimistic sync for the finalized part of the chain.
pub optimistic_finalized_sync: bool,
/// The size of the shuffling cache,
pub shuffling_cache_size: usize,
/// Whether to send payload attributes every slot, regardless of connected proposers.
///
/// This is useful for block builders and testing.
Expand Down Expand Up @@ -97,6 +99,7 @@ impl Default for ChainConfig {
prepare_payload_lookahead: Duration::from_secs(4),
// This value isn't actually read except in tests.
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
always_prepare_payload: false,
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod persisted_fork_choice;
mod pre_finalization_cache;
pub mod proposer_prep_service;
pub mod schema_change;
mod shuffling_cache;
pub mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod sync_committee_rewards;
Expand Down
16 changes: 8 additions & 8 deletions beacon_node/beacon_chain/src/shuffling_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
/// ignores a few extra bytes in the caches that should be insignificant compared to the indices).
const CACHE_SIZE: usize = 16;
pub const DEFAULT_CACHE_SIZE: usize = 16;

/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this
/// limits the number of concurrent states that can be loaded into memory for the committee cache.
Expand Down Expand Up @@ -54,9 +54,9 @@ pub struct ShufflingCache {
}

impl ShufflingCache {
pub fn new() -> Self {
pub fn new(cache_size: usize) -> Self {
Self {
cache: LruCache::new(CACHE_SIZE),
cache: LruCache::new(cache_size),
}
}

Expand Down Expand Up @@ -172,7 +172,7 @@ impl ToArcCommitteeCache for Arc<CommitteeCache> {

impl Default for ShufflingCache {
fn default() -> Self {
Self::new()
Self::new(DEFAULT_CACHE_SIZE)
}
}

Expand Down Expand Up @@ -249,7 +249,7 @@ mod test {
fn resolved_promise() {
let (committee_a, _) = committee_caches();
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::new(16);

// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
Expand All @@ -276,7 +276,7 @@ mod test {
#[test]
fn unresolved_promise() {
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::new(16);

// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
Expand All @@ -301,7 +301,7 @@ mod test {
fn two_promises() {
let (committee_a, committee_b) = committee_caches();
let (id_a, id_b) = (shuffling_id(1), shuffling_id(2));
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::new(16);

// Create promise A.
let sender_a = cache.create_promise(id_a.clone()).unwrap();
Expand Down Expand Up @@ -355,7 +355,7 @@ mod test {

#[test]
fn too_many_promises() {
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::new(16);

for i in 0..MAX_CONCURRENT_PROMISES {
cache.create_promise(shuffling_id(i as u64)).unwrap();
Expand Down
131 changes: 102 additions & 29 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ use system_health::observe_system_health_bn;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload,
CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
Expand Down Expand Up @@ -783,39 +783,112 @@ pub fn serve<T: BeaconChainTypes>(
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);

let committee_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state
.committee_cache_is_initialized(relative_epoch) =>
{
state.committee_cache(relative_epoch).map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(state, epoch, &chain.spec)
.map(Cow::Owned),
}
.map_err(|e| match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root() as u64;
let first_subsequent_restore_point_slot = ((epoch
.start_slot(T::EthSpec::slots_per_epoch())
/ max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(format!(
// Attempt to obtain the committee_cache from the beacon chain
let decision_slot = (epoch.saturating_sub(2u64))
.end_slot(T::EthSpec::slots_per_epoch());
// Find the decision block and skip to another method on any kind
// of failure
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{
Some(AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
})
} else {
None
};

// Attempt to read from the chain cache if there exists a
// shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) =
shuffling_id.as_ref()
{
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else {
None
};

let committee_cache = match maybe_cached_shuffling {
Some(shuffling) => shuffling,
None => {
let possibly_built_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state.committee_cache_is_initialized(
relative_epoch,
) =>
{
state
.committee_cache(relative_epoch)
.map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(
state,
epoch,
&chain.spec,
)
.map(Cow::Owned),
}
.map_err(
|e| match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root()
as u64;
let first_subsequent_restore_point_slot =
((epoch.start_slot(
T::EthSpec::slots_per_epoch(),
) / max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(
format!(
"epoch out of bounds, try state at slot {}",
first_subsequent_restore_point_slot,
))
} else {
warp_utils::reject::custom_bad_request(
),
)
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, too far in future".into(),
)
}
}
_ => warp_utils::reject::beacon_chain_error(
e.into(),
),
},
)?;

let owned_cache =
Arc::new(possibly_built_cache.into_owned());

// Attempt to write to the beacon cache (only if the cache
// size is not the default value
if chain.config.shuffling_cache_size
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
{
if let Some(shuffling_id) = shuffling_id {
if let Some(mut cache_write) =
chain.shuffling_cache.try_write_for(
std::time::Duration::from_secs(1),
)
{
cache_write.insert_committee_cache(
shuffling_id,
&owned_cache,
);
}
}
}
_ => warp_utils::reject::beacon_chain_error(e.into()),
})?;
owned_cache
}
};

// Use either the supplied slot or all slots in the epoch.
let slots =
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
address of this server (e.g., http://localhost:5054).")
.takes_value(true),
)
.arg(
Arg::with_name("shuffling-cache-size")
.long("shuffling-cache-size")
.help("Some HTTP API requests can be optimised by caching the shufflings at each epoch. \
This flag allows the user to set the shuffling cache size in epochs. \
Shufflings are dependent on validator count and setting this value to a large number can consume a large amount of memory.")
.takes_value(true)
)

/*
* Monitoring metrics
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true;
}

if let Some(cache_size) = cli_args.value_of("shuffling-cache-size") {
client_config.chain.shuffling_cache_size = cache_size
.parse::<usize>()
.map_err(|_| "cache size is not a valid u64")?;
}

/*
* Prometheus metrics HTTP server
*/
Expand Down