diff --git a/crates/fuel-core/src/service/sub_services/algorithm_updater.rs b/crates/fuel-core/src/service/sub_services/algorithm_updater.rs index 002ecebe039..096178ee34e 100644 --- a/crates/fuel-core/src/service/sub_services/algorithm_updater.rs +++ b/crates/fuel-core/src/service/sub_services/algorithm_updater.rs @@ -15,11 +15,7 @@ use crate::{ use fuel_core_gas_price_service::{ fuel_gas_price_updater::{ - da_source_adapter::{ - dummy_costs::DummyDaBlockCosts, - DaBlockCostsProvider, - DaBlockCostsSharedState, - }, + da_source_adapter::dummy_costs::DummyDaBlockCosts, fuel_core_storage_adapter::{ storage::GasPriceMetadata, FuelL2BlockSource, @@ -39,7 +35,6 @@ use fuel_core_gas_price_service::{ use fuel_core_services::{ stream::BoxStream, RunnableService, - Service, StateWatcher, }; use fuel_core_storage::{ @@ -60,11 +55,7 @@ use fuel_core_types::{ services::block_importer::SharedImportResult, }; -type Updater = FuelGasPriceUpdater< - FuelL2BlockSource, - MetadataStorageAdapter, - DaBlockCostsSharedState, ->; +type Updater = FuelGasPriceUpdater; pub struct InitializeTask { pub config: Config, @@ -74,13 +65,17 @@ pub struct InitializeTask { pub on_chain_db: Database>, pub block_stream: BoxStream, pub shared_algo: SharedGasPriceAlgo, - pub da_block_costs_provider: DaBlockCostsProvider, } type MetadataStorageAdapter = StructuredStorage>>; -type Task = GasPriceService; +type Task = GasPriceService< + Algorithm, + Updater, + FuelL2BlockSource, + DummyDaBlockCosts, +>; impl InitializeTask { pub fn new( @@ -98,12 +93,6 @@ impl InitializeTask { let default_metadata = get_default_metadata(&config, latest_block_height); let algo = get_best_algo(&gas_price_db, default_metadata)?; let shared_algo = SharedGasPriceAlgo::new_with_algorithm(algo); - // there's no use of this source yet, so we can safely return an error - let da_block_costs_source = - DummyDaBlockCosts::new(Err(anyhow::anyhow!("Not used"))); - let da_block_costs_provider = - DaBlockCostsProvider::new(da_block_costs_source, None); - let task = Self { config, genesis_block_height, @@ -112,7 +101,6 @@ impl InitializeTask { on_chain_db, block_stream, shared_algo, - da_block_costs_provider, }; Ok(task) } @@ -170,19 +158,26 @@ impl RunnableService for InitializeTask { let updater = get_synced_gas_price_updater( self.config, self.genesis_block_height, - self.settings, + self.settings.clone(), self.gas_price_db, self.on_chain_db, - self.block_stream, - self.da_block_costs_provider.shared_state, )?; - self.da_block_costs_provider - .service - .start_and_await() - .await?; - let inner_service = - GasPriceService::new(starting_height, updater, self.shared_algo).await; + let l2_block_source = FuelL2BlockSource::new( + self.genesis_block_height, + self.settings, + self.block_stream, + ); + let da_block_cost_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("unused"))); + + let inner_service = GasPriceService::new( + starting_height, + updater, + self.shared_algo, + l2_block_source, + da_block_cost_source, + ) + .await; Ok(inner_service) } } @@ -193,8 +188,6 @@ pub fn get_synced_gas_price_updater( settings: ConsensusParametersProvider, mut gas_price_db: Database>, on_chain_db: Database>, - block_stream: BoxStream, - da_block_costs: DaBlockCostsSharedState, ) -> anyhow::Result { let mut first_run = false; let latest_block_height: u32 = on_chain_db @@ -222,16 +215,9 @@ pub fn get_synced_gas_price_updater( } let mut metadata_storage = StructuredStorage::new(gas_price_db); - let l2_block_source = - FuelL2BlockSource::new(genesis_block_height, settings.clone(), block_stream); if BlockHeight::from(latest_block_height) == genesis_block_height || first_run { - let updater = FuelGasPriceUpdater::new( - default_metadata.into(), - l2_block_source, - metadata_storage, - da_block_costs, - ); + let updater = FuelGasPriceUpdater::new(default_metadata.into(), metadata_storage); Ok(updater) } else { if latest_block_height > metadata_height { @@ -246,9 +232,7 @@ pub fn get_synced_gas_price_updater( FuelGasPriceUpdater::init( latest_block_height.into(), - l2_block_source, metadata_storage, - da_block_costs, config.min_gas_price, config.gas_price_change_percent, config.gas_price_threshold_percent, diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater.rs index bef9795266b..d1c5381a4d0 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater.rs @@ -24,12 +24,9 @@ pub mod fuel_core_storage_adapter; pub mod da_source_adapter; -pub struct FuelGasPriceUpdater { +pub struct FuelGasPriceUpdater { inner: AlgorithmUpdater, - l2_block_source: L2, metadata_storage: Metadata, - #[allow(dead_code)] - da_block_costs: DaBlockCosts, } #[derive(Debug, Clone, PartialEq)] @@ -54,29 +51,23 @@ impl AlgorithmUpdater { } } -impl FuelGasPriceUpdater { - pub fn new( - inner: AlgorithmUpdater, - l2_block_source: L2, - metadata_storage: Metadata, - da_block_costs: DaBlockCosts, - ) -> Self { +impl FuelGasPriceUpdater { + pub fn new(inner: AlgorithmUpdater, metadata_storage: Metadata) -> Self { Self { inner, - l2_block_source, metadata_storage, - da_block_costs, } } } #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("Failed to find L2 block at height {block_height:?}: {source_error:?}")] - CouldNotFetchL2Block { - block_height: BlockHeight, - source_error: anyhow::Error, - }, + #[error("Failed to find L2 block: {source_error:?}")] + CouldNotFetchL2Block { source_error: anyhow::Error }, + #[error("Failed to compute: {source_error:?}")] + MathError { source_error: anyhow::Error }, + #[error("No Mint transaction in block")] + NoMintTx, #[error("Failed to find DA records: {0:?}")] CouldNotFetchDARecord(anyhow::Error), #[error("Failed to retrieve updater metadata: {source_error:?}")] @@ -111,7 +102,7 @@ pub enum BlockInfo { } #[async_trait::async_trait] pub trait L2BlockSource: Send + Sync { - async fn get_l2_block(&mut self, height: BlockHeight) -> Result; + async fn get_l2_block(&mut self) -> Result; } #[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] @@ -121,10 +112,6 @@ pub struct DaBlockCosts { pub blob_cost_wei: u128, } -pub trait GetDaBlockCosts: Send + Sync { - fn get(&self) -> Result>; -} - #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] pub enum UpdaterMetadata { V0(V0Metadata), @@ -206,16 +193,13 @@ pub trait MetadataStorage: Send + Sync { fn set_metadata(&mut self, metadata: UpdaterMetadata) -> Result<()>; } -impl FuelGasPriceUpdater +impl FuelGasPriceUpdater where Metadata: MetadataStorage, - DaBlockCosts: GetDaBlockCosts, { pub fn init( target_block_height: BlockHeight, - l2_block_source: L2, metadata_storage: Metadata, - da_block_costs: DaBlockCosts, min_exec_gas_price: u64, exec_gas_price_change_percent: u64, l2_block_fullness_threshold_percent: u64, @@ -240,9 +224,7 @@ where }; let updater = Self { inner, - l2_block_source, metadata_storage, - da_block_costs, }; Ok(updater) } @@ -255,17 +237,18 @@ where .ok_or_else(|| anyhow!("Block gas capacity must be non-zero")) } - async fn set_metadata(&mut self) -> anyhow::Result<()> { + fn set_metadata(&mut self) -> anyhow::Result<()> { self.metadata_storage .set_metadata(self.inner.clone().into()) .map_err(|err| anyhow!(err)) } - async fn handle_normal_block( + fn handle_normal_block( &mut self, height: u32, gas_used: u64, block_gas_capacity: u64, + _da_block_costs: Option, ) -> anyhow::Result<()> { let capacity = self.validate_block_gas_capacity(block_gas_capacity)?; @@ -280,25 +263,30 @@ where } } - self.set_metadata().await?; + self.set_metadata()?; Ok(()) } - async fn apply_block_info_to_gas_algorithm( + fn apply_block_info_to_gas_algorithm( &mut self, l2_block: BlockInfo, + da_block_costs: Option, ) -> anyhow::Result<()> { match l2_block { BlockInfo::GenesisBlock => { - self.set_metadata().await?; + self.set_metadata()?; } BlockInfo::Block { height, gas_used, block_gas_capacity, } => { - self.handle_normal_block(height, gas_used, block_gas_capacity) - .await?; + self.handle_normal_block( + height, + gas_used, + block_gas_capacity, + da_block_costs, + )?; } } Ok(()) @@ -306,12 +294,9 @@ where } #[async_trait::async_trait] -impl UpdateAlgorithm - for FuelGasPriceUpdater +impl UpdateAlgorithm for FuelGasPriceUpdater where - L2: L2BlockSource, Metadata: MetadataStorage + Send + Sync, - DaBlockCosts: GetDaBlockCosts, { type Algorithm = Algorithm; @@ -319,15 +304,12 @@ where self.inner.algorithm() } - async fn next(&mut self) -> anyhow::Result { - let l2_block_res = self - .l2_block_source - .get_l2_block(self.inner.l2_block_height()) - .await; - tracing::info!("Received L2 block result: {:?}", l2_block_res); - let l2_block = l2_block_res?; - - self.apply_block_info_to_gas_algorithm(l2_block).await?; + fn next( + &mut self, + l2_block: BlockInfo, + da_block_costs: Option, + ) -> anyhow::Result { + self.apply_block_info_to_gas_algorithm(l2_block, da_block_costs)?; Ok(self.inner.algorithm()) } diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs index 8323077c07b..0a2cf27bd5c 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs @@ -1,80 +1,17 @@ -use crate::fuel_gas_price_updater::{ - da_source_adapter::service::{ - new_service, - DaBlockCostsService, - DaBlockCostsSource, - }, - DaBlockCosts, - GetDaBlockCosts, - Result as GasPriceUpdaterResult, -}; -use fuel_core_services::ServiceRunner; -use std::{ - sync::Arc, - time::Duration, -}; -use tokio::sync::{ - mpsc, - Mutex, -}; - pub mod block_committer_costs; pub mod dummy_costs; pub mod service; -pub const POLLING_INTERVAL_MS: u64 = 10_000; - -#[derive(Clone)] -pub struct DaBlockCostsSharedState { - inner: Arc>>, -} - -impl DaBlockCostsSharedState { - fn new(receiver: mpsc::Receiver) -> Self { - Self { - inner: Arc::new(Mutex::new(receiver)), - } - } -} - -pub struct DaBlockCostsProvider { - pub service: ServiceRunner>, - pub shared_state: DaBlockCostsSharedState, -} - -const CHANNEL_BUFFER_SIZE: usize = 10; - -impl DaBlockCostsProvider -where - T: DaBlockCostsSource, -{ - pub fn new(source: T, polling_interval: Option) -> Self { - let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE); - let service = new_service(source, sender, polling_interval); - Self { - shared_state: DaBlockCostsSharedState::new(receiver), - service, - } - } -} - -impl GetDaBlockCosts for DaBlockCostsSharedState { - fn get(&self) -> GasPriceUpdaterResult> { - if let Ok(mut guard) = self.inner.try_lock() { - if let Ok(da_block_costs) = guard.try_recv() { - return Ok(Some(da_block_costs)); - } - } - Ok(None) - } -} - #[allow(non_snake_case)] #[cfg(test)] mod tests { - use super::*; - use crate::fuel_gas_price_updater::da_source_adapter::dummy_costs::DummyDaBlockCosts; - use fuel_core_services::Service; + use crate::fuel_gas_price_updater::{ + da_source_adapter::{ + dummy_costs::DummyDaBlockCosts, + service::new_provider, + }, + DaBlockCosts, + }; use std::time::Duration; use tokio::time::sleep; @@ -87,21 +24,17 @@ mod tests { blob_cost_wei: 2, }; let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); - let provider = DaBlockCostsProvider::new( - da_block_costs_source, - Some(Duration::from_millis(1)), - ); - let shared_state = provider.shared_state.clone(); + let mut provider = + new_provider(da_block_costs_source, Some(Duration::from_millis(1))); // when - provider.service.start_and_await().await.unwrap(); + provider.start_and_await().await.unwrap(); sleep(Duration::from_millis(10)).await; - provider.service.stop_and_await().await.unwrap(); + provider.stop_and_await().await.unwrap(); // then - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_some()); - assert_eq!(da_block_costs_opt.unwrap(), expected_da_cost); + let da_block_costs = provider.recv().await.unwrap(); + assert_eq!(da_block_costs, expected_da_cost); } #[tokio::test] @@ -110,45 +43,38 @@ mod tests { let expected_da_cost = DaBlockCosts { l2_block_range: 0..10, blob_size_bytes: 1024 * 128, - blob_cost_wei: 1, + blob_cost_wei: 2, }; let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); - let provider = DaBlockCostsProvider::new( - da_block_costs_source, - Some(Duration::from_millis(1)), - ); - let shared_state = provider.shared_state.clone(); + let mut provider = + new_provider(da_block_costs_source, Some(Duration::from_millis(1))); // when - provider.service.start_and_await().await.unwrap(); + provider.start_and_await().await.unwrap(); sleep(Duration::from_millis(10)).await; - provider.service.stop_and_await().await.unwrap(); - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_some()); - assert_eq!(da_block_costs_opt.unwrap(), expected_da_cost); + provider.stop_and_await().await.unwrap(); + let da_block_costs = provider.recv().await.unwrap(); + assert_eq!(da_block_costs, expected_da_cost); // then - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_none()); + let da_block_costs_opt = provider.try_recv(); + assert!(da_block_costs_opt.is_err()); } #[tokio::test] async fn run__when_da_block_cost_source_errors_shared_value_is_not_updated() { // given let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!"))); - let provider = DaBlockCostsProvider::new( - da_block_costs_source, - Some(Duration::from_millis(1)), - ); - let shared_state = provider.shared_state.clone(); + let mut provider = + new_provider(da_block_costs_source, Some(Duration::from_millis(1))); // when - provider.service.start_and_await().await.unwrap(); + provider.start_and_await().await.unwrap(); sleep(Duration::from_millis(10)).await; - provider.service.stop_and_await().await.unwrap(); + provider.stop_and_await().await.unwrap(); // then - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_none()); + let da_block_costs_opt = provider.try_recv(); + assert!(da_block_costs_opt.is_err()); } } diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs index 23fc9866120..a738dd53689 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs @@ -1,10 +1,8 @@ -use crate::fuel_gas_price_updater::{ - da_source_adapter::POLLING_INTERVAL_MS, - DaBlockCosts, -}; +use crate::fuel_gas_price_updater::DaBlockCosts; use fuel_core_services::{ RunnableService, RunnableTask, + Service, ServiceRunner, StateWatcher, }; @@ -13,7 +11,7 @@ use std::{ time::Duration, }; use tokio::{ - sync::mpsc::Sender, + sync::broadcast::Sender, time::{ interval, Interval, @@ -21,16 +19,53 @@ use tokio::{ }; pub use anyhow::Result; +use tokio::sync::broadcast::Receiver; + +pub struct DaBlockCostsProvider { + handle: ServiceRunner>, + subscription: Receiver, +} + +impl DaBlockCostsProvider +where + Source: DaBlockCostsSource, +{ + pub async fn start_and_await(&self) -> Result<()> { + self.handle.start_and_await().await?; + Ok(()) + } + + pub async fn stop_and_await(&self) -> Result<()> { + self.handle.stop_and_await().await?; + Ok(()) + } + + pub async fn recv(&mut self) -> Result { + self.subscription.recv().await.map_err(Into::into) + } + + pub fn try_recv(&mut self) -> Result { + self.subscription.try_recv().map_err(Into::into) + } +} + +#[derive(Clone)] +struct DaBlockCostsServiceSharedState { + sender: Sender, +} + +const POLLING_INTERVAL_MS: u64 = 10_000; +const DA_BLOCK_COSTS_BUFFER_SIZE: usize = 10; /// This struct houses the shared_state, polling interval /// and a source, which does the actual fetching of the data -pub struct DaBlockCostsService +struct DaBlockCostsService where Source: DaBlockCostsSource, { poll_interval: Interval, + shared: DaBlockCostsServiceSharedState, source: Source, - sender: Sender, cache: HashSet, } @@ -38,14 +73,11 @@ impl DaBlockCostsService where Source: DaBlockCostsSource, { - pub fn new( - source: Source, - sender: Sender, - poll_interval: Option, - ) -> Self { + fn new(source: Source, poll_interval: Option) -> Self { + let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_BUFFER_SIZE); #[allow(clippy::arithmetic_side_effects)] Self { - sender, + shared: DaBlockCostsServiceSharedState { sender }, poll_interval: interval( poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)), ), @@ -69,13 +101,15 @@ where { const NAME: &'static str = "DaBlockCostsService"; - type SharedData = (); + type SharedData = DaBlockCostsServiceSharedState; type Task = Self; type TaskParams = (); - fn shared_data(&self) -> Self::SharedData {} + fn shared_data(&self) -> Self::SharedData { + self.shared.clone() + } async fn into_task( mut self, @@ -106,7 +140,7 @@ where let da_block_costs = self.source.request_da_block_cost().await?; if !self.cache.contains(&da_block_costs) { self.cache.insert(da_block_costs.clone()); - self.sender.send(da_block_costs).await?; + self.shared.sender.send(da_block_costs)?; } continue_running = true; } @@ -121,10 +155,14 @@ where } } -pub fn new_service( +pub fn new_provider( da_source: S, - sender: Sender, poll_interval: Option, -) -> ServiceRunner> { - ServiceRunner::new(DaBlockCostsService::new(da_source, sender, poll_interval)) +) -> DaBlockCostsProvider { + let handle = ServiceRunner::new(DaBlockCostsService::new(da_source, poll_interval)); + let subscription = handle.shared.sender.subscribe(); + DaBlockCostsProvider { + handle, + subscription, + } } diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter.rs index 21ac241be9e..961402868f7 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter.rs @@ -124,9 +124,7 @@ fn get_block_info( block_gas_limit: u64, ) -> GasPriceResult { let (fee, gas_price) = mint_values(block)?; - let height = *block.header().height(); - let used_gas = - block_used_gas(height, fee, gas_price, gas_price_factor, block_gas_limit)?; + let used_gas = block_used_gas(fee, gas_price, gas_price_factor, block_gas_limit)?; let info = BlockInfo::Block { height: (*block.header().height()).into(), gas_used: used_gas, @@ -140,14 +138,10 @@ fn mint_values(block: &Block) -> GasPriceResult<(u64, u64)> { .transactions() .last() .and_then(|tx| tx.as_mint()) - .ok_or(GasPriceError::CouldNotFetchL2Block { - block_height: *block.header().height(), - source_error: anyhow!("Block has no mint transaction"), - })?; + .ok_or(GasPriceError::NoMintTx)?; Ok((*mint.mint_amount(), *mint.gas_price())) } fn block_used_gas( - block_height: BlockHeight, fee: u64, gas_price: u64, gas_price_factor: u64, @@ -155,8 +149,7 @@ fn block_used_gas( ) -> GasPriceResult { let scaled_fee = fee.checked_mul(gas_price_factor) - .ok_or(GasPriceError::CouldNotFetchL2Block { - block_height, + .ok_or(GasPriceError::MathError { source_error: anyhow!( "Failed to scale fee by gas price factor, overflow" ), @@ -172,14 +165,13 @@ impl L2BlockSource for FuelL2BlockSource where Settings: GasPriceSettingsProvider + Send + Sync, { - async fn get_l2_block(&mut self, height: BlockHeight) -> GasPriceResult { + async fn get_l2_block(&mut self) -> GasPriceResult { let block = &self .committed_block_stream .next() .await .ok_or({ GasPriceError::CouldNotFetchL2Block { - block_height: height, source_error: anyhow!("No committed block found"), } })? @@ -188,7 +180,6 @@ where match block.header().height().cmp(&self.genesis_block_height) { std::cmp::Ordering::Less => Err(GasPriceError::CouldNotFetchL2Block { - block_height: height, source_error: anyhow!("Block precedes expected genesis block height"), }), std::cmp::Ordering::Equal => Ok(BlockInfo::GenesisBlock), diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter/l2_source_tests.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter/l2_source_tests.rs index 06bc49b342e..dcc787833dd 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter/l2_source_tests.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater/fuel_core_storage_adapter/l2_source_tests.rs @@ -102,7 +102,6 @@ async fn get_l2_block__gets_expected_value() { let params = params(); let height = 1u32.into(); let (block, _mint) = build_block(¶ms.chain_id(), height); - let block_height = 1u32.into(); let genesis_block_height = 0u32.into(); let gas_price_factor = 100; let block_gas_limit = 1000; @@ -116,7 +115,7 @@ async fn get_l2_block__gets_expected_value() { let mut source = l2_source(genesis_block_height, settings, block_stream); // when - let actual = source.get_l2_block(block_height).await.unwrap(); + let actual = source.get_l2_block().await.unwrap(); // then assert_eq!(expected, actual); @@ -139,7 +138,7 @@ async fn get_l2_block__waits_for_block() { let mut source = l2_source(genesis_block_height, settings, block_stream); // when - let mut fut_l2_block = source.get_l2_block(block_height); + let mut fut_l2_block = source.get_l2_block(); for _ in 0..10 { fut_l2_block = match maybe_done(fut_l2_block) { MaybeDone::Future(fut) => { @@ -194,7 +193,7 @@ async fn get_l2_block__calculates_gas_used_correctly() { let mut source = l2_source(genesis_block_height, settings, block_stream); // when - let result = source.get_l2_block(block_height).await.unwrap(); + let result = source.get_l2_block().await.unwrap(); // then let BlockInfo::Block { @@ -235,7 +234,7 @@ async fn get_l2_block__calculates_block_gas_capacity_correctly() { let mut source = l2_source(genesis_block_height, settings, block_stream); // when - let result = source.get_l2_block(block_height).await.unwrap(); + let result = source.get_l2_block().await.unwrap(); // then let BlockInfo::Block { @@ -269,7 +268,7 @@ async fn get_l2_block__if_block_precedes_genesis_block_throw_an_error() { let mut source = l2_source(genesis_block_height, settings, block_stream); // when - let error = source.get_l2_block(block_height).await.unwrap_err(); + let error = source.get_l2_block().await.unwrap_err(); // then assert!(matches!(error, GasPriceError::CouldNotFetchL2Block { .. })); diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater/tests.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater/tests.rs index 6a4a669e059..65859af137f 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater/tests.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater/tests.rs @@ -2,28 +2,6 @@ use super::*; use std::sync::Arc; -use tokio::sync::mpsc::Receiver; - -struct FakeL2BlockSource { - l2_block: Receiver, -} - -#[async_trait::async_trait] -impl L2BlockSource for FakeL2BlockSource { - async fn get_l2_block(&mut self, _height: BlockHeight) -> Result { - let block = self.l2_block.recv().await.unwrap(); - Ok(block) - } -} - -struct PendingL2BlockSource; - -#[async_trait::async_trait] -impl L2BlockSource for PendingL2BlockSource { - async fn get_l2_block(&mut self, _height: BlockHeight) -> Result { - futures::future::pending().await - } -} struct FakeMetadata { inner: Arc>>, @@ -75,30 +53,6 @@ fn different_arb_metadata() -> UpdaterMetadata { }) } -#[derive(Default, Clone)] -struct FakeDaSource { - called: Arc>, -} - -impl FakeDaSource { - fn new() -> Self { - Self { - called: Arc::new(std::sync::Mutex::new(false)), - } - } - - fn was_called(&self) -> bool { - *self.called.lock().unwrap() - } -} - -impl GetDaBlockCosts for FakeDaSource { - fn get(&self) -> Result> { - *self.called.lock().unwrap() = true; - Ok(Some(DaBlockCosts::default())) - } -} - #[tokio::test] async fn next__fetches_l2_block() { // given @@ -107,36 +61,17 @@ async fn next__fetches_l2_block() { gas_used: 60, block_gas_capacity: 100, }; - let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); - let l2_block_source = FakeL2BlockSource { - l2_block: l2_block_receiver, - }; let metadata_storage = FakeMetadata::empty(); let starting_metadata = arb_metadata(); - let fake_da_source = FakeDaSource::new(); - let mut updater = FuelGasPriceUpdater::new( - starting_metadata.into(), - l2_block_source, - metadata_storage, - fake_da_source.clone(), - ); + let mut updater = + FuelGasPriceUpdater::new(starting_metadata.into(), metadata_storage); let start = updater.start(0.into()); // when - let next = tokio::spawn(async move { updater.next().await }); - - l2_block_sender.send(l2_block).await.unwrap(); - let new = next.await.unwrap().unwrap(); - + let new = updater.next(l2_block, None).unwrap(); // then assert_ne!(start, new); - match start { - Algorithm::V0(_) => {} - Algorithm::V1(_) => { - assert!(fake_da_source.was_called()); - } - } } #[tokio::test] @@ -147,29 +82,18 @@ async fn next__new_l2_block_saves_old_metadata() { gas_used: 60, block_gas_capacity: 100, }; - let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); - let l2_block_source = FakeL2BlockSource { - l2_block: l2_block_receiver, - }; let metadata_inner = Arc::new(std::sync::Mutex::new(None)); let metadata_storage = FakeMetadata { inner: metadata_inner.clone(), }; let starting_metadata = arb_metadata(); - let mut updater = FuelGasPriceUpdater::new( - starting_metadata.into(), - l2_block_source, - metadata_storage, - FakeDaSource::default(), - ); + let mut updater = + FuelGasPriceUpdater::new(starting_metadata.into(), metadata_storage); let start = updater.start(0.into()); // when - let next = tokio::spawn(async move { updater.next().await }); - - l2_block_sender.send(l2_block).await.unwrap(); - let new = next.await.unwrap().unwrap(); + let new = updater.next(l2_block, None).unwrap(); // then assert_ne!(start, new); @@ -183,7 +107,6 @@ async fn init__if_exists_already_reload_old_values_with_overrides() { let metadata_storage = FakeMetadata { inner: metadata_inner, }; - let l2_block_source = PendingL2BlockSource; let new_min_exec_gas_price = 99; let new_exec_gas_price_change_percent = 88; let new_l2_block_fullness_threshold_percent = 77; @@ -192,9 +115,7 @@ async fn init__if_exists_already_reload_old_values_with_overrides() { let height = original.l2_block_height(); let updater = FuelGasPriceUpdater::init( height, - l2_block_source, metadata_storage, - FakeDaSource::default(), new_min_exec_gas_price, new_exec_gas_price_change_percent, new_l2_block_fullness_threshold_percent, @@ -218,20 +139,11 @@ async fn init__if_exists_already_reload_old_values_with_overrides() { async fn init__if_it_does_not_exist_fail() { // given let metadata_storage = FakeMetadata::empty(); - let l2_block_source = PendingL2BlockSource; // when let metadata = different_arb_metadata(); let height = u32::from(metadata.l2_block_height()) + 1; - let res = FuelGasPriceUpdater::init( - height.into(), - l2_block_source, - metadata_storage, - FakeDaSource::default(), - 0, - 0, - 0, - ); + let res = FuelGasPriceUpdater::init(height.into(), metadata_storage, 0, 0, 0); // then assert!(matches!(res, Err(Error::CouldNotInitUpdater(_)))); diff --git a/crates/services/gas_price_service/src/lib.rs b/crates/services/gas_price_service/src/lib.rs index 10caf66ec54..19abc73d84d 100644 --- a/crates/services/gas_price_service/src/lib.rs +++ b/crates/services/gas_price_service/src/lib.rs @@ -3,6 +3,16 @@ #![deny(unused_crate_dependencies)] #![deny(warnings)] +use crate::fuel_gas_price_updater::{ + da_source_adapter::service::{ + new_provider, + DaBlockCostsProvider, + DaBlockCostsSource, + }, + BlockInfo, + DaBlockCosts, + L2BlockSource, +}; use async_trait::async_trait; use fuel_core_services::{ RunnableService, @@ -12,7 +22,6 @@ use fuel_core_services::{ use fuel_core_types::fuel_types::BlockHeight; use futures::FutureExt; use std::sync::Arc; - use tokio::sync::RwLock; pub mod static_updater; @@ -20,28 +29,42 @@ pub mod static_updater; pub mod fuel_gas_price_updater; /// The service that updates the gas price algorithm. -pub struct GasPriceService { +pub struct GasPriceService { /// The algorithm that can be used in the next block next_block_algorithm: SharedGasPriceAlgo, /// The code that is run to update your specific algorithm update_algorithm: U, + /// The L2 block source + l2_block_source: L2, + /// The DA block costs provider + da_block_costs_provider: DaBlockCostsProvider, + /// The cached value of da block costs + da_block_costs: Option, } -impl GasPriceService +impl GasPriceService where U: UpdateAlgorithm, A: Send + Sync, + DA: DaBlockCostsSource, { pub async fn new( starting_block_height: BlockHeight, update_algorithm: U, mut shared_algo: SharedGasPriceAlgo, + l2_block_source: L2, + da_block_costs_source: DA, ) -> Self { let algorithm = update_algorithm.start(starting_block_height); shared_algo.update(algorithm).await; + + let da_block_costs_provider = new_provider(da_block_costs_source, None); Self { next_block_algorithm: shared_algo, update_algorithm, + l2_block_source, + da_block_costs: None, + da_block_costs_provider, } } @@ -60,7 +83,11 @@ pub trait UpdateAlgorithm { fn start(&self, for_block: BlockHeight) -> Self::Algorithm; /// Wait for the next algorithm to be available - async fn next(&mut self) -> anyhow::Result; + fn next( + &mut self, + l2_block: BlockInfo, + da_block_costs: Option, + ) -> anyhow::Result; } pub trait GasPriceAlgorithm { @@ -68,14 +95,22 @@ pub trait GasPriceAlgorithm { fn worst_case_gas_price(&self, block_height: BlockHeight) -> u64; } -impl GasPriceService +impl GasPriceService where U: UpdateAlgorithm, A: Send + Sync, + DA: DaBlockCostsSource, { async fn update(&mut self, new_algorithm: A) { self.next_block_algorithm.update(new_algorithm).await; } + + async fn update_algorithm(&mut self, l2_block: BlockInfo) -> anyhow::Result<()> { + let da_block_costs = self.da_block_costs.take(); + let new_algo = self.update_algorithm.next(l2_block, da_block_costs)?; + self.update(new_algo).await; + Ok(()) + } } #[derive(Debug, Default)] @@ -115,10 +150,12 @@ where } #[async_trait] -impl RunnableService for GasPriceService +impl RunnableService for GasPriceService where U: UpdateAlgorithm + Send + Sync, A: Send + Sync, + L2: L2BlockSource, + DA: DaBlockCostsSource, { const NAME: &'static str = "GasPriceUpdater"; type SharedData = SharedGasPriceAlgo; @@ -134,15 +171,18 @@ where _state_watcher: &StateWatcher, _params: Self::TaskParams, ) -> anyhow::Result { + self.da_block_costs_provider.start_and_await().await?; Ok(self) } } #[async_trait] -impl RunnableTask for GasPriceService +impl RunnableTask for GasPriceService where U: UpdateAlgorithm + Send + Sync, A: Send + Sync, + L2: L2BlockSource, + DA: DaBlockCostsSource, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; @@ -152,10 +192,18 @@ where tracing::debug!("Stopping gas price service"); should_continue = false; } - new_algo = self.update_algorithm.next() => { - let new_algo = new_algo?; + da_block_costs = self.da_block_costs_provider.recv() => { + if da_block_costs.is_err() { + tracing::error!("Failed to get da block costs: {:?}", da_block_costs.err()); + } else { + self.da_block_costs = Some(da_block_costs?); + } + should_continue = true; + } + l2_block = self.l2_block_source.get_l2_block() => { + let l2_block = l2_block?; tracing::debug!("Updating gas price algorithm"); - self.update(new_algo).await; + self.update_algorithm(l2_block).await?; should_continue = true; } } @@ -163,10 +211,11 @@ where } async fn shutdown(mut self) -> anyhow::Result<()> { - while let Some(new_algo) = self.update_algorithm.next().now_or_never() { - let new_algo = new_algo?; - tracing::debug!("Updating gas price algorithm"); - self.update(new_algo).await; + self.da_block_costs_provider.stop_and_await().await?; + + let l2_block = self.l2_block_source.get_l2_block().now_or_never(); + if let Some(Ok(l2_block)) = l2_block { + self.update_algorithm(l2_block).await?; } Ok(()) } @@ -177,6 +226,13 @@ where #[cfg(test)] mod tests { use crate::{ + fuel_gas_price_updater, + fuel_gas_price_updater::{ + da_source_adapter::dummy_costs::DummyDaBlockCosts, + BlockInfo, + DaBlockCosts, + L2BlockSource, + }, GasPriceAlgorithm, GasPriceService, SharedGasPriceAlgo, @@ -187,6 +243,7 @@ mod tests { ServiceRunner, }; use fuel_core_types::fuel_types::BlockHeight; + use std::time::Duration; use tokio::sync::mpsc; #[derive(Clone, Debug)] @@ -217,11 +274,31 @@ mod tests { self.start.clone() } - async fn next(&mut self) -> anyhow::Result { - let price = self.price_source.recv().await.unwrap(); + fn next( + &mut self, + _l2_block: BlockInfo, + _da_block_costs: Option, + ) -> anyhow::Result { + let price = self.price_source.try_recv()?; Ok(TestAlgorithm { price }) } } + + struct FakeL2BlockSource; + + #[async_trait::async_trait] + impl L2BlockSource for FakeL2BlockSource { + async fn get_l2_block(&mut self) -> fuel_gas_price_updater::Result { + // simulate fetch + tokio::time::sleep(Duration::from_millis(10)).await; + Ok(BlockInfo::Block { + height: 0, + gas_used: 0, + block_gas_capacity: 0, + }) + } + } + #[tokio::test] async fn run__updates_gas_price() { // given @@ -235,14 +312,24 @@ mod tests { price_source: price_receiver, }; let shared_algo = SharedGasPriceAlgo::new_with_algorithm(start_algo); - let service = GasPriceService::new(0.into(), updater, shared_algo).await; + let l2_block_source = FakeL2BlockSource; + let da_block_source = + DummyDaBlockCosts::new(Err(anyhow::anyhow!("not implemented"))); + let service = GasPriceService::new( + 0.into(), + updater, + shared_algo, + l2_block_source, + da_block_source, + ) + .await; let read_algo = service.next_block_algorithm(); let service = ServiceRunner::new(service); service.start_and_await().await.unwrap(); // when price_sender.send(expected_price).await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(10)).await; // then let actual_price = read_algo.next_gas_price().await; diff --git a/crates/services/gas_price_service/src/static_updater.rs b/crates/services/gas_price_service/src/static_updater.rs index c2f45a3d60b..a3709886177 100644 --- a/crates/services/gas_price_service/src/static_updater.rs +++ b/crates/services/gas_price_service/src/static_updater.rs @@ -1,4 +1,8 @@ use crate::{ + fuel_gas_price_updater::{ + BlockInfo, + DaBlockCosts, + }, GasPriceAlgorithm, UpdateAlgorithm, }; @@ -47,7 +51,11 @@ impl UpdateAlgorithm for StaticAlgorithmUpdater { StaticAlgorithm::new(self.static_price) } - async fn next(&mut self) -> anyhow::Result { - futures::future::pending().await + fn next( + &mut self, + _l2_block: BlockInfo, + _da_block_costs: Option, + ) -> anyhow::Result { + Ok(StaticAlgorithm::new(self.static_price)) } }