diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a0ec04860a583e..c8f534448579c3 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -47,7 +47,7 @@ use { rent_collector::RentCollector, runtime_config::RuntimeConfig, stake_account::StakeAccount, - stake_utils, + stake_history::StakeHistory as CowStakeHistory, stake_weighted_timestamp::{ calculate_stake_weighted_timestamp, MaxAllowableDrift, MAX_ALLOWABLE_DRIFT_PERCENTAGE_FAST, MAX_ALLOWABLE_DRIFT_PERCENTAGE_SLOW_V2, @@ -112,7 +112,6 @@ use { solana_lattice_hash::lt_hash::LtHash, solana_measure::{measure::Measure, measure_time, measure_us}, solana_message::{inner_instruction::InnerInstructions, AccountKeys, SanitizedMessage}, - solana_native_token::LAMPORTS_PER_SOL, solana_packet::PACKET_DATA_SIZE, solana_precompile_error::PrecompileError, solana_program_runtime::{ @@ -167,7 +166,7 @@ use { transaction_accounts::KeyedAccountSharedData, TransactionReturnData, }, solana_transaction_error::{TransactionError, TransactionResult as Result}, - solana_vote::vote_account::{VoteAccount, VoteAccountsHashMap}, + solana_vote::vote_account::{VoteAccount, VoteAccounts, VoteAccountsHashMap}, std::{ collections::{HashMap, HashSet}, fmt, @@ -1050,6 +1049,14 @@ impl AtomicBankHashStats { } } +struct NewEpochBundle { + stake_history: CowStakeHistory, + vote_accounts: VoteAccounts, + rewards_calculation: Arc, + calculate_activated_stake_time_us: u64, + update_rewards_with_thread_pool_time_us: u64, +} + impl Bank { fn default_with_accounts(accounts: Accounts) -> Self { let mut bank = Self { @@ -1591,6 +1598,47 @@ impl Bank { .new_warmup_cooldown_rate_epoch(&self.epoch_schedule) } + /// Returns updated stake history and vote accounts that includes new + /// activated stake from the last epoch. + fn compute_new_epoch_caches_and_rewards( + &self, + thread_pool: &ThreadPool, + parent_epoch: Epoch, + reward_calc_tracer: Option, + rewards_metrics: &mut RewardsMetrics, + ) -> NewEpochBundle { + // Add new entry to stakes.stake_history, set appropriate epoch and + // update vote accounts with warmed up stakes before saving a + // snapshot of stakes in epoch stakes + let stakes = self.stakes_cache.stakes(); + let stake_delegations = stakes.stake_delegations_vec(); + let ((stake_history, vote_accounts), calculate_activated_stake_time_us) = + measure_us!(stakes.calculate_activated_stake( + self.epoch(), + thread_pool, + self.new_warmup_cooldown_rate_epoch(), + &stake_delegations + )); + // Apply stake rewards and commission using new snapshots. + let (rewards_calculation, update_rewards_with_thread_pool_time_us) = measure_us!(self + .calculate_rewards( + &stake_history, + stake_delegations, + &vote_accounts, + parent_epoch, + reward_calc_tracer, + thread_pool, + rewards_metrics, + )); + NewEpochBundle { + stake_history, + vote_accounts, + rewards_calculation, + calculate_activated_stake_time_us, + update_rewards_with_thread_pool_time_us, + } + } + /// process for the start of a new epoch fn process_new_epoch( &mut self, @@ -1610,31 +1658,37 @@ impl Bank { thread_pool.install(|| { self.compute_and_apply_new_feature_activations() }) ); - // Add new entry to stakes.stake_history, set appropriate epoch and - // update vote accounts with warmed up stakes before saving a - // snapshot of stakes in epoch stakes - let (_, activate_epoch_time_us) = measure_us!(self.stakes_cache.activate_epoch( - epoch, + let mut rewards_metrics = RewardsMetrics::default(); + let NewEpochBundle { + stake_history, + vote_accounts, + rewards_calculation, + calculate_activated_stake_time_us, + update_rewards_with_thread_pool_time_us, + } = self.compute_new_epoch_caches_and_rewards( &thread_pool, - self.new_warmup_cooldown_rate_epoch() - )); + parent_epoch, + reward_calc_tracer, + &mut rewards_metrics, + ); + + self.stakes_cache + .activate_epoch(epoch, stake_history, vote_accounts); // Save a snapshot of stakes for use in consensus and stake weighted networking let leader_schedule_epoch = self.epoch_schedule.get_leader_schedule_epoch(slot); let (_, update_epoch_stakes_time_us) = measure_us!(self.update_epoch_stakes(leader_schedule_epoch)); - let mut rewards_metrics = RewardsMetrics::default(); - // After saving a snapshot of stakes, apply stake rewards and commission - let (_, update_rewards_with_thread_pool_time_us) = measure_us!(self - .begin_partitioned_rewards( - reward_calc_tracer, - &thread_pool, - parent_epoch, - parent_slot, - parent_height, - &mut rewards_metrics, - )); + // Distribute rewards commission to vote accounts and cache stake rewards + // for partitioned distribution in the upcoming slots. + self.begin_partitioned_rewards( + parent_epoch, + parent_slot, + parent_height, + &rewards_calculation, + &rewards_metrics, + ); report_new_epoch_metrics( epoch, @@ -1643,7 +1697,7 @@ impl Bank { NewEpochTimings { thread_pool_time_us, apply_feature_activations_time_us, - activate_epoch_time_us, + calculate_activated_stake_time_us, update_epoch_stakes_time_us, update_rewards_with_thread_pool_time_us, }, @@ -2301,41 +2355,6 @@ impl Bank { } } - fn filter_stake_delegations<'a>( - &self, - stakes: &'a Stakes>, - ) -> Vec<(&'a Pubkey, &'a StakeAccount)> { - if self - .feature_set - .is_active(&feature_set::stake_minimum_delegation_for_rewards::id()) - { - let num_stake_delegations = stakes.stake_delegations().len(); - let min_stake_delegation = stake_utils::get_minimum_delegation( - self.feature_set - .is_active(&agave_feature_set::stake_raise_minimum_delegation_to_1_sol::id()), - ) - .max(LAMPORTS_PER_SOL); - - let (stake_delegations, filter_time_us) = measure_us!(stakes - .stake_delegations() - .iter() - .filter(|(_stake_pubkey, cached_stake_account)| { - cached_stake_account.delegation().stake >= min_stake_delegation - }) - .collect::>()); - - datapoint_info!( - "stake_account_filter_time", - ("filter_time_us", filter_time_us, i64), - ("num_stake_delegations_before", num_stake_delegations, i64), - ("num_stake_delegations_after", stake_delegations.len(), i64) - ); - stake_delegations - } else { - stakes.stake_delegations().iter().collect() - } - } - /// Convert computed VoteRewards to VoteRewardsAccounts for storing. /// /// This function processes vote rewards and consolidates them into a single diff --git a/runtime/src/bank/metrics.rs b/runtime/src/bank/metrics.rs index e6e76639f5db8d..9609a641a28177 100644 --- a/runtime/src/bank/metrics.rs +++ b/runtime/src/bank/metrics.rs @@ -11,7 +11,7 @@ use { pub(crate) struct NewEpochTimings { pub(crate) thread_pool_time_us: u64, pub(crate) apply_feature_activations_time_us: u64, - pub(crate) activate_epoch_time_us: u64, + pub(crate) calculate_activated_stake_time_us: u64, pub(crate) update_epoch_stakes_time_us: u64, pub(crate) update_rewards_with_thread_pool_time_us: u64, } @@ -63,7 +63,11 @@ pub(crate) fn report_new_epoch_metrics( timings.apply_feature_activations_time_us, i64 ), - ("activate_epoch_us", timings.activate_epoch_time_us, i64), + ( + "calculate_activated_stake_us", + timings.calculate_activated_stake_time_us, + i64 + ), ( "update_epoch_stakes_us", timings.update_epoch_stakes_time_us, diff --git a/runtime/src/bank/partitioned_epoch_rewards/calculation.rs b/runtime/src/bank/partitioned_epoch_rewards/calculation.rs index 240b21f5e77099..fbfc54a6ded998 100644 --- a/runtime/src/bank/partitioned_epoch_rewards/calculation.rs +++ b/runtime/src/bank/partitioned_epoch_rewards/calculation.rs @@ -1,10 +1,9 @@ use { super::{ - epoch_rewards_hasher::hash_rewards_into_partitions, Bank, - CalculateRewardsAndDistributeVoteRewardsResult, CalculateValidatorRewardsResult, - EpochRewardCalculateParamInfo, PartitionedRewardsCalculation, PartitionedStakeReward, - PartitionedStakeRewards, StakeRewardCalculation, VoteRewardsAccounts, - VoteRewardsAccountsStorable, REWARD_CALCULATION_NUM_BLOCKS, + epoch_rewards_hasher::hash_rewards_into_partitions, Bank, CalculateValidatorRewardsResult, + EpochRewardCalculateParamInfo, FilteredStakeDelegations, PartitionedRewardsCalculation, + PartitionedStakeReward, PartitionedStakeRewards, StakeRewardCalculation, + VoteRewardsAccounts, VoteRewardsAccountsStorable, REWARD_CALCULATION_NUM_BLOCKS, }, crate::{ bank::{ @@ -16,15 +15,18 @@ use { redeem_rewards, }, stake_account::StakeAccount, + stake_utils, stakes::Stakes, }, + agave_feature_set as feature_set, log::{debug, info}, rayon::{ - iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}, + iter::{IndexedParallelIterator, ParallelIterator}, ThreadPool, }, solana_clock::{Epoch, Slot}, solana_measure::{measure::Measure, measure_us}, + solana_native_token::LAMPORTS_PER_SOL, solana_pubkey::Pubkey, solana_stake_interface::{stake_history::StakeHistory, state::Delegation}, solana_sysvar::epoch_rewards::EpochRewards, @@ -96,30 +98,31 @@ impl RewardsAccumulator { impl Bank { /// Begin the process of calculating and distributing rewards. /// This process can take multiple slots. + #[allow(clippy::too_many_arguments)] pub(in crate::bank) fn begin_partitioned_rewards( &mut self, - reward_calc_tracer: Option, - thread_pool: &ThreadPool, parent_epoch: Epoch, parent_slot: Slot, parent_block_height: u64, - rewards_metrics: &mut RewardsMetrics, + rewards_calculation: &PartitionedRewardsCalculation, + rewards_metrics: &RewardsMetrics, ) { - let CalculateRewardsAndDistributeVoteRewardsResult { - distributed_rewards, - point_value, - stake_rewards, - } = self.calculate_rewards_and_distribute_vote_rewards( - parent_epoch, - reward_calc_tracer, - thread_pool, - rewards_metrics, - ); + self.distribute_vote_rewards(parent_epoch, rewards_calculation, rewards_metrics); let slot = self.slot(); let distribution_starting_block_height = self.block_height() + REWARD_CALCULATION_NUM_BLOCKS; + let PartitionedRewardsCalculation { + vote_account_rewards, + stake_rewards, + point_value, + .. + } = rewards_calculation; + + let distributed_rewards = vote_account_rewards.total_vote_rewards_lamports; + let stake_rewards = Arc::clone(&stake_rewards.stake_rewards); + let num_partitions = self.get_reward_distribution_num_blocks(&stake_rewards); self.set_epoch_reward_status_calculation(distribution_starting_block_height, stake_rewards); @@ -142,13 +145,16 @@ impl Bank { } // Calculate rewards from previous epoch and distribute vote rewards - fn calculate_rewards_and_distribute_vote_rewards( + pub(in crate::bank) fn calculate_rewards( &self, + stake_history: &StakeHistory, + stake_delegations: Vec<(&Pubkey, &StakeAccount)>, + cached_vote_accounts: &VoteAccounts, prev_epoch: Epoch, reward_calc_tracer: Option, thread_pool: &ThreadPool, metrics: &mut RewardsMetrics, - ) -> CalculateRewardsAndDistributeVoteRewardsResult { + ) -> Arc { // We hold the lock here for the epoch rewards calculation cache to prevent // rewards computation across multiple forks simultaneously. This aligns with // how banks are currently created- all banks are created sequentially. @@ -173,7 +179,11 @@ impl Bank { let rewards_calculation = epoch_rewards_calculation_cache .entry(self.parent_hash) .or_insert_with(|| { + let stake_delegations = self.filter_stake_delegations(stake_delegations); Arc::new(self.calculate_rewards_for_partitioning( + stake_history, + &stake_delegations, + cached_vote_accounts, prev_epoch, reward_calc_tracer, thread_pool, @@ -183,6 +193,15 @@ impl Bank { .clone(); drop(epoch_rewards_calculation_cache); + rewards_calculation + } + + pub(in crate::bank) fn distribute_vote_rewards( + &mut self, + prev_epoch: Epoch, + rewards_calculation: &PartitionedRewardsCalculation, + rewards_metrics: &RewardsMetrics, + ) { let PartitionedRewardsCalculation { vote_account_rewards, stake_rewards, @@ -191,15 +210,16 @@ impl Bank { prev_epoch_duration_in_years, capitalization, point_value, - } = rewards_calculation.as_ref(); + .. + } = rewards_calculation; let total_vote_rewards = vote_account_rewards.total_vote_rewards_lamports; - self.store_vote_accounts_partitioned(vote_account_rewards, metrics); + self.store_vote_accounts_partitioned(vote_account_rewards, rewards_metrics); self.update_vote_rewards(vote_account_rewards); let StakeRewardCalculation { - stake_rewards, total_stake_rewards_lamports, + .. } = stake_rewards; // verify that we didn't pay any more than we expected to @@ -244,12 +264,6 @@ impl Bank { ("num_stake_accounts", num_stake_accounts, i64), ("num_vote_accounts", num_vote_accounts, i64), ); - - CalculateRewardsAndDistributeVoteRewardsResult { - distributed_rewards: total_vote_rewards, - point_value: point_value.clone(), - stake_rewards: Arc::clone(stake_rewards), - } } fn store_vote_accounts_partitioned( @@ -271,8 +285,11 @@ impl Bank { } /// Calculate rewards from previous epoch to prepare for partitioned distribution. - pub(super) fn calculate_rewards_for_partitioning( + pub(super) fn calculate_rewards_for_partitioning<'a>( &self, + stake_history: &StakeHistory, + stake_delegations: &'a FilteredStakeDelegations<'a>, + cached_vote_accounts: &VoteAccounts, prev_epoch: Epoch, reward_calc_tracer: Option, thread_pool: &ThreadPool, @@ -292,6 +309,9 @@ impl Bank { point_value, } = self .calculate_validator_rewards( + stake_history, + stake_delegations, + cached_vote_accounts, prev_epoch, validator_rewards, reward_calc_tracer, @@ -317,19 +337,21 @@ impl Bank { } /// Calculate epoch reward and return vote and stake rewards. - fn calculate_validator_rewards( + fn calculate_validator_rewards<'a>( &self, + stake_history: &StakeHistory, + stake_delegations: &'a FilteredStakeDelegations<'a>, + cached_vote_accounts: &VoteAccounts, rewarded_epoch: Epoch, rewards: u64, reward_calc_tracer: Option, thread_pool: &ThreadPool, metrics: &mut RewardsMetrics, ) -> Option { - let stakes = self.stakes_cache.stakes(); - let reward_calculate_param = self.get_epoch_reward_calculate_param_info(&stakes); - self.calculate_reward_points_partitioned( - &reward_calculate_param, + stake_history, + stake_delegations, + cached_vote_accounts, rewards, thread_pool, metrics, @@ -337,7 +359,9 @@ impl Bank { .map(|point_value| { let (vote_rewards_accounts, stake_reward_calculation) = self .calculate_stake_vote_rewards( - &reward_calculate_param, + stake_history, + stake_delegations, + cached_vote_accounts, rewarded_epoch, point_value.clone(), thread_pool, @@ -352,14 +376,39 @@ impl Bank { }) } - /// calculate and return some reward calc info to avoid recalculation across functions - fn get_epoch_reward_calculate_param_info<'a>( + pub(in crate::bank) fn filter_stake_delegations<'a>( + &self, + stake_delegations: Vec<(&'a Pubkey, &'a StakeAccount)>, + ) -> FilteredStakeDelegations<'a> { + let min_stake_delegation = if self + .feature_set + .is_active(&feature_set::stake_minimum_delegation_for_rewards::id()) + { + let min_stake_delegation = stake_utils::get_minimum_delegation( + self.feature_set + .is_active(&agave_feature_set::stake_raise_minimum_delegation_to_1_sol::id()), + ) + .max(LAMPORTS_PER_SOL); + Some(min_stake_delegation) + } else { + None + }; + FilteredStakeDelegations { + stake_delegations, + min_stake_delegation, + } + } + + /// Retrieves stake history and delegations for stake reward recalculation + /// after snapshot restore. + fn get_epoch_params_for_recalculation<'a>( &'a self, stakes: &'a Stakes>, ) -> EpochRewardCalculateParamInfo<'a> { // Use `stakes` for stake-related info let stake_history = stakes.history().clone(); - let stake_delegations = self.filter_stake_delegations(stakes); + let stake_delegations = stakes.stake_delegations_vec(); + let stake_delegations = self.filter_stake_delegations(stake_delegations); // Use `EpochStakes` for vote accounts let leader_schedule_epoch = self.epoch_schedule().get_leader_schedule_epoch(self.slot()); @@ -445,21 +494,17 @@ impl Bank { /// Calculates epoch rewards for stake/vote accounts /// Returns vote rewards, stake rewards, and the sum of all stake rewards in lamports - fn calculate_stake_vote_rewards( + fn calculate_stake_vote_rewards<'a>( &self, - reward_calculate_params: &EpochRewardCalculateParamInfo, + stake_history: &StakeHistory, + stake_delegations: &'a FilteredStakeDelegations<'a>, + cached_vote_accounts: &VoteAccounts, rewarded_epoch: Epoch, point_value: PointValue, thread_pool: &ThreadPool, reward_calc_tracer: Option, metrics: &mut RewardsMetrics, ) -> (VoteRewardsAccounts, StakeRewardCalculation) { - let EpochRewardCalculateParamInfo { - stake_history, - stake_delegations, - cached_vote_accounts, - } = reward_calculate_params; - let new_warmup_cooldown_rate_epoch = self.new_warmup_cooldown_rate_epoch(); let mut measure_redeem_rewards = Measure::start("redeem-rewards"); @@ -478,17 +523,20 @@ impl Bank { .par_iter() .zip_eq(stake_rewards.spare_capacity_mut()) .with_min_len(500) - .filter_map(|((stake_pubkey, stake_account), stake_reward_ref)| { - let maybe_reward_record = self.redeem_delegation_rewards( - rewarded_epoch, - stake_pubkey, - stake_account, - &point_value, - stake_history, - cached_vote_accounts, - reward_calc_tracer.as_ref(), - new_warmup_cooldown_rate_epoch, - ); + .filter_map(|(maybe_stake_delegation, stake_reward_ref)| { + let maybe_reward_record = + maybe_stake_delegation.and_then(|(stake_pubkey, stake_account)| { + self.redeem_delegation_rewards( + rewarded_epoch, + stake_pubkey, + stake_account, + &point_value, + stake_history, + cached_vote_accounts, + reward_calc_tracer.as_ref(), + new_warmup_cooldown_rate_epoch, + ) + }); let (stake_reward, maybe_reward_record) = match maybe_reward_record { Some(res) => { let DelegationRewards { @@ -551,24 +599,21 @@ impl Bank { /// Calculates epoch reward points from stake/vote accounts. /// Returns reward lamports and points for the epoch or none if points == 0. - fn calculate_reward_points_partitioned( + fn calculate_reward_points_partitioned<'a>( &self, - reward_calculate_params: &EpochRewardCalculateParamInfo, + stake_history: &StakeHistory, + stake_delegations: &'a FilteredStakeDelegations<'a>, + cached_vote_accounts: &VoteAccounts, rewards: u64, thread_pool: &ThreadPool, metrics: &RewardsMetrics, ) -> Option { - let EpochRewardCalculateParamInfo { - stake_history, - stake_delegations, - cached_vote_accounts, - } = reward_calculate_params; - let solana_vote_program: Pubkey = solana_vote_program::id(); let new_warmup_cooldown_rate_epoch = self.new_warmup_cooldown_rate_epoch(); let (points, measure_us) = measure_us!(thread_pool.install(|| { stake_delegations .par_iter() + .filter_map(|stake_delegation| stake_delegation) .map(|(_stake_pubkey, stake_account)| { let vote_pubkey = stake_account.delegation().voter_pubkey; @@ -637,7 +682,11 @@ impl Bank { }; let stakes = self.stakes_cache.stakes(); - let reward_calculate_param = self.get_epoch_reward_calculate_param_info(&stakes); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = self.get_epoch_params_for_recalculation(&stakes); // On recalculation, only the `StakeRewardCalculation::stake_rewards` // field is relevant. It is assumed that vote-account rewards have @@ -645,7 +694,9 @@ impl Bank { // `StakeRewardCalculation::total_rewards` only reflects rewards that // have not yet been distributed. let (_, StakeRewardCalculation { stake_rewards, .. }) = self.calculate_stake_vote_rewards( - &reward_calculate_param, + &stake_history, + &stake_delegations, + cached_vote_accounts, rewarded_epoch, point_value, thread_pool, @@ -800,7 +851,16 @@ mod tests { let mut rewards_metrics = RewardsMetrics::default(); let expected_rewards = 100_000_000_000; + let stakes = bank.stakes_cache.stakes(); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = bank.get_epoch_params_for_recalculation(&stakes); let calculated_rewards = bank.calculate_validator_rewards( + &stake_history, + &stake_delegations, + cached_vote_accounts, 1, expected_rewards, null_tracer(), @@ -846,10 +906,16 @@ mod tests { let expected_rewards = 100_000_000_000; let stakes: RwLockReadGuard>> = bank.stakes_cache.stakes(); - let reward_calculate_param = bank.get_epoch_reward_calculate_param_info(&stakes); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = bank.get_epoch_params_for_recalculation(&stakes); let point_value = bank.calculate_reward_points_partitioned( - &reward_calculate_param, + &stake_history, + &stake_delegations, + cached_vote_accounts, expected_rewards, &thread_pool, &rewards_metrics, @@ -872,10 +938,16 @@ mod tests { let rewards_metrics: RewardsMetrics = RewardsMetrics::default(); let expected_rewards = 100_000_000_000; let stakes: RwLockReadGuard>> = bank.stakes_cache.stakes(); - let reward_calculate_param = bank.get_epoch_reward_calculate_param_info(&stakes); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = bank.get_epoch_params_for_recalculation(&stakes); let point_value = bank.calculate_reward_points_partitioned( - &reward_calculate_param, + &stake_history, + &stake_delegations, + cached_vote_accounts, expected_rewards, &thread_pool, &rewards_metrics, @@ -913,15 +985,22 @@ mod tests { let reward_calc_tracer = Some(tracer); let rewarded_epoch = bank.epoch(); let stakes: RwLockReadGuard>> = bank.stakes_cache.stakes(); - let reward_calculate_param = bank.get_epoch_reward_calculate_param_info(&stakes); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = bank.get_epoch_params_for_recalculation(&stakes); let (vote_rewards_accounts, stake_reward_calculation) = bank.calculate_stake_vote_rewards( - &reward_calculate_param, + &stake_history, + &stake_delegations, + cached_vote_accounts, rewarded_epoch, point_value, &thread_pool, reward_calc_tracer, &mut rewards_metrics, ); + drop(stakes); let vote_account = bank .load_slow_with_fixed_root(&bank.ancestors, vote_pubkey) @@ -1000,6 +1079,12 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let mut rewards_metrics = RewardsMetrics::default(); + let stakes = bank.stakes_cache.stakes(); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = bank.get_epoch_params_for_recalculation(&stakes); let PartitionedRewardsCalculation { stake_rewards: StakeRewardCalculation { @@ -1008,11 +1093,15 @@ mod tests { }, .. } = bank.calculate_rewards_for_partitioning( + &stake_history, + &stake_delegations, + cached_vote_accounts, rewarded_epoch, null_tracer(), &thread_pool, &mut rewards_metrics, ); + drop(stakes); let epoch_rewards_sysvar = bank.get_epoch_rewards_sysvar(); let (recalculated_rewards, recalculated_partition_indices) = @@ -1095,6 +1184,12 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let mut rewards_metrics = RewardsMetrics::default(); + let stakes = bank.stakes_cache.stakes(); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = bank.get_epoch_params_for_recalculation(&stakes); let PartitionedRewardsCalculation { stake_rewards: StakeRewardCalculation { @@ -1103,11 +1198,15 @@ mod tests { }, .. } = bank.calculate_rewards_for_partitioning( + &stake_history, + &stake_delegations, + cached_vote_accounts, rewarded_epoch, null_tracer(), &thread_pool, &mut rewards_metrics, ); + drop(stakes); let epoch_rewards_sysvar = bank.get_epoch_rewards_sysvar(); let expected_partition_indices = hash_rewards_into_partitions( @@ -1160,6 +1259,12 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let mut rewards_metrics = RewardsMetrics::default(); + let stakes = bank.stakes_cache.stakes(); + let EpochRewardCalculateParamInfo { + stake_history, + stake_delegations, + cached_vote_accounts, + } = bank.get_epoch_params_for_recalculation(&stakes); let PartitionedRewardsCalculation { stake_rewards: StakeRewardCalculation { @@ -1169,11 +1274,15 @@ mod tests { point_value, .. } = bank.calculate_rewards_for_partitioning( + &stake_history, + &stake_delegations, + cached_vote_accounts, rewarded_epoch, null_tracer(), &thread_pool, &mut rewards_metrics, ); + drop(stakes); bank.recalculate_partitioned_rewards_if_active(|| &thread_pool); let EpochRewardStatus::Active(EpochRewardPhase::Distribution( diff --git a/runtime/src/bank/partitioned_epoch_rewards/distribution.rs b/runtime/src/bank/partitioned_epoch_rewards/distribution.rs index 882ff69de05d4b..10b08e98311c7e 100644 --- a/runtime/src/bank/partitioned_epoch_rewards/distribution.rs +++ b/runtime/src/bank/partitioned_epoch_rewards/distribution.rs @@ -445,7 +445,7 @@ mod tests { 0, 42, num_partitions, - PointValue { + &PointValue { rewards: total_rewards, points: total_points, }, diff --git a/runtime/src/bank/partitioned_epoch_rewards/mod.rs b/runtime/src/bank/partitioned_epoch_rewards/mod.rs index 5632a99603de8f..2c83e8b8727594 100644 --- a/runtime/src/bank/partitioned_epoch_rewards/mod.rs +++ b/runtime/src/bank/partitioned_epoch_rewards/mod.rs @@ -9,6 +9,7 @@ use { inflation_rewards::points::PointValue, stake_account::StakeAccount, stake_history::StakeHistory, }, + rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}, solana_account::{AccountSharedData, ReadableAccount}, solana_accounts_db::{ partitioned_rewards::PartitionedEpochRewardsConfig, @@ -231,10 +232,48 @@ impl Default for CalculateValidatorRewardsResult { } } +pub(super) struct FilteredStakeDelegations<'a> { + stake_delegations: Vec<(&'a Pubkey, &'a StakeAccount)>, + min_stake_delegation: Option, +} + +impl<'a> FilteredStakeDelegations<'a> { + pub(super) fn len(&self) -> usize { + self.stake_delegations.len() + } + + pub(super) fn par_iter( + &'a self, + ) -> impl IndexedParallelIterator)>> + { + self.stake_delegations + .par_iter() + // We yield `None` items instead of filtering them out to + // keep the number of elements predictable. It's better to + // let the callers deal with `None` elements and even store + // them in collections (that are allocated once with the + // size of `FilteredStakeDelegations::len`) rather than + // `collect` yet another time (which would take ~100ms). + .map(|(pubkey, stake_account)| { + match self.min_stake_delegation { + Some(min_stake_delegation) + if stake_account.delegation().stake < min_stake_delegation => + { + None + } + _ => { + // Dereference `&&` to `&`. + Some((*pubkey, *stake_account)) + } + } + }) + } +} + /// hold reward calc info to avoid recalculation across functions pub(super) struct EpochRewardCalculateParamInfo<'a> { pub(super) stake_history: StakeHistory, - pub(super) stake_delegations: Vec<(&'a Pubkey, &'a StakeAccount)>, + pub(super) stake_delegations: FilteredStakeDelegations<'a>, pub(super) cached_vote_accounts: &'a VoteAccounts, } @@ -252,18 +291,6 @@ pub(super) struct PartitionedRewardsCalculation { point_value: PointValue, } -pub(super) struct CalculateRewardsAndDistributeVoteRewardsResult { - /// distributed vote rewards - pub(super) distributed_rewards: u64, - /// total rewards and points calculated for the current epoch, where points - /// equals the sum of (delegated stake * credits observed) for all - /// delegations and rewards are the lamports to split across all stake and - /// vote accounts - pub(super) point_value: PointValue, - /// stake rewards that still need to be distributed - pub(super) stake_rewards: Arc, -} - pub(crate) type StakeRewards = Vec; #[derive(Debug, PartialEq)] diff --git a/runtime/src/bank/partitioned_epoch_rewards/sysvar.rs b/runtime/src/bank/partitioned_epoch_rewards/sysvar.rs index aaddeb1826b421..b1ee3f2c3120a8 100644 --- a/runtime/src/bank/partitioned_epoch_rewards/sysvar.rs +++ b/runtime/src/bank/partitioned_epoch_rewards/sysvar.rs @@ -26,7 +26,7 @@ impl Bank { distributed_rewards: u64, distribution_starting_block_height: u64, num_partitions: u64, - point_value: PointValue, + point_value: &PointValue, ) { assert!(point_value.rewards >= distributed_rewards); @@ -145,7 +145,7 @@ mod tests { sysvar::epoch_rewards::EpochRewards::default() ); - bank.create_epoch_rewards_sysvar(10, 42, num_partitions, point_value.clone()); + bank.create_epoch_rewards_sysvar(10, 42, num_partitions, &point_value); let account = bank.get_account(&sysvar::epoch_rewards::id()).unwrap(); let expected_balance = bank.get_minimum_balance_for_rent_exemption(account.data().len()); // Expected balance is the sysvar rent-exempt balance @@ -159,7 +159,7 @@ mod tests { let bank = Bank::new_from_parent(Arc::new(bank), &Pubkey::default(), parent_slot + 1); // Also note that running `create_epoch_rewards_sysvar()` against a bank // with an existing EpochRewards sysvar clobbers the previous values - bank.create_epoch_rewards_sysvar(10, 42, num_partitions, point_value.clone()); + bank.create_epoch_rewards_sysvar(10, 42, num_partitions, &point_value); let expected_epoch_rewards = sysvar::epoch_rewards::EpochRewards { distribution_starting_block_height: 42, diff --git a/runtime/src/bank/sysvar_cache.rs b/runtime/src/bank/sysvar_cache.rs index 99ef2600ba0374..2f43a94f4c10c9 100644 --- a/runtime/src/bank/sysvar_cache.rs +++ b/runtime/src/bank/sysvar_cache.rs @@ -121,7 +121,7 @@ mod tests { expected_epoch_rewards.distributed_rewards, expected_epoch_rewards.distribution_starting_block_height, num_partitions, - PointValue { + &PointValue { rewards: 100, points: total_points, }, diff --git a/runtime/src/bank/tests.rs b/runtime/src/bank/tests.rs index 73d9cc30eea8d9..77bb99d0847c96 100644 --- a/runtime/src/bank/tests.rs +++ b/runtime/src/bank/tests.rs @@ -584,7 +584,8 @@ impl Bank { reward_calc_tracer: Option, ) -> StakeDelegationsMap { let stakes = self.stakes_cache.stakes(); - let stake_delegations = self.filter_stake_delegations(&stakes); + let stake_delegations = stakes.stake_delegations_vec(); + let stake_delegations = self.filter_stake_delegations(stake_delegations); // Obtain all unique voter pubkeys from stake delegations. fn merge(mut acc: HashSet, other: HashSet) -> HashSet { if acc.len() < other.len() { @@ -596,6 +597,7 @@ impl Bank { let voter_pubkeys = thread_pool.install(|| { stake_delegations .par_iter() + .filter_map(|stake_delegation| stake_delegation) .fold( HashSet::default, |mut voter_pubkeys, (_stake_pubkey, stake_account)| { @@ -663,7 +665,8 @@ impl Bank { }; thread_pool.install(|| { stake_delegations - .into_par_iter() + .par_iter() + .filter_map(|stake_delegation| stake_delegation) .for_each(push_stake_delegation); }); stake_delegations_map diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index b8e37d8847736a..d1a25b157ba032 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -145,11 +145,11 @@ impl StakesCache { pub(crate) fn activate_epoch( &self, next_epoch: Epoch, - thread_pool: &ThreadPool, - new_rate_activation_epoch: Option, + stake_history: StakeHistory, + vote_accounts: VoteAccounts, ) { let mut stakes = self.0.write().unwrap(); - stakes.activate_epoch(next_epoch, thread_pool, new_rate_activation_epoch) + stakes.activate_epoch(next_epoch, stake_history, vote_accounts) } } @@ -282,40 +282,55 @@ impl Stakes { &self.stake_history } - fn activate_epoch( - &mut self, + pub(crate) fn calculate_activated_stake( + &self, next_epoch: Epoch, thread_pool: &ThreadPool, new_rate_activation_epoch: Option, - ) { - let stake_delegations: Vec<_> = self.stake_delegations.values().collect(); + stake_delegations: &[(&Pubkey, &StakeAccount)], + ) -> (StakeHistory, VoteAccounts) { // Wrap up the prev epoch by adding new stake history entry for the // prev epoch. let stake_history_entry = thread_pool.install(|| { stake_delegations .par_iter() - .fold(StakeActivationStatus::default, |acc, stake_account| { - let delegation = stake_account.delegation(); - acc + delegation.stake_activating_and_deactivating( - self.epoch, - &self.stake_history, - new_rate_activation_epoch, - ) - }) + .fold( + StakeActivationStatus::default, + |acc, (_stake_pubkey, stake_account)| { + let delegation = stake_account.delegation(); + acc + delegation.stake_activating_and_deactivating( + self.epoch, + &self.stake_history, + new_rate_activation_epoch, + ) + }, + ) .reduce(StakeActivationStatus::default, Add::add) }); - self.stake_history.add(self.epoch, stake_history_entry); - self.epoch = next_epoch; + let mut stake_history = self.stake_history.clone(); + stake_history.add(self.epoch, stake_history_entry); // Refresh the stake distribution of vote accounts for the next epoch, // using new stake history. - self.vote_accounts = refresh_vote_accounts( + let vote_accounts = refresh_vote_accounts( thread_pool, - self.epoch, + next_epoch, &self.vote_accounts, - &stake_delegations, - &self.stake_history, + stake_delegations, + &stake_history, new_rate_activation_epoch, ); + (stake_history, vote_accounts) + } + + pub(crate) fn activate_epoch( + &mut self, + next_epoch: Epoch, + stake_history: StakeHistory, + vote_accounts: VoteAccounts, + ) { + self.epoch = next_epoch; + self.stake_history = stake_history; + self.vote_accounts = vote_accounts; } /// Sum the stakes that point to the given voter_pubkey @@ -403,10 +418,37 @@ impl Stakes { } } + /// Returns a reference to the map of stake delegations. + /// + /// # Performance + /// + /// `[im::HashMap]` is a [hash array mapped trie (HAMT)][hamt], which means + /// that inserts, deletions and lookups are average-case O(1) and + /// worst-case O(log n). However, the performance of iterations is poor due + /// to depth-first traversal and jumps. Currently it's also impossible to + /// iterate over it with [`rayon`]. + /// + /// [hamt]: https://en.wikipedia.org/wiki/Hash_array_mapped_trie pub(crate) fn stake_delegations(&self) -> &ImHashMap { &self.stake_delegations } + /// Collects stake delegations into a vector, which then can be used for + /// parallel iteration with [`rayon`]. + /// + /// # Performance + /// + /// The execution of this method takes ~200ms and it collects elements of + /// the [`im::HashMap`], which is a [hash array mapped trie (HAMT)][hamt], + /// so that operation involves a depth-first traversal with jumps. However, + /// it's still a reasonable tradeoff if the caller iterates over these + /// elements. + /// + /// [hamt]: https://en.wikipedia.org/wiki/Hash_array_mapped_trie + pub(crate) fn stake_delegations_vec(&self) -> Vec<(&Pubkey, &StakeAccount)> { + self.stake_delegations.iter().collect() + } + pub(crate) fn highest_staked_node(&self) -> Option<&Pubkey> { let vote_account = self.vote_accounts.find_max_by_delegated_stake()?; Some(vote_account.node_pubkey()) @@ -477,7 +519,7 @@ fn refresh_vote_accounts( thread_pool: &ThreadPool, epoch: Epoch, vote_accounts: &VoteAccounts, - stake_delegations: &[&StakeAccount], + stake_delegations: &[(&Pubkey, &StakeAccount)], stake_history: &StakeHistory, new_rate_activation_epoch: Option, ) -> VoteAccounts { @@ -494,12 +536,15 @@ fn refresh_vote_accounts( let delegated_stakes = thread_pool.install(|| { stake_delegations .par_iter() - .fold(HashMap::default, |mut delegated_stakes, stake_account| { - let delegation = stake_account.delegation(); - let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default(); - *entry += delegation.stake(epoch, stake_history, new_rate_activation_epoch); - delegated_stakes - }) + .fold( + HashMap::default, + |mut delegated_stakes, (_stake_pubkey, stake_account)| { + let delegation = stake_account.delegation(); + let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default(); + *entry += delegation.stake(epoch, stake_history, new_rate_activation_epoch); + delegated_stakes + }, + ) .reduce(HashMap::default, merge) }); vote_accounts @@ -850,7 +895,13 @@ pub(crate) mod tests { ); } let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - stakes_cache.activate_epoch(3, &thread_pool, None); + let next_epoch = 3; + let (stake_history, vote_accounts) = { + let stakes = stakes_cache.stakes(); + let stake_delegations = stakes.stake_delegations_vec(); + stakes.calculate_activated_stake(next_epoch, &thread_pool, None, &stake_delegations) + }; + stakes_cache.activate_epoch(next_epoch, stake_history, vote_accounts); { let stakes = stakes_cache.stakes(); let vote_accounts = stakes.vote_accounts();