Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package builder
import (
"context"
"errors"
"math/big"
_ "os"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
blockvalidation "github.com/ethereum/go-ethereum/eth/block-validation"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -236,23 +236,22 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
var (
queueSignal = make(chan struct{}, 1)

queueMu sync.Mutex
queueLastSubmittedProfit = new(big.Int)
queueBestProfit = new(big.Int)
queueBestEntry blockQueueEntry
queueMu sync.Mutex
queueLastSubmittedHash common.Hash
queueBestEntry blockQueueEntry
)

log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash)

submitBestBlock := func() {
queueMu.Lock()
if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 {
if queueBestEntry.block.Hash() != queueLastSubmittedHash {
err := b.onSealedBlock(queueBestEntry.block, queueBestEntry.ordersCloseTime, queueBestEntry.sealedAt, queueBestEntry.commitedBundles, queueBestEntry.allBundles, proposerPubkey, vd, attrs)

if err != nil {
log.Error("could not run sealed block hook", "err", err)
} else {
queueLastSubmittedProfit.Set(queueBestProfit)
queueLastSubmittedHash = queueBestEntry.block.Hash()
}
}
queueMu.Unlock()
Expand All @@ -271,15 +270,14 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy

queueMu.Lock()
defer queueMu.Unlock()
if block.Profit.Cmp(queueBestProfit) > 0 {
if block.Hash() != queueLastSubmittedHash {
queueBestEntry = blockQueueEntry{
block: block,
ordersCloseTime: ordersCloseTime,
sealedAt: sealedAt,
commitedBundles: commitedBundles,
allBundles: allBundles,
}
queueBestProfit.Set(block.Profit)

select {
case queueSignal <- struct{}{}:
Expand Down
13 changes: 10 additions & 3 deletions builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,20 @@ func TestOnPayloadAttributes(t *testing.T) {

require.Equal(t, uint64(25), testRelay.requestedSlot)

// Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the profit is the same
// Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the hash is the same
testBlock.Profit = big.NewInt(10)
testRelay.submittedMsg = nil
time.Sleep(2200 * time.Millisecond)
require.Nil(t, testRelay.submittedMsg)

// Up the profit, expect to get the block
testEthService.testBlock.Profit.SetInt64(11)
// Change the hash, expect to get the block
testExecutableData.ExtraData = hexutil.MustDecode("0x0042fafd")
testExecutableData.BlockHash = common.HexToHash("0x0579b1aaca5c079c91e5774bac72c7f9bc2ddf2b126e9c632be68a1cb8f3fc71")
testBlock, err = beacon.ExecutableDataToBlock(*testExecutableData)
testBlock.Profit = big.NewInt(10)
require.NoError(t, err)
testEthService.testBlock = testBlock

time.Sleep(2200 * time.Millisecond)
require.NotNil(t, testRelay.submittedMsg)
}
1 change: 1 addition & 0 deletions builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
mevBundleCh := make(chan []types.MevBundle)
blockNumCh := make(chan int64)
bundleFetcher := flashbotsextra.NewBundleFetcher(backend, ds, blockNumCh, mevBundleCh, true)
backend.RegisterBundleFetcher(bundleFetcher)
go bundleFetcher.Run()
}

Expand Down
108 changes: 100 additions & 8 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"errors"
"math"
"math/big"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/google/uuid"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -280,6 +282,8 @@ type TxPool struct {
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

bundleFetcher IFetcher
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -344,6 +348,17 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
return pool
}

type IFetcher interface {
GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error)
}

func (pool *TxPool) RegisterBundleFetcher(fetcher IFetcher) {
pool.mu.Lock()
defer pool.mu.Unlock()

pool.bundleFetcher = fetcher
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -581,21 +596,79 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
return pending
}

/// AllMevBundles returns all the MEV Bundles currently in the pool
func (pool *TxPool) AllMevBundles() []types.MevBundle {
return pool.mevBundles
type uuidBundleKey struct {
Uuid uuid.UUID
SigningAddress common.Address
}

func (pool *TxPool) fetchLatestCancellableBundles(ctx context.Context, blockNumber *big.Int) (chan []types.LatestUuidBundle, chan error) {
if pool.bundleFetcher == nil {
return nil, nil
}
errCh := make(chan error, 1)
lubCh := make(chan []types.LatestUuidBundle, 1)
go func(blockNum int64) {
lub, err := pool.bundleFetcher.GetLatestUuidBundles(ctx, blockNum)
errCh <- err
lubCh <- lub
}(blockNumber.Int64())
return lubCh, errCh
}

func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan error, uuidBundles map[uuidBundleKey][]types.MevBundle) []types.MevBundle {
if lubCh == nil || errCh == nil {
return nil
}

if len(uuidBundles) == 0 {
return nil
}

err := <-errCh
if err != nil {
log.Error("could not fetch latest bundles uuid map", "err", err)
return nil
}

currentCancellableBundles := []types.MevBundle{}

log.Trace("Processing uuid bundles", "uuidBundles", uuidBundles)

lubs := <-lubCh
for _, lub := range lubs {
ubk := uuidBundleKey{lub.Uuid, lub.SigningAddress}
bundles, found := uuidBundles[ubk]
if !found {
log.Trace("missing uuid bundle", "ubk", ubk)
continue
}
for _, bundle := range bundles {
if bundle.Hash == lub.BundleHash {
log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub)
currentCancellableBundles = append(currentCancellableBundles, bundle)
break
}
}
}
return currentCancellableBundles
}

// MevBundles returns a list of bundles valid for the given blockNumber/blockTimestamp
// also prunes bundles that are outdated
func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []types.MevBundle {
// Returns regular bundles and a function resolving to current cancellable bundles
func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) ([]types.MevBundle, chan []types.MevBundle) {
pool.mu.Lock()
defer pool.mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
lubCh, errCh := pool.fetchLatestCancellableBundles(ctx, blockNumber)

// returned values
var ret []types.MevBundle
// rolled over values
var bundles []types.MevBundle
// (uuid, signingAddress) -> list of bundles
var uuidBundles = make(map[uuidBundleKey][]types.MevBundle)

for _, bundle := range pool.mevBundles {
// Prune outdated bundles
Expand All @@ -609,14 +682,31 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []ty
continue
}

// return the ones which are in time
ret = append(ret, bundle)
// keep the bundles around internally until they need to be pruned
bundles = append(bundles, bundle)

// TODO: omit duplicates

// do not append to the return quite yet, check the DB for the latest bundle for that uuid
if bundle.Uuid != types.EmptyUUID {
ubk := uuidBundleKey{bundle.Uuid, bundle.SigningAddress}
uuidBundles[ubk] = append(uuidBundles[ubk], bundle)
continue
}

// return the ones which are in time
ret = append(ret, bundle)
}

pool.mevBundles = bundles
return ret

cancellableBundlesCh := make(chan []types.MevBundle, 1)
go func() {
cancellableBundlesCh <- resolveCancellableBundles(lubCh, errCh, uuidBundles)
cancel()
}()

return ret, cancellableBundlesCh
}

// AddMevBundles adds a mev bundles to the pool
Expand All @@ -629,7 +719,7 @@ func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error {
}

// AddMevBundle adds a mev bundle to the pool
func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, replacementUuid uuid.UUID, signingAddress common.Address, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
bundleHasher := sha3.NewLegacyKeccak256()
for _, tx := range txs {
bundleHasher.Write(tx.Hash().Bytes())
Expand All @@ -642,6 +732,8 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
pool.mevBundles = append(pool.mevBundles, types.MevBundle{
Txs: txs,
BlockNumber: blockNumber,
Uuid: replacementUuid,
SigningAddress: signingAddress,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
RevertingTxHashes: revertingTxHashes,
Expand Down
Loading