Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/added/2802.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a new cache with outputs extracted from the pool for the duration of the block.
1 change: 1 addition & 0 deletions .changes/changed/2802.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change new txs notifier to be notified only on executable transactions
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ConsensusModulePort for PoAAdapter {

impl TransactionPool for TxPoolAdapter {
fn new_txs_watcher(&self) -> watch::Receiver<()> {
self.service.get_new_txs_notifier()
self.service.get_new_executable_txs_notifier()
}

fn notify_skipped_txs(&self, tx_ids_and_reasons: Vec<(Bytes32, String)>) {
Expand Down
81 changes: 73 additions & 8 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod collisions;

use std::{
collections::HashMap,
collections::{
HashMap,
HashSet,
},
iter,
time::{
Instant,
Expand All @@ -14,7 +17,13 @@ use fuel_core_metrics::txpool_metrics::txpool_metrics;
use fuel_core_types::{
fuel_tx::{
field::BlobId,
Address,
AssetId,
ContractId,
Output,
TxId,
UtxoId,
Word,
},
services::txpool::{
ArcPoolTx,
Expand Down Expand Up @@ -47,16 +56,27 @@ use crate::{
},
};

#[cfg(test)]
use std::collections::HashSet;

#[derive(Debug, Clone, Copy, Default)]
pub struct TxPoolStats {
pub tx_count: u64,
pub total_size: u64,
pub total_gas: u64,
}

#[derive(Debug, Clone, Copy, Hash, PartialEq, PartialOrd, Eq, Ord)]
pub(crate) struct SavedCoinOutput {
pub utxo_id: UtxoId,
pub to: Address,
pub amount: Word,
pub asset_id: AssetId,
}

#[derive(Debug, Clone, Copy, Hash, PartialEq, PartialOrd, Eq, Ord)]
pub(crate) enum SavedOutput {
Coin(SavedCoinOutput),
Contract(ContractId),
}

/// The pool is the main component of the txpool service. It is responsible for storing transactions
/// and allowing the selection of transactions for inclusion in a block.
pub struct Pool<S, SI, CM, SA> {
Expand All @@ -70,6 +90,8 @@ pub struct Pool<S, SI, CM, SA> {
pub(crate) selection_algorithm: SA,
/// Mapping from tx_id to storage_id.
pub(crate) tx_id_to_storage_id: HashMap<TxId, SI>,
/// All sent outputs when transactions are extracted. Clear when processing a block.
pub(crate) extracted_outputs: HashSet<SavedOutput>,
/// Current pool gas stored.
pub(crate) current_gas: u64,
/// Current pool size in bytes.
Expand All @@ -93,6 +115,7 @@ impl<S, SI, CM, SA> Pool<S, SI, CM, SA> {
selection_algorithm,
config,
tx_id_to_storage_id: HashMap::new(),
extracted_outputs: HashSet::new(),
current_gas: 0,
current_bytes_size: 0,
pool_stats_sender,
Expand All @@ -119,7 +142,7 @@ where
&mut self,
tx: ArcPoolTx,
persistent_storage: &impl TxPoolPersistentStorage,
) -> Result<Vec<ArcPoolTx>, InsertionErrorType> {
) -> Result<(Vec<ArcPoolTx>, bool), InsertionErrorType> {
let insertion_result = self.insert_inner(tx, persistent_storage);
self.register_transaction_counts();
insertion_result
Expand All @@ -129,7 +152,7 @@ where
&mut self,
tx: std::sync::Arc<PoolTransaction>,
persistent_storage: &impl TxPoolPersistentStorage,
) -> Result<Vec<std::sync::Arc<PoolTransaction>>, InsertionErrorType> {
) -> Result<(Vec<std::sync::Arc<PoolTransaction>>, bool), InsertionErrorType> {
let CanStoreTransaction {
checked_transaction,
transactions_to_remove,
Expand Down Expand Up @@ -189,7 +212,7 @@ where
.map(|data| data.transaction)
.collect::<Vec<_>>();
self.update_stats();
Ok(removed_transactions)
Ok((removed_transactions, !has_dependencies))
}

fn update_stats(&self) {
Expand Down Expand Up @@ -228,6 +251,7 @@ where
self.storage.validate_inputs(
&tx,
persistent_storage,
&self.extracted_outputs,
self.config.utxo_validation,
)?;

Expand Down Expand Up @@ -300,6 +324,43 @@ where
}
}

fn populate_saved_outputs_cache(&mut self, best_txs: &[StorageData]) {
self.extracted_outputs.clear();
for tx in best_txs {
for (idx, output) in tx.transaction.outputs().iter().enumerate() {
match output {
Output::Coin {
to,
amount,
asset_id,
} => {
self.extracted_outputs.insert(SavedOutput::Coin(
SavedCoinOutput {
utxo_id: UtxoId::new(
tx.transaction.id(),
u16::try_from(idx)
.expect("Outputs count is less than u16::MAX"),
),
to: *to,
amount: *amount,
asset_id: *asset_id,
},
));
}
Output::ContractCreated { contract_id, .. } => {
self.extracted_outputs
.insert(SavedOutput::Contract(*contract_id));
}
Output::Contract { .. }
| Output::Change { .. }
| Output::Variable { .. } => {
continue;
}
}
}
}
}

// TODO: Use block space also (https://github.com/FuelLabs/fuel-core/issues/2133)
/// Extract transactions for a block.
/// Returns a list of transactions that were selected for the block
Expand All @@ -313,6 +374,9 @@ where
let best_txs = self
.selection_algorithm
.gather_best_txs(constraints, &mut self.storage);

self.populate_saved_outputs_cache(&best_txs);

if let Some(start) = maybe_start {
Self::record_select_transaction_time(start)
};
Expand Down Expand Up @@ -350,7 +414,8 @@ where

/// Remove transaction but keep its dependents.
/// The dependents become executables.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment could be updated with the new stuff as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

pub fn remove_transactions(&mut self, tx_ids: impl Iterator<Item = TxId>) {
pub fn process_block(&mut self, tx_ids: impl Iterator<Item = TxId>) {
self.extracted_outputs.clear();
for tx_id in tx_ids {
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents: Vec<S::StorageIndex> =
Expand Down
9 changes: 5 additions & 4 deletions crates/services/txpool_v2/src/pool_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ pub(super) enum PoolNotification {
time: SystemTime,
expiration: BlockHeight,
source: ExtendedInsertionSource,
executable: bool,
},
ErrorInsertion {
tx_id: TxId,
Expand Down Expand Up @@ -357,7 +358,7 @@ where
let res = self.pool.insert(tx.clone(), &view);

match res {
Ok(removed_txs) => {
Ok((removed_txs, executable)) => {
let extended_source = match source {
InsertionSource::P2P { from_peer_info } => {
ExtendedInsertionSource::P2P { from_peer_info }
Expand All @@ -384,6 +385,7 @@ where
expiration,
time: SystemTime::now(),
source: extended_source,
executable,
})
{
tracing::error!("Failed to send inserted notification: {}", e);
Expand Down Expand Up @@ -470,9 +472,8 @@ where
}

fn process_block(&mut self, block_result: SharedImportResult) {
self.pool.remove_transactions(
block_result.tx_status.iter().map(|tx_status| tx_status.id),
);
self.pool
.process_block(block_result.tx_status.iter().map(|tx_status| tx_status.id));
let resolved_txs = self.pending_pool.new_known_txs(
block_result
.sealed_block
Expand Down
11 changes: 8 additions & 3 deletions crates/services/txpool_v2/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ where
time,
expiration,
source,
executable,
} => {
let duration = time
.duration_since(SystemTime::UNIX_EPOCH)
Expand Down Expand Up @@ -411,7 +412,11 @@ where
.or_default();
block_height_expiration.push(tx_id);
}
self.shared_state.new_txs_notifier.send_replace(());
if executable {
self.shared_state
.new_executable_txs_notifier
.send_replace(());
}
}
PoolNotification::ErrorInsertion {
tx_id,
Expand Down Expand Up @@ -752,7 +757,7 @@ where
// But we still want to drop subscribers after `2 * TxPool_TTL`.
config.max_txs_ttl.saturating_mul(2),
);
let (new_txs_notifier, _) = watch::channel(());
let (new_executable_txs_notifier, _) = watch::channel(());

let subscriptions = Subscriptions {
new_tx_source: new_peers_subscribed_stream,
Expand Down Expand Up @@ -821,7 +826,7 @@ where
select_transactions_requests_sender: pool_worker
.extract_block_transactions_sender
.clone(),
new_txs_notifier,
new_executable_txs_notifier,
latest_stats: pool_stats_receiver,
};

Expand Down
8 changes: 4 additions & 4 deletions crates/services/txpool_v2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct SharedState {
mpsc::Sender<pool_worker::PoolExtractBlockTransactions>,
pub(crate) request_read_sender: mpsc::Sender<PoolReadRequest>,
pub(crate) tx_status_sender: TxStatusChange,
pub(crate) new_txs_notifier: tokio::sync::watch::Sender<()>,
pub(crate) new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
pub(crate) latest_stats: tokio::sync::watch::Receiver<TxPoolStats>,
}

Expand Down Expand Up @@ -155,9 +155,9 @@ impl SharedState {
.map_err(|_| Error::ServiceCommunicationFailed)
}

/// Get a notifier that is notified when new transactions are added to the pool.
pub fn get_new_txs_notifier(&self) -> watch::Receiver<()> {
self.new_txs_notifier.subscribe()
/// Get a notifier that is notified when new executable transactions are added to the pool.
pub fn get_new_executable_txs_notifier(&self) -> watch::Receiver<()> {
self.new_executable_txs_notifier.subscribe()
}

/// Subscribe to new transaction notifications.
Expand Down
36 changes: 34 additions & 2 deletions crates/services/txpool_v2/src/storage/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ use crate::{
InputValidationErrorType,
},
pending_pool::MissingInput,
pool::{
SavedCoinOutput,
SavedOutput,
},
ports::TxPoolPersistentStorage,
selection_algorithms::ratio_tip_gas::RatioTipGasSelectionAlgorithmStorage,
storage::checked_collision::CheckedTransaction,
Expand Down Expand Up @@ -616,15 +620,28 @@ impl Storage for GraphStorage {
&self,
transaction: &PoolTransaction,
persistent_storage: &impl TxPoolPersistentStorage,
saved_outputs: &HashSet<SavedOutput>,
utxo_validation: bool,
) -> Result<(), InputValidationErrorType> {
let mut missing_inputs = Vec::new();
for input in transaction.inputs() {
match input {
// If the utxo is created in the pool, need to check if we don't spend too much (utxo can still be unresolved)
// If the utxo_validation is active, we need to check if the utxo exists in the database and is valid
Input::CoinSigned(CoinSigned { utxo_id, .. })
| Input::CoinPredicate(CoinPredicate { utxo_id, .. }) => {
Input::CoinSigned(CoinSigned {
utxo_id,
owner,
amount,
asset_id,
..
})
| Input::CoinPredicate(CoinPredicate {
utxo_id,
owner,
amount,
asset_id,
..
}) => {
if let Some(node_id) = self.coins_creators.get(utxo_id) {
let Some(node) = self.graph.node_weight(*node_id) else {
return Err(InputValidationErrorType::Inconsistency(
Expand Down Expand Up @@ -654,6 +671,16 @@ impl Storage for GraphStorage {
}
}
Ok(None) => {
if saved_outputs.contains(&SavedOutput::Coin(
SavedCoinOutput {
utxo_id: *utxo_id,
to: *owner,
amount: *amount,
asset_id: *asset_id,
},
)) {
continue;
}
missing_inputs.push(MissingInput::Utxo(*utxo_id));
continue;
}
Expand Down Expand Up @@ -704,6 +731,11 @@ impl Storage for GraphStorage {
match persistent_storage.contract_exist(contract_id) {
Ok(true) => {}
Ok(false) => {
if saved_outputs
.contains(&SavedOutput::Contract(*contract_id))
{
continue;
}
missing_inputs.push(MissingInput::Contract(*contract_id));
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions crates/services/txpool_v2/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
Error,
InputValidationErrorType,
},
pool::SavedOutput,
ports::TxPoolPersistentStorage,
};
use fuel_core_types::services::txpool::{
Expand Down Expand Up @@ -92,6 +93,7 @@ pub trait Storage {
&self,
transaction: &PoolTransaction,
persistent_storage: &impl TxPoolPersistentStorage,
saved_outputs: &HashSet<SavedOutput>,
utxo_validation: bool,
) -> Result<(), InputValidationErrorType>;

Expand Down
5 changes: 4 additions & 1 deletion crates/services/txpool_v2/src/tests/universe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ impl TestPoolUniverse {
.map_err(|e| match e {
InsertionErrorType::Error(e) => e,
InsertionErrorType::MissingInputs(e) => e.first().unwrap().into(),
})?,
})?
.0,
))
} else {
panic!("Pool needs to be built first");
Expand Down Expand Up @@ -290,6 +291,7 @@ impl TestPoolUniverse {
InsertionErrorType::Error(e) => e,
InsertionErrorType::MissingInputs(e) => e.first().unwrap().into(),
})
.map(|(removed_txs, _)| removed_txs)
} else {
panic!("Pool needs to be built first");
}
Expand Down Expand Up @@ -325,6 +327,7 @@ impl TestPoolUniverse {
InsertionErrorType::Error(e) => e,
InsertionErrorType::MissingInputs(e) => e.first().unwrap().into(),
})
.map(|(removed_txs, _)| removed_txs)
} else {
panic!("Pool needs to be built first");
}
Expand Down
Loading