@@ -28,8 +28,7 @@ use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_B
2828use crate :: {
2929 budget:: {
3030 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS ,
31- DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS , DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS ,
32- DEFAULT_BUDGET_TRY_DRAIN_STREAM ,
31+ DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS , DEFAULT_BUDGET_TRY_DRAIN_STREAM ,
3332 } ,
3433 cache:: LruCache ,
3534 duration_metered_exec, metered_poll_nested_stream_with_budget,
@@ -77,7 +76,7 @@ use std::{
7776 time:: { Duration , Instant } ,
7877} ;
7978use tokio:: sync:: { mpsc, oneshot, oneshot:: error:: RecvError } ;
80- use tokio_stream:: wrappers:: { ReceiverStream , UnboundedReceiverStream } ;
79+ use tokio_stream:: wrappers:: UnboundedReceiverStream ;
8180use tracing:: { debug, trace} ;
8281
8382/// The future for importing transactions into the pool.
@@ -339,7 +338,7 @@ pub struct TransactionsManager<
339338 /// - no nonce gaps
340339 /// - all dynamic fee requirements are (currently) met
341340 /// - account has enough balance to cover the transaction's gas
342- pending_transactions : ReceiverStream < TxHash > ,
341+ pending_transactions : mpsc :: Receiver < TxHash > ,
343342 /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
344343 transaction_events : UnboundedMeteredReceiver < NetworkTransactionEvent < N > > ,
345344 /// How the `TransactionsManager` is configured.
@@ -422,7 +421,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
422421 peers : Default :: default ( ) ,
423422 command_tx,
424423 command_rx : UnboundedReceiverStream :: new ( command_rx) ,
425- pending_transactions : ReceiverStream :: new ( pending) ,
424+ pending_transactions : pending,
426425 transaction_events : UnboundedMeteredReceiver :: new (
427426 from_network,
428427 NETWORK_POOL_TRANSACTIONS_SCOPE ,
@@ -1529,14 +1528,16 @@ where
15291528 // We don't expect this buffer to be large, since only pending transactions are
15301529 // emitted here.
15311530 let mut new_txs = Vec :: new ( ) ;
1532- let maybe_more_pending_txns = metered_poll_nested_stream_with_budget ! (
1533- poll_durations. acc_imported_txns,
1534- "net::tx" ,
1535- "Pending transactions stream" ,
1536- DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS ,
1537- this. pending_transactions. poll_next_unpin( cx) ,
1538- |hash| new_txs. push( hash)
1539- ) ;
1531+ let maybe_more_pending_txns = match this. pending_transactions . poll_recv_many (
1532+ cx,
1533+ & mut new_txs,
1534+ SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE ,
1535+ ) {
1536+ Poll :: Ready ( count) => {
1537+ count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE
1538+ }
1539+ Poll :: Pending => false ,
1540+ } ;
15401541 if !new_txs. is_empty ( ) {
15411542 this. on_new_pending_transactions ( new_txs) ;
15421543 }
0 commit comments