Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions client/transaction-pool/src/graph/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! For a more full-featured pool, have a look at the `pool` module.

use std::{collections::HashSet, fmt, hash, sync::Arc};
use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc};

use log::{debug, trace, warn};
use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
Expand All @@ -36,7 +36,7 @@ use sp_runtime::{

use super::{
future::{FutureTransactions, WaitingTransaction},
ready::{BestIterator, ReadyTransactions},
ready::{BestIterator, ReadyTransactions, TransactionRef},
};

/// Successful import result.
Expand Down Expand Up @@ -384,8 +384,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
///
/// Removes and returns worst transactions from the queues and all transactions that depend on
/// them. Technically the worst transaction should be evaluated by computing the entire pending
/// set. We use a simplified approach to remove the transaction that occupies the pool for the
/// longest time.
/// set. We use a simplified approach to remove transactions with the lowest priority first or
/// those that occupy the pool for the longest time in case priority is the same.
pub fn enforce_limits(
&mut self,
ready: &Limit,
Expand All @@ -395,33 +395,45 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,

while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
// find the worst transaction
let minimal = self.ready.fold(|minimal, current| {
let worst = self.ready.fold::<TransactionRef<Hash, Ex>, _>(|worst, current| {
let transaction = &current.transaction;
match minimal {
None => Some(transaction.clone()),
Some(ref tx) if tx.insertion_id > transaction.insertion_id =>
Some(transaction.clone()),
other => other,
}
worst
.map(|worst| {
// Here we don't use `TransactionRef`'s ordering implementation because
// while it prefers priority like need here, it also prefers older
// transactions for inclusion purposes and limit enforcement needs to prefer
// newer transactions instead and drop the older ones.
match worst.transaction.priority.cmp(&transaction.transaction.priority) {
Ordering::Less => worst,
Ordering::Equal =>
if worst.insertion_id > transaction.insertion_id {
transaction.clone()
} else {
worst
},
Ordering::Greater => transaction.clone(),
}
})
.or_else(|| Some(transaction.clone()))
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
}

while future.is_exceeded(self.future.len(), self.future.bytes()) {
// find the worst transaction
let minimal = self.future.fold(|minimal, current| match minimal {
let worst = self.future.fold(|worst, current| match worst {
None => Some(current.clone()),
Some(ref tx) if tx.imported_at > current.imported_at => Some(current.clone()),
other => other,
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
Expand Down
70 changes: 69 additions & 1 deletion client/transaction-pool/src/graph/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ mod tests {
longevity: 9001,
propagate: false,
}),
Extrinsic::Store(_) => Ok(ValidTransaction {
priority: 9001,
requires: vec![],
provides: vec![vec![43]],
longevity: 9001,
propagate: false,
}),
_ => unimplemented!(),
};

Expand Down Expand Up @@ -1044,7 +1051,7 @@ mod tests {
}

#[test]
fn should_trigger_dropped() {
fn should_trigger_dropped_older() {
// given
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
Expand Down Expand Up @@ -1077,6 +1084,67 @@ mod tests {
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}

#[test]
fn should_trigger_dropped_lower_priority() {
{
// given
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let pool = Pool::new(options, true.into(), TestApi::default().into());

let xt = Extrinsic::IncludeData(Vec::new());
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);

// then
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(2)),
to: AccountId::from_h256(H256::from_low_u64_be(1)),
amount: 4,
nonce: 1,
});
let result = block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt));
assert!(matches!(
result,
Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
));
}
{
// given
let limit = Limit { count: 2, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let pool = Pool::new(options, true.into(), TestApi::default().into());

let xt = Extrinsic::IncludeData(Vec::new());
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);

let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);

// when
let xt = Extrinsic::Store(Vec::new());
block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);

// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
}

#[test]
fn should_handle_pruning_in_the_middle_of_import() {
// given
Expand Down