Skip to content

Commit c128c75

Browse files
AurelienFTxgreenx
andauthored
Txpool v2 insertion (#2193)
REVIEWER NOTE : Review #2162 first for context ## Linked Issues/PRs Closes #2186 ## Description This PR allows you to insert `Transaction` in the `TxPool v2` service and so perform all the necessary verifications and the insertion. ### Changes in code logic from `TxPool v1` - The verifications are performed in a new order specified in #2186. The goal is to avoid making the computation heavy work if the simple checks aren't valid. In this new version we also ensure that verifications are done in order by having wrapper type around each step to allow only one verification path. - The insertion is performed in a separate thread pool, the goal is to not block the pool on any verifications/insertions and to manage the ressources we allocate to these works - The insertion rules and conditions has change to the following : - A transaction with dependencies can collide only with one other transaction - A transaction without dependencies can collide with multiple transaction - Rules to free up space for new transaction - If a transaction is colliding with another verify if deleting the colliding transaction and dependents subtree is enough otherwise refuses the tx - If a transaction is dependent and not enough space, don't accept transaction - If a transaction is executable, try to free has much space used by less profitable transactions as possible in the pool to include it - New limits on the size of the pool : `max_pool_bytes_size and max_pool_gas` ### Changes from the base branch `txpool-v2` All the tests has been refactored to be way more clearer to understand and easier to read. They also now all follow the naming and the GWT convention. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --------- Co-authored-by: Green Baneling <[email protected]>
1 parent 0bbdaf5 commit c128c75

File tree

23 files changed

+2301
-1660
lines changed

23 files changed

+2301
-1660
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1111
- [2182](https://github.com/FuelLabs/fuel-core/pull/2151): Limit number of transactions that can be fetched via TxSource::next
1212
- [2189](https://github.com/FuelLabs/fuel-core/pull/2151): Select next DA height to never include more than u16::MAX -1 transactions from L1.
1313
- [2162](https://github.com/FuelLabs/fuel-core/pull/2162): Pool structure with dependencies, etc.. for the next transaction pool module.
14+
- [2193](https://github.com/FuelLabs/fuel-core/pull/2193): Insertion in PoolV2 and tests refactoring
15+
1416

1517
### Changed
1618

Cargo.lock

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/services/txpool_v2/Cargo.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,18 @@ derive_more = { workspace = true }
1818
fuel-core-services = { workspace = true }
1919
fuel-core-storage = { workspace = true, features = ["std"] }
2020
fuel-core-types = { workspace = true, features = ["test-helpers"] }
21+
futures = { workspace = true }
22+
mockall = { workspace = true, optional = true }
2123
num-rational = { workspace = true }
24+
parking_lot = { workspace = true }
2225
petgraph = "0.6.5"
26+
rayon = { workspace = true }
2327
tokio = { workspace = true, default-features = false, features = ["sync"] }
24-
tokio-rayon = { workspace = true }
2528
tracing = { workspace = true }
29+
30+
[features]
31+
test-helpers = [
32+
"dep:mockall",
33+
"fuel-core-types/test-helpers",
34+
"fuel-core-storage/test-helpers",
35+
]

crates/services/txpool_v2/src/collision_manager/basic.rs

Lines changed: 76 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,14 @@ use fuel_core_types::{
3131
use num_rational::Ratio;
3232

3333
use crate::{
34-
error::Error,
34+
error::{
35+
CollisionReason,
36+
Error,
37+
},
3538
storage::StorageData,
3639
};
3740

38-
use super::{
39-
CollisionManager,
40-
CollisionReason,
41-
Collisions,
42-
};
41+
use super::CollisionManager;
4342

4443
pub trait BasicCollisionManagerStorage {
4544
type StorageIndex: Copy + Debug + Hash + PartialEq + Eq;
@@ -76,104 +75,113 @@ impl<S: BasicCollisionManagerStorage> Default for BasicCollisionManager<S> {
7675
}
7776

7877
impl<S: BasicCollisionManagerStorage> BasicCollisionManager<S> {
79-
fn gather_colliding_txs(
78+
fn is_better_than_collision(
8079
&self,
8180
tx: &PoolTransaction,
82-
) -> Result<Collisions<S::StorageIndex>, Error> {
83-
let mut collisions = Collisions::new();
84-
if let PoolTransaction::Blob(checked_tx, _) = tx {
81+
collision: S::StorageIndex,
82+
storage: &S,
83+
) -> bool {
84+
let new_tx_ratio = Ratio::new(tx.tip(), tx.max_gas());
85+
let colliding_tx = storage
86+
.get(&collision)
87+
.expect("Transaction always should exist in storage");
88+
let colliding_tx_ratio = Ratio::new(
89+
colliding_tx.dependents_cumulative_tip,
90+
colliding_tx.dependents_cumulative_gas,
91+
);
92+
new_tx_ratio > colliding_tx_ratio
93+
}
94+
}
95+
96+
impl<S: BasicCollisionManagerStorage> CollisionManager for BasicCollisionManager<S> {
97+
type Storage = S;
98+
type StorageIndex = S::StorageIndex;
99+
100+
fn collect_colliding_transactions(
101+
&self,
102+
transaction: &PoolTransaction,
103+
) -> Result<HashMap<Self::StorageIndex, Vec<CollisionReason>>, Error> {
104+
let mut collisions = HashMap::new();
105+
if let PoolTransaction::Blob(checked_tx, _) = transaction {
85106
let blob_id = checked_tx.transaction().blob_id();
86107
if let Some(state) = self.blobs_users.get(blob_id) {
87-
collisions.reasons.insert(CollisionReason::Blob(*blob_id));
88-
collisions.colliding_txs.insert(*state);
108+
collisions.insert(*state, vec![CollisionReason::Blob(*blob_id)]);
89109
}
90110
}
91-
for input in tx.inputs() {
111+
for input in transaction.inputs() {
92112
match input {
93113
Input::CoinSigned(CoinSigned { utxo_id, .. })
94114
| Input::CoinPredicate(CoinPredicate { utxo_id, .. }) => {
95115
// Check if the utxo is already spent by another transaction in the pool
96-
if let Some(tx_id) = self.coins_spenders.get(utxo_id) {
97-
collisions.reasons.insert(CollisionReason::Coin(*utxo_id));
98-
collisions.colliding_txs.insert(*tx_id);
116+
if let Some(storage_id) = self.coins_spenders.get(utxo_id) {
117+
let entry = collisions.entry(*storage_id).or_default();
118+
entry.push(CollisionReason::Utxo(*utxo_id));
99119
}
100120
}
101121
Input::MessageCoinSigned(MessageCoinSigned { nonce, .. })
102122
| Input::MessageCoinPredicate(MessageCoinPredicate { nonce, .. })
103123
| Input::MessageDataSigned(MessageDataSigned { nonce, .. })
104124
| Input::MessageDataPredicate(MessageDataPredicate { nonce, .. }) => {
105125
// Check if the message is already spent by another transaction in the pool
106-
if let Some(tx_id) = self.messages_spenders.get(nonce) {
107-
collisions.reasons.insert(CollisionReason::Message(*nonce));
108-
collisions.colliding_txs.insert(*tx_id);
126+
if let Some(storage_id) = self.messages_spenders.get(nonce) {
127+
let entry = collisions.entry(*storage_id).or_default();
128+
entry.push(CollisionReason::Message(*nonce));
109129
}
110130
}
111131
// No collision for contract inputs
112132
_ => {}
113133
}
114134
}
115135

116-
for output in tx.outputs() {
136+
for output in transaction.outputs() {
117137
if let Output::ContractCreated { contract_id, .. } = output {
118138
// Check if the contract is already created by another transaction in the pool
119-
if let Some(tx_id) = self.contracts_creators.get(contract_id) {
120-
collisions
121-
.reasons
122-
.insert(CollisionReason::ContractCreation(*contract_id));
123-
collisions.colliding_txs.insert(*tx_id);
139+
if let Some(storage_id) = self.contracts_creators.get(contract_id) {
140+
let entry = collisions.entry(*storage_id).or_default();
141+
entry.push(CollisionReason::ContractCreation(*contract_id));
124142
}
125143
}
126144
}
127145
Ok(collisions)
128146
}
129147

130-
fn is_better_than_collisions(
131-
&self,
132-
tx: &PoolTransaction,
133-
collisions: &Collisions<S::StorageIndex>,
134-
storage: &S,
135-
) -> bool {
136-
let new_tx_ratio = Ratio::new(tx.tip(), tx.max_gas());
137-
let (total_tip, total_gas) = collisions.colliding_txs.iter().fold(
138-
(0u64, 0u64),
139-
|(total_tip, total_gas), node_id| {
140-
let dependent_tx = storage
141-
.get(node_id)
142-
.expect("Transaction always should exist in storage");
143-
let total_tip =
144-
total_tip.saturating_add(dependent_tx.dependents_cumulative_tip);
145-
let total_gas =
146-
total_gas.saturating_add(dependent_tx.dependents_cumulative_gas);
147-
(total_tip, total_gas)
148-
},
149-
);
150-
151-
let collision_tx_ratio = Ratio::new(total_tip, total_gas);
152-
153-
new_tx_ratio > collision_tx_ratio
154-
}
155-
}
156-
157-
impl<S: BasicCollisionManagerStorage> CollisionManager for BasicCollisionManager<S> {
158-
type Storage = S;
159-
type StorageIndex = S::StorageIndex;
160-
161-
fn collect_colliding_transactions(
148+
/// Rules:
149+
// A transaction with dependencies can collide only with one other transaction if it is less worth it
150+
// A transaction without dependencies can collide with multiple transaction if they are less worth it
151+
fn can_store_transaction(
162152
&self,
163153
transaction: &PoolTransaction,
164-
storage: &S,
165-
) -> Result<Collisions<S::StorageIndex>, Error> {
166-
let collisions = self.gather_colliding_txs(transaction)?;
167-
if collisions.colliding_txs.is_empty() {
168-
Ok(Collisions::new())
169-
} else if self.is_better_than_collisions(transaction, &collisions, storage) {
170-
Ok(collisions)
154+
has_dependencies: bool,
155+
colliding_transactions: &HashMap<Self::StorageIndex, Vec<CollisionReason>>,
156+
storage: &Self::Storage,
157+
) -> Result<(), CollisionReason> {
158+
if colliding_transactions.is_empty() {
159+
return Ok(());
160+
}
161+
if has_dependencies {
162+
if colliding_transactions.len() > 1 {
163+
return Err(CollisionReason::MultipleCollisions);
164+
}
165+
let (collision, reason) = colliding_transactions.iter().next().unwrap();
166+
if !self.is_better_than_collision(transaction, *collision, storage) {
167+
if let Some(reason) = reason.first() {
168+
return Err(reason.clone());
169+
} else {
170+
return Err(CollisionReason::Unknown);
171+
}
172+
}
171173
} else {
172-
Err(Error::Collided(format!(
173-
"Transaction collides with other transactions: {:?}",
174-
collisions.colliding_txs
175-
)))
174+
for (collision, reason) in colliding_transactions.iter() {
175+
if !self.is_better_than_collision(transaction, *collision, storage) {
176+
if let Some(reason) = reason.first() {
177+
return Err(reason.clone());
178+
} else {
179+
return Err(CollisionReason::Unknown);
180+
}
181+
}
182+
}
176183
}
184+
Ok(())
177185
}
178186

179187
fn on_stored_transaction(

crates/services/txpool_v2/src/collision_manager/mod.rs

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use std::{
2-
collections::HashSet,
2+
collections::{
3+
HashMap,
4+
HashSet,
5+
},
36
fmt::Debug,
47
};
58

@@ -13,61 +16,35 @@ use fuel_core_types::{
1316
services::txpool::PoolTransaction,
1417
};
1518

16-
use crate::error::Error;
19+
use crate::error::{
20+
CollisionReason,
21+
Error,
22+
};
1723

1824
pub mod basic;
1925

20-
/// The reason why a transaction collides with another.
21-
/// It also contains additional information about the collision.
22-
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
23-
pub enum CollisionReason {
24-
Coin(UtxoId),
25-
Blob(BlobId),
26-
Message(Nonce),
27-
ContractCreation(ContractId),
28-
}
29-
30-
/// Contains all the information about the collisions of a transaction.
31-
#[derive(Default, Debug)]
32-
pub struct Collisions<Idx> {
33-
reasons: HashSet<CollisionReason>,
34-
colliding_txs: HashSet<Idx>,
35-
}
36-
37-
impl<Idx> Collisions<Idx> {
38-
/// Create a new empty collision information.
39-
pub fn new() -> Self {
40-
Self {
41-
reasons: HashSet::default(),
42-
colliding_txs: HashSet::default(),
43-
}
44-
}
45-
46-
/// Get the reasons of the collisions.
47-
pub fn reasons(&self) -> &HashSet<CollisionReason> {
48-
&self.reasons
49-
}
50-
51-
/// Get the transactions that collide with the transaction.
52-
pub fn colliding_txs(&self) -> &HashSet<Idx> {
53-
&self.colliding_txs
54-
}
55-
}
56-
5726
pub trait CollisionManager {
5827
/// Storage type of the collision manager.
5928
type Storage;
6029
/// Index that identifies a transaction in the storage.
6130
type StorageIndex;
6231

63-
/// Collect all the transactions that collide with the given transaction.
64-
/// It returns an error if the transaction is less worthy than the colliding transactions.
65-
/// It returns the information about the collisions.
32+
/// Collect the transaction that collide with the given transaction.
33+
/// Returns a list of storage indexes of the colliding transactions and the reason of the collision.
6634
fn collect_colliding_transactions(
6735
&self,
6836
transaction: &PoolTransaction,
37+
) -> Result<HashMap<Self::StorageIndex, Vec<CollisionReason>>, Error>;
38+
39+
/// Determine if the collisions allow the transaction to be stored.
40+
/// Returns the reason of the collision if the transaction cannot be stored.
41+
fn can_store_transaction(
42+
&self,
43+
transaction: &PoolTransaction,
44+
has_dependencies: bool,
45+
colliding_transactions: &HashMap<Self::StorageIndex, Vec<CollisionReason>>,
6946
storage: &Self::Storage,
70-
) -> Result<Collisions<Self::StorageIndex>, Error>;
47+
) -> Result<(), CollisionReason>;
7148

7249
/// Inform the collision manager that a transaction was stored.
7350
fn on_stored_transaction(

crates/services/txpool_v2/src/config.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ impl BlackList {
115115
}
116116
}
117117

118+
#[derive(Clone)]
118119
pub struct Config {
119120
/// Enable UTXO validation (will check if UTXO exists in the database and has correct data).
120121
pub utxo_validation: bool,
@@ -124,14 +125,34 @@ pub struct Config {
124125
pub max_block_gas: u64,
125126
/// Maximum transactions per dependencies chain.
126127
pub max_txs_chain_count: usize,
127-
/// Maximum transactions in the pool.
128-
pub max_txs: u64,
128+
/// Pool limits
129+
pub pool_limits: PoolLimits,
129130
/// Maximum transaction time to live.
130131
pub max_txs_ttl: Duration,
132+
/// Heavy async processing configuration.
133+
pub heavy_work: HeavyWorkConfig,
131134
/// Blacklist. Transactions with blacklisted inputs will not be accepted.
132135
pub black_list: BlackList,
133136
}
134137

138+
#[derive(Clone)]
139+
pub struct PoolLimits {
140+
/// Maximum number of transactions in the pool.
141+
pub max_txs: usize,
142+
/// Maximum number of gas in the pool.
143+
pub max_gas: u64,
144+
/// Maximum number of bytes in the pool.
145+
pub max_bytes_size: usize,
146+
}
147+
148+
#[derive(Clone)]
149+
pub struct HeavyWorkConfig {
150+
/// Maximum of threads for managing verifications/insertions.
151+
pub number_threads_to_verify_transactions: usize,
152+
/// Maximum of tasks in the heavy async processing queue.
153+
pub size_of_verification_queue: usize,
154+
}
155+
135156
#[cfg(test)]
136157
impl Default for Config {
137158
fn default() -> Self {
@@ -140,9 +161,17 @@ impl Default for Config {
140161
max_block_gas: 100000000,
141162
max_block_size: 1000000000,
142163
max_txs_chain_count: 1000,
143-
max_txs: 10000,
164+
pool_limits: PoolLimits {
165+
max_txs: 10000,
166+
max_gas: 100_000_000_000,
167+
max_bytes_size: 10_000_000_000,
168+
},
144169
max_txs_ttl: Duration::from_secs(60 * 10),
145170
black_list: BlackList::default(),
171+
heavy_work: HeavyWorkConfig {
172+
number_threads_to_verify_transactions: 4,
173+
size_of_verification_queue: 100,
174+
},
146175
}
147176
}
148177
}

0 commit comments

Comments
 (0)