Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/added/2855.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an expiration interval check for pending pool and refactor extracted_outputs to not rely on block creation/process sequence.
129 changes: 129 additions & 0 deletions crates/services/txpool_v2/src/extracted_outputs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//! Temporary module to extract outputs until we know that they have been settled in database or skipped.

use std::collections::HashMap;

use fuel_core_types::{
fuel_tx::{
input::coin::{
CoinPredicate,
CoinSigned,
},
Address,
AssetId,
ContractId,
Input,
Output,
TxId,
UtxoId,
},
services::txpool::ArcPoolTx,
};

pub struct ExtractedOutputs {
contract_created: HashMap<ContractId, TxId>,
contract_created_by_tx: HashMap<TxId, Vec<ContractId>>,
coins_created: HashMap<TxId, HashMap<u16, (Address, u64, AssetId)>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit:
For clarity, HashMap<TxId, HashMap<u16, (Address, u64, AssetId)>> maybe deserves a dedicated type to be introduced.

It'll also allow to make code like this more lean: a == address && am == amount && asid == asset_id

}

impl ExtractedOutputs {
pub fn new() -> Self {
Self {
contract_created: HashMap::new(),
contract_created_by_tx: HashMap::new(),
coins_created: HashMap::new(),
}
}
}

impl Default for ExtractedOutputs {
fn default() -> Self {
Self::new()
}
}

impl ExtractedOutputs {
pub fn new_extracted_transaction(&mut self, tx: &ArcPoolTx) {
let tx_id = tx.id();
for (idx, output) in tx.outputs().iter().enumerate() {
match output {
Output::ContractCreated { contract_id, .. } => {
self.contract_created.insert(*contract_id, tx_id);
self.contract_created_by_tx
.entry(tx_id)
.or_default()
.push(*contract_id);
}
Output::Coin {
to,
amount,
asset_id,
} => {
self.coins_created.entry(tx_id).or_default().insert(
u16::try_from(idx).expect("Outputs count is less than u16::MAX"),
(*to, *amount, *asset_id),
);
}
Output::Contract { .. }
| Output::Change { .. }
| Output::Variable { .. } => {
continue;
}
}
}
for input in tx.inputs() {
match input {
Input::CoinSigned(CoinSigned { utxo_id, .. })
| Input::CoinPredicate(CoinPredicate { utxo_id, .. }) => {
self.coins_created
.entry(*utxo_id.tx_id())
.and_modify(|coins| {
coins.remove(&utxo_id.output_index());
});
}
Input::Contract(_)
| Input::MessageCoinPredicate(_)
| Input::MessageCoinSigned(_)
| Input::MessageDataPredicate(_)
| Input::MessageDataSigned(_) => {
continue;
}
}
}
}

pub fn new_skipped_transaction(&mut self, tx_id: &TxId) {
self.new_executed_transaction(tx_id);
}
Comment on lines +94 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new_skipped_transaction() just calling new_executed_transaction is confusing when you don't know the context. Maybe some renaming or at least additional comment could help

At first glance this code is similar to something like this 😄

    fn insert(&self, val) { self.remove(val) }


pub fn new_executed_transaction(&mut self, tx_id: &TxId) {
let contract_ids = self.contract_created_by_tx.remove(tx_id);
if let Some(contract_ids) = contract_ids {
for contract_id in contract_ids {
self.contract_created.remove(&contract_id);
}
}
self.coins_created.remove(tx_id);
}

pub fn contract_exists(&self, contract_id: &ContractId) -> bool {
self.contract_created.contains_key(contract_id)
}

pub fn coin_exists(
&self,
utxo_id: &UtxoId,
address: &Address,
amount: &u64,
asset_id: &AssetId,
) -> bool {
self.coins_created
.get(utxo_id.tx_id())
.map_or(false, |coins| {
coins
.get(&utxo_id.output_index())
.map_or(false, |(a, am, asid)| {
a == address && am == amount && asid == asset_id
})
})
}
}
1 change: 1 addition & 0 deletions crates/services/txpool_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
mod collision_manager;
pub mod config;
pub mod error;
mod extracted_outputs;
mod pending_pool;
mod pool;
mod pool_worker;
Expand Down
78 changes: 12 additions & 66 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
mod collisions;

use std::{
collections::{
HashMap,
HashSet,
},
collections::HashMap,
iter,
time::{
Instant,
Expand All @@ -17,13 +14,7 @@ use fuel_core_metrics::txpool_metrics::txpool_metrics;
use fuel_core_types::{
fuel_tx::{
field::BlobId,
Address,
AssetId,
ContractId,
Output,
TxId,
UtxoId,
Word,
},
services::txpool::{
ArcPoolTx,
Expand All @@ -44,6 +35,7 @@ use crate::{
InputValidationError,
InsertionErrorType,
},
extracted_outputs::ExtractedOutputs,
ports::TxPoolPersistentStorage,
selection_algorithms::{
Constraints,
Expand All @@ -56,27 +48,16 @@ use crate::{
},
};

#[cfg(test)]
use std::collections::HashSet;

#[derive(Debug, Clone, Copy, Default)]
pub struct TxPoolStats {
pub tx_count: u64,
pub total_size: u64,
pub total_gas: u64,
}

#[derive(Debug, Clone, Copy, Hash, PartialEq, PartialOrd, Eq, Ord)]
pub(crate) struct SavedCoinOutput {
pub utxo_id: UtxoId,
pub to: Address,
pub amount: Word,
pub asset_id: AssetId,
}

#[derive(Debug, Clone, Copy, Hash, PartialEq, PartialOrd, Eq, Ord)]
pub(crate) enum SavedOutput {
Coin(SavedCoinOutput),
Contract(ContractId),
}

/// The pool is the main component of the txpool service. It is responsible for storing transactions
/// and allowing the selection of transactions for inclusion in a block.
pub struct Pool<S, SI, CM, SA> {
Expand All @@ -91,7 +72,7 @@ pub struct Pool<S, SI, CM, SA> {
/// Mapping from tx_id to storage_id.
pub(crate) tx_id_to_storage_id: HashMap<TxId, SI>,
/// All sent outputs when transactions are extracted. Clear when processing a block.
pub(crate) extracted_outputs: HashSet<SavedOutput>,
pub(crate) extracted_outputs: ExtractedOutputs,
/// Current pool gas stored.
pub(crate) current_gas: u64,
/// Current pool size in bytes.
Expand All @@ -118,7 +99,7 @@ impl<S, SI, CM, SA> Pool<S, SI, CM, SA> {
selection_algorithm,
config,
tx_id_to_storage_id: HashMap::new(),
extracted_outputs: HashSet::new(),
extracted_outputs: ExtractedOutputs::new(),
current_gas: 0,
current_bytes_size: 0,
pool_stats_sender,
Expand Down Expand Up @@ -329,42 +310,6 @@ where
}
}

fn populate_saved_outputs_cache(&mut self, best_txs: &[StorageData]) {
for tx in best_txs {
for (idx, output) in tx.transaction.outputs().iter().enumerate() {
match output {
Output::Coin {
to,
amount,
asset_id,
} => {
self.extracted_outputs.insert(SavedOutput::Coin(
SavedCoinOutput {
utxo_id: UtxoId::new(
tx.transaction.id(),
u16::try_from(idx)
.expect("Outputs count is less than u16::MAX"),
),
to: *to,
amount: *amount,
asset_id: *asset_id,
},
));
}
Output::ContractCreated { contract_id, .. } => {
self.extracted_outputs
.insert(SavedOutput::Contract(*contract_id));
}
Output::Contract { .. }
| Output::Change { .. }
| Output::Variable { .. } => {
continue;
}
}
}
}
}

/// Extract transactions for a block.
/// Returns a list of transactions that were selected for the block
/// based on the constraints given in the configuration and the selection algorithm used.
Expand All @@ -378,8 +323,6 @@ where
.selection_algorithm
.gather_best_txs(constraints, &mut self.storage);

self.populate_saved_outputs_cache(&best_txs);

if let Some(start) = maybe_start {
Self::record_select_transaction_time(start)
};
Expand All @@ -393,6 +336,8 @@ where
let txs = best_txs
.into_iter()
.map(|storage_entry| {
self.extracted_outputs
.new_extracted_transaction(&storage_entry.transaction);
self.update_components_and_caches_on_removal(iter::once(&storage_entry));
storage_entry.transaction
})
Expand All @@ -419,9 +364,9 @@ where
/// - Remove transaction but keep its dependents and the dependents become executables.
/// - Notify about possible new executable transactions.
pub fn process_block(&mut self, tx_ids: impl Iterator<Item = TxId>) {
self.extracted_outputs.clear();
let mut transactions_to_promote = vec![];
for tx_id in tx_ids {
self.extracted_outputs.new_executed_transaction(&tx_id);
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents: Vec<S::StorageIndex> =
self.storage.get_direct_dependents(storage_id).collect();
Expand Down Expand Up @@ -624,7 +569,8 @@ where
removed_transactions
}

pub fn remove_coin_dependents(&mut self, tx_id: TxId) -> Vec<ArcPoolTx> {
pub fn remove_skipped_transaction(&mut self, tx_id: TxId) -> Vec<ArcPoolTx> {
self.extracted_outputs.new_skipped_transaction(&tx_id);
let mut txs_removed = vec![];
let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id);
for dependent in coin_dependents {
Expand Down
Loading
Loading