Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fixed/2935.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The change rejects transactions immediately, if they use spent coins. `TxPool` has a SpentInputs LRU cache, storing all spent coins.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions benches/src/bin/tps_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn main() {
test_builder.initial_coins.extend(
transactions
.iter()
.flat_map(|t| t.inputs().unwrap())
.flat_map(|t| t.inputs().into_owned())
.filter_map(|input| {
if let Input::CoinSigned(CoinSigned {
amount,
Expand All @@ -174,9 +174,9 @@ fn main() {
output_index: utxo_id.output_index(),
tx_pointer_block_height: tx_pointer.block_height(),
tx_pointer_tx_idx: tx_pointer.tx_index(),
owner: *owner,
amount: *amount,
asset_id: *asset_id,
owner,
amount,
asset_id,
})
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions bin/e2e-test-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn main_body(config: SuiteConfig, mut args: Arguments) {
with_cloned(&config, |config| {
async_execute(async {
let ctx = TestContext::new(config).await;
tests::transfers::transfer_back(&ctx).await
tests::script::receipts(&ctx).await
})
}),
),
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn main_body(config: SuiteConfig, mut args: Arguments) {
),
];

libtest_mimic::run(&args, tests).exit();
libtest_mimic::run(&args, tests).exit_if_failed();
}

pub fn load_config_env() -> SuiteConfig {
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ impl<'a> ContextExt for Context<'a> {
) -> anyhow::Result<FuelTx> {
let mut has_predicates = false;

for input in tx.inputs()? {
for input in tx.inputs().iter() {
if input.predicate().is_some() {
has_predicates = true;
break;
Expand Down
2 changes: 1 addition & 1 deletion crates/services/executor/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl MaybeCheckedTransaction {
}

impl TransactionExt for MaybeCheckedTransaction {
fn inputs(&self) -> ExecutorResult<&Vec<Input>> {
fn inputs(&self) -> Cow<[Input]> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

match self {
MaybeCheckedTransaction::CheckedTransaction(tx, _) => tx.inputs(),
MaybeCheckedTransaction::Transaction(tx) => tx.inputs(),
Expand Down
1 change: 1 addition & 0 deletions crates/services/txpool_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fuel-core-services = { workspace = true, features = ["sync-processor"] }
fuel-core-storage = { workspace = true, features = ["std"] }
fuel-core-types = { workspace = true, features = ["test-helpers"] }
futures = { workspace = true }
lru = "0.13.0"
num-rational = { workspace = true }
parking_lot = { workspace = true }
petgraph = "0.6.5"
Expand Down
4 changes: 4 additions & 0 deletions crates/services/txpool_v2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub enum Error {
/// The minimum gas price required by TxPool.
minimal_gas_price: Word,
},
#[display(fmt = "The message input {_0:#x} was already spent")]
MessageInputWasAlreadySpent(Nonce),
#[display(fmt = "The UTXO input {_0:#x} was already spent")]
UtxoInputWasAlreadySpent(UtxoId),
}

impl Error {
Expand Down
1 change: 1 addition & 0 deletions crates/services/txpool_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ mod storage;

pub type GasPrice = Word;

mod spent_inputs;
#[cfg(test)]
mod tests;
#[cfg(test)]
Expand Down
23 changes: 21 additions & 2 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod collisions;

use core::num::NonZeroUsize;
use std::{
collections::HashMap,
iter,
Expand Down Expand Up @@ -59,7 +60,10 @@ use crate::{
},
};

use crate::error::RemovedReason;
use crate::{
error::RemovedReason,
spent_inputs::SpentInputs,
};
#[cfg(test)]
use std::collections::HashSet;

Expand All @@ -85,6 +89,8 @@ pub struct Pool<S, SI, CM, SA, TxStatusManager> {
pub(crate) tx_id_to_storage_id: HashMap<TxId, SI>,
/// All sent outputs when transactions are extracted. Clear when processing a block.
pub(crate) extracted_outputs: ExtractedOutputs,
/// The spent inputs cache.
pub(crate) spent_inputs: SpentInputs,
/// Current pool gas stored.
pub(crate) current_gas: u64,
/// Current pool size in bytes.
Expand All @@ -108,13 +114,17 @@ impl<S, SI, CM, SA, TxStatusManager> Pool<S, SI, CM, SA, TxStatusManager> {
new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
tx_status_manager: Arc<TxStatusManager>,
) -> Self {
let capacity = NonZeroUsize::new(config.pool_limits.max_txs.saturating_add(1))
.expect("Max txs is greater than 0");
let spent_inputs = SpentInputs::new(capacity);
Pool {
storage,
collision_manager,
selection_algorithm,
config,
tx_id_to_storage_id: HashMap::new(),
extracted_outputs: ExtractedOutputs::new(),
spent_inputs,
current_gas: 0,
current_bytes_size: 0,
pool_stats_sender,
Expand Down Expand Up @@ -276,6 +286,7 @@ where
&tx,
persistent_storage,
&self.extracted_outputs,
&self.spent_inputs,
self.config.utxo_validation,
)?;

Expand Down Expand Up @@ -376,6 +387,10 @@ where
.map(|storage_entry| {
self.extracted_outputs
.new_extracted_transaction(&storage_entry.transaction);
self.spent_inputs.maybe_spend_inputs(
storage_entry.transaction.id(),
storage_entry.transaction.inputs(),
);
self.update_components_and_caches_on_removal(iter::once(&storage_entry));
storage_entry.transaction
})
Expand Down Expand Up @@ -404,6 +419,7 @@ where
pub fn process_committed_transactions(&mut self, tx_ids: impl Iterator<Item = TxId>) {
let mut transactions_to_promote = vec![];
for tx_id in tx_ids {
self.spent_inputs.spend_inputs_by_tx_id(tx_id);
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents: Vec<S::StorageIndex> =
self.storage.get_direct_dependents(storage_id).collect();
Expand All @@ -417,6 +433,8 @@ where
};
self.extracted_outputs
.new_extracted_transaction(&transaction.transaction);
self.spent_inputs
.spend_inputs(tx_id, transaction.transaction.inputs());
self.update_components_and_caches_on_removal(iter::once(&transaction));

for dependent in dependents {
Expand Down Expand Up @@ -621,14 +639,15 @@ where
if self.tx_id_to_storage_id.contains_key(&tx_id) {
let tx_status = statuses::SqueezedOut {
reason: Error::SkippedTransaction(format!(
"Parent transaction with id: {tx_id}, was removed because of: {reason}"
"Transaction with id: {tx_id}, was removed because of: {reason}"
))
.to_string(),
};
self.remove_transactions_and_dependents(iter::once(tx_id), tx_status);
}

self.extracted_outputs.new_skipped_transaction(&tx_id);
self.spent_inputs.unspend_inputs(tx_id);

let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id);
if !coin_dependents.is_empty() {
Expand Down
Loading
Loading