diff --git a/.changes/changed/2925.md b/.changes/changed/2925.md new file mode 100644 index 00000000000..1e898c00e93 --- /dev/null +++ b/.changes/changed/2925.md @@ -0,0 +1 @@ +Make preconfirmation optional on API endpoints. \ No newline at end of file diff --git a/bin/e2e-test-client/src/test_context.rs b/bin/e2e-test-client/src/test_context.rs index 6f1e9e0b23d..4f22387a439 100644 --- a/bin/e2e-test-client/src/test_context.rs +++ b/bin/e2e-test-client/src/test_context.rs @@ -186,6 +186,7 @@ impl Wallet { let tx_id = tx.id(&self.consensus_params.chain_id()); println!("submitting tx... {:?}", tx_id); let status = self.client.submit_and_await_commit(&tx).await?; + println!("Status for {:?} is {:?}", tx_id, status); // we know the transferred coin should be output 0 from above let transferred_utxo = UtxoId::new(tx_id, 0); diff --git a/crates/client/assets/schema.sdl b/crates/client/assets/schema.sdl index 7b0fa5e1d17..089633e3474 100644 --- a/crates/client/assets/schema.sdl +++ b/crates/client/assets/schema.sdl @@ -1348,7 +1348,11 @@ type Subscription { """ The ID of the transaction """ - id: TransactionId! + id: TransactionId!, + """ + If true, accept to receive the preconfirmation status + """ + includePreconfirmation: Boolean ): TransactionStatus! """ Submits transaction to the `TxPool` and await either success or failure. @@ -1357,9 +1361,9 @@ type Subscription { """ Submits the transaction to the `TxPool` and returns a stream of events. Compared to the `submitAndAwait`, the stream also contains - `SubmittedStatus` as an intermediate state. + `SubmittedStatus` and potentially preconfirmation as an intermediate state. """ - submitAndAwaitStatus(tx: HexString!, estimatePredicates: Boolean): TransactionStatus! + submitAndAwaitStatus(tx: HexString!, estimatePredicates: Boolean, includePreconfirmation: Boolean): TransactionStatus! contractStorageSlots(contractId: ContractId!): StorageSlot! contractStorageBalances(contractId: ContractId!): ContractBalance! } @@ -1401,7 +1405,7 @@ type Transaction { outputContract: ContractOutput witnesses: [HexString!] receiptsRoot: Bytes32 - status: TransactionStatus + status(includePreconfirmation: Boolean): TransactionStatus script: HexString scriptData: HexString bytecodeWitnessIndex: U16 diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 4a52ea6ae90..1c3e6757ec7 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -976,7 +976,7 @@ impl FuelClient { &'a self, tx: &'a Transaction, ) -> io::Result> + 'a> { - self.submit_and_await_status_opt(tx, None).await + self.submit_and_await_status_opt(tx, None, None).await } /// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states. @@ -985,13 +985,16 @@ impl FuelClient { &'a self, tx: &'a Transaction, estimate_predicates: Option, + include_preconfirmation: Option, ) -> io::Result> + 'a> { use cynic::SubscriptionBuilder; + use schema::tx::SubmitAndAwaitStatusArg; let tx = tx.clone().to_bytes(); let s = schema::tx::SubmitAndAwaitStatusSubscription::build( - TxWithEstimatedPredicatesArg { + SubmitAndAwaitStatusArg { tx: HexString(Bytes(tx)), estimate_predicates, + include_preconfirmation, }, ); @@ -1243,14 +1246,29 @@ impl FuelClient { #[tracing::instrument(skip(self), level = "debug")] #[cfg(feature = "subscriptions")] - /// Subscribe to the status of a transaction + /// Similar to [`Self::subscribe_transaction_status_opt`], but with default options. pub async fn subscribe_transaction_status<'a>( &'a self, id: &'a TxId, ) -> io::Result> + 'a> { + self.subscribe_transaction_status_opt(id, None).await + } + + #[cfg(feature = "subscriptions")] + /// Subscribe to the status of a transaction + pub async fn subscribe_transaction_status_opt<'a>( + &'a self, + id: &'a TxId, + include_preconfirmation: Option, + ) -> io::Result> + 'a> { use cynic::SubscriptionBuilder; + use schema::tx::StatusChangeSubscriptionArgs; let tx_id: TransactionId = (*id).into(); - let s = schema::tx::StatusChangeSubscription::build(TxIdArgs { id: tx_id }); + let s = + schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs { + id: tx_id, + include_preconfirmation, + }); tracing::debug!("subscribing"); let stream = self.subscribe(s).await?.map(|tx| { diff --git a/crates/client/src/client/schema/tx.rs b/crates/client/src/client/schema/tx.rs index 5549bf87154..814fddd95eb 100644 --- a/crates/client/src/client/schema/tx.rs +++ b/crates/client/src/client/schema/tx.rs @@ -472,14 +472,21 @@ pub struct TransactionsByOwnerQuery { pub transactions_by_owner: TransactionConnection, } +#[derive(cynic::QueryVariables, Debug)] +pub struct StatusChangeSubscriptionArgs { + pub id: TransactionId, + #[cynic(skip_serializing_if = "Option::is_none")] + pub include_preconfirmation: Option, +} + #[derive(cynic::QueryFragment, Clone, Debug)] #[cynic( schema_path = "./assets/schema.sdl", graphql_type = "Subscription", - variables = "TxIdArgs" + variables = "StatusChangeSubscriptionArgs" )] pub struct StatusChangeSubscription { - #[arguments(id: $id)] + #[arguments(id: $id, includePreconfirmation: $include_preconfirmation)] pub status_change: TransactionStatus, } @@ -497,6 +504,15 @@ pub struct TxWithEstimatedPredicatesArg { pub estimate_predicates: Option, } +#[derive(cynic::QueryVariables)] +pub struct SubmitAndAwaitStatusArg { + pub tx: HexString, + #[cynic(skip_serializing_if = "Option::is_none")] + pub estimate_predicates: Option, + #[cynic(skip_serializing_if = "Option::is_none")] + pub include_preconfirmation: Option, +} + #[derive(cynic::QueryFragment, Clone, Debug)] #[cynic( schema_path = "./assets/schema.sdl", @@ -661,10 +677,10 @@ pub struct SubmitAndAwaitSubscriptionWithTransaction { #[cynic( schema_path = "./assets/schema.sdl", graphql_type = "Subscription", - variables = "TxWithEstimatedPredicatesArg" + variables = "SubmitAndAwaitStatusArg" )] pub struct SubmitAndAwaitStatusSubscription { - #[arguments(tx: $tx, estimatePredicates: $estimate_predicates)] + #[arguments(tx: $tx, estimatePredicates: $estimate_predicates, includePreconfirmation: $include_preconfirmation)] pub submit_and_await_status: TransactionStatus, } diff --git a/crates/fuel-core/src/combined_database.rs b/crates/fuel-core/src/combined_database.rs index dd18bccd14d..7974cb95841 100644 --- a/crates/fuel-core/src/combined_database.rs +++ b/crates/fuel-core/src/combined_database.rs @@ -523,6 +523,14 @@ impl CombinedDatabase { Ok(()) } + + pub fn shutdown(self) { + self.on_chain.shutdown(); + self.off_chain.shutdown(); + self.relayer.shutdown(); + self.gas_price.shutdown(); + self.compression.shutdown(); + } } /// A trait for listening to shutdown signals. diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 2a532edc8c5..93548cd9181 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -181,6 +181,12 @@ where self.iter_all_filtered::(prefix, None, Some(direction)) .map_ok(|(key, value)| TableEntry { key, value }) } + + pub fn shutdown(self) { + let (storage, _) = self.into_inner(); + + storage.data.shutdown() + } } impl GenesisDatabase diff --git a/crates/fuel-core/src/query/subscriptions.rs b/crates/fuel-core/src/query/subscriptions.rs index 4c94c0d243e..c8d69df85c8 100644 --- a/crates/fuel-core/src/query/subscriptions.rs +++ b/crates/fuel-core/src/query/subscriptions.rs @@ -20,6 +20,7 @@ pub(crate) trait TxnStatusChangeState { async fn get_tx_status( &self, id: Bytes32, + include_preconfirmation: bool, ) -> StorageResult>; } @@ -28,6 +29,7 @@ pub(crate) async fn transaction_status_change<'a, State>( state: State, stream: BoxStream<'a, TxStatusMessage>, transaction_id: Bytes32, + include_preconfirmation: bool, ) -> impl Stream> + 'a where State: TxnStatusChangeState + Send + Sync + 'a, @@ -35,7 +37,7 @@ where // Check the database first to see if the transaction already // has a status. let maybe_db_status = state - .get_tx_status(transaction_id) + .get_tx_status(transaction_id, include_preconfirmation) .await .transpose() .map(TxStatusMessage::from); @@ -50,7 +52,7 @@ where .chain(stream) // Keep taking the stream until the oneshot channel is closed. .take_until(closed) - .map(move |status| { + .filter_map(move |status | { if status.is_final() { if let Some(close) = close.take() { let _ = close.send(()); @@ -60,11 +62,15 @@ where match status { TxStatusMessage::Status(status) => { let status = ApiTxStatus::new(transaction_id, status); - Ok(status) + if !include_preconfirmation && status.is_preconfirmation() { + futures::future::ready(None) + } else { + futures::future::ready(Some(Ok(status))) + } }, // Map a failed status to an error for the api. TxStatusMessage::FailedStatus => { - Err(anyhow::anyhow!("Failed to get transaction status")) + futures::future::ready(Some(Err(anyhow::anyhow!("Failed to get transaction status")))) } } }) diff --git a/crates/fuel-core/src/query/subscriptions/test.rs b/crates/fuel-core/src/query/subscriptions/test.rs index eb033d69091..2d7312bab49 100644 --- a/crates/fuel-core/src/query/subscriptions/test.rs +++ b/crates/fuel-core/src/query/subscriptions/test.rs @@ -265,16 +265,16 @@ fn test_tsc_inner( let out = RT.with(|rt| { rt.block_on(async { let mut mock_state = super::MockTxnStatusChangeState::new(); - mock_state - .expect_get_tx_status() - .returning(move |_| match state.clone() { + mock_state.expect_get_tx_status().returning(move |_, _| { + match state.clone() { Ok(Some(t)) => Ok(Some(t)), Ok(None) => Ok(None), Err(_) => Err(StorageError::NotFound("", "")), - }); + } + }); let stream = futures::stream::iter(stream).boxed(); - super::transaction_status_change(mock_state, stream, txn_id(0)) + super::transaction_status_change(mock_state, stream, txn_id(0), true) .await .collect::>() .await diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 6b49b5c90f1..595dad1d8e5 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -42,6 +42,7 @@ use crate::{ AssembleTx, }, types::{ + get_tx_status, AssembleTransactionResult, TransactionStatus, }, @@ -695,6 +696,8 @@ impl TxStatusSubscription { &self, ctx: &'a Context<'a>, #[graphql(desc = "The ID of the transaction")] id: TransactionId, + #[graphql(desc = "If true, accept to receive the preconfirmation status")] + include_preconfirmation: Option, ) -> anyhow::Result> + 'a> { let tx_status_manager = ctx.data_unchecked::(); @@ -705,11 +708,14 @@ impl TxStatusSubscription { tx_status_manager, query, }; - Ok( - transaction_status_change(status_change_state, rx, id.into()) - .await - .map_err(async_graphql::Error::from), + Ok(transaction_status_change( + status_change_state, + rx, + id.into(), + include_preconfirmation.unwrap_or(false), ) + .await + .map_err(async_graphql::Error::from)) } /// Submits transaction to the `TxPool` and await either success or failure. @@ -724,7 +730,7 @@ impl TxStatusSubscription { > { use tokio_stream::StreamExt; let subscription = - submit_and_await_status(ctx, tx, estimate_predicates.unwrap_or(false)) + submit_and_await_status(ctx, tx, estimate_predicates.unwrap_or(false), false) .await?; Ok(subscription @@ -734,17 +740,24 @@ impl TxStatusSubscription { /// Submits the transaction to the `TxPool` and returns a stream of events. /// Compared to the `submitAndAwait`, the stream also contains - /// `SubmittedStatus` as an intermediate state. + /// `SubmittedStatus` and potentially preconfirmation as an intermediate state. #[graphql(complexity = "query_costs().submit_and_await + child_complexity")] async fn submit_and_await_status<'a>( &self, ctx: &'a Context<'a>, tx: HexString, estimate_predicates: Option, + include_preconfirmation: Option, ) -> async_graphql::Result< impl Stream> + 'a, > { - submit_and_await_status(ctx, tx, estimate_predicates.unwrap_or(false)).await + submit_and_await_status( + ctx, + tx, + estimate_predicates.unwrap_or(false), + include_preconfirmation.unwrap_or(false), + ) + .await } } @@ -752,6 +765,7 @@ async fn submit_and_await_status<'a>( ctx: &'a Context<'a>, tx: HexString, estimate_predicates: bool, + include_preconfirmation: bool, ) -> async_graphql::Result< impl Stream> + 'a, > { @@ -774,13 +788,21 @@ async fn submit_and_await_status<'a>( txpool.insert(tx).await?; Ok(subscription - .map(move |event| match event { - TxStatusMessage::Status(status) => { - let status = TransactionStatus::new(tx_id, status); - Ok(status) - } - TxStatusMessage::FailedStatus => { - Err(anyhow::anyhow!("Failed to get transaction status").into()) + .filter_map(move |status| { + match status { + TxStatusMessage::Status(status) => { + let status = TransactionStatus::new(tx_id, status); + if !include_preconfirmation && status.is_preconfirmation() { + None + } else { + Some(Ok(status)) + } + } + // Map a failed status to an error for the api. + TxStatusMessage::FailedStatus => Some(Err(anyhow::anyhow!( + "Failed to get transaction status" + ) + .into())), } }) .take(3)) @@ -795,14 +817,15 @@ impl<'a> TxnStatusChangeState for StatusChangeState<'a> { async fn get_tx_status( &self, id: Bytes32, + include_preconfirmation: bool, ) -> StorageResult> { - match self.query.tx_status(&id) { - Ok(status) => Ok(Some(status.into())), - Err(StorageError::NotFound(_, _)) => { - Ok(self.tx_status_manager.status(id).await?) - } - Err(err) => Err(err), - } + get_tx_status( + &id, + self.query.as_ref(), + self.tx_status_manager, + include_preconfirmation, + ) + .await } } diff --git a/crates/fuel-core/src/schema/tx/types.rs b/crates/fuel-core/src/schema/tx/types.rs index 198db596208..e96f394a7b3 100644 --- a/crates/fuel-core/src/schema/tx/types.rs +++ b/crates/fuel-core/src/schema/tx/types.rs @@ -449,6 +449,14 @@ impl TransactionStatus { | TransactionStatus::PreconfirmationFailure(_) => false, } } + + pub fn is_preconfirmation(&self) -> bool { + matches!( + self, + TransactionStatus::PreconfirmationSuccess(_) + | TransactionStatus::PreconfirmationFailure(_) + ) + } } pub struct Policies(fuel_tx::policies::Policies); @@ -769,15 +777,22 @@ impl Transaction { async fn status( &self, ctx: &Context<'_>, + include_preconfirmation: Option, ) -> async_graphql::Result> { let id = self.1; let query = ctx.read_view()?; let tx_status_manager = ctx.data_unchecked::(); - get_tx_status(id, query.as_ref(), tx_status_manager) - .await - .map_err(Into::into) + get_tx_status( + &id, + query.as_ref(), + tx_status_manager, + include_preconfirmation.unwrap_or(false), + ) + .await + .map(|status| status.map(|status| TransactionStatus::new(id, status))) + .map_err(Into::into) } async fn script(&self) -> Option { @@ -1112,22 +1127,34 @@ impl StorageReadReplayEvent { #[tracing::instrument(level = "debug", skip(query, tx_status_manager), ret, err)] pub(crate) async fn get_tx_status( - id: fuel_core_types::fuel_types::Bytes32, + id: &fuel_core_types::fuel_types::Bytes32, query: &ReadView, tx_status_manager: &DynTxStatusManager, -) -> Result, StorageError> { + include_preconfirmation: bool, +) -> Result, StorageError> { let api_result = query - .tx_status(&id) + .tx_status(id) .into_api_result::()?; match api_result { - Some(status) => { - let status = TransactionStatus::new(id, status); - Ok(Some(status)) - } + Some(status) => Ok(Some(status)), None => { - let status = tx_status_manager.status(id).await?; + let status = tx_status_manager.status(*id).await?; match status { - Some(status) => Ok(Some(TransactionStatus::new(id, status))), + Some(status) => { + // Filter out preconfirmation statuses if not allowed. Converting to submitted status + // because it's the closest to the preconfirmation status. + // Having `now()` as timestamp isn't ideal but shouldn't cause much inconsistency. + if !include_preconfirmation + && status.is_preconfirmation() + && !status.is_final() + { + Ok(Some(transaction_status::TransactionStatus::submitted( + Tai64::now(), + ))) + } else { + Ok(Some(status)) + } + } None => Ok(None), } } diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index b6d3c6d2a5e..ffe43c7517e 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -147,12 +147,12 @@ impl FuelService { let (services, shared) = sub_services::init_sub_services( &config, - database, + database.clone(), block_production_ready_signal.clone(), )?; let sub_services = Arc::new(services); - let task = Task::new(sub_services.clone(), shared.clone())?; + let task = Task::new(sub_services.clone(), database, shared.clone())?; let runner = ServiceRunner::new_with_params( task, TaskParams { @@ -409,14 +409,24 @@ pub type SubServices = Vec>; struct Task { /// The list of started sub services. services: Arc, + /// The copies of the databases used by services. + database: CombinedDatabase, /// The address bound by the system for serving the API pub shared: SharedState, } impl Task { /// Private inner method for initializing the fuel service task - pub fn new(services: Arc, shared: SharedState) -> anyhow::Result { - Ok(Task { services, shared }) + pub fn new( + services: Arc, + database: CombinedDatabase, + shared: SharedState, + ) -> anyhow::Result { + Ok(Task { + services, + database, + shared, + }) } } @@ -491,6 +501,7 @@ impl RunnableTask for Task { ); } } + self.database.shutdown(); Ok(()) } } diff --git a/crates/fuel-core/src/service/query.rs b/crates/fuel-core/src/service/query.rs index 2aa3a64a402..443b1aba1ff 100644 --- a/crates/fuel-core/src/service/query.rs +++ b/crates/fuel-core/src/service/query.rs @@ -8,6 +8,7 @@ use fuel_core_types::{ }, fuel_types::Bytes32, services::transaction_status::TransactionStatus as TxPoolTxStatus, + tai64::Tai64, }; use futures::{ Stream, @@ -89,7 +90,7 @@ impl FuelService { db, tx_status_manager, }; - Ok(transaction_status_change(state, rx, id).await) + Ok(transaction_status_change(state, rx, id, true).await) } } @@ -99,11 +100,28 @@ struct StatusChangeState<'a> { } impl<'a> TxnStatusChangeState for StatusChangeState<'a> { - async fn get_tx_status(&self, id: Bytes32) -> StorageResult> { + async fn get_tx_status( + &self, + id: Bytes32, + include_preconfirmation: bool, + ) -> StorageResult> { match self.db.get_tx_status(&id)? { Some(status) => Ok(Some(status.into())), None => { let status = self.tx_status_manager.status(id).await?; + let status = status.map(|status| { + // Filter out preconfirmation statuses if not allowed. Converting to submitted status + // because it's the closest to the preconfirmation status. + // Having `now()` as timestamp isn't ideal but shouldn't cause much inconsistency. + if !include_preconfirmation + && status.is_preconfirmation() + && !status.is_final() + { + TxPoolTxStatus::submitted(Tai64::now()) + } else { + status + } + }); Ok(status) } } diff --git a/crates/fuel-core/src/state.rs b/crates/fuel-core/src/state.rs index 1173342c815..318212d010b 100644 --- a/crates/fuel-core/src/state.rs +++ b/crates/fuel-core/src/state.rs @@ -68,6 +68,10 @@ pub trait TransactableStorage: IterableStore + Debug + Send + Sync { fn latest_view(&self) -> StorageResult>; fn rollback_block_to(&self, height: &Height) -> StorageResult<()>; + + fn shutdown(&self) { + // Do nothing by default + } } // It is used only to allow conversion of the `StorageTransaction` into the `DataSource`. diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index 5114a6482ec..44d3a547b7f 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -621,6 +621,10 @@ where fn rollback_block_to(&self, height: &Description::Height) -> StorageResult<()> { self.rollback_block_to(height.as_u64()) } + + fn shutdown(&self) { + self.db.shutdown() + } } pub fn height_key(key: &[u8], height: &u64) -> Vec { diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index 11734e02c29..32cba614d12 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -855,6 +855,10 @@ where Ok(()) } + + pub fn shutdown(&self) { + while Arc::strong_count(&self.db) > 1 {} + } } pub(crate) struct KeyOnly; diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index 9d13d5d94a8..8728e01305a 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -72,6 +72,7 @@ enum CommitInput { } enum Commands { + Stop, CommitResult { result: CommitInput, permit: OwnedSemaphorePermit, @@ -94,7 +95,7 @@ struct ImporterInner { verifier: V, chain_id: ChainId, broadcast: broadcast::Sender, - commands: mpsc::Receiver, + commands: mpsc::UnboundedReceiver, /// Enables prometheus metrics for this fuel-service metrics: bool, } @@ -102,7 +103,7 @@ struct ImporterInner { pub struct Importer { broadcast: broadcast::Sender, guard: Semaphore, - commands: mpsc::Sender, + commands: mpsc::UnboundedSender, /// The semaphore tracks the number of unprocessed `SharedImportResult`. /// If the number of unprocessed results is more than the threshold, /// the block importer stops committing new blocks and waits for @@ -113,10 +114,7 @@ pub struct Importer { impl Drop for Importer { fn drop(&mut self) { - // Dropping the sender will close the receiver and stop the inner importer. - let (empty_sender, _) = mpsc::channel(1); - let sender = core::mem::replace(&mut self.commands, empty_sender); - drop(sender); + let _ = self.commands.send(Commands::Stop); let inner = self.inner.take(); @@ -148,7 +146,7 @@ impl Importer { // that will not be processed. let max_block_notify_buffer = config.max_block_notify_buffer; let (broadcast, _) = broadcast::channel(max_block_notify_buffer); - let (sender, receiver) = mpsc::channel(1); + let (sender, receiver) = mpsc::unbounded_channel(); let mut inner = ImporterInner { database, @@ -267,7 +265,7 @@ impl Importer { permit, callback: sender, }; - self.commands.send(command).await?; + self.commands.send(command)?; receiver.await? } @@ -281,7 +279,7 @@ impl Importer { sealed_block, callback: sender, }; - self.commands.send(command).await?; + self.commands.send(command)?; receiver.await? } @@ -294,7 +292,7 @@ impl Importer { sealed_block, callback: sender, }; - self.commands.send(command).await?; + self.commands.send(command)?; receiver.await? } } @@ -494,6 +492,7 @@ where let local_runner = LocalRunner::new().expect("Failed to create the local runner"); while let Some(command) = self.commands.recv().await { match command { + Commands::Stop => break, Commands::CommitResult { result, permit, diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 01446a7136b..ee2d8f8bec1 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -203,8 +203,8 @@ mod tests { assert_eq!(err, OutOfCapacity); } - #[test] - fn second_spawn_works_when_first_is_finished() { + #[tokio::test] + async fn second_spawn_works_when_first_is_finished() { const NUMBER_OF_PENDING_TASKS: usize = 1; let heavy_task_processor = AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); @@ -215,10 +215,8 @@ mod tests { sleep(Duration::from_secs(1)); sender.send(()).unwrap(); }); - first_spawn.expect("Expected Ok result"); - futures::executor::block_on(async move { - receiver.await.unwrap(); - }); + first_spawn.expect("Expected Ok result").await.unwrap(); + receiver.await.unwrap(); // When let second_spawn = heavy_task_processor.try_spawn(async move { diff --git a/crates/services/txpool_v2/src/pool_worker.rs b/crates/services/txpool_v2/src/pool_worker.rs index 28c2b977580..fbb87ae0b29 100644 --- a/crates/services/txpool_v2/src/pool_worker.rs +++ b/crates/services/txpool_v2/src/pool_worker.rs @@ -29,6 +29,8 @@ use tokio::{ self, Receiver, Sender, + UnboundedReceiver, + UnboundedSender, }, oneshot, }, @@ -62,10 +64,9 @@ const MAX_PENDING_REMOVE_POOL_REQUESTS: usize = 1_000; const SIZE_EXTRACT_BLOCK_TRANSACTIONS_CHANNEL: usize = 100_000; const SIZE_NOTIFICATION_CHANNEL: usize = 10_000_000; -const SIZE_THREAD_MANAGEMENT_CHANNEL: usize = 10; pub(super) struct PoolWorkerInterface { - thread_management_sender: Sender, + thread_management_sender: UnboundedSender, pub(super) request_insert_sender: Sender, pub(super) request_update_sender: Sender, pub(super) request_read_sender: Sender, @@ -74,6 +75,20 @@ pub(super) struct PoolWorkerInterface { handle: Option>, } +impl Drop for PoolWorkerInterface { + fn drop(&mut self) { + let _ = self + .thread_management_sender + .send(ThreadManagementRequest::Stop); + + if let Some(handle) = self.handle.take() { + if handle.join().is_err() { + tracing::error!("Failed to join pool worker thread"); + } + } + } +} + impl PoolWorkerInterface { pub fn new( tx_pool: TxPool, @@ -95,7 +110,7 @@ impl PoolWorkerInterface { let (notification_sender, notification_receiver) = mpsc::channel(SIZE_NOTIFICATION_CHANNEL); let (thread_management_sender, thread_management_receiver) = - mpsc::channel(SIZE_THREAD_MANAGEMENT_CHANNEL); + mpsc::unbounded_channel(); let handle = std::thread::spawn({ let tx_insert_from_pending_sender = request_insert_sender.clone(); @@ -169,20 +184,6 @@ impl PoolWorkerInterface { ) }) } - - pub fn stop(&mut self) { - if let Err(e) = self - .thread_management_sender - .try_send(ThreadManagementRequest::Stop) - { - tracing::error!("Failed to send stop request: {}", e); - } - if let Some(handle) = self.handle.take() { - if handle.join().is_err() { - tracing::error!("Failed to join pool worker thread"); - } - } - } } enum ThreadManagementRequest { @@ -260,7 +261,7 @@ pub(super) enum PoolNotification { pub(super) struct PoolWorker { tx_insert_from_pending_sender: Sender, - thread_management_receiver: Receiver, + thread_management_receiver: UnboundedReceiver, request_remove_receiver: Receiver, request_read_receiver: Receiver, extract_block_transactions_receiver: Receiver, diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index 601a785b562..aac3df800d9 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -283,8 +283,7 @@ where } } - async fn shutdown(mut self) -> anyhow::Result<()> { - self.pool_worker.stop(); + async fn shutdown(self) -> anyhow::Result<()> { Ok(()) } } diff --git a/crates/types/src/services/transaction_status.rs b/crates/types/src/services/transaction_status.rs index 0b4b40dbb3c..2bc97a1520a 100644 --- a/crates/types/src/services/transaction_status.rs +++ b/crates/types/src/services/transaction_status.rs @@ -171,6 +171,19 @@ impl TransactionStatus { } } + /// Returns `true` if the status is pre confirmation. + pub fn is_preconfirmation(&self) -> bool { + match self { + TransactionStatus::Submitted(_) + | TransactionStatus::Success(_) + | TransactionStatus::Failure(_) + | TransactionStatus::SqueezedOut(_) => false, + TransactionStatus::PreConfirmationSuccess(_) + | TransactionStatus::PreConfirmationFailure(_) + | TransactionStatus::PreConfirmationSqueezedOut(_) => true, + } + } + /// Returns `true` if the status is `Submitted`. pub fn is_submitted(&self) -> bool { matches!(self, Self::Submitted { .. }) diff --git a/tests/test-helpers/src/counter_contract.rs b/tests/test-helpers/src/counter_contract.rs index 10a2b586eb5..962d6b47958 100644 --- a/tests/test-helpers/src/counter_contract.rs +++ b/tests/test-helpers/src/counter_contract.rs @@ -84,11 +84,6 @@ pub async fn deploy( intermediate_status, TransactionStatus::Submitted { .. } )); - let preconfirmation_status = status_stream.next().await.unwrap().unwrap(); - match preconfirmation_status { - TransactionStatus::PreconfirmationSuccess { .. } => {} - _ => panic!("Tx wasn't preconfirmed: {:?}", preconfirmation_status), - }; let final_status = status_stream.next().await.unwrap().unwrap(); let TransactionStatus::Success { block_height, .. } = final_status else { panic!("Tx wasn't included in a block: {:?}", final_status); diff --git a/tests/tests/gas_price.rs b/tests/tests/gas_price.rs index 23144691eee..f5a7ffd9c38 100644 --- a/tests/tests/gas_price.rs +++ b/tests/tests/gas_price.rs @@ -542,6 +542,7 @@ async fn startup__can_override_gas_price_values_by_changing_config() { l2_block_height, .. } = new_metadata.try_into().unwrap(); assert_eq!(l2_block_height, new_height); + drop(recovered_view); recovered_driver.kill().await; } diff --git a/tests/tests/poa.rs b/tests/tests/poa.rs index 30873d254f3..6424a9b8e0a 100644 --- a/tests/tests/poa.rs +++ b/tests/tests/poa.rs @@ -106,12 +106,13 @@ async fn starting_node_with_predefined_nodes_produces_these_predefined_blocks( for _ in 0..BLOCK_TO_PRODUCE { produce_block_with_tx(&mut rng, &core.client).await; } - let on_chain_view = core.node.shared.database.on_chain().latest_view()?; // Given let predefined_blocks: Vec<_> = (1..=BLOCK_TO_PRODUCE) .map(|block_height| { let block_height = block_height as u32; + let on_chain_view = + core.node.shared.database.on_chain().latest_view().unwrap(); on_chain_view .get_full_block(&block_height.into()) .unwrap() @@ -145,6 +146,13 @@ async fn starting_node_with_predefined_nodes_produces_these_predefined_blocks( let blocks_from_new_node: Vec<_> = (1..=BLOCK_TO_PRODUCE) .map(|block_height| { let block_height = block_height as u32; + let on_chain_view = new_core + .node + .shared + .database + .on_chain() + .latest_view() + .unwrap(); on_chain_view .get_full_block(&block_height.into()) .unwrap() diff --git a/tests/tests/preconfirmations.rs b/tests/tests/preconfirmations.rs index e8f0227edbd..d6f77bf4a5d 100644 --- a/tests/tests/preconfirmations.rs +++ b/tests/tests/preconfirmations.rs @@ -26,48 +26,37 @@ use fuel_core_types::{ }; use futures::StreamExt; use rand::Rng; +use test_helpers::{ + assemble_tx::AssembleAndRunTx, + config_with_fee, + default_signing_wallet, +}; #[tokio::test] async fn preconfirmation__received_after_successful_execution() { - let mut rng = rand::thread_rng(); - let mut config = Config::local_node(); + let mut config = config_with_fee(); config.block_production = Trigger::Never; - let address = Address::new([0; 32]); - let amount = 10; let srv = FuelService::new_node(config).await.unwrap(); let client = FuelClient::from(srv.bound_address); - let gas_limit = 1_000_000; - let maturity = Default::default(); - // Given - let script = [ + let script = vec![ op::addi(0x10, RegId::ZERO, 0xca), op::addi(0x11, RegId::ZERO, 0xba), op::log(0x10, 0x11, RegId::ZERO, RegId::ZERO), op::ret(RegId::ONE), ]; - let script: Vec = script - .iter() - .flat_map(|op| u32::from(*op).to_be_bytes()) - .collect(); - - let tx = TransactionBuilder::script(script, vec![]) - .script_gas_limit(gas_limit) - .maturity(maturity) - .add_unsigned_coin_input( - SecretKey::random(&mut rng), - rng.gen(), - amount, - AssetId::default(), - Default::default(), - ) - .add_output(Output::change(address, 0, AssetId::default())) - .finalize_as_transaction(); + let tx = client + .assemble_script(script, vec![], default_signing_wallet()) + .await + .unwrap(); let tx_id = tx.id(&Default::default()); - let mut tx_statuses_subscriber = client.submit_and_await_status(&tx).await.unwrap(); + let mut tx_statuses_subscriber = client + .submit_and_await_status_opt(&tx, None, Some(true)) + .await + .unwrap(); // When assert!(matches!( @@ -77,7 +66,7 @@ async fn preconfirmation__received_after_successful_execution() { client.produce_blocks(1, None).await.unwrap(); if let TransactionStatus::PreconfirmationSuccess { tx_pointer, - total_fee, + total_fee: _, total_gas: _, transaction_id, receipts, @@ -86,7 +75,6 @@ async fn preconfirmation__received_after_successful_execution() { { // Then assert_eq!(tx_pointer, TxPointer::new(BlockHeight::new(1), 1)); - assert_eq!(total_fee, 0); assert_eq!(transaction_id, tx_id); let receipts = receipts.unwrap(); assert_eq!(receipts.len(), 3); @@ -101,14 +89,7 @@ async fn preconfirmation__received_after_successful_execution() { } if val == 1)); let outputs = resolved_outputs.unwrap(); assert_eq!(outputs.len(), 1); - assert_eq!( - outputs[0].output, - Output::Change { - to: address, - amount, - asset_id: AssetId::default() - } - ); + assert!(outputs[0].output.is_change()); } else { panic!("Expected preconfirmation status"); } @@ -119,47 +100,112 @@ async fn preconfirmation__received_after_successful_execution() { } #[tokio::test] -async fn preconfirmation__received_after_failed_execution() { - let mut rng = rand::thread_rng(); - let mut config = Config::local_node(); +async fn preconfirmation__received_when_asked() { + let mut config = config_with_fee(); config.block_production = Trigger::Never; - let address = Address::new([0; 32]); - let amount = 10; let srv = FuelService::new_node(config).await.unwrap(); let client = FuelClient::from(srv.bound_address); - let gas_limit = 1_000_000; - let maturity = Default::default(); + // Given + let script = vec![op::ret(RegId::ONE)]; + let tx = client + .assemble_script(script, vec![], default_signing_wallet()) + .await + .unwrap(); + + // When + let mut tx_statuses_subscriber = client + .submit_and_await_status_opt(&tx, None, Some(true)) + .await + .unwrap(); + + // Then + assert!(matches!( + tx_statuses_subscriber.next().await.unwrap().unwrap(), + TransactionStatus::Submitted { .. } + )); + client.produce_blocks(1, None).await.unwrap(); + assert!(matches!( + tx_statuses_subscriber.next().await.unwrap().unwrap(), + TransactionStatus::PreconfirmationSuccess { .. } + )); + assert!(matches!( + tx_statuses_subscriber.next().await.unwrap().unwrap(), + TransactionStatus::Success { block_height, .. } if block_height == BlockHeight::new(1) + )); +} + +#[tokio::test] +async fn preconfirmation__not_received_when_not_asked() { + let mut config = config_with_fee(); + config.block_production = Trigger::Never; + + let srv = FuelService::new_node(config).await.unwrap(); + let client = FuelClient::from(srv.bound_address); // Given - let script = [ + let script = vec![op::ret(RegId::ONE)]; + let tx = client + .assemble_script(script, vec![], default_signing_wallet()) + .await + .unwrap(); + + let tx_id = tx.id(&Default::default()); + // When + let mut tx_statuses_update_subscriber = + client.subscribe_transaction_status(&tx_id).await.unwrap(); + let mut tx_statuses_subscriber = client + .submit_and_await_status_opt(&tx, None, None) + .await + .unwrap(); + + // Then + assert!(matches!( + tx_statuses_subscriber.next().await.unwrap().unwrap(), + TransactionStatus::Submitted { .. } + )); + assert!(matches!( + tx_statuses_update_subscriber.next().await.unwrap().unwrap(), + TransactionStatus::Submitted { .. } + )); + client.produce_blocks(1, None).await.unwrap(); + assert!(matches!( + tx_statuses_subscriber.next().await.unwrap().unwrap(), + TransactionStatus::Success { block_height, .. } if block_height == BlockHeight::new(1) + )); + assert!(matches!( + tx_statuses_update_subscriber.next().await.unwrap().unwrap(), + TransactionStatus::Success { block_height, .. } if block_height == BlockHeight::new(1) + )); +} + +#[tokio::test] +async fn preconfirmation__received_after_failed_execution() { + let mut config = config_with_fee(); + config.block_production = Trigger::Never; + + let srv = FuelService::new_node(config).await.unwrap(); + let client = FuelClient::from(srv.bound_address); + + // Given + let script = vec![ op::addi(0x10, RegId::ZERO, 0xca), op::addi(0x11, RegId::ZERO, 0xba), op::log(0x10, 0x11, RegId::ZERO, RegId::ZERO), op::rvrt(RegId::ONE), op::ret(RegId::ONE), ]; - let script: Vec = script - .iter() - .flat_map(|op| u32::from(*op).to_be_bytes()) - .collect(); - - let tx = TransactionBuilder::script(script, vec![]) - .script_gas_limit(gas_limit) - .maturity(maturity) - .add_unsigned_coin_input( - SecretKey::random(&mut rng), - rng.gen(), - amount, - AssetId::default(), - Default::default(), - ) - .add_output(Output::change(address, 0, AssetId::default())) - .finalize_as_transaction(); + let tx = client + .assemble_script(script, vec![], default_signing_wallet()) + .await + .unwrap(); let tx_id = tx.id(&Default::default()); - let mut tx_statuses_subscriber = client.submit_and_await_status(&tx).await.unwrap(); + let mut tx_statuses_subscriber = client + .submit_and_await_status_opt(&tx, None, Some(true)) + .await + .unwrap(); // When assert!(matches!( @@ -169,7 +215,7 @@ async fn preconfirmation__received_after_failed_execution() { client.produce_blocks(1, None).await.unwrap(); if let TransactionStatus::PreconfirmationFailure { tx_pointer, - total_fee, + total_fee: _, total_gas: _, transaction_id, receipts, @@ -179,7 +225,6 @@ async fn preconfirmation__received_after_failed_execution() { { // Then assert_eq!(tx_pointer, TxPointer::new(BlockHeight::new(1), 1)); - assert_eq!(total_fee, 0); assert_eq!(transaction_id, tx_id); let receipts = receipts.unwrap(); assert_eq!(receipts.len(), 3); @@ -194,14 +239,7 @@ async fn preconfirmation__received_after_failed_execution() { } if ra == 1)); let outputs = resolved_outputs.unwrap(); assert_eq!(outputs.len(), 1); - assert_eq!( - outputs[0].output, - Output::Change { - to: address, - amount, - asset_id: AssetId::default() - } - ); + assert!(outputs[0].output.is_change()); } else { panic!("Expected preconfirmation status"); } @@ -214,9 +252,8 @@ async fn preconfirmation__received_after_failed_execution() { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn preconfirmation__received_tx_inserted_end_block_open_period() { - let mut config = Config::local_node(); + let mut config = config_with_fee(); let block_production_period = Duration::from_secs(1); - let address = Address::new([0; 32]); config.block_production = Trigger::Open { period: block_production_period, @@ -225,18 +262,15 @@ async fn preconfirmation__received_tx_inserted_end_block_open_period() { let client = FuelClient::from(srv.bound_address); // Given - let tx = TransactionBuilder::script( - vec![op::ret(RegId::ONE)].into_iter().collect(), - vec![], - ) - .script_gas_limit(1_000_000) - .add_fee_input() - .add_output(Output::variable(address, 0, AssetId::default())) - .finalize_as_transaction(); + let script = vec![op::ret(RegId::ONE)]; + let tx = client + .assemble_script(script, vec![], default_signing_wallet()) + .await + .unwrap(); // When client - .submit_and_await_status(&tx) + .submit_and_await_status_opt(&tx, None, Some(true)) .await .unwrap() .enumerate() @@ -303,8 +337,14 @@ async fn preconfirmation__received_after_execution__multiple_txs() { .finalize_as_transaction(); // Given - let mut tx_statuses_subscriber1 = client.submit_and_await_status(&tx1).await.unwrap(); - let mut tx_statuses_subscriber2 = client.submit_and_await_status(&tx2).await.unwrap(); + let mut tx_statuses_subscriber1 = client + .submit_and_await_status_opt(&tx1, None, Some(true)) + .await + .unwrap(); + let mut tx_statuses_subscriber2 = client + .submit_and_await_status_opt(&tx2, None, Some(true)) + .await + .unwrap(); // When assert!(matches!( diff --git a/tests/tests/preconfirmations_gossip.rs b/tests/tests/preconfirmations_gossip.rs index 0d71506ab30..e5149849b84 100644 --- a/tests/tests/preconfirmations_gossip.rs +++ b/tests/tests/preconfirmations_gossip.rs @@ -162,7 +162,7 @@ async fn preconfirmation__propagate_p2p_after_successful_execution() { // When let client_sentry = FuelClient::from(sentry.node.bound_address); let mut tx_statuses_subscriber = client_sentry - .submit_and_await_status(&tx) + .submit_and_await_status_opt(&tx, None, Some(true)) .await .expect("Should be able to subscribe for events"); @@ -308,7 +308,7 @@ async fn preconfirmation__propagate_p2p_after_failed_execution() { // When let client_sentry = FuelClient::from(sentry.node.bound_address); let mut tx_statuses_subscriber = client_sentry - .submit_and_await_status(&tx) + .submit_and_await_status_opt(&tx, None, Some(true)) .await .expect("Should be able to subscribe for events"); @@ -452,7 +452,7 @@ async fn preconfirmation__propagate_p2p_after_squeezed_out_on_producer() { // When let client_sentry = FuelClient::from(sentry.node.bound_address); let mut tx_statuses_subscriber = client_sentry - .submit_and_await_status(&tx) + .submit_and_await_status_opt(&tx, None, Some(true)) .await .expect("Should be able to subscribe for events"); @@ -605,7 +605,7 @@ async fn preconfirmation__sentry_allows_usage_of_dynamic_outputs() { .unwrap(); let tx_statuses_subscriber = client_sentry - .submit_and_await_status(&final_script_with_transfers) + .submit_and_await_status_opt(&final_script_with_transfers, None, Some(true)) .await .expect("Should be able to subscribe for events"); @@ -675,7 +675,11 @@ async fn preconfirmation__sentry_allows_usage_of_dynamic_outputs() { // When let mut tx_statuses_subscriber = client_sentry - .submit_and_await_status(&transaction_that_rely_on_variable_inputs) + .submit_and_await_status_opt( + &transaction_that_rely_on_variable_inputs, + None, + Some(true), + ) .await .expect("Should be able to subscribe for events"); diff --git a/tests/tests/state_rewind.rs b/tests/tests/state_rewind.rs index 623b472e171..c6b9aeeb0ea 100644 --- a/tests/tests/state_rewind.rs +++ b/tests/tests/state_rewind.rs @@ -116,10 +116,10 @@ async fn validate_block_at_any_height__only_transfers() -> anyhow::Result<()> { database_modifications.insert(last_block_height, block.changes.as_ref().clone()); } - let view = node.shared.database.on_chain().latest_view().unwrap(); for i in 0..TOTAL_BLOCKS { let height_to_execute = rng.gen_range(1..last_block_height); + let view = node.shared.database.on_chain().latest_view().unwrap(); let block = view .get_full_block(&height_to_execute.into()) .unwrap() @@ -376,11 +376,10 @@ async fn backup_and_restore__should_work_with_state_rewind() -> anyhow::Result<( .unwrap(); let node = &driver.node; - let view = node.shared.database.on_chain().latest_view().unwrap(); - for i in 0..TOTAL_BLOCKS { let height_to_execute = rng.gen_range(1..last_block_height); + let view = node.shared.database.on_chain().latest_view().unwrap(); let block = view .get_full_block(&height_to_execute.into()) .unwrap() diff --git a/tests/tests/tx.rs b/tests/tests/tx.rs index 2bb26922758..9a46d14c0e8 100644 --- a/tests/tests/tx.rs +++ b/tests/tests/tx.rs @@ -390,11 +390,6 @@ async fn submit_and_await_status() { intermediate_status, TransactionStatus::Submitted { .. } )); - let preconfirmation_status = status_stream.next().await.unwrap().unwrap(); - assert!(matches!( - preconfirmation_status, - TransactionStatus::PreconfirmationSuccess { .. } - )); let final_status = status_stream.next().await.unwrap().unwrap(); assert!(matches!(final_status, TransactionStatus::Success { .. })); } diff --git a/tests/tests/tx/txn_status_subscription.rs b/tests/tests/tx/txn_status_subscription.rs index b7e56b64489..6b5c9fa4cd3 100644 --- a/tests/tests/tx/txn_status_subscription.rs +++ b/tests/tests/tx/txn_status_subscription.rs @@ -140,7 +140,7 @@ async fn subscribe_txn_status() { let client = client.clone(); async move { client - .subscribe_transaction_status(&id) + .subscribe_transaction_status_opt(&id, Some(true)) .await .unwrap() .enumerate()