Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e93324b
Add allow preconfirmation parameter to end point
AurelienFT Apr 4, 2025
c1c1a04
changelog
AurelienFT Apr 4, 2025
3922b3d
Update status changes subscription
AurelienFT Apr 4, 2025
e052e10
SImplify implementation for transaction_status_change and modify subm…
AurelienFT Apr 4, 2025
2c6ff01
Change status type
AurelienFT Apr 4, 2025
ff1d2ac
format
AurelienFT Apr 4, 2025
be78e5b
Update all tests
AurelienFT Apr 4, 2025
d1cedac
Update crates/fuel-core/src/schema/tx/types.rs
AurelienFT Apr 4, 2025
deb4c78
Add new test and avoid changes in submit_and_await_status client func…
AurelienFT Apr 7, 2025
5d7b07b
Add new client method and change behavior of transaction_status_change
AurelienFT Apr 7, 2025
79ce8b0
Fix clippy & tests
AurelienFT Apr 7, 2025
ad46cde
Merge branch 'master' into make_preconfirmation_api_optional
AurelienFT Apr 7, 2025
fc91841
Merge branch 'master' into make_preconfirmation_api_optional
AurelienFT Apr 7, 2025
ec3c92f
fix test
AurelienFT Apr 7, 2025
f4591cd
fix test
AurelienFT Apr 7, 2025
e94b818
Simplified integration tests to use `assemble_tx`.
xgreenx Apr 7, 2025
b5f5a63
Fix the test after latest modifications
xgreenx Apr 7, 2025
6a75a49
Merge branch 'master' into make_preconfirmation_api_optional
xgreenx Apr 7, 2025
d9d1cf7
Fix flakiness
xgreenx Apr 7, 2025
c42573d
Merge branch 'master' into make_preconfirmation_api_optional
xgreenx Apr 7, 2025
1e47f97
Fix flakiness with dropped DB
xgreenx Apr 8, 2025
cf7fd60
Merge branch 'master' into make_preconfirmation_api_optional
xgreenx Apr 8, 2025
1075737
Change naming parameter
AurelienFT Apr 8, 2025
114d0ac
Merge branch 'master' into make_preconfirmation_api_optional
AurelienFT Apr 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/changed/2925.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make preconfirmation optional on API endpoints.
1 change: 1 addition & 0 deletions bin/e2e-test-client/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl Wallet {
let tx_id = tx.id(&self.consensus_params.chain_id());
println!("submitting tx... {:?}", tx_id);
let status = self.client.submit_and_await_commit(&tx).await?;
println!("Status for {:?} is {:?}", tx_id, status);

// we know the transferred coin should be output 0 from above
let transferred_utxo = UtxoId::new(tx_id, 0);
Expand Down
12 changes: 8 additions & 4 deletions crates/client/assets/schema.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,11 @@ type Subscription {
"""
The ID of the transaction
"""
id: TransactionId!
id: TransactionId!,
"""
If true, accept to receive the preconfirmation status
"""
includePreconfirmation: Boolean
): TransactionStatus!
"""
Submits transaction to the `TxPool` and await either success or failure.
Expand All @@ -1357,9 +1361,9 @@ type Subscription {
"""
Submits the transaction to the `TxPool` and returns a stream of events.
Compared to the `submitAndAwait`, the stream also contains
`SubmittedStatus` as an intermediate state.
`SubmittedStatus` and potentially preconfirmation as an intermediate state.
"""
submitAndAwaitStatus(tx: HexString!, estimatePredicates: Boolean): TransactionStatus!
submitAndAwaitStatus(tx: HexString!, estimatePredicates: Boolean, includePreconfirmation: Boolean): TransactionStatus!
contractStorageSlots(contractId: ContractId!): StorageSlot!
contractStorageBalances(contractId: ContractId!): ContractBalance!
}
Expand Down Expand Up @@ -1401,7 +1405,7 @@ type Transaction {
outputContract: ContractOutput
witnesses: [HexString!]
receiptsRoot: Bytes32
status: TransactionStatus
status(includePreconfirmation: Boolean): TransactionStatus
script: HexString
scriptData: HexString
bytecodeWitnessIndex: U16
Expand Down
26 changes: 22 additions & 4 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ impl FuelClient {
&'a self,
tx: &'a Transaction,
) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
self.submit_and_await_status_opt(tx, None).await
self.submit_and_await_status_opt(tx, None, None).await
}

/// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states.
Expand All @@ -985,13 +985,16 @@ impl FuelClient {
&'a self,
tx: &'a Transaction,
estimate_predicates: Option<bool>,
include_preconfirmation: Option<bool>,
) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
use cynic::SubscriptionBuilder;
use schema::tx::SubmitAndAwaitStatusArg;
let tx = tx.clone().to_bytes();
let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
TxWithEstimatedPredicatesArg {
SubmitAndAwaitStatusArg {
tx: HexString(Bytes(tx)),
estimate_predicates,
include_preconfirmation,
},
);

Expand Down Expand Up @@ -1243,14 +1246,29 @@ impl FuelClient {

#[tracing::instrument(skip(self), level = "debug")]
#[cfg(feature = "subscriptions")]
/// Subscribe to the status of a transaction
/// Similar to [`Self::subscribe_transaction_status_opt`], but with default options.
pub async fn subscribe_transaction_status<'a>(
&'a self,
id: &'a TxId,
) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + 'a> {
self.subscribe_transaction_status_opt(id, None).await
}

#[cfg(feature = "subscriptions")]
/// Subscribe to the status of a transaction
pub async fn subscribe_transaction_status_opt<'a>(
&'a self,
id: &'a TxId,
include_preconfirmation: Option<bool>,
) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
use cynic::SubscriptionBuilder;
use schema::tx::StatusChangeSubscriptionArgs;
let tx_id: TransactionId = (*id).into();
let s = schema::tx::StatusChangeSubscription::build(TxIdArgs { id: tx_id });
let s =
schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs {
id: tx_id,
include_preconfirmation,
});

tracing::debug!("subscribing");
let stream = self.subscribe(s).await?.map(|tx| {
Expand Down
24 changes: 20 additions & 4 deletions crates/client/src/client/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,21 @@ pub struct TransactionsByOwnerQuery {
pub transactions_by_owner: TransactionConnection,
}

#[derive(cynic::QueryVariables, Debug)]
pub struct StatusChangeSubscriptionArgs {
pub id: TransactionId,
#[cynic(skip_serializing_if = "Option::is_none")]
pub include_preconfirmation: Option<bool>,
}

#[derive(cynic::QueryFragment, Clone, Debug)]
#[cynic(
schema_path = "./assets/schema.sdl",
graphql_type = "Subscription",
variables = "TxIdArgs"
variables = "StatusChangeSubscriptionArgs"
)]
pub struct StatusChangeSubscription {
#[arguments(id: $id)]
#[arguments(id: $id, includePreconfirmation: $include_preconfirmation)]
pub status_change: TransactionStatus,
}

Expand All @@ -497,6 +504,15 @@ pub struct TxWithEstimatedPredicatesArg {
pub estimate_predicates: Option<bool>,
}

#[derive(cynic::QueryVariables)]
pub struct SubmitAndAwaitStatusArg {
pub tx: HexString,
#[cynic(skip_serializing_if = "Option::is_none")]
pub estimate_predicates: Option<bool>,
#[cynic(skip_serializing_if = "Option::is_none")]
pub include_preconfirmation: Option<bool>,
}

#[derive(cynic::QueryFragment, Clone, Debug)]
#[cynic(
schema_path = "./assets/schema.sdl",
Expand Down Expand Up @@ -661,10 +677,10 @@ pub struct SubmitAndAwaitSubscriptionWithTransaction {
#[cynic(
schema_path = "./assets/schema.sdl",
graphql_type = "Subscription",
variables = "TxWithEstimatedPredicatesArg"
variables = "SubmitAndAwaitStatusArg"
)]
pub struct SubmitAndAwaitStatusSubscription {
#[arguments(tx: $tx, estimatePredicates: $estimate_predicates)]
#[arguments(tx: $tx, estimatePredicates: $estimate_predicates, includePreconfirmation: $include_preconfirmation)]
pub submit_and_await_status: TransactionStatus,
}

Expand Down
8 changes: 8 additions & 0 deletions crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,14 @@ impl CombinedDatabase {

Ok(())
}

pub fn shutdown(self) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated to the main PR. Does this have a specific purpose compared to just dropping the handles? I'm fine with doing this here, but maybe the changelog should mention it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @xgreenx that added this change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI was failing constantly with SIGSEG, so I fixed that by adding it=D

self.on_chain.shutdown();
self.off_chain.shutdown();
self.relayer.shutdown();
self.gas_price.shutdown();
self.compression.shutdown();
}
}

/// A trait for listening to shutdown signals.
Expand Down
6 changes: 6 additions & 0 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ where
self.iter_all_filtered::<T, _>(prefix, None, Some(direction))
.map_ok(|(key, value)| TableEntry { key, value })
}

pub fn shutdown(self) {
let (storage, _) = self.into_inner();

storage.data.shutdown()
}
}

impl<Description> GenesisDatabase<Description>
Expand Down
14 changes: 10 additions & 4 deletions crates/fuel-core/src/query/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub(crate) trait TxnStatusChangeState {
async fn get_tx_status(
&self,
id: Bytes32,
include_preconfirmation: bool,
) -> StorageResult<Option<TransactionStatus>>;
}

Expand All @@ -28,14 +29,15 @@ pub(crate) async fn transaction_status_change<'a, State>(
state: State,
stream: BoxStream<'a, TxStatusMessage>,
transaction_id: Bytes32,
include_preconfirmation: bool,
) -> impl Stream<Item = anyhow::Result<ApiTxStatus>> + 'a
where
State: TxnStatusChangeState + Send + Sync + 'a,
{
// Check the database first to see if the transaction already
// has a status.
let maybe_db_status = state
.get_tx_status(transaction_id)
.get_tx_status(transaction_id, include_preconfirmation)
.await
.transpose()
.map(TxStatusMessage::from);
Expand All @@ -50,7 +52,7 @@ where
.chain(stream)
// Keep taking the stream until the oneshot channel is closed.
.take_until(closed)
.map(move |status| {
.filter_map(move |status | {
if status.is_final() {
if let Some(close) = close.take() {
let _ = close.send(());
Expand All @@ -60,11 +62,15 @@ where
match status {
TxStatusMessage::Status(status) => {
let status = ApiTxStatus::new(transaction_id, status);
Ok(status)
if !include_preconfirmation && status.is_preconfirmation() {
futures::future::ready(None)
} else {
futures::future::ready(Some(Ok(status)))
}
},
// Map a failed status to an error for the api.
TxStatusMessage::FailedStatus => {
Err(anyhow::anyhow!("Failed to get transaction status"))
futures::future::ready(Some(Err(anyhow::anyhow!("Failed to get transaction status"))))
}
}
})
Expand Down
10 changes: 5 additions & 5 deletions crates/fuel-core/src/query/subscriptions/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,16 @@ fn test_tsc_inner(
let out = RT.with(|rt| {
rt.block_on(async {
let mut mock_state = super::MockTxnStatusChangeState::new();
mock_state
.expect_get_tx_status()
.returning(move |_| match state.clone() {
mock_state.expect_get_tx_status().returning(move |_, _| {
match state.clone() {
Ok(Some(t)) => Ok(Some(t)),
Ok(None) => Ok(None),
Err(_) => Err(StorageError::NotFound("", "")),
});
}
});

let stream = futures::stream::iter(stream).boxed();
super::transaction_status_change(mock_state, stream, txn_id(0))
super::transaction_status_change(mock_state, stream, txn_id(0), true)
.await
.collect::<Vec<_>>()
.await
Expand Down
65 changes: 44 additions & 21 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
AssembleTx,
},
types::{
get_tx_status,
AssembleTransactionResult,
TransactionStatus,
},
Expand Down Expand Up @@ -695,6 +696,8 @@ impl TxStatusSubscription {
&self,
ctx: &'a Context<'a>,
#[graphql(desc = "The ID of the transaction")] id: TransactionId,
#[graphql(desc = "If true, accept to receive the preconfirmation status")]
include_preconfirmation: Option<bool>,
) -> anyhow::Result<impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a>
{
let tx_status_manager = ctx.data_unchecked::<DynTxStatusManager>();
Expand All @@ -705,11 +708,14 @@ impl TxStatusSubscription {
tx_status_manager,
query,
};
Ok(
transaction_status_change(status_change_state, rx, id.into())
.await
.map_err(async_graphql::Error::from),
Ok(transaction_status_change(
status_change_state,
rx,
id.into(),
include_preconfirmation.unwrap_or(false),
)
.await
.map_err(async_graphql::Error::from))
}

/// Submits transaction to the `TxPool` and await either success or failure.
Expand All @@ -724,7 +730,7 @@ impl TxStatusSubscription {
> {
use tokio_stream::StreamExt;
let subscription =
submit_and_await_status(ctx, tx, estimate_predicates.unwrap_or(false))
submit_and_await_status(ctx, tx, estimate_predicates.unwrap_or(false), false)
.await?;

Ok(subscription
Expand All @@ -734,24 +740,32 @@ impl TxStatusSubscription {

/// Submits the transaction to the `TxPool` and returns a stream of events.
/// Compared to the `submitAndAwait`, the stream also contains
/// `SubmittedStatus` as an intermediate state.
/// `SubmittedStatus` and potentially preconfirmation as an intermediate state.
#[graphql(complexity = "query_costs().submit_and_await + child_complexity")]
async fn submit_and_await_status<'a>(
&self,
ctx: &'a Context<'a>,
tx: HexString,
estimate_predicates: Option<bool>,
include_preconfirmation: Option<bool>,
) -> async_graphql::Result<
impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a,
> {
submit_and_await_status(ctx, tx, estimate_predicates.unwrap_or(false)).await
submit_and_await_status(
ctx,
tx,
estimate_predicates.unwrap_or(false),
include_preconfirmation.unwrap_or(false),
)
.await
}
}

async fn submit_and_await_status<'a>(
ctx: &'a Context<'a>,
tx: HexString,
estimate_predicates: bool,
include_preconfirmation: bool,
) -> async_graphql::Result<
impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a,
> {
Expand All @@ -774,13 +788,21 @@ async fn submit_and_await_status<'a>(
txpool.insert(tx).await?;

Ok(subscription
.map(move |event| match event {
TxStatusMessage::Status(status) => {
let status = TransactionStatus::new(tx_id, status);
Ok(status)
}
TxStatusMessage::FailedStatus => {
Err(anyhow::anyhow!("Failed to get transaction status").into())
.filter_map(move |status| {
match status {
TxStatusMessage::Status(status) => {
let status = TransactionStatus::new(tx_id, status);
if !include_preconfirmation && status.is_preconfirmation() {
None
} else {
Some(Ok(status))
}
}
// Map a failed status to an error for the api.
TxStatusMessage::FailedStatus => Some(Err(anyhow::anyhow!(
"Failed to get transaction status"
)
.into())),
}
})
.take(3))
Expand All @@ -795,14 +817,15 @@ impl<'a> TxnStatusChangeState for StatusChangeState<'a> {
async fn get_tx_status(
&self,
id: Bytes32,
include_preconfirmation: bool,
) -> StorageResult<Option<transaction_status::TransactionStatus>> {
match self.query.tx_status(&id) {
Ok(status) => Ok(Some(status.into())),
Err(StorageError::NotFound(_, _)) => {
Ok(self.tx_status_manager.status(id).await?)
}
Err(err) => Err(err),
}
get_tx_status(
&id,
self.query.as_ref(),
self.tx_status_manager,
include_preconfirmation,
)
.await
}
}

Expand Down
Loading
Loading