@@ -200,11 +200,11 @@ type TxPool struct {
200200 locals * accountSet // Set of local transaction to exempt from eviction rules
201201 journal * txJournal // Journal of local transaction to back up to disk
202202
203- pending map [common.Address ]* txList // All currently processable transactions
204- queue map [common.Address ]* txList // Queued but non-processable transactions
205- beats map [common.Address ]time.Time // Last heartbeat from each known account
206- all map [common. Hash ] * types. Transaction // All transactions to allow lookups
207- priced * txPricedList // All transactions sorted by price
203+ pending map [common.Address ]* txList // All currently processable transactions
204+ queue map [common.Address ]* txList // Queued but non-processable transactions
205+ beats map [common.Address ]time.Time // Last heartbeat from each known account
206+ all * txLookup // All transactions to allow lookups
207+ priced * txPricedList // All transactions sorted by price
208208
209209 wg sync.WaitGroup // for shutdown sync
210210
@@ -226,12 +226,12 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
226226 pending : make (map [common.Address ]* txList ),
227227 queue : make (map [common.Address ]* txList ),
228228 beats : make (map [common.Address ]time.Time ),
229- all : make ( map [common. Hash ] * types. Transaction ),
229+ all : newTxLookup ( ),
230230 chainHeadCh : make (chan ChainHeadEvent , chainHeadChanSize ),
231231 gasPrice : new (big.Int ).SetUint64 (config .PriceLimit ),
232232 }
233233 pool .locals = newAccountSet (pool .signer )
234- pool .priced = newTxPricedList (& pool .all )
234+ pool .priced = newTxPricedList (pool .all )
235235 pool .reset (nil , chain .CurrentBlock ().Header ())
236236
237237 // If local transactions and journaling is enabled, load from disk
@@ -605,7 +605,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
605605func (pool * TxPool ) add (tx * types.Transaction , local bool ) (bool , error ) {
606606 // If the transaction is already known, discard it
607607 hash := tx .Hash ()
608- if pool .all [ hash ] != nil {
608+ if pool .all . Get ( hash ) != nil {
609609 log .Trace ("Discarding already known transaction" , "hash" , hash )
610610 return false , fmt .Errorf ("known transaction: %x" , hash )
611611 }
@@ -616,15 +616,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
616616 return false , err
617617 }
618618 // If the transaction pool is full, discard underpriced transactions
619- if uint64 (len ( pool .all )) >= pool .config .GlobalSlots + pool .config .GlobalQueue {
619+ if uint64 (pool .all . Count ( )) >= pool .config .GlobalSlots + pool .config .GlobalQueue {
620620 // If the new transaction is underpriced, don't accept it
621621 if ! local && pool .priced .Underpriced (tx , pool .locals ) {
622622 log .Trace ("Discarding underpriced transaction" , "hash" , hash , "price" , tx .GasPrice ())
623623 underpricedTxCounter .Inc (1 )
624624 return false , ErrUnderpriced
625625 }
626626 // New transaction is better than our worse ones, make room for it
627- drop := pool .priced .Discard (len ( pool .all )- int (pool .config .GlobalSlots + pool .config .GlobalQueue - 1 ), pool .locals )
627+ drop := pool .priced .Discard (pool .all . Count ( )- int (pool .config .GlobalSlots + pool .config .GlobalQueue - 1 ), pool .locals )
628628 for _ , tx := range drop {
629629 log .Trace ("Discarding freshly underpriced transaction" , "hash" , tx .Hash (), "price" , tx .GasPrice ())
630630 underpricedTxCounter .Inc (1 )
@@ -642,11 +642,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
642642 }
643643 // New transaction is better, replace old one
644644 if old != nil {
645- delete ( pool .all , old .Hash ())
645+ pool .all . Remove ( old .Hash ())
646646 pool .priced .Removed ()
647647 pendingReplaceCounter .Inc (1 )
648648 }
649- pool .all [ tx . Hash ()] = tx
649+ pool .all . Add ( tx )
650650 pool .priced .Put (tx )
651651 pool .journalTx (from , tx )
652652
@@ -689,12 +689,12 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
689689 }
690690 // Discard any previous transaction and mark this
691691 if old != nil {
692- delete ( pool .all , old .Hash ())
692+ pool .all . Remove ( old .Hash ())
693693 pool .priced .Removed ()
694694 queuedReplaceCounter .Inc (1 )
695695 }
696- if pool .all [ hash ] == nil {
697- pool .all [ hash ] = tx
696+ if pool .all . Get ( hash ) == nil {
697+ pool .all . Add ( tx )
698698 pool .priced .Put (tx )
699699 }
700700 return old != nil , nil
@@ -726,22 +726,22 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
726726 inserted , old := list .Add (tx , pool .config .PriceBump )
727727 if ! inserted {
728728 // An older transaction was better, discard this
729- delete ( pool .all , hash )
729+ pool .all . Remove ( hash )
730730 pool .priced .Removed ()
731731
732732 pendingDiscardCounter .Inc (1 )
733733 return false
734734 }
735735 // Otherwise discard any previous transaction and mark this
736736 if old != nil {
737- delete ( pool .all , old .Hash ())
737+ pool .all . Remove ( old .Hash ())
738738 pool .priced .Removed ()
739739
740740 pendingReplaceCounter .Inc (1 )
741741 }
742742 // Failsafe to work around direct pending inserts (tests)
743- if pool .all [ hash ] == nil {
744- pool .all [ hash ] = tx
743+ if pool .all . Get ( hash ) == nil {
744+ pool .all . Add ( tx )
745745 pool .priced .Put (tx )
746746 }
747747 // Set the potentially new pending nonce and notify any subsystems of the new tx
@@ -840,7 +840,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
840840
841841 status := make ([]TxStatus , len (hashes ))
842842 for i , hash := range hashes {
843- if tx := pool .all [ hash ] ; tx != nil {
843+ if tx := pool .all . Get ( hash ) ; tx != nil {
844844 from , _ := types .Sender (pool .signer , tx ) // already validated
845845 if pool .pending [from ] != nil && pool .pending [from ].txs .items [tx .Nonce ()] != nil {
846846 status [i ] = TxStatusPending
@@ -855,24 +855,21 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
855855// Get returns a transaction if it is contained in the pool
856856// and nil otherwise.
857857func (pool * TxPool ) Get (hash common.Hash ) * types.Transaction {
858- pool .mu .RLock ()
859- defer pool .mu .RUnlock ()
860-
861- return pool .all [hash ]
858+ return pool .all .Get (hash )
862859}
863860
864861// removeTx removes a single transaction from the queue, moving all subsequent
865862// transactions back to the future queue.
866863func (pool * TxPool ) removeTx (hash common.Hash , outofbound bool ) {
867864 // Fetch the transaction we wish to delete
868- tx , ok := pool .all [ hash ]
869- if ! ok {
865+ tx := pool .all . Get ( hash )
866+ if tx == nil {
870867 return
871868 }
872869 addr , _ := types .Sender (pool .signer , tx ) // already validated during insertion
873870
874871 // Remove it from the list of known transactions
875- delete ( pool .all , hash )
872+ pool .all . Remove ( hash )
876873 if outofbound {
877874 pool .priced .Removed ()
878875 }
@@ -928,15 +925,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
928925 for _ , tx := range list .Forward (pool .currentState .GetNonce (addr )) {
929926 hash := tx .Hash ()
930927 log .Trace ("Removed old queued transaction" , "hash" , hash )
931- delete ( pool .all , hash )
928+ pool .all . Remove ( hash )
932929 pool .priced .Removed ()
933930 }
934931 // Drop all transactions that are too costly (low balance or out of gas)
935932 drops , _ := list .Filter (pool .currentState .GetBalance (addr ), pool .currentMaxGas )
936933 for _ , tx := range drops {
937934 hash := tx .Hash ()
938935 log .Trace ("Removed unpayable queued transaction" , "hash" , hash )
939- delete ( pool .all , hash )
936+ pool .all . Remove ( hash )
940937 pool .priced .Removed ()
941938 queuedNofundsCounter .Inc (1 )
942939 }
@@ -952,7 +949,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
952949 if ! pool .locals .contains (addr ) {
953950 for _ , tx := range list .Cap (int (pool .config .AccountQueue )) {
954951 hash := tx .Hash ()
955- delete ( pool .all , hash )
952+ pool .all . Remove ( hash )
956953 pool .priced .Removed ()
957954 queuedRateLimitCounter .Inc (1 )
958955 log .Trace ("Removed cap-exceeding queued transaction" , "hash" , hash )
@@ -1001,7 +998,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
1001998 for _ , tx := range list .Cap (list .Len () - 1 ) {
1002999 // Drop the transaction from the global pools too
10031000 hash := tx .Hash ()
1004- delete ( pool .all , hash )
1001+ pool .all . Remove ( hash )
10051002 pool .priced .Removed ()
10061003
10071004 // Update the account nonce to the dropped transaction
@@ -1023,7 +1020,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
10231020 for _ , tx := range list .Cap (list .Len () - 1 ) {
10241021 // Drop the transaction from the global pools too
10251022 hash := tx .Hash ()
1026- delete ( pool .all , hash )
1023+ pool .all . Remove ( hash )
10271024 pool .priced .Removed ()
10281025
10291026 // Update the account nonce to the dropped transaction
@@ -1092,15 +1089,15 @@ func (pool *TxPool) demoteUnexecutables() {
10921089 for _ , tx := range list .Forward (nonce ) {
10931090 hash := tx .Hash ()
10941091 log .Trace ("Removed old pending transaction" , "hash" , hash )
1095- delete ( pool .all , hash )
1092+ pool .all . Remove ( hash )
10961093 pool .priced .Removed ()
10971094 }
10981095 // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
10991096 drops , invalids := list .Filter (pool .currentState .GetBalance (addr ), pool .currentMaxGas )
11001097 for _ , tx := range drops {
11011098 hash := tx .Hash ()
11021099 log .Trace ("Removed unpayable pending transaction" , "hash" , hash )
1103- delete ( pool .all , hash )
1100+ pool .all . Remove ( hash )
11041101 pool .priced .Removed ()
11051102 pendingNofundsCounter .Inc (1 )
11061103 }
@@ -1172,3 +1169,68 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool {
11721169func (as * accountSet ) add (addr common.Address ) {
11731170 as .accounts [addr ] = struct {}{}
11741171}
1172+
1173+ // txLookup is used internally by TxPool to track transactions while allowing lookup without
1174+ // mutex contention.
1175+ //
1176+ // Note, although this type is properly protected against concurrent access, it
1177+ // is **not** a type that should ever be mutated or even exposed outside of the
1178+ // transaction pool, since its internal state is tightly coupled with the pools
1179+ // internal mechanisms. The sole purpose of the type is to permit out-of-bound
1180+ // peeking into the pool in TxPool.Get without having to acquire the widely scoped
1181+ // TxPool.mu mutex.
1182+ type txLookup struct {
1183+ all map [common.Hash ]* types.Transaction
1184+ lock sync.RWMutex
1185+ }
1186+
1187+ // newTxLookup returns a new txLookup structure.
1188+ func newTxLookup () * txLookup {
1189+ return & txLookup {
1190+ all : make (map [common.Hash ]* types.Transaction ),
1191+ }
1192+ }
1193+
1194+ // Range calls f on each key and value present in the map.
1195+ func (t * txLookup ) Range (f func (hash common.Hash , tx * types.Transaction ) bool ) {
1196+ t .lock .RLock ()
1197+ defer t .lock .RUnlock ()
1198+
1199+ for key , value := range t .all {
1200+ if ! f (key , value ) {
1201+ break
1202+ }
1203+ }
1204+ }
1205+
1206+ // Get returns a transaction if it exists in the lookup, or nil if not found.
1207+ func (t * txLookup ) Get (hash common.Hash ) * types.Transaction {
1208+ t .lock .RLock ()
1209+ defer t .lock .RUnlock ()
1210+
1211+ return t .all [hash ]
1212+ }
1213+
1214+ // Count returns the current number of items in the lookup.
1215+ func (t * txLookup ) Count () int {
1216+ t .lock .RLock ()
1217+ defer t .lock .RUnlock ()
1218+
1219+ return len (t .all )
1220+ }
1221+
1222+ // Add adds a transaction to the lookup.
1223+ func (t * txLookup ) Add (tx * types.Transaction ) {
1224+ t .lock .Lock ()
1225+ defer t .lock .Unlock ()
1226+
1227+ t .all [tx .Hash ()] = tx
1228+ }
1229+
1230+ // Remove removes a transaction from the lookup.
1231+ func (t * txLookup ) Remove (hash common.Hash ) {
1232+ t .lock .Lock ()
1233+ defer t .lock .Unlock ()
1234+
1235+ delete (t .all , hash )
1236+ }
0 commit comments