@@ -18,6 +18,7 @@ package txpool
1818
1919import (
2020 "container/heap"
21+ "context"
2122 "errors"
2223 "fmt"
2324 "math"
@@ -37,6 +38,7 @@ import (
3738 "github.com/ethereum/go-ethereum/log"
3839 "github.com/ethereum/go-ethereum/metrics"
3940 "github.com/ethereum/go-ethereum/params"
41+ "github.com/google/uuid"
4042 "golang.org/x/crypto/sha3"
4143)
4244
@@ -291,6 +293,8 @@ type TxPool struct {
291293 initDoneCh chan struct {} // is closed once the pool is initialized (for tests)
292294
293295 changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
296+
297+ bundleFetcher IFetcher
294298}
295299
296300type txpoolResetRequest struct {
@@ -354,6 +358,17 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
354358 return pool
355359}
356360
361+ type IFetcher interface {
362+ GetLatestUuidBundles (ctx context.Context , blockNum int64 ) ([]types.LatestUuidBundle , error )
363+ }
364+
365+ func (pool * TxPool ) RegisterBundleFetcher (fetcher IFetcher ) {
366+ pool .mu .Lock ()
367+ defer pool .mu .Unlock ()
368+
369+ pool .bundleFetcher = fetcher
370+ }
371+
357372// loop is the transaction pool's main event loop, waiting for and reacting to
358373// outside blockchain events as well as for various reporting and transaction
359374// eviction events.
@@ -591,21 +606,79 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
591606 return pending
592607}
593608
594- /// AllMevBundles returns all the MEV Bundles currently in the pool
595- func (pool * TxPool ) AllMevBundles () []types.MevBundle {
596- return pool .mevBundles
609+ type uuidBundleKey struct {
610+ Uuid uuid.UUID
611+ SigningAddress common.Address
612+ }
613+
614+ func (pool * TxPool ) fetchLatestCancellableBundles (ctx context.Context , blockNumber * big.Int ) (chan []types.LatestUuidBundle , chan error ) {
615+ if pool .bundleFetcher == nil {
616+ return nil , nil
617+ }
618+ errCh := make (chan error , 1 )
619+ lubCh := make (chan []types.LatestUuidBundle , 1 )
620+ go func (blockNum int64 ) {
621+ lub , err := pool .bundleFetcher .GetLatestUuidBundles (ctx , blockNum )
622+ errCh <- err
623+ lubCh <- lub
624+ }(blockNumber .Int64 ())
625+ return lubCh , errCh
626+ }
627+
628+ func resolveCancellableBundles (lubCh chan []types.LatestUuidBundle , errCh chan error , uuidBundles map [uuidBundleKey ][]types.MevBundle ) []types.MevBundle {
629+ if lubCh == nil || errCh == nil {
630+ return nil
631+ }
632+
633+ if len (uuidBundles ) == 0 {
634+ return nil
635+ }
636+
637+ err := <- errCh
638+ if err != nil {
639+ log .Error ("could not fetch latest bundles uuid map" , "err" , err )
640+ return nil
641+ }
642+
643+ currentCancellableBundles := []types.MevBundle {}
644+
645+ log .Trace ("Processing uuid bundles" , "uuidBundles" , uuidBundles )
646+
647+ lubs := <- lubCh
648+ for _ , lub := range lubs {
649+ ubk := uuidBundleKey {lub .Uuid , lub .SigningAddress }
650+ bundles , found := uuidBundles [ubk ]
651+ if ! found {
652+ log .Trace ("missing uuid bundle" , "ubk" , ubk )
653+ continue
654+ }
655+ for _ , bundle := range bundles {
656+ if bundle .Hash == lub .BundleHash {
657+ log .Trace ("adding uuid bundle" , "bundle hash" , bundle .Hash .String (), "lub" , lub )
658+ currentCancellableBundles = append (currentCancellableBundles , bundle )
659+ break
660+ }
661+ }
662+ }
663+ return currentCancellableBundles
597664}
598665
599666// MevBundles returns a list of bundles valid for the given blockNumber/blockTimestamp
600667// also prunes bundles that are outdated
601- func (pool * TxPool ) MevBundles (blockNumber * big.Int , blockTimestamp uint64 ) []types.MevBundle {
668+ // Returns regular bundles and a function resolving to current cancellable bundles
669+ func (pool * TxPool ) MevBundles (blockNumber * big.Int , blockTimestamp uint64 ) ([]types.MevBundle , chan []types.MevBundle ) {
602670 pool .mu .Lock ()
603671 defer pool .mu .Unlock ()
604672
673+ ctx , cancel := context .WithTimeout (context .Background (), 500 * time .Millisecond )
674+ lubCh , errCh := pool .fetchLatestCancellableBundles (ctx , blockNumber )
675+
605676 // returned values
606677 var ret []types.MevBundle
607678 // rolled over values
608679 var bundles []types.MevBundle
680+ // (uuid, signingAddress) -> list of bundles
681+ var uuidBundles = make (map [uuidBundleKey ][]types.MevBundle )
609682
610683 for _ , bundle := range pool .mevBundles {
611684 // Prune outdated bundles
@@ -619,14 +692,31 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []ty
619692 continue
620693 }
621694
622- // return the ones which are in time
623- ret = append (ret , bundle )
624695 // keep the bundles around internally until they need to be pruned
625696 bundles = append (bundles , bundle )
697+
698+ // TODO: omit duplicates
699+
700+ // do not append to the return quite yet, check the DB for the latest bundle for that uuid
701+ if bundle .Uuid != types .EmptyUUID {
702+ ubk := uuidBundleKey {bundle .Uuid , bundle .SigningAddress }
703+ uuidBundles [ubk ] = append (uuidBundles [ubk ], bundle )
704+ continue
705+ }
706+
707+ // return the ones which are in time
708+ ret = append (ret , bundle )
626709 }
627710
628711 pool .mevBundles = bundles
629- return ret
712+
713+ cancellableBundlesCh := make (chan []types.MevBundle , 1 )
714+ go func () {
715+ cancellableBundlesCh <- resolveCancellableBundles (lubCh , errCh , uuidBundles )
716+ cancel ()
717+ }()
718+
719+ return ret , cancellableBundlesCh
630720}
631721
632722// AddMevBundles adds a mev bundles to the pool
@@ -639,7 +729,7 @@ func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error {
639729}
640730
641731// AddMevBundle adds a mev bundle to the pool
642- func (pool * TxPool ) AddMevBundle (txs types.Transactions , blockNumber * big.Int , minTimestamp , maxTimestamp uint64 , revertingTxHashes []common.Hash ) error {
732+ func (pool * TxPool ) AddMevBundle (txs types.Transactions , blockNumber * big.Int , replacementUuid uuid. UUID , signingAddress common. Address , minTimestamp , maxTimestamp uint64 , revertingTxHashes []common.Hash ) error {
643733 bundleHasher := sha3 .NewLegacyKeccak256 ()
644734 for _ , tx := range txs {
645735 bundleHasher .Write (tx .Hash ().Bytes ())
@@ -652,6 +742,8 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
652742 pool .mevBundles = append (pool .mevBundles , types.MevBundle {
653743 Txs : txs ,
654744 BlockNumber : blockNumber ,
745+ Uuid : replacementUuid ,
746+ SigningAddress : signingAddress ,
655747 MinTimestamp : minTimestamp ,
656748 MaxTimestamp : maxTimestamp ,
657749 RevertingTxHashes : revertingTxHashes ,
0 commit comments