Skip to content

Commit f7a4418

Browse files
authored
Revert "core, eth, miner: start propagating and consuming blob txs (ethereum#28243)"
This reverts commit 723fd06.
1 parent a785192 commit f7a4418

File tree

14 files changed

+73
-145
lines changed

14 files changed

+73
-145
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ type blobTxMeta struct {
9797
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
9898
execFeeCap *uint256.Int // Needed to validate replacement price bump
9999
blobFeeCap *uint256.Int // Needed to validate replacement price bump
100-
execGas uint64 // Needed to check inclusion validity before reading the blob
101-
blobGas uint64 // Needed to check inclusion validity before reading the blob
102100

103101
basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
104102
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
@@ -120,8 +118,6 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
120118
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
121119
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
122120
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
123-
execGas: tx.Gas(),
124-
blobGas: tx.BlobGas(),
125121
}
126122
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
127123
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
@@ -311,8 +307,8 @@ type BlobPool struct {
311307
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
312308
evict *evictHeap // Heap of cheapest accounts for eviction when full
313309

314-
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
315-
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)
310+
eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
311+
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
316312

317313
lock sync.RWMutex // Mutex protecting the pool during reorg handling
318314
}
@@ -440,6 +436,8 @@ func (p *BlobPool) Close() error {
440436
if err := p.store.Close(); err != nil {
441437
errs = append(errs, err)
442438
}
439+
p.eventScope.Close()
440+
443441
switch {
444442
case errs == nil:
445443
return nil
@@ -760,21 +758,15 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
760758
// Run the reorg between the old and new head and figure out which accounts
761759
// need to be rechecked and which transactions need to be readded
762760
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
763-
var adds []*types.Transaction
764761
for addr, txs := range reinject {
765762
// Blindly push all the lost transactions back into the pool
766763
for _, tx := range txs {
767-
if err := p.reinject(addr, tx.Hash()); err == nil {
768-
adds = append(adds, tx.WithoutBlobTxSidecar())
769-
}
764+
p.reinject(addr, tx.Hash())
770765
}
771766
// Recheck the account's pooled transactions to drop included and
772767
// invalidated one
773768
p.recheck(addr, inclusions)
774769
}
775-
if len(adds) > 0 {
776-
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
777-
}
778770
}
779771
// Flush out any blobs from limbo that are older than the latest finality
780772
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
@@ -929,13 +921,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
929921
// Note, the method will not initialize the eviction cache values as those will
930922
// be done once for all transactions belonging to an account after all individual
931923
// transactions are injected back into the pool.
932-
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
924+
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
933925
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
934926
// add the transaction back into the pool as it is not mineable.
935927
tx, err := p.limbo.pull(txhash)
936928
if err != nil {
937929
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
938-
return err
930+
return
939931
}
940932
// TODO: seems like an easy optimization here would be getting the serialized tx
941933
// from limbo instead of re-serializing it here.
@@ -944,20 +936,20 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
944936
blob, err := rlp.EncodeToBytes(tx)
945937
if err != nil {
946938
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
947-
return err
939+
return
948940
}
949941
id, err := p.store.Put(blob)
950942
if err != nil {
951943
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
952-
return err
944+
return
953945
}
954946

955947
// Update the indixes and metrics
956948
meta := newBlobTxMeta(id, p.store.Size(id), tx)
957949
if _, ok := p.index[addr]; !ok {
958950
if err := p.reserve(addr, true); err != nil {
959951
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
960-
return err
952+
return
961953
}
962954
p.index[addr] = []*blobTxMeta{meta}
963955
p.spent[addr] = meta.costCap
@@ -968,7 +960,6 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
968960
}
969961
p.lookup[meta.hash] = meta.id
970962
p.stored += uint64(meta.size)
971-
return nil
972963
}
973964

974965
// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
@@ -1163,19 +1154,9 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
11631154
// Add inserts a set of blob transactions into the pool if they pass validation (both
11641155
// consensus validity and pool restictions).
11651156
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
1166-
var (
1167-
adds = make([]*types.Transaction, 0, len(txs))
1168-
errs = make([]error, len(txs))
1169-
)
1157+
errs := make([]error, len(txs))
11701158
for i, tx := range txs {
11711159
errs[i] = p.add(tx)
1172-
if errs[i] == nil {
1173-
adds = append(adds, tx.WithoutBlobTxSidecar())
1174-
}
1175-
}
1176-
if len(adds) > 0 {
1177-
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
1178-
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
11791160
}
11801161
return errs
11811162
}
@@ -1403,8 +1384,6 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
14031384
Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
14041385
GasFeeCap: tx.execFeeCap.ToBig(),
14051386
GasTipCap: tx.execTipCap.ToBig(),
1406-
Gas: tx.execGas,
1407-
BlobGas: tx.blobGas,
14081387
})
14091388
}
14101389
if len(lazies) > 0 {
@@ -1489,14 +1468,10 @@ func (p *BlobPool) updateLimboMetrics() {
14891468
limboSlotusedGauge.Update(int64(slotused))
14901469
}
14911470

1492-
// SubscribeTransactions registers a subscription for new transaction events,
1493-
// supporting feeding only newly seen or also resurrected transactions.
1494-
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
1495-
if reorgs {
1496-
return p.insertFeed.Subscribe(ch)
1497-
} else {
1498-
return p.discoverFeed.Subscribe(ch)
1499-
}
1471+
// SubscribeTransactions registers a subscription of NewTxsEvent and
1472+
// starts sending event to the given channel.
1473+
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
1474+
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
15001475
}
15011476

15021477
// Nonce returns the next nonce of an account, with all transactions executable

core/txpool/legacypool/legacypool.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ type LegacyPool struct {
208208
chain BlockChain
209209
gasTip atomic.Pointer[big.Int]
210210
txFeed event.Feed
211+
scope event.SubscriptionScope
211212
signer types.Signer
212213
mu sync.RWMutex
213214

@@ -403,6 +404,9 @@ func (pool *LegacyPool) loop() {
403404

404405
// Close terminates the transaction pool.
405406
func (pool *LegacyPool) Close() error {
407+
// Unsubscribe all subscriptions registered from txpool
408+
pool.scope.Close()
409+
406410
// Terminate the pool reorger and return
407411
close(pool.reorgShutdownCh)
408412
pool.wg.Wait()
@@ -421,14 +425,10 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
421425
<-wait
422426
}
423427

424-
// SubscribeTransactions registers a subscription for new transaction events,
425-
// supporting feeding only newly seen or also resurrected transactions.
426-
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
427-
// The legacy pool has a very messed up internal shuffling, so it's kind of
428-
// hard to separate newly discovered transaction from resurrected ones. This
429-
// is because the new txs are added to the queue, resurrected ones too and
430-
// reorgs run lazily, so separating the two would need a marker.
431-
return pool.txFeed.Subscribe(ch)
428+
// SubscribeTransactions registers a subscription of NewTxsEvent and
429+
// starts sending event to the given channel.
430+
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
431+
return pool.scope.Track(pool.txFeed.Subscribe(ch))
432432
}
433433

434434
// SetGasTip updates the minimum gas tip required by the transaction pool for a
@@ -552,8 +552,6 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
552552
Time: txs[i].Time(),
553553
GasFeeCap: txs[i].GasFeeCap(),
554554
GasTipCap: txs[i].GasTipCap(),
555-
Gas: txs[i].Gas(),
556-
BlobGas: txs[i].BlobGas(),
557555
}
558556
}
559557
pending[addr] = lazies

core/txpool/subpool.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,13 @@ import (
3030
// enough for the miner and other APIs to handle large batches of transactions;
3131
// and supports pulling up the entire transaction when really needed.
3232
type LazyTransaction struct {
33-
Pool LazyResolver // Transaction resolver to pull the real transaction up
33+
Pool SubPool // Transaction subpool to pull the real transaction up
3434
Hash common.Hash // Transaction hash to pull up if needed
3535
Tx *types.Transaction // Transaction if already resolved
3636

3737
Time time.Time // Time when the transaction was first seen
3838
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
3939
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
40-
41-
Gas uint64 // Amount of gas required by the transaction
42-
BlobGas uint64 // Amount of blob gas required by the transaction
4340
}
4441

4542
// Resolve retrieves the full transaction belonging to a lazy handle if it is still
@@ -51,14 +48,6 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction {
5148
return ltx.Tx
5249
}
5350

54-
// LazyResolver is a minimal interface needed for a transaction pool to satisfy
55-
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
56-
// pool being injected into the lazy transaction.
57-
type LazyResolver interface {
58-
// Get returns a transaction if it is contained in the pool, or nil otherwise.
59-
Get(hash common.Hash) *types.Transaction
60-
}
61-
6251
// AddressReserver is passed by the main transaction pool to subpools, so they
6352
// may request (and relinquish) exclusive access to certain addresses.
6453
type AddressReserver func(addr common.Address, reserve bool) error
@@ -110,10 +99,8 @@ type SubPool interface {
11099
// account and sorted by nonce.
111100
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
112101

113-
// SubscribeTransactions subscribes to new transaction events. The subscriber
114-
// can decide whether to receive notifications only for newly seen transactions
115-
// or also for reorged out ones.
116-
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
102+
// SubscribeTransactions subscribes to new transaction events.
103+
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
117104

118105
// Nonce returns the next nonce of an account, with all transactions executable
119106
// by the pool already applied on top.

core/txpool/txpool.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,13 @@ func (p *TxPool) Close() error {
155155
if err := <-errc; err != nil {
156156
errs = append(errs, err)
157157
}
158+
158159
// Terminate each subpool
159160
for _, subpool := range p.subpools {
160161
if err := subpool.Close(); err != nil {
161162
errs = append(errs, err)
162163
}
163164
}
164-
// Unsubscribe anyone still listening for tx events
165-
p.subs.Close()
166-
167165
if len(errs) > 0 {
168166
return fmt.Errorf("subpool close errors: %v", errs)
169167
}
@@ -318,12 +316,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
318316
return txs
319317
}
320318

321-
// SubscribeTransactions registers a subscription for new transaction events,
322-
// supporting feeding only newly seen or also resurrected transactions.
323-
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
319+
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
320+
// events to the given channel.
321+
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
324322
subs := make([]event.Subscription, len(p.subpools))
325323
for i, subpool := range p.subpools {
326-
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
324+
subs[i] = subpool.SubscribeTransactions(ch)
327325
}
328326
return p.subs.Track(event.JoinSubscriptions(subs...))
329327
}

eth/api_backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
334334
}
335335

336336
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
337-
return b.eth.txPool.SubscribeTransactions(ch, true)
337+
return b.eth.txPool.SubscribeNewTxsEvent(ch)
338338
}
339339

340340
func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {

eth/catalyst/simulated_beacon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error {
199199
func (c *SimulatedBeacon) loopOnDemand() {
200200
var (
201201
newTxs = make(chan core.NewTxsEvent)
202-
sub = c.eth.TxPool().SubscribeTransactions(newTxs, true)
202+
sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs)
203203
)
204204
defer sub.Unsubscribe()
205205

eth/handler.go

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,9 @@ type txPool interface {
7575
// The slice should be modifiable by the caller.
7676
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
7777

78-
// SubscribeTransactions subscribes to new transaction events. The subscriber
79-
// can decide whether to receive notifications only for newly seen transactions
80-
// or also for reorged out ones.
81-
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
78+
// SubscribeNewTxsEvent should return an event subscription of
79+
// NewTxsEvent and send events to the given channel.
80+
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
8281
}
8382

8483
// handlerConfig is the collection of initialization parameters to create a full
@@ -510,10 +509,10 @@ func (h *handler) unregisterPeer(id string) {
510509
func (h *handler) Start(maxPeers int) {
511510
h.maxPeers = maxPeers
512511

513-
// broadcast and announce transactions (only new ones, not resurrected ones)
512+
// broadcast transactions
514513
h.wg.Add(1)
515514
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
516-
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
515+
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
517516
go h.txBroadcastLoop()
518517

519518
// broadcast mined blocks
@@ -593,33 +592,26 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
593592
}
594593

595594
// BroadcastTransactions will propagate a batch of transactions
596-
// - To a square root of all peers for non-blob transactions
595+
// - To a square root of all peers
597596
// - And, separately, as announcements to all peers which are not known to
598597
// already have the given transaction.
599598
func (h *handler) BroadcastTransactions(txs types.Transactions) {
600599
var (
601-
blobTxs int // Number of blob transactions to announce only
602-
largeTxs int // Number of large transactions to announce only
603-
604-
directCount int // Number of transactions sent directly to peers (duplicates included)
605-
directPeers int // Number of peers that were sent transactions directly
606-
annCount int // Number of transactions announced across all peers (duplicates included)
607-
annPeers int // Number of peers announced about transactions
600+
annoCount int // Count of announcements made
601+
annoPeers int
602+
directCount int // Count of the txs sent directly to peers
603+
directPeers int // Count of the peers that were sent transactions directly
608604

609605
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
610606
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
607+
611608
)
612609
// Broadcast transactions to a batch of peers not knowing about it
613610
for _, tx := range txs {
614611
peers := h.peers.peersWithoutTransaction(tx.Hash())
615612

616613
var numDirect int
617-
switch {
618-
case tx.Type() == types.BlobTxType:
619-
blobTxs++
620-
case tx.Size() > txMaxBroadcastSize:
621-
largeTxs++
622-
default:
614+
if tx.Size() <= txMaxBroadcastSize {
623615
numDirect = int(math.Sqrt(float64(len(peers))))
624616
}
625617
// Send the tx unconditionally to a subset of our peers
@@ -637,12 +629,13 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
637629
peer.AsyncSendTransactions(hashes)
638630
}
639631
for peer, hashes := range annos {
640-
annPeers++
641-
annCount += len(hashes)
632+
annoPeers++
633+
annoCount += len(hashes)
642634
peer.AsyncSendPooledTransactionHashes(hashes)
643635
}
644-
log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
645-
"bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount)
636+
log.Debug("Transaction broadcast", "txs", len(txs),
637+
"announce packs", annoPeers, "announced hashes", annoCount,
638+
"tx packs", directPeers, "broadcast txs", directCount)
646639
}
647640

648641
// minedBroadcastLoop sends mined blocks to connected peers.

0 commit comments

Comments
 (0)