Skip to content

Commit 9e8d71e

Browse files
AurelienFTxgreenx
andauthored
Add new pre confirmed intermediate state in TxUpdateSender (#2845)
## Linked Issues/PRs #2827 ## Description Add new intermediate state for pre confirmed state. Adapt the tests to work with the new state ### Side notes I think this code should be heavily refactored using more readable code using crates such as https://github.com/rust-bakery/machine but this is not related to this PR. (follow up issue #2846) ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --------- Co-authored-by: Green Baneling <[email protected]>
1 parent 9a7d6d9 commit 9e8d71e

File tree

6 files changed

+89
-83
lines changed

6 files changed

+89
-83
lines changed

.changes/added/2845.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
New status to manage the pre confirmation status send in `TxUpdateSender`.

crates/services/tx_status_manager/src/tests/tests_e2e.rs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,27 +57,44 @@ pub(crate) enum StateTransitions {
5757
Next,
5858
}
5959

60-
pub(crate) fn validate_tx_update_stream_state(
60+
pub(crate) fn apply_tx_state_transition(
6161
state: State,
6262
transition: StateTransitions,
6363
) -> State {
6464
use State::*;
6565
use StateTransitions::*;
6666
match (state, transition) {
67-
(Empty, AddMsg(TxStatusMessage::Status(s))) => {
68-
if s.is_submitted() {
69-
Initial(s)
70-
} else {
71-
// If not Submitted, it's an early success.
72-
EarlySuccess(s)
67+
(Empty, AddMsg(TxStatusMessage::Status(s))) => match s {
68+
TransactionStatus::Submitted(s) => Submitted(TransactionStatus::Submitted(s)),
69+
TransactionStatus::PreConfirmationSuccess(s) => {
70+
Preconfirmed(TransactionStatus::PreConfirmationSuccess(s))
7371
}
74-
}
72+
TransactionStatus::PreConfirmationFailure(s) => {
73+
Preconfirmed(TransactionStatus::PreConfirmationFailure(s))
74+
}
75+
s => EarlySuccess(s),
76+
},
7577
(Empty, AddMsg(TxStatusMessage::FailedStatus)) => Failed,
7678
(Empty, AddFailure) => Failed,
77-
(Empty | Initial(_), Next) => Empty,
78-
(Initial(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
79-
(Initial(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
80-
(Initial(s), AddFailure) => LateFailed(s),
79+
(Empty | Submitted(_) | Preconfirmed(_), Next) => Empty,
80+
(
81+
Submitted(_),
82+
AddMsg(TxStatusMessage::Status(TransactionStatus::Submitted(s))),
83+
) => Submitted(TransactionStatus::Submitted(s)),
84+
(
85+
Submitted(_),
86+
AddMsg(TxStatusMessage::Status(TransactionStatus::PreConfirmationSuccess(s))),
87+
) => Preconfirmed(TransactionStatus::PreConfirmationSuccess(s)),
88+
(
89+
Submitted(_),
90+
AddMsg(TxStatusMessage::Status(TransactionStatus::PreConfirmationFailure(s))),
91+
) => Preconfirmed(TransactionStatus::PreConfirmationFailure(s)),
92+
(Submitted(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
93+
(Submitted(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
94+
(Submitted(s), AddFailure) => LateFailed(s),
95+
(Preconfirmed(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
96+
(Preconfirmed(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
97+
(Preconfirmed(s), AddFailure) => LateFailed(s),
8198
(_, CloseRecv) => Closed,
8299
(EarlySuccess(_) | Failed | SenderClosed(_), Next) => Closed,
83100
(LateFailed(_), Next) => Failed,
@@ -98,22 +115,22 @@ pub(super) fn validate_send(
98115
msg: TxStatusMessage,
99116
) -> State {
100117
// Add the message to the stream.
101-
let state = validate_tx_update_stream_state(state, StateTransitions::AddMsg(msg));
118+
let state = apply_tx_state_transition(state, StateTransitions::AddMsg(msg));
102119

103120
// Try to get the next message from the stream.
104-
let state = validate_tx_update_stream_state(state, StateTransitions::Next);
121+
let state = apply_tx_state_transition(state, StateTransitions::Next);
105122

106123
// Try to send the message to the receiver.
107124
match tx {
108125
// If ok, then use this state.
109126
Ok(()) => state,
110127
// If the receiver is closed, then update the state.
111128
Err(SendError::Closed) => {
112-
validate_tx_update_stream_state(state, StateTransitions::CloseRecv)
129+
apply_tx_state_transition(state, StateTransitions::CloseRecv)
113130
}
114131
// If the receiver is full, then update the state.
115132
Err(SendError::Full) => {
116-
validate_tx_update_stream_state(state, StateTransitions::AddFailure)
133+
apply_tx_state_transition(state, StateTransitions::AddFailure)
117134
}
118135
}
119136
}

crates/services/tx_status_manager/src/tests/tests_sending.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use crate::{
3333
},
3434
};
3535

36-
use super::tests_update_stream_state::{
37-
validate_tx_update_stream_state,
36+
use super::tests_e2e::{
37+
apply_tx_state_transition,
3838
StateTransitions,
3939
};
4040

@@ -45,22 +45,22 @@ pub(super) fn validate_send(
4545
msg: TxStatusMessage,
4646
) -> State {
4747
// Add the message to the stream.
48-
let state = validate_tx_update_stream_state(state, StateTransitions::AddMsg(msg));
48+
let state = apply_tx_state_transition(state, StateTransitions::AddMsg(msg));
4949

5050
// Try to get the next message from the stream.
51-
let state = validate_tx_update_stream_state(state, StateTransitions::Next);
51+
let state = apply_tx_state_transition(state, StateTransitions::Next);
5252

5353
// Try to send the message to the receiver.
5454
match tx {
5555
// If ok, then use this state.
5656
Ok(()) => state,
5757
// If the receiver is closed, then update the state.
5858
Err(SendError::Closed) => {
59-
validate_tx_update_stream_state(state, StateTransitions::CloseRecv)
59+
apply_tx_state_transition(state, StateTransitions::CloseRecv)
6060
}
6161
// If the receiver is full, then update the state.
6262
Err(SendError::Full) => {
63-
validate_tx_update_stream_state(state, StateTransitions::AddFailure)
63+
apply_tx_state_transition(state, StateTransitions::AddFailure)
6464
}
6565
}
6666
}
@@ -92,7 +92,9 @@ fn test_send_reg() {
9292
TransactionStatus::Submitted(Default::default()),
9393
TransactionStatus::Submitted(Default::default()),
9494
)),
95-
SenderData::ok(Initial(TransactionStatus::Submitted(Default::default()))),
95+
SenderData::ok(Submitted(TransactionStatus::Submitted(
96+
Default::default(),
97+
))),
9698
],
9799
)]),
98100
);

crates/services/tx_status_manager/src/tests/tests_update_stream_state.rs

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,65 +2,20 @@
22
33
#![allow(clippy::arithmetic_side_effects)]
44

5-
use test_strategy::{
6-
proptest,
7-
Arbitrary,
8-
};
5+
use test_strategy::proptest;
96

107
use crate::{
11-
tests::utils,
8+
tests::{
9+
tests_e2e::apply_tx_state_transition,
10+
utils,
11+
},
1212
tx_status_stream::{
1313
State,
14-
TxStatusMessage,
1514
TxUpdateStream,
1615
},
1716
};
1817

19-
/// Represents the possible state transitions in TxUpdateStream.
20-
#[derive(Debug, PartialEq, Eq, Clone, Arbitrary)]
21-
pub(crate) enum StateTransitions {
22-
AddMsg(#[strategy(utils::tx_status_message_strategy())] TxStatusMessage),
23-
AddFailure,
24-
CloseRecv,
25-
Next,
26-
}
27-
28-
/// Returns the new state after applying the given `transition` to the current `state`.
29-
pub(crate) fn validate_tx_update_stream_state(
30-
state: State,
31-
transition: StateTransitions,
32-
) -> State {
33-
use State::*;
34-
use StateTransitions::*;
35-
36-
match (state, transition) {
37-
// If not Submitted, it's an early success.
38-
(Empty, AddMsg(TxStatusMessage::Status(s))) => {
39-
if s.is_submitted() {
40-
Initial(s)
41-
} else {
42-
EarlySuccess(s)
43-
}
44-
}
45-
(Empty, AddMsg(TxStatusMessage::FailedStatus)) => Failed,
46-
(Empty, AddFailure) => Failed,
47-
(Empty | Initial(_), Next) => Empty,
48-
(Initial(s1), AddMsg(TxStatusMessage::Status(s2))) => Success(s1, s2),
49-
(Initial(s1), AddMsg(TxStatusMessage::FailedStatus)) => LateFailed(s1),
50-
(Initial(s), AddFailure) => LateFailed(s),
51-
(_, CloseRecv) => Closed,
52-
(EarlySuccess(_) | Failed | SenderClosed(_), Next) => Closed,
53-
(LateFailed(_), Next) => Failed,
54-
(Success(_, s2), Next) => SenderClosed(s2),
55-
// Final states.
56-
(Closed, _) => Closed,
57-
(EarlySuccess(s), _) => EarlySuccess(s),
58-
(Success(s1, s2), _) => Success(s1, s2),
59-
(Failed, _) => Failed,
60-
(LateFailed(s), _) => LateFailed(s),
61-
(SenderClosed(s), _) => SenderClosed(s),
62-
}
63-
}
18+
use super::tests_e2e::StateTransitions;
6419

6520
/// Proptest for validating TxUpdateStream state transitions.
6621
#[proptest]
@@ -69,7 +24,7 @@ fn test_tx_update_stream_state(
6924
transition: StateTransitions,
7025
) {
7126
let mut stream = TxUpdateStream::with_state(state.clone());
72-
let new_state = validate_tx_update_stream_state(state, transition.clone());
27+
let new_state = apply_tx_state_transition(state, transition.clone());
7328
match transition {
7429
StateTransitions::AddMsg(s) => {
7530
stream.add_msg(s);

crates/services/tx_status_manager/src/tests/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub fn tx_status_message_strategy() -> impl Strategy<Value = TxStatusMessage> {
6969
pub(super) fn state_strategy() -> impl Strategy<Value = State> {
7070
prop_oneof![
7171
Just(State::Empty),
72-
transaction_status_strategy().prop_map(State::Initial),
72+
transaction_status_strategy().prop_map(State::Submitted),
7373
transaction_status_strategy().prop_map(State::EarlySuccess),
7474
(transaction_status_strategy(), transaction_status_strategy())
7575
.prop_map(|(s1, s2)| State::Success(s1, s2)),

crates/services/tx_status_manager/src/tx_status_stream.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,17 @@ impl From<TransactionStatus> for TxStatusMessage {
4848
#[derive(Debug, Clone, PartialEq, Eq)]
4949
pub(super) enum State {
5050
Empty,
51-
Initial(TransactionStatus),
51+
// The transaction has been submitted to the txpool
52+
Submitted(TransactionStatus),
53+
// The transaction has been preconfirmed by execution
54+
Preconfirmed(TransactionStatus),
55+
// We received a final status for the transaction without receiving anything before
5256
EarlySuccess(TransactionStatus),
57+
// We received a final status for the transaction after receiving an other status first
5358
Success(TransactionStatus, TransactionStatus),
59+
// We received a failed status
5460
Failed,
61+
// We received a failed status after receiving an other status first
5562
LateFailed(TransactionStatus),
5663
SenderClosed(TransactionStatus),
5764
Closed,
@@ -79,19 +86,42 @@ impl TxUpdateStream {
7986
&self.state
8087
}
8188

82-
// TODO: Update the code to handle new pre-confirmation statuses
83-
// https://github.com/FuelLabs/fuel-core/issues/2827
8489
pub fn add_msg(&mut self, msg: TxStatusMessage) {
8590
let state = std::mem::replace(&mut self.state, State::Empty);
8691
self.state = match state {
8792
State::Empty => match msg {
8893
TxStatusMessage::Status(TransactionStatus::Submitted(s)) => {
89-
State::Initial(TransactionStatus::Submitted(s))
94+
State::Submitted(TransactionStatus::Submitted(s))
9095
}
96+
97+
TxStatusMessage::Status(TransactionStatus::PreConfirmationSuccess(s)) => {
98+
State::Preconfirmed(TransactionStatus::PreConfirmationSuccess(s))
99+
}
100+
TxStatusMessage::Status(TransactionStatus::PreConfirmationFailure(s)) => {
101+
State::Preconfirmed(TransactionStatus::PreConfirmationFailure(s))
102+
}
103+
91104
TxStatusMessage::Status(s) => State::EarlySuccess(s),
105+
92106
TxStatusMessage::FailedStatus => State::Failed,
93107
},
94-
State::Initial(s1) => {
108+
State::Submitted(s1) => match msg {
109+
TxStatusMessage::Status(TransactionStatus::Submitted(s2)) => {
110+
State::Submitted(TransactionStatus::Submitted(s2))
111+
}
112+
113+
TxStatusMessage::Status(TransactionStatus::PreConfirmationSuccess(
114+
s2,
115+
)) => State::Preconfirmed(TransactionStatus::PreConfirmationSuccess(s2)),
116+
TxStatusMessage::Status(TransactionStatus::PreConfirmationFailure(
117+
s2,
118+
)) => State::Preconfirmed(TransactionStatus::PreConfirmationFailure(s2)),
119+
120+
TxStatusMessage::Status(s2) => State::Success(s1, s2),
121+
122+
TxStatusMessage::FailedStatus => State::LateFailed(s1),
123+
},
124+
State::Preconfirmed(s1) => {
95125
if let TxStatusMessage::Status(s2) = msg {
96126
State::Success(s1, s2)
97127
} else {
@@ -105,7 +135,7 @@ impl TxUpdateStream {
105135
pub fn add_failure(&mut self) {
106136
let state = std::mem::replace(&mut self.state, State::Empty);
107137
self.state = match state {
108-
State::Initial(s) => State::LateFailed(s),
138+
State::Submitted(s) | State::Preconfirmed(s) => State::LateFailed(s),
109139
State::Empty => State::Failed,
110140
s => s,
111141
};
@@ -118,7 +148,8 @@ impl TxUpdateStream {
118148
pub fn try_next(&mut self) -> Option<TxStatusMessage> {
119149
let state = std::mem::replace(&mut self.state, State::Empty);
120150
match state {
121-
State::Initial(s) => Some(TxStatusMessage::Status(s)),
151+
State::Submitted(s) => Some(TxStatusMessage::Status(s)),
152+
State::Preconfirmed(s) => Some(TxStatusMessage::Status(s)),
122153
State::Empty => None,
123154
State::EarlySuccess(s) | State::SenderClosed(s) => {
124155
self.state = State::Closed;

0 commit comments

Comments
 (0)