Skip to content

Commit 04ac71a

Browse files
dropping transaction - size limit is properly obeyed now
1 parent e1dce5a commit 04ac71a

File tree

6 files changed

+193
-64
lines changed

6 files changed

+193
-64
lines changed

substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ where
643643
/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
644644
async fn submit_at(
645645
&self,
646-
_: <Self::Block as BlockT>::Hash,
646+
at: <Self::Block as BlockT>::Hash,
647647
source: TransactionSource,
648648
xts: Vec<TransactionFor<Self>>,
649649
) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
@@ -660,6 +660,21 @@ where
660660
.collect::<Vec<_>>())
661661
}
662662

663+
//todo: review + test maybe?
664+
let retries = mempool_results
665+
.into_iter()
666+
.zip(xts.clone())
667+
.map(|(result, xt)| async move {
668+
match result {
669+
Err(TxPoolApiError::ImmediatelyDropped) =>
670+
self.attempt_transaction_replacement(at, source, false, xt).await,
671+
result @ _ => result,
672+
}
673+
})
674+
.collect::<Vec<_>>();
675+
676+
let mempool_results = futures::future::join_all(retries).await;
677+
663678
let to_be_submitted = mempool_results
664679
.iter()
665680
.zip(xts)
@@ -726,7 +741,7 @@ where
726741
log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
727742
let xt = Arc::from(xt);
728743

729-
let InsertionInfo { hash: xt_hash, source: timed_source } =
744+
let InsertionInfo { hash: xt_hash, source: timed_source, .. } =
730745
match self.mempool.push_watched(source, xt.clone()) {
731746
Ok(result) => result,
732747
Err(TxPoolApiError::ImmediatelyDropped) =>
@@ -1338,33 +1353,31 @@ where
13381353
)
13391354
.await;
13401355

1341-
// 3. check if most_recent_view contains a transaction with lower priority (actually worse -
1342-
// do we want to check timestamp too - no: see #4609?)
1343-
// Would be perfect to choose transaction with lowest the number of dependant txs in its
1344-
// subtree.
1345-
if let Some(worst_tx_hash) =
1346-
best_view.pool.validated_pool().find_transaction_with_lower_prio(validated_tx)
1347-
{
1356+
if let Some(priority) = validated_tx.priority() {
1357+
// 3. check if mempool contains a transaction with lower priority (actually worse - do
1358+
// we want to check timestamp too - no: see #4609?) Would be perfect to choose
1359+
// transaction with lowest the number of dependant txs in its subtree.
13481360
// 4. if yes - remove worse transaction from mempool and add new one.
1349-
log::trace!(target: LOG_TARGET, "found candidate for removal: {worst_tx_hash:?} replaced by {tx_hash:?}");
13501361
let insertion_info =
1351-
self.mempool.try_replace_transaction(xt, source, watched, worst_tx_hash)?;
1352-
1353-
// 5. notify listner
1354-
self.view_store
1355-
.listener
1356-
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(worst_tx_hash));
1357-
1358-
// 6. remove transaction from the view_store
1359-
self.view_store.remove_transaction_subtree(
1360-
worst_tx_hash,
1361-
|listener, removed_tx_hash| {
1362-
listener.limits_enforced(&removed_tx_hash);
1363-
},
1364-
);
1362+
self.mempool.try_replace_transaction(xt, priority, source, watched)?;
1363+
1364+
for worst_hash in &insertion_info.removed {
1365+
log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
1366+
// 5. notify listner
1367+
self.view_store
1368+
.listener
1369+
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1370+
1371+
// 6. remove transaction from the view_store
1372+
self.view_store.remove_transaction_subtree(
1373+
*worst_hash,
1374+
|listener, removed_tx_hash| {
1375+
listener.limits_enforced(&removed_tx_hash);
1376+
},
1377+
);
1378+
}
13651379

13661380
// 8. add to pending_replacements - make sure it will not sneak back via cloned view
1367-
13681381
// 9. subemit new one to the view, this will be done upon in the caller
13691382
return Ok(insertion_info)
13701383
}

substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs

Lines changed: 123 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,19 @@ use crate::{
3838
};
3939
use futures::FutureExt;
4040
use itertools::Itertools;
41-
use sc_transaction_pool_api::TransactionSource;
41+
use sc_transaction_pool_api::{TransactionPriority, TransactionSource};
4242
use sp_blockchain::HashAndNumber;
4343
use sp_runtime::{
4444
traits::Block as BlockT,
4545
transaction_validity::{InvalidTransaction, TransactionValidityError},
4646
};
4747
use std::{
48+
cmp::Ordering,
4849
collections::HashMap,
49-
sync::{atomic, atomic::AtomicU64, Arc},
50+
sync::{
51+
atomic::{self, AtomicU64},
52+
Arc,
53+
},
5054
time::Instant,
5155
};
5256

@@ -80,6 +84,9 @@ where
8084
source: TimedTransactionSource,
8185
/// When the transaction was revalidated, used to periodically revalidate the mem pool buffer.
8286
validated_at: AtomicU64,
87+
/// Priority of transaction at some block. It is assumed it will not be changed often.
88+
//todo: Option is needed here. This means lock. So maybe +AtomicBool?
89+
priority: AtomicU64,
8390
//todo: we need to add future / ready status at finalized block.
8491
//If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means
8592
// to replace them somehow with newly coming transactions.
@@ -112,19 +119,31 @@ where
112119
Self::new(true, source, tx, bytes)
113120
}
114121

115-
/// Creates a new instance of wrapper for a transaction.
122+
/// Creates a new instance of wrapper for a transaction with no priority.
116123
fn new(
117124
watched: bool,
118125
source: TransactionSource,
119126
tx: ExtrinsicFor<ChainApi>,
120127
bytes: usize,
128+
) -> Self {
129+
Self::new_with_priority(watched, source, tx, bytes, 0)
130+
}
131+
132+
/// Creates a new instance of wrapper for a transaction with given priority.
133+
fn new_with_priority(
134+
watched: bool,
135+
source: TransactionSource,
136+
tx: ExtrinsicFor<ChainApi>,
137+
bytes: usize,
138+
priority: TransactionPriority,
121139
) -> Self {
122140
Self {
123141
watched,
124142
tx,
125143
source: TimedTransactionSource::from_transaction_source(source, true),
126144
validated_at: AtomicU64::new(0),
127145
bytes,
146+
priority: AtomicU64::new(priority),
128147
}
129148
}
130149

@@ -139,6 +158,11 @@ where
139158
pub(crate) fn source(&self) -> TimedTransactionSource {
140159
self.source.clone()
141160
}
161+
162+
/// Returns the priority of the transaction.
163+
pub(crate) fn priority(&self) -> TransactionPriority {
164+
self.priority.load(atomic::Ordering::Relaxed)
165+
}
142166
}
143167

144168
impl<ChainApi, Block> Size for Arc<TxInMemPool<ChainApi, Block>>
@@ -198,11 +222,15 @@ where
198222
pub(super) struct InsertionInfo<Hash> {
199223
pub(super) hash: Hash,
200224
pub(super) source: TimedTransactionSource,
225+
pub(super) removed: Vec<Hash>,
201226
}
202227

203228
impl<Hash> InsertionInfo<Hash> {
204229
fn new(hash: Hash, source: TimedTransactionSource) -> Self {
205-
Self { hash, source }
230+
Self::new_with_removed(hash, source, Default::default())
231+
}
232+
fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
233+
Self { hash, source, removed }
206234
}
207235
}
208236

@@ -286,14 +314,9 @@ where
286314
&self,
287315
hash: ExtrinsicHash<ChainApi>,
288316
tx: TxInMemPool<ChainApi, Block>,
289-
tx_to_be_removed: Option<ExtrinsicHash<ChainApi>>,
290317
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
291318
let mut transactions = self.transactions.write();
292319

293-
tx_to_be_removed.inspect(|hash| {
294-
transactions.remove(hash);
295-
});
296-
297320
let bytes = self.transactions.bytes();
298321

299322
let result = match (
@@ -314,22 +337,100 @@ where
314337
result
315338
}
316339

340+
/// Attempts to insert a new transaction in the memory pool and drop some worse existing
341+
/// transactions.
342+
///
343+
/// This operation will not overflow the limit of the mempool. It means that cumulative
344+
/// size of removed transactions will be equal (or greated) then size of newly inserted
345+
/// transaction.
346+
///
347+
/// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully
348+
/// inserted; otherwise, returns an appropriate error indicating the failure.
349+
pub(super) fn try_insert_with_dropping(
350+
&self,
351+
hash: ExtrinsicHash<ChainApi>,
352+
new_tx: TxInMemPool<ChainApi, Block>,
353+
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
354+
let mut transactions = self.transactions.write();
355+
356+
if transactions.contains_key(&hash) {
357+
return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)));
358+
}
359+
360+
let mut sorted =
361+
transactions.iter().map(|(h, v)| (h.clone(), v.clone())).collect::<Vec<_>>();
362+
363+
// When pushing higher prio transaction, we need to find a number of lower prio txs, such
364+
// that the sum of their bytes is ge then size of new tx. Otherwise we could overflow size
365+
// limits. Naive way to do it - rev-sort by priority and eat the tail.
366+
367+
// reverse (lowest prio last)
368+
sorted.sort_by(|(_, a), (_, b)| match b.priority().cmp(&a.priority()) {
369+
Ordering::Equal => match (a.source.timestamp, b.source.timestamp) {
370+
(Some(a), Some(b)) => b.cmp(&a),
371+
_ => Ordering::Equal,
372+
},
373+
ordering @ _ => ordering,
374+
});
375+
376+
let required_size = new_tx.bytes;
377+
let mut total_size = 0usize;
378+
let mut to_be_removed = vec![];
379+
380+
while total_size < required_size {
381+
let Some((worst_hash, worst_tx)) = sorted.pop() else {
382+
return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
383+
};
384+
385+
if worst_tx.priority() >= new_tx.priority() {
386+
return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
387+
}
388+
389+
total_size += worst_tx.bytes;
390+
to_be_removed.push(worst_hash);
391+
}
392+
393+
let source = new_tx.source();
394+
transactions.insert(hash, Arc::from(new_tx));
395+
for worst_hash in &to_be_removed {
396+
transactions.remove(worst_hash);
397+
}
398+
debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));
399+
400+
Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed))
401+
402+
// let result = match (
403+
// self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
404+
// transactions.contains_key(&hash),
405+
// ) {
406+
// (false, false) => {
407+
// let source = tx.source();
408+
// transactions.insert(hash, Arc::from(tx));
409+
// Ok(InsertionInfo::new(hash, source))
410+
// },
411+
// (_, true) =>
412+
// Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))),
413+
// (true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
414+
// };
415+
// log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash,
416+
// result.as_ref().map(|r| r.hash));
417+
}
418+
317419
/// Attempts to replace an existing transaction in the memory pool with a new one.
318420
///
319421
/// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully
320422
/// inserted; otherwise, returns an appropriate error indicating the failure.
321423
pub(super) fn try_replace_transaction(
322424
&self,
323425
new_xt: ExtrinsicFor<ChainApi>,
426+
priority: TransactionPriority,
324427
source: TransactionSource,
325428
watched: bool,
326-
replaced_tx_hash: ExtrinsicHash<ChainApi>,
327429
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
328430
let (hash, length) = self.api.hash_and_length(&new_xt);
329-
self.try_insert(
431+
self.try_insert_with_dropping(
330432
hash,
331-
TxInMemPool::new(watched, source, new_xt, length),
332-
Some(replaced_tx_hash),
433+
TxInMemPool::new_with_priority(watched, source, new_xt, length, priority),
333434
)
334435
}
335436

@@ -347,7 +448,7 @@ where
347448
.iter()
348449
.map(|xt| {
349450
let (hash, length) = self.api.hash_and_length(&xt);
350-
self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length), None)
451+
self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length))
351452
})
352453
.collect::<Vec<_>>();
353454
result
@@ -361,7 +462,7 @@ where
361462
xt: ExtrinsicFor<ChainApi>,
362463
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
363464
let (hash, length) = self.api.hash_and_length(&xt);
364-
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length), None)
465+
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
365466
}
366467

367468
/// Clones and returns a `HashMap` of references to all unwatched transactions in the memory
@@ -493,7 +594,11 @@ where
493594
}
494595

495596
pub(super) fn update_transaction(&self, outcome: &ViewStoreSubmitOutcome<ChainApi>) {
496-
// todo!()
597+
if let Some(priority) = outcome.priority() {
598+
if let Some(tx) = self.transactions.write().get_mut(&outcome.hash()) {
599+
tx.priority.store(priority, atomic::Ordering::Relaxed);
600+
}
601+
}
497602
}
498603
}
499604

@@ -650,4 +755,6 @@ mod tx_mem_pool_tests {
650755
sc_transaction_pool_api::error::Error::ImmediatelyDropped
651756
));
652757
}
758+
759+
//add some test for try_insert_with_dropping
653760
}

substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ where
262262
log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
263263
Err(err)
264264
},
265-
None => Ok(ViewStoreSubmitOutcome::new_no_priority(tx_hash)),
265+
None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)),
266266
Some(Ok(r)) => Ok(r.into()),
267267
}
268268
}
@@ -320,8 +320,7 @@ where
320320
},
321321
Some(Ok(result)) =>
322322
Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)),
323-
None =>
324-
Ok(ViewStoreSubmitOutcome::new_no_priority(tx_hash).with_watcher(external_watcher)),
323+
None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)),
325324
}
326325
}
327326

substrate/client/transaction-pool/src/graph/tracked_map.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,16 @@ where
173173
pub fn len(&mut self) -> usize {
174174
self.inner_guard.len()
175175
}
176+
177+
/// Returns an iterator over all values.
178+
pub fn values(&self) -> std::collections::hash_map::Values<K, V> {
179+
self.inner_guard.values()
180+
}
181+
182+
/// Returns an iterator over all key-value pairs.
183+
pub fn iter(&self) -> Iter<'_, K, V> {
184+
self.inner_guard.iter()
185+
}
176186
}
177187

178188
#[cfg(test)]

0 commit comments

Comments
 (0)