Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/added/2845.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New status to manage the pre confirmation status send in `TxUpdateSender`.
49 changes: 33 additions & 16 deletions crates/services/tx_status_manager/src/tests/tests_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,44 @@ pub(crate) enum StateTransitions {
Next,
}

pub(crate) fn validate_tx_update_stream_state(
pub(crate) fn apply_tx_state_transition(
state: State,
transition: StateTransitions,
) -> State {
use State::*;
use StateTransitions::*;
match (state, transition) {
(Empty, AddMsg(TxStatusMessage::Status(s))) => {
if s.is_submitted() {
Initial(s)
} else {
// If not Submitted, it's an early success.
EarlySuccess(s)
(Empty, AddMsg(TxStatusMessage::Status(s))) => match s {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty convoluted piece of code now :-/

I tried to create a flowchart based on the state transitions and it looks ✔️ for me. I'm attaching it here for future reviewers (I can't guarantee it's perfectly correct):

stateDiagram-v2
    [*] --> Empty

    Empty --> Submitted: AddMsg(Submitted)
    Empty --> Preconfirmed: AddMsg(PreConfAny)
    Empty --> EarlySuccess: AddMsg(Other)
    Empty --> Failed: AddMsg(Failed/Failure)
    Empty --> Empty: Next

    Submitted --> Submitted: AddMsg(Submitted)
    Submitted --> Preconfirmed: AddMsg(PreConfAny)
    Submitted --> Success: AddMsg(Other)
    Submitted --> LateFailed: AddMsg(Failed/Failure)
    Submitted --> Empty: Next

    Preconfirmed --> Success: AddMsg(Any)
    Preconfirmed --> LateFailed: AddMsg(Failed/Failure)
    Preconfirmed --> Empty: Next

    EarlySuccess --> Closed: Next
    EarlySuccess --> EarlySuccess: Any

    Success --> SenderClosed: Next
    Success --> Success: Any

    Failed --> Closed: Next
    Failed --> Failed: Any

    LateFailed --> Failed: Next
    LateFailed --> LateFailed: Any

    SenderClosed --> SenderClosed: Any

    Closed --> Closed: Any

    Empty --> Closed: CloseRecv
    Submitted --> Closed: CloseRecv
    Preconfirmed --> Closed: CloseRecv
    EarlySuccess --> Closed: CloseRecv
    Success --> Closed: CloseRecv
    Failed --> Closed: CloseRecv
    LateFailed --> Closed: CloseRecv
    SenderClosed --> Closed: CloseRecv
Loading

TransactionStatus::Submitted(s) => Submitted(TransactionStatus::Submitted(s)),
TransactionStatus::PreConfirmationSuccess(s) => {
Preconfirmed(TransactionStatus::PreConfirmationSuccess(s))
}
}
TransactionStatus::PreConfirmationFailure(s) => {
Preconfirmed(TransactionStatus::PreConfirmationFailure(s))
}
s => EarlySuccess(s),
},
(Empty, AddMsg(TxStatusMessage::FailedStatus)) => Failed,
(Empty, AddFailure) => Failed,
(Empty | Initial(_), Next) => Empty,
(Initial(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
(Initial(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
(Initial(s), AddFailure) => LateFailed(s),
(Empty | Submitted(_) | Preconfirmed(_), Next) => Empty,
(
Submitted(_),
AddMsg(TxStatusMessage::Status(TransactionStatus::Submitted(s))),
) => Submitted(TransactionStatus::Submitted(s)),
(
Submitted(_),
AddMsg(TxStatusMessage::Status(TransactionStatus::PreConfirmationSuccess(s))),
) => Preconfirmed(TransactionStatus::PreConfirmationSuccess(s)),
(
Submitted(_),
AddMsg(TxStatusMessage::Status(TransactionStatus::PreConfirmationFailure(s))),
) => Preconfirmed(TransactionStatus::PreConfirmationFailure(s)),
(Submitted(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
(Submitted(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
(Submitted(s), AddFailure) => LateFailed(s),
(Preconfirmed(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
(Preconfirmed(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
(Preconfirmed(s), AddFailure) => LateFailed(s),
(_, CloseRecv) => Closed,
(EarlySuccess(_) | Failed | SenderClosed(_), Next) => Closed,
(LateFailed(_), Next) => Failed,
Expand All @@ -98,22 +115,22 @@ pub(super) fn validate_send(
msg: TxStatusMessage,
) -> State {
// Add the message to the stream.
let state = validate_tx_update_stream_state(state, StateTransitions::AddMsg(msg));
let state = apply_tx_state_transition(state, StateTransitions::AddMsg(msg));

// Try to get the next message from the stream.
let state = validate_tx_update_stream_state(state, StateTransitions::Next);
let state = apply_tx_state_transition(state, StateTransitions::Next);

// Try to send the message to the receiver.
match tx {
// If ok, then use this state.
Ok(()) => state,
// If the receiver is closed, then update the state.
Err(SendError::Closed) => {
validate_tx_update_stream_state(state, StateTransitions::CloseRecv)
apply_tx_state_transition(state, StateTransitions::CloseRecv)
}
// If the receiver is full, then update the state.
Err(SendError::Full) => {
validate_tx_update_stream_state(state, StateTransitions::AddFailure)
apply_tx_state_transition(state, StateTransitions::AddFailure)
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions crates/services/tx_status_manager/src/tests/tests_sending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::{
},
};

use super::tests_update_stream_state::{
validate_tx_update_stream_state,
use super::tests_e2e::{
apply_tx_state_transition,
StateTransitions,
};

Expand All @@ -45,22 +45,22 @@ pub(super) fn validate_send(
msg: TxStatusMessage,
) -> State {
// Add the message to the stream.
let state = validate_tx_update_stream_state(state, StateTransitions::AddMsg(msg));
let state = apply_tx_state_transition(state, StateTransitions::AddMsg(msg));

// Try to get the next message from the stream.
let state = validate_tx_update_stream_state(state, StateTransitions::Next);
let state = apply_tx_state_transition(state, StateTransitions::Next);

// Try to send the message to the receiver.
match tx {
// If ok, then use this state.
Ok(()) => state,
// If the receiver is closed, then update the state.
Err(SendError::Closed) => {
validate_tx_update_stream_state(state, StateTransitions::CloseRecv)
apply_tx_state_transition(state, StateTransitions::CloseRecv)
}
// If the receiver is full, then update the state.
Err(SendError::Full) => {
validate_tx_update_stream_state(state, StateTransitions::AddFailure)
apply_tx_state_transition(state, StateTransitions::AddFailure)
}
}
}
Expand Down Expand Up @@ -92,7 +92,9 @@ fn test_send_reg() {
TransactionStatus::Submitted(Default::default()),
TransactionStatus::Submitted(Default::default()),
)),
SenderData::ok(Initial(TransactionStatus::Submitted(Default::default()))),
SenderData::ok(Submitted(TransactionStatus::Submitted(
Default::default(),
))),
],
)]),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,20 @@

#![allow(clippy::arithmetic_side_effects)]

use test_strategy::{
proptest,
Arbitrary,
};
use test_strategy::proptest;

use crate::{
tests::utils,
tests::{
tests_e2e::apply_tx_state_transition,
utils,
},
tx_status_stream::{
State,
TxStatusMessage,
TxUpdateStream,
},
};

/// Represents the possible state transitions in TxUpdateStream.
#[derive(Debug, PartialEq, Eq, Clone, Arbitrary)]
pub(crate) enum StateTransitions {
AddMsg(#[strategy(utils::tx_status_message_strategy())] TxStatusMessage),
AddFailure,
CloseRecv,
Next,
}

/// Returns the new state after applying the given `transition` to the current `state`.
pub(crate) fn validate_tx_update_stream_state(
state: State,
transition: StateTransitions,
) -> State {
use State::*;
use StateTransitions::*;

match (state, transition) {
// If not Submitted, it's an early success.
(Empty, AddMsg(TxStatusMessage::Status(s))) => {
if s.is_submitted() {
Initial(s)
} else {
EarlySuccess(s)
}
}
(Empty, AddMsg(TxStatusMessage::FailedStatus)) => Failed,
(Empty, AddFailure) => Failed,
(Empty | Initial(_), Next) => Empty,
(Initial(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
(Initial(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
(Initial(s), AddFailure) => LateFailed(s),
(_, CloseRecv) => Closed,
(EarlySuccess(_) | Failed | SenderClosed(_), Next) => Closed,
(LateFailed(_), Next) => Failed,
(Success(_, s2), Next) => SenderClosed(s2),
// Final states.
(Closed, _) => Closed,
(EarlySuccess(s), _) => EarlySuccess(s),
(Success(s1, s2), _) => Success(s1, s2),
(Failed, _) => Failed,
(LateFailed(s), _) => LateFailed(s),
(SenderClosed(s), _) => SenderClosed(s),
}
}
use super::tests_e2e::StateTransitions;

/// Proptest for validating TxUpdateStream state transitions.
#[proptest]
Expand All @@ -69,7 +24,7 @@ fn test_tx_update_stream_state(
transition: StateTransitions,
) {
let mut stream = TxUpdateStream::with_state(state.clone());
let new_state = validate_tx_update_stream_state(state, transition.clone());
let new_state = apply_tx_state_transition(state, transition.clone());
match transition {
StateTransitions::AddMsg(s) => {
stream.add_msg(s);
Expand Down
2 changes: 1 addition & 1 deletion crates/services/tx_status_manager/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub fn tx_status_message_strategy() -> impl Strategy<Value = TxStatusMessage> {
pub(super) fn state_strategy() -> impl Strategy<Value = State> {
prop_oneof![
Just(State::Empty),
transaction_status_strategy().prop_map(State::Initial),
transaction_status_strategy().prop_map(State::Submitted),
transaction_status_strategy().prop_map(State::EarlySuccess),
(transaction_status_strategy(), transaction_status_strategy())
.prop_map(|(s1, s2)| State::Success(s1, s2)),
Expand Down
45 changes: 38 additions & 7 deletions crates/services/tx_status_manager/src/tx_status_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ impl From<TransactionStatus> for TxStatusMessage {
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum State {
Empty,
Initial(TransactionStatus),
// The transaction has been submitted to the txpool
Submitted(TransactionStatus),
// The transaction has been preconfirmed by execution
Preconfirmed(TransactionStatus),
// We received a final status for the transaction without receiving anything before
EarlySuccess(TransactionStatus),
// We received a final status for the transaction after receiving an other status first
Success(TransactionStatus, TransactionStatus),
// We received a failed status
Failed,
// We received a failed status after receiving an other status first
LateFailed(TransactionStatus),
SenderClosed(TransactionStatus),
Closed,
Expand Down Expand Up @@ -79,19 +86,42 @@ impl TxUpdateStream {
&self.state
}

// TODO: Update the code to handle new pre-confirmation statuses
// https://github.com/FuelLabs/fuel-core/issues/2827
pub fn add_msg(&mut self, msg: TxStatusMessage) {
let state = std::mem::replace(&mut self.state, State::Empty);
self.state = match state {
State::Empty => match msg {
TxStatusMessage::Status(TransactionStatus::Submitted(s)) => {
State::Initial(TransactionStatus::Submitted(s))
State::Submitted(TransactionStatus::Submitted(s))
}

TxStatusMessage::Status(TransactionStatus::PreConfirmationSuccess(s)) => {
State::Preconfirmed(TransactionStatus::PreConfirmationSuccess(s))
}
TxStatusMessage::Status(TransactionStatus::PreConfirmationFailure(s)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like another state machine, so again, here's the diagram which looks ✔️

stateDiagram-v2
    [*] --> Empty
    
    Empty --> Submitted : Msg-Status(Submitted(s))
    Empty --> Preconfirmed : Msg-Status(PreConfirmationSuccess(s))
    Empty --> Preconfirmed : Msg-Status(PreConfirmationFailure(s))
    Empty --> Preconfirmed : Msg-Status(PreConfirmationSqueezedOut(s))
    Empty --> EarlySuccess : Msg-Status(other s)
    Empty --> Failed : Msg-FailedStatus
    
    Submitted --> Submitted : Msg-Status(Submitted(s2))
    Submitted --> Preconfirmed : Msg-Status(PreConfirmationSuccess(s2))
    Submitted --> Preconfirmed : Msg-Status(PreConfirmationFailure(s2))
    Submitted --> Preconfirmed : Msg-Status(PreConfirmationSqueezedOut(s2))
    Submitted --> Success : Msg-Status(other s2)
    Submitted --> LateFailed : Msg-FailedStatus
    
    Preconfirmed --> Success : Msg-Status(s2)
    Preconfirmed --> LateFailed : Msg-FailedStatus
    
    EarlySuccess --> EarlySuccess : any message
    Success --> Success : any message
    Failed --> Failed : any message
    LateFailed --> LateFailed : any message
Loading

State::Preconfirmed(TransactionStatus::PreConfirmationFailure(s))
}

TxStatusMessage::Status(s) => State::EarlySuccess(s),

TxStatusMessage::FailedStatus => State::Failed,
},
State::Initial(s1) => {
State::Submitted(s1) => match msg {
TxStatusMessage::Status(TransactionStatus::Submitted(s2)) => {
State::Submitted(TransactionStatus::Submitted(s2))
}

TxStatusMessage::Status(TransactionStatus::PreConfirmationSuccess(
s2,
)) => State::Preconfirmed(TransactionStatus::PreConfirmationSuccess(s2)),
TxStatusMessage::Status(TransactionStatus::PreConfirmationFailure(
s2,
)) => State::Preconfirmed(TransactionStatus::PreConfirmationFailure(s2)),

TxStatusMessage::Status(s2) => State::Success(s1, s2),

TxStatusMessage::FailedStatus => State::LateFailed(s1),
},
State::Preconfirmed(s1) => {
if let TxStatusMessage::Status(s2) = msg {
State::Success(s1, s2)
} else {
Expand All @@ -105,7 +135,7 @@ impl TxUpdateStream {
pub fn add_failure(&mut self) {
let state = std::mem::replace(&mut self.state, State::Empty);
self.state = match state {
State::Initial(s) => State::LateFailed(s),
State::Submitted(s) | State::Preconfirmed(s) => State::LateFailed(s),
State::Empty => State::Failed,
s => s,
};
Expand All @@ -118,7 +148,8 @@ impl TxUpdateStream {
pub fn try_next(&mut self) -> Option<TxStatusMessage> {
let state = std::mem::replace(&mut self.state, State::Empty);
match state {
State::Initial(s) => Some(TxStatusMessage::Status(s)),
State::Submitted(s) => Some(TxStatusMessage::Status(s)),
State::Preconfirmed(s) => Some(TxStatusMessage::Status(s)),
State::Empty => None,
State::EarlySuccess(s) | State::SenderClosed(s) => {
self.state = State::Closed;
Expand Down
Loading