Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ where
let genesis_time = head_snapshot.beacon_state.genesis_time();
let head_for_snapshot_cache = head_snapshot.clone();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;

let beacon_chain = BeaconChain {
spec: self.spec,
Expand Down Expand Up @@ -818,7 +819,7 @@ where
DEFAULT_SNAPSHOT_CACHE_SIZE,
head_for_snapshot_cache,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct ChainConfig {
pub prepare_payload_lookahead: Duration,
/// Use EL-free optimistic sync for the finalized part of the chain.
pub optimistic_finalized_sync: bool,
/// The size of the shuffling cache,
pub shuffling_cache_size: usize,
/// Whether to send payload attributes every slot, regardless of connected proposers.
///
/// This is useful for block builders and testing.
Expand Down Expand Up @@ -97,6 +99,7 @@ impl Default for ChainConfig {
prepare_payload_lookahead: Duration::from_secs(4),
// This value isn't actually read except in tests.
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
always_prepare_payload: false,
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ mod persisted_fork_choice;
mod pre_finalization_cache;
pub mod proposer_prep_service;
pub mod schema_change;
mod shuffling_cache;
pub mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod sync_committee_rewards;
Expand Down
16 changes: 8 additions & 8 deletions beacon_node/beacon_chain/src/shuffling_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
/// ignores a few extra bytes in the caches that should be insignificant compared to the indices).
const CACHE_SIZE: usize = 16;
pub const DEFAULT_CACHE_SIZE: usize = 16;

/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this
/// limits the number of concurrent states that can be loaded into memory for the committee cache.
Expand Down Expand Up @@ -54,9 +54,9 @@ pub struct ShufflingCache {
}

impl ShufflingCache {
pub fn new() -> Self {
pub fn new(cache_size: usize) -> Self {
Self {
cache: LruCache::new(CACHE_SIZE),
cache: LruCache::new(cache_size),
}
}

Expand Down Expand Up @@ -172,7 +172,7 @@ impl ToArcCommitteeCache for Arc<CommitteeCache> {

impl Default for ShufflingCache {
fn default() -> Self {
Self::new()
Self::new(DEFAULT_CACHE_SIZE)
}
}

Expand Down Expand Up @@ -249,7 +249,7 @@ mod test {
fn resolved_promise() {
let (committee_a, _) = committee_caches();
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();

// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
Expand All @@ -276,7 +276,7 @@ mod test {
#[test]
fn unresolved_promise() {
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();

// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
Expand All @@ -301,7 +301,7 @@ mod test {
fn two_promises() {
let (committee_a, committee_b) = committee_caches();
let (id_a, id_b) = (shuffling_id(1), shuffling_id(2));
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();

// Create promise A.
let sender_a = cache.create_promise(id_a.clone()).unwrap();
Expand Down Expand Up @@ -355,7 +355,7 @@ mod test {

#[test]
fn too_many_promises() {
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();

for i in 0..MAX_CONCURRENT_PROMISES {
cache.create_promise(shuffling_id(i as u64)).unwrap();
Expand Down
137 changes: 105 additions & 32 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ use system_health::observe_system_health_bn;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload,
CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
Expand Down Expand Up @@ -784,39 +784,112 @@ pub fn serve<T: BeaconChainTypes>(
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);

let committee_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state
.committee_cache_is_initialized(relative_epoch) =>
{
state.committee_cache(relative_epoch).map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(state, epoch, &chain.spec)
// Attempt to obtain the committee_cache from the beacon chain
let decision_slot = (epoch.saturating_sub(2u64))
.end_slot(T::EthSpec::slots_per_epoch());
// Find the decision block and skip to another method on any kind
// of failure
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{
Some(AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
})
} else {
None
};

// Attempt to read from the chain cache if there exists a
// shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) =
shuffling_id.as_ref()
{
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else {
None
};

let committee_cache = if let Some(ref shuffling) =
maybe_cached_shuffling
{
Cow::Borrowed(&**shuffling)
} else {
let possibly_built_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state.committee_cache_is_initialized(
relative_epoch,
) =>
{
state
.committee_cache(relative_epoch)
.map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(
state,
epoch,
&chain.spec,
)
.map(Cow::Owned),
}
.map_err(|e| match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root() as u64;
let first_subsequent_restore_point_slot = ((epoch
.start_slot(T::EthSpec::slots_per_epoch())
/ max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(format!(
"epoch out of bounds, try state at slot {}",
first_subsequent_restore_point_slot,
))
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, too far in future".into(),
)
}
.map_err(|e| {
match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root()
as u64;
let first_subsequent_restore_point_slot =
((epoch.start_slot(
T::EthSpec::slots_per_epoch(),
) / max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(
format!(
"epoch out of bounds, \
try state at slot {}",
first_subsequent_restore_point_slot,
),
)
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, \
too far in future"
.into(),
)
}
}
_ => {
warp_utils::reject::beacon_chain_error(e.into())
}
}
})?;

// Attempt to write to the beacon cache (only if the cache
// size is not the default value).
if chain.config.shuffling_cache_size
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
{
if let Some(shuffling_id) = shuffling_id {
if let Some(mut cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
cache_write.insert_committee_cache(
shuffling_id,
&*possibly_built_cache,
);
}
}
_ => warp_utils::reject::beacon_chain_error(e.into()),
})?;
}
possibly_built_cache
};

// Use either the supplied slot or all slots in the epoch.
let slots =
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
address of this server (e.g., http://localhost:5054).")
.takes_value(true),
)
.arg(
Arg::with_name("shuffling-cache-size")
.long("shuffling-cache-size")
.help("Some HTTP API requests can be optimised by caching the shufflings at each epoch. \
This flag allows the user to set the shuffling cache size in epochs. \
Shufflings are dependent on validator count and setting this value to a large number can consume a large amount of memory.")
.takes_value(true)
)

/*
* Monitoring metrics
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true;
}

if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
client_config.chain.shuffling_cache_size = cache_size;
}

/*
* Prometheus metrics HTTP server
*/
Expand Down
20 changes: 20 additions & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ fn disable_lock_timeouts_flag() {
.with_config(|config| assert!(!config.chain.enable_lock_timeouts));
}

#[test]
fn shuffling_cache_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.chain.shuffling_cache_size,
beacon_node::beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
)
});
}

#[test]
fn shuffling_cache_set() {
CommandLineTest::new()
.flag("shuffling-cache-size", Some("500"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.chain.shuffling_cache_size, 500));
}

#[test]
fn fork_choice_before_proposal_timeout_default() {
CommandLineTest::new()
Expand Down