diff --git a/crates/services/gas_price_service/src/v1.rs b/crates/services/gas_price_service/src/v1.rs index 534a31b6167..c0f23f5100a 100644 --- a/crates/services/gas_price_service/src/v1.rs +++ b/crates/services/gas_price_service/src/v1.rs @@ -1,2 +1,2 @@ pub mod algorithm; -pub mod da_source_adapter; +pub mod da_source_service; diff --git a/crates/services/gas_price_service/src/v1/da_source_adapter.rs b/crates/services/gas_price_service/src/v1/da_source_adapter.rs deleted file mode 100644 index a1e26611920..00000000000 --- a/crates/services/gas_price_service/src/v1/da_source_adapter.rs +++ /dev/null @@ -1,163 +0,0 @@ -use crate::{ - common::utils::Result as GasPriceResult, - v1::da_source_adapter::service::{ - new_service, - DaBlockCostsService, - DaBlockCostsSource, - }, -}; -use fuel_core_services::ServiceRunner; -use std::{ - sync::Arc, - time::Duration, -}; -use tokio::sync::{ - mpsc, - Mutex, -}; - -pub mod block_committer_costs; -pub mod dummy_costs; -pub mod service; - -pub const POLLING_INTERVAL_MS: u64 = 10_000; - -#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] -pub struct DaBlockCosts { - pub l2_block_range: core::ops::Range, - pub blob_size_bytes: u32, - pub blob_cost_wei: u128, -} - -pub trait GetDaBlockCosts: Send + Sync { - fn get(&self) -> GasPriceResult>; -} - -#[derive(Clone)] -pub struct DaBlockCostsSharedState { - inner: Arc>>, -} - -impl DaBlockCostsSharedState { - fn new(receiver: mpsc::Receiver) -> Self { - Self { - inner: Arc::new(Mutex::new(receiver)), - } - } -} - -pub struct DaBlockCostsProvider { - pub service: ServiceRunner>, - pub shared_state: DaBlockCostsSharedState, -} - -const CHANNEL_BUFFER_SIZE: usize = 10; - -impl DaBlockCostsProvider -where - T: DaBlockCostsSource, -{ - pub fn new(source: T, polling_interval: Option) -> Self { - let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE); - let service = new_service(source, sender, polling_interval); - Self { - shared_state: DaBlockCostsSharedState::new(receiver), - service, - } - } -} - -impl GetDaBlockCosts for DaBlockCostsSharedState { - fn get(&self) -> GasPriceResult> { - if let Ok(mut guard) = self.inner.try_lock() { - if let Ok(da_block_costs) = guard.try_recv() { - return Ok(Some(da_block_costs)); - } - } - Ok(None) - } -} - -#[allow(non_snake_case)] -#[cfg(test)] -mod tests { - use super::*; - use crate::v1::da_source_adapter::dummy_costs::DummyDaBlockCosts; - use fuel_core_services::Service; - use std::time::Duration; - use tokio::time::sleep; - - #[tokio::test] - async fn run__when_da_block_cost_source_gives_value_shared_value_is_updated() { - // given - let expected_da_cost = DaBlockCosts { - l2_block_range: 0..10, - blob_size_bytes: 1024 * 128, - blob_cost_wei: 2, - }; - let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); - let provider = DaBlockCostsProvider::new( - da_block_costs_source, - Some(Duration::from_millis(1)), - ); - let shared_state = provider.shared_state.clone(); - - // when - provider.service.start_and_await().await.unwrap(); - sleep(Duration::from_millis(10)).await; - provider.service.stop_and_await().await.unwrap(); - - // then - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_some()); - assert_eq!(da_block_costs_opt.unwrap(), expected_da_cost); - } - - #[tokio::test] - async fn run__when_da_block_cost_source_gives_value_shared_value_is_marked_stale() { - // given - let expected_da_cost = DaBlockCosts { - l2_block_range: 0..10, - blob_size_bytes: 1024 * 128, - blob_cost_wei: 1, - }; - let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); - let provider = DaBlockCostsProvider::new( - da_block_costs_source, - Some(Duration::from_millis(1)), - ); - let shared_state = provider.shared_state.clone(); - - // when - provider.service.start_and_await().await.unwrap(); - sleep(Duration::from_millis(10)).await; - provider.service.stop_and_await().await.unwrap(); - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_some()); - assert_eq!(da_block_costs_opt.unwrap(), expected_da_cost); - - // then - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_none()); - } - - #[tokio::test] - async fn run__when_da_block_cost_source_errors_shared_value_is_not_updated() { - // given - let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!"))); - let provider = DaBlockCostsProvider::new( - da_block_costs_source, - Some(Duration::from_millis(1)), - ); - let shared_state = provider.shared_state.clone(); - - // when - provider.service.start_and_await().await.unwrap(); - sleep(Duration::from_millis(10)).await; - provider.service.stop_and_await().await.unwrap(); - - // then - let da_block_costs_opt = shared_state.get().unwrap(); - assert!(da_block_costs_opt.is_none()); - } -} diff --git a/crates/services/gas_price_service/src/v1/da_source_service.rs b/crates/services/gas_price_service/src/v1/da_source_service.rs new file mode 100644 index 00000000000..091069a5d44 --- /dev/null +++ b/crates/services/gas_price_service/src/v1/da_source_service.rs @@ -0,0 +1,98 @@ +use crate::v1::da_source_service::service::DaBlockCostsSource; +use std::time::Duration; + +pub mod block_committer_costs; +pub mod dummy_costs; +pub mod service; + +#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] +pub struct DaBlockCosts { + pub l2_block_range: core::ops::Range, + pub blob_size_bytes: u32, + pub blob_cost_wei: u128, +} + +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use super::*; + use crate::v1::da_source_service::{ + dummy_costs::DummyDaBlockCosts, + service::new_service, + }; + use fuel_core_services::Service; + use std::time::Duration; + use tokio::time::sleep; + + #[tokio::test] + async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() { + // given + let expected_da_cost = DaBlockCosts { + l2_block_range: 0..10, + blob_size_bytes: 1024 * 128, + blob_cost_wei: 2, + }; + let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); + let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); + let mut shared_state = &mut service.shared.subscribe(); + + // when + service.start_and_await().await.unwrap(); + sleep(Duration::from_millis(10)).await; + service.stop_and_await().await.unwrap(); + + // then + let da_block_costs = shared_state.try_recv().unwrap(); + assert_eq!(da_block_costs, expected_da_cost); + } + + #[tokio::test] + async fn run__when_da_block_cost_source_gives_value_shared_state_is_marked_stale() { + // given + let expected_da_cost = DaBlockCosts { + l2_block_range: 0..10, + blob_size_bytes: 1024 * 128, + blob_cost_wei: 1, + }; + let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); + let service = new_service(da_block_costs_source, Some(Duration::from_millis(8))); + let mut shared_state = &mut service.shared.subscribe(); + + // when + service.start_and_await().await.unwrap(); + sleep(Duration::from_millis(10)).await; + service.stop_and_await().await.unwrap(); + + let actual = shared_state.try_recv().unwrap(); + assert_eq!(actual, expected_da_cost); + + // then + let da_block_costs_res = shared_state.try_recv(); + assert!(da_block_costs_res.is_err()); + assert!(matches!( + da_block_costs_res.err().unwrap(), + tokio::sync::broadcast::error::TryRecvError::Empty + )); + } + + #[tokio::test] + async fn run__when_da_block_cost_source_errors_shared_state_is_not_updated() { + // given + let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!"))); + let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); + let mut shared_state = &mut service.shared.subscribe(); + + // when + service.start_and_await().await.unwrap(); + sleep(Duration::from_millis(10)).await; + service.stop_and_await().await.unwrap(); + + // then + let da_block_costs_res = shared_state.try_recv(); + assert!(da_block_costs_res.is_err()); + assert!(matches!( + da_block_costs_res.err().unwrap(), + tokio::sync::broadcast::error::TryRecvError::Empty + )); + } +} diff --git a/crates/services/gas_price_service/src/v1/da_source_adapter/block_committer_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs similarity index 99% rename from crates/services/gas_price_service/src/v1/da_source_adapter/block_committer_costs.rs rename to crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs index cc9179e540f..7016fe0da08 100644 --- a/crates/services/gas_price_service/src/v1/da_source_adapter/block_committer_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs @@ -1,6 +1,6 @@ #![allow(clippy::arithmetic_side_effects)] -use crate::v1::da_source_adapter::{ +use crate::v1::da_source_service::{ service::{ DaBlockCostsSource, Result as DaBlockCostsResult, diff --git a/crates/services/gas_price_service/src/v1/da_source_adapter/dummy_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs similarity index 94% rename from crates/services/gas_price_service/src/v1/da_source_adapter/dummy_costs.rs rename to crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs index d962db83f56..c5551cf1c0b 100644 --- a/crates/services/gas_price_service/src/v1/da_source_adapter/dummy_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs @@ -1,4 +1,4 @@ -use crate::v1::da_source_adapter::{ +use crate::v1::da_source_service::{ service::{ DaBlockCostsSource, Result as DaBlockCostsResult, diff --git a/crates/services/gas_price_service/src/v1/da_source_adapter/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs similarity index 64% rename from crates/services/gas_price_service/src/v1/da_source_adapter/service.rs rename to crates/services/gas_price_service/src/v1/da_source_service/service.rs index c87defa52f5..1f9cadf02dd 100644 --- a/crates/services/gas_price_service/src/v1/da_source_adapter/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -4,53 +4,57 @@ use fuel_core_services::{ ServiceRunner, StateWatcher, }; -use std::{ - collections::HashSet, - time::Duration, -}; +use std::time::Duration; use tokio::{ - sync::mpsc::Sender, + sync::broadcast::Sender, time::{ interval, Interval, }, }; -use crate::v1::da_source_adapter::{ - DaBlockCosts, - POLLING_INTERVAL_MS, -}; +use crate::v1::da_source_service::DaBlockCosts; pub use anyhow::Result; +#[derive(Clone)] +pub struct SharedState(Sender); + +impl SharedState { + fn new(sender: Sender) -> Self { + Self(sender) + } + pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.0.subscribe() + } +} + /// This struct houses the shared_state, polling interval /// and a source, which does the actual fetching of the data -pub struct DaBlockCostsService +pub struct DaSourceService where Source: DaBlockCostsSource, { poll_interval: Interval, source: Source, - sender: Sender, - cache: HashSet, + shared_state: SharedState, } -impl DaBlockCostsService +const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 10; +const POLLING_INTERVAL_MS: u64 = 10_000; + +impl DaSourceService where Source: DaBlockCostsSource, { - pub fn new( - source: Source, - sender: Sender, - poll_interval: Option, - ) -> Self { + pub fn new(source: Source, poll_interval: Option) -> Self { + let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE); #[allow(clippy::arithmetic_side_effects)] Self { - sender, + shared_state: SharedState::new(sender), poll_interval: interval( poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)), ), source, - cache: Default::default(), } } } @@ -63,19 +67,21 @@ pub trait DaBlockCostsSource: Send + Sync { } #[async_trait::async_trait] -impl RunnableService for DaBlockCostsService +impl RunnableService for DaSourceService where Source: DaBlockCostsSource, { - const NAME: &'static str = "DaBlockCostsService"; + const NAME: &'static str = "DaSourceService"; - type SharedData = (); + type SharedData = SharedState; type Task = Self; type TaskParams = (); - fn shared_data(&self) -> Self::SharedData {} + fn shared_data(&self) -> Self::SharedData { + self.shared_state.clone() + } async fn into_task( mut self, @@ -88,7 +94,7 @@ where } #[async_trait::async_trait] -impl RunnableTask for DaBlockCostsService +impl RunnableTask for DaSourceService where Source: DaBlockCostsSource, { @@ -104,10 +110,7 @@ where } _ = self.poll_interval.tick() => { let da_block_costs = self.source.request_da_block_cost().await?; - if !self.cache.contains(&da_block_costs) { - self.cache.insert(da_block_costs.clone()); - self.sender.send(da_block_costs).await?; - } + self.shared_state.0.send(da_block_costs)?; continue_running = true; } } @@ -123,8 +126,7 @@ where pub fn new_service( da_source: S, - sender: Sender, poll_interval: Option, -) -> ServiceRunner> { - ServiceRunner::new(DaBlockCostsService::new(da_source, sender, poll_interval)) +) -> ServiceRunner> { + ServiceRunner::new(DaSourceService::new(da_source, poll_interval)) }