Skip to content

Commit 60085ec

Browse files
karalabetcrypt25519
authored andcommitted
core, eth, miner: start propagating and consuming blob txs (ethereum#28243)
* core, eth, miner: start propagating and consuming blob txs * eth/protocols/eth: disable eth/67 if Cancun is enabled * core/txpool, eth, miner: pass gas limit infos in lazy tx for mienr filtering * core/txpool, miner: add lazy resolver for pending txs too * core, eth: fix review noticed bugs * eth, miner: minor polishes in the mining and announcing logs * core/expool: unsubscribe the event scope
1 parent 9689f74 commit 60085ec

File tree

14 files changed

+167
-94
lines changed

14 files changed

+167
-94
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import (
2929
"sync"
3030
"time"
3131

32+
"github.com/holiman/billy"
33+
"github.com/holiman/uint256"
34+
3235
"github.com/ethereum/go-ethereum/common"
3336
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
3437
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
@@ -41,8 +44,6 @@ import (
4144
"github.com/ethereum/go-ethereum/metrics"
4245
"github.com/ethereum/go-ethereum/params"
4346
"github.com/ethereum/go-ethereum/rlp"
44-
"github.com/holiman/billy"
45-
"github.com/holiman/uint256"
4647
)
4748

4849
const (
@@ -97,6 +98,8 @@ type blobTxMeta struct {
9798
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
9899
execFeeCap *uint256.Int // Needed to validate replacement price bump
99100
blobFeeCap *uint256.Int // Needed to validate replacement price bump
101+
execGas uint64 // Needed to check inclusion validity before reading the blob
102+
blobGas uint64 // Needed to check inclusion validity before reading the blob
100103

101104
basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
102105
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
@@ -118,6 +121,8 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
118121
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
119122
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
120123
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
124+
execGas: tx.Gas(),
125+
blobGas: tx.BlobGas(),
121126
}
122127
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
123128
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
@@ -307,10 +312,12 @@ type BlobPool struct {
307312
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
308313
evict *evictHeap // Heap of cheapest accounts for eviction when full
309314

310-
eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
315+
eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
311316
dropTxFeed event.Feed
312317
rejectTxFeed event.Feed
313-
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
318+
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
319+
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
320+
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)
314321

315322
lock sync.RWMutex // Mutex protecting the pool during reorg handling
316323
}
@@ -438,8 +445,6 @@ func (p *BlobPool) Close() error {
438445
if err := p.store.Close(); err != nil {
439446
errs = append(errs, err)
440447
}
441-
p.eventScope.Close()
442-
443448
switch {
444449
case errs == nil:
445450
return nil
@@ -760,15 +765,21 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
760765
// Run the reorg between the old and new head and figure out which accounts
761766
// need to be rechecked and which transactions need to be readded
762767
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
768+
var adds []*types.Transaction
763769
for addr, txs := range reinject {
764770
// Blindly push all the lost transactions back into the pool
765771
for _, tx := range txs {
766-
p.reinject(addr, tx.Hash())
772+
if err := p.reinject(addr, tx.Hash()); err == nil {
773+
adds = append(adds, tx.WithoutBlobTxSidecar())
774+
}
767775
}
768776
// Recheck the account's pooled transactions to drop included and
769777
// invalidated one
770778
p.recheck(addr, inclusions)
771779
}
780+
if len(adds) > 0 {
781+
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
782+
}
772783
}
773784
// Flush out any blobs from limbo that are older than the latest finality
774785
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
@@ -923,13 +934,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
923934
// Note, the method will not initialize the eviction cache values as those will
924935
// be done once for all transactions belonging to an account after all individual
925936
// transactions are injected back into the pool.
926-
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
937+
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
927938
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
928939
// add the transaction back into the pool as it is not mineable.
929940
tx, err := p.limbo.pull(txhash)
930941
if err != nil {
931942
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
932-
return
943+
return err
933944
}
934945
// TODO: seems like an easy optimization here would be getting the serialized tx
935946
// from limbo instead of re-serializing it here.
@@ -938,20 +949,20 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
938949
blob, err := rlp.EncodeToBytes(tx)
939950
if err != nil {
940951
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
941-
return
952+
return err
942953
}
943954
id, err := p.store.Put(blob)
944955
if err != nil {
945956
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
946-
return
957+
return err
947958
}
948959

949960
// Update the indixes and metrics
950961
meta := newBlobTxMeta(id, p.store.Size(id), tx)
951962
if _, ok := p.index[addr]; !ok {
952963
if err := p.reserve(addr, true); err != nil {
953964
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
954-
return
965+
return err
955966
}
956967
p.index[addr] = []*blobTxMeta{meta}
957968
p.spent[addr] = meta.costCap
@@ -962,6 +973,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
962973
}
963974
p.lookup[meta.hash] = meta.id
964975
p.stored += uint64(meta.size)
976+
return nil
965977
}
966978

967979
// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
@@ -1156,9 +1168,19 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
11561168
// Add inserts a set of blob transactions into the pool if they pass validation (both
11571169
// consensus validity and pool restictions).
11581170
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
1159-
errs := make([]error, len(txs))
1171+
var (
1172+
adds = make([]*types.Transaction, 0, len(txs))
1173+
errs = make([]error, len(txs))
1174+
)
11601175
for i, tx := range txs {
11611176
errs[i] = p.add(tx)
1177+
if errs[i] == nil {
1178+
adds = append(adds, tx.WithoutBlobTxSidecar())
1179+
}
1180+
}
1181+
if len(adds) > 0 {
1182+
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
1183+
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
11621184
}
11631185
return errs
11641186
}
@@ -1386,6 +1408,8 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
13861408
Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
13871409
GasFeeCap: tx.execFeeCap.ToBig(),
13881410
GasTipCap: tx.execTipCap.ToBig(),
1411+
Gas: tx.execGas,
1412+
BlobGas: tx.blobGas,
13891413
})
13901414
}
13911415
if len(lazies) > 0 {
@@ -1470,10 +1494,14 @@ func (p *BlobPool) updateLimboMetrics() {
14701494
limboSlotusedGauge.Update(int64(slotused))
14711495
}
14721496

1473-
// SubscribeTransactions registers a subscription of NewTxsEvent and
1474-
// starts sending event to the given channel.
1475-
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
1476-
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
1497+
// SubscribeTransactions registers a subscription for new transaction events,
1498+
// supporting feeding only newly seen or also resurrected transactions.
1499+
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
1500+
if reorgs {
1501+
return p.insertFeed.Subscribe(ch)
1502+
} else {
1503+
return p.discoverFeed.Subscribe(ch)
1504+
}
14771505
}
14781506

14791507
// Nonce returns the next nonce of an account, with all transactions executable
@@ -1535,7 +1563,6 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
15351563
return txpool.TxStatusUnknown
15361564
}
15371565

1538-
15391566
// SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and
15401567
// starts sending event to the given channel.
15411568
func (pool *BlobPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription {

core/txpool/legacypool/legacypool.go

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -203,16 +203,16 @@ func (config *Config) sanitize() Config {
203203
// current state) and future transactions. Transactions move between those
204204
// two states over time as they are received and processed.
205205
type LegacyPool struct {
206-
config Config
207-
chainconfig *params.ChainConfig
208-
chain BlockChain
209-
gasTip atomic.Pointer[big.Int]
210-
txFeed event.Feed
206+
config Config
207+
chainconfig *params.ChainConfig
208+
chain BlockChain
209+
gasTip atomic.Pointer[big.Int]
210+
txFeed event.Feed
211211
dropTxFeed event.Feed
212212
rejectTxFeed event.Feed
213-
scope event.SubscriptionScope
214-
signer types.Signer
215-
mu sync.RWMutex
213+
scope event.SubscriptionScope
214+
signer types.Signer
215+
mu sync.RWMutex
216216

217217
currentHead atomic.Pointer[types.Header] // Current head of the blockchain
218218
currentState *state.StateDB // Current state in the blockchain head
@@ -410,9 +410,6 @@ func (pool *LegacyPool) loop() {
410410

411411
// Close terminates the transaction pool.
412412
func (pool *LegacyPool) Close() error {
413-
// Unsubscribe all subscriptions registered from txpool
414-
pool.scope.Close()
415-
416413
// Terminate the pool reorger and return
417414
close(pool.reorgShutdownCh)
418415
pool.wg.Wait()
@@ -431,10 +428,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
431428
<-wait
432429
}
433430

434-
// SubscribeTransactions registers a subscription of NewTxsEvent and
435-
// starts sending event to the given channel.
436-
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
437-
return pool.scope.Track(pool.txFeed.Subscribe(ch))
431+
// SubscribeTransactions registers a subscription for new transaction events,
432+
// supporting feeding only newly seen or also resurrected transactions.
433+
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
434+
// The legacy pool has a very messed up internal shuffling, so it's kind of
435+
// hard to separate newly discovered transaction from resurrected ones. This
436+
// is because the new txs are added to the queue, resurrected ones too and
437+
// reorgs run lazily, so separating the two would need a marker.
438+
return pool.txFeed.Subscribe(ch)
438439
}
439440

440441
// SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and
@@ -449,7 +450,6 @@ func (pool *LegacyPool) SubscribeRejectedTxEvent(ch chan<- core.RejectedTxEvent)
449450
return pool.scope.Track(pool.rejectTxFeed.Subscribe(ch))
450451
}
451452

452-
453453
// SetGasTip updates the minimum gas tip required by the transaction pool for a
454454
// new transaction, and drops all transactions below this threshold.
455455
func (pool *LegacyPool) SetGasTip(tip *big.Int) {
@@ -575,6 +575,8 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
575575
Time: txs[i].Time(),
576576
GasFeeCap: txs[i].GasFeeCap(),
577577
GasTipCap: txs[i].GasTipCap(),
578+
Gas: txs[i].Gas(),
579+
BlobGas: txs[i].BlobGas(),
578580
}
579581
}
580582
pending[addr] = lazies
@@ -2027,16 +2029,15 @@ func numSlots(tx *types.Transaction) int {
20272029
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
20282030
}
20292031

2030-
20312032
const (
20322033
dropUnderpriced = "underpriced-txs"
2033-
dropLowNonce = "low-nonce-txs"
2034-
dropUnpayable = "unpayable-txs"
2035-
2036-
dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
2037-
dropReplaced = "replaced-txs"
2038-
dropUnexecutable = "unexecutable-txs"
2039-
dropTruncating = "truncating-txs"
2040-
dropOld = "old-txs"
2034+
dropLowNonce = "low-nonce-txs"
2035+
dropUnpayable = "unpayable-txs"
2036+
2037+
dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
2038+
dropReplaced = "replaced-txs"
2039+
dropUnexecutable = "unexecutable-txs"
2040+
dropTruncating = "truncating-txs"
2041+
dropOld = "old-txs"
20412042
dropGasPriceUpdated = "updated-gas-price"
2042-
)
2043+
)

core/txpool/subpool.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ import (
3030
// enough for the miner and other APIs to handle large batches of transactions;
3131
// and supports pulling up the entire transaction when really needed.
3232
type LazyTransaction struct {
33-
Pool SubPool // Transaction subpool to pull the real transaction up
33+
Pool LazyResolver // Transaction resolver to pull the real transaction up
3434
Hash common.Hash // Transaction hash to pull up if needed
3535
Tx *types.Transaction // Transaction if already resolved
3636

3737
Time time.Time // Time when the transaction was first seen
3838
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
3939
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
40+
41+
Gas uint64 // Amount of gas required by the transaction
42+
BlobGas uint64 // Amount of blob gas required by the transaction
4043
}
4144

4245
// Resolve retrieves the full transaction belonging to a lazy handle if it is still
@@ -48,6 +51,14 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction {
4851
return ltx.Tx
4952
}
5053

54+
// LazyResolver is a minimal interface needed for a transaction pool to satisfy
55+
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
56+
// pool being injected into the lazy transaction.
57+
type LazyResolver interface {
58+
// Get returns a transaction if it is contained in the pool, or nil otherwise.
59+
Get(hash common.Hash) *types.Transaction
60+
}
61+
5162
// AddressReserver is passed by the main transaction pool to subpools, so they
5263
// may request (and relinquish) exclusive access to certain addresses.
5364
type AddressReserver func(addr common.Address, reserve bool) error
@@ -99,8 +110,10 @@ type SubPool interface {
99110
// account and sorted by nonce.
100111
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
101112

102-
// SubscribeTransactions subscribes to new transaction events.
103-
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
113+
// SubscribeTransactions subscribes to new transaction events. The subscriber
114+
// can decide whether to receive notifications only for newly seen transactions
115+
// or also for reorged out ones.
116+
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
104117

105118
// Nonce returns the next nonce of an account, with all transactions executable
106119
// by the pool already applied on top.

core/txpool/txpool.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,15 @@ func (p *TxPool) Close() error {
155155
if err := <-errc; err != nil {
156156
errs = append(errs, err)
157157
}
158-
159158
// Terminate each subpool
160159
for _, subpool := range p.subpools {
161160
if err := subpool.Close(); err != nil {
162161
errs = append(errs, err)
163162
}
164163
}
164+
// Unsubscribe anyone still listening for tx events
165+
p.subs.Close()
166+
165167
if len(errs) > 0 {
166168
return fmt.Errorf("subpool close errors: %v", errs)
167169
}
@@ -316,12 +318,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
316318
return txs
317319
}
318320

319-
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
320-
// events to the given channel.
321-
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
321+
// SubscribeTransactions registers a subscription for new transaction events,
322+
// supporting feeding only newly seen or also resurrected transactions.
323+
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
322324
subs := make([]event.Subscription, len(p.subpools))
323325
for i, subpool := range p.subpools {
324-
subs[i] = subpool.SubscribeTransactions(ch)
326+
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
325327
}
326328
return p.subs.Track(event.JoinSubscriptions(subs...))
327329
}

eth/api_backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
334334
}
335335

336336
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
337-
return b.eth.txPool.SubscribeNewTxsEvent(ch)
337+
return b.eth.txPool.SubscribeTransactions(ch, true)
338338
}
339339

340340
func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {

eth/catalyst/simulated_beacon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error {
199199
func (c *SimulatedBeacon) loopOnDemand() {
200200
var (
201201
newTxs = make(chan core.NewTxsEvent)
202-
sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs)
202+
sub = c.eth.TxPool().SubscribeTransactions(newTxs, true)
203203
)
204204
defer sub.Unsubscribe()
205205

0 commit comments

Comments
 (0)