Skip to content

Commit 62affdc

Browse files
authored
core/txpool/blobpool: post-crash cleanup and addition/removal metrics (#28914)
* core/txpool/blobpool: clean up resurrected junk after a crash * core/txpool/blobpool: track transaction insertions and rejections * core/txpool/blobpool: linnnnnnnt
1 parent 06a8711 commit 62affdc

File tree

3 files changed

+158
-18
lines changed

3 files changed

+158
-18
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,8 @@ func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.Addr
386386

387387
if len(fails) > 0 {
388388
log.Warn("Dropping invalidated blob transactions", "ids", fails)
389+
dropInvalidMeter.Mark(int64(len(fails)))
390+
389391
for _, id := range fails {
390392
if err := p.store.Delete(id); err != nil {
391393
p.Close()
@@ -467,7 +469,13 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
467469
}
468470

469471
meta := newBlobTxMeta(id, size, tx)
470-
472+
if _, exists := p.lookup[meta.hash]; exists {
473+
// This path is only possible after a crash, where deleted items are not
474+
// removed via the normal shutdown-startup procedure and thus may get
475+
// partially resurrected.
476+
log.Error("Rejecting duplicate blob pool entry", "id", id, "hash", tx.Hash())
477+
return errors.New("duplicate blob entry")
478+
}
471479
sender, err := p.signer.Sender(tx)
472480
if err != nil {
473481
// This path is impossible unless the signature validity changes across
@@ -537,8 +545,10 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
537545

538546
if gapped {
539547
log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids)
548+
dropDanglingMeter.Mark(int64(len(ids)))
540549
} else {
541550
log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids)
551+
dropFilledMeter.Mark(int64(len(ids)))
542552
}
543553
for _, id := range ids {
544554
if err := p.store.Delete(id); err != nil {
@@ -569,6 +579,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
569579
txs = txs[1:]
570580
}
571581
log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs))
582+
dropOverlappedMeter.Mark(int64(len(ids)))
583+
572584
for _, id := range ids {
573585
if err := p.store.Delete(id); err != nil {
574586
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
@@ -600,10 +612,30 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
600612
}
601613
continue
602614
}
603-
// Sanity check that there's no double nonce. This case would be a coding
604-
// error, but better know about it
615+
// Sanity check that there's no double nonce. This case would generally
616+
// be a coding error, so better know about it.
617+
//
618+
// Also, Billy behind the blobpool does not journal deletes. A process
619+
// crash would result in previously deleted entities being resurrected.
620+
// That could potentially cause a duplicate nonce to appear.
605621
if txs[i].nonce == txs[i-1].nonce {
606-
log.Error("Duplicate nonce blob transaction", "from", addr, "nonce", txs[i].nonce)
622+
id := p.lookup[txs[i].hash]
623+
624+
log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id)
625+
dropRepeatedMeter.Mark(1)
626+
627+
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
628+
p.stored -= uint64(txs[i].size)
629+
delete(p.lookup, txs[i].hash)
630+
631+
if err := p.store.Delete(id); err != nil {
632+
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
633+
}
634+
txs = append(txs[:i], txs[i+1:]...)
635+
p.index[addr] = txs
636+
637+
i--
638+
continue
607639
}
608640
// Otherwise if there's a nonce gap evict all later transactions
609641
var (
@@ -621,6 +653,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
621653
txs = txs[:i]
622654

623655
log.Error("Dropping gapped blob transactions", "from", addr, "missing", txs[i-1].nonce+1, "drop", nonces, "ids", ids)
656+
dropGappedMeter.Mark(int64(len(ids)))
657+
624658
for _, id := range ids {
625659
if err := p.store.Delete(id); err != nil {
626660
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
@@ -665,6 +699,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
665699
p.index[addr] = txs
666700
}
667701
log.Warn("Dropping overdrafted blob transactions", "from", addr, "balance", balance, "spent", spent, "drop", nonces, "ids", ids)
702+
dropOverdraftedMeter.Mark(int64(len(ids)))
703+
668704
for _, id := range ids {
669705
if err := p.store.Delete(id); err != nil {
670706
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
@@ -695,6 +731,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
695731
p.index[addr] = txs
696732

697733
log.Warn("Dropping overcapped blob transactions", "from", addr, "kept", len(txs), "drop", nonces, "ids", ids)
734+
dropOvercappedMeter.Mark(int64(len(ids)))
735+
698736
for _, id := range ids {
699737
if err := p.store.Delete(id); err != nil {
700738
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
@@ -952,7 +990,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
952990
return err
953991
}
954992

955-
// Update the indixes and metrics
993+
// Update the indices and metrics
956994
meta := newBlobTxMeta(id, p.store.Size(id), tx)
957995
if _, ok := p.index[addr]; !ok {
958996
if err := p.reserve(addr, true); err != nil {
@@ -1019,6 +1057,8 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10191057
}
10201058
// Clear out the transactions from the data store
10211059
log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids)
1060+
dropUnderpricedMeter.Mark(int64(len(ids)))
1061+
10221062
for _, id := range ids {
10231063
if err := p.store.Delete(id); err != nil {
10241064
log.Error("Failed to delete dropped transaction", "id", id, "err", err)
@@ -1198,13 +1238,30 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
11981238
// Ensure the transaction is valid from all perspectives
11991239
if err := p.validateTx(tx); err != nil {
12001240
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
1241+
switch {
1242+
case errors.Is(err, txpool.ErrUnderpriced):
1243+
addUnderpricedMeter.Mark(1)
1244+
case errors.Is(err, core.ErrNonceTooLow):
1245+
addStaleMeter.Mark(1)
1246+
case errors.Is(err, core.ErrNonceTooHigh):
1247+
addGappedMeter.Mark(1)
1248+
case errors.Is(err, core.ErrInsufficientFunds):
1249+
addOverdraftedMeter.Mark(1)
1250+
case errors.Is(err, txpool.ErrAccountLimitExceeded):
1251+
addOvercappedMeter.Mark(1)
1252+
case errors.Is(err, txpool.ErrReplaceUnderpriced):
1253+
addNoreplaceMeter.Mark(1)
1254+
default:
1255+
addInvalidMeter.Mark(1)
1256+
}
12011257
return err
12021258
}
12031259
// If the address is not yet known, request exclusivity to track the account
12041260
// only by this subpool until all transactions are evicted
12051261
from, _ := types.Sender(p.signer, tx) // already validated above
12061262
if _, ok := p.index[from]; !ok {
12071263
if err := p.reserve(from, true); err != nil {
1264+
addNonExclusiveMeter.Mark(1)
12081265
return err
12091266
}
12101267
defer func() {
@@ -1244,6 +1301,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
12441301
}
12451302
if len(p.index[from]) > offset {
12461303
// Transaction replaces a previously queued one
1304+
dropReplacedMeter.Mark(1)
1305+
12471306
prev := p.index[from][offset]
12481307
if err := p.store.Delete(prev.id); err != nil {
12491308
// Shitty situation, but try to recover gracefully instead of going boom
@@ -1322,6 +1381,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
13221381
}
13231382
p.updateStorageMetrics()
13241383

1384+
addValidMeter.Mark(1)
13251385
return nil
13261386
}
13271387

@@ -1371,7 +1431,9 @@ func (p *BlobPool) drop() {
13711431
}
13721432
}
13731433
// Remove the transaction from the data store
1374-
log.Warn("Evicting overflown blob transaction", "from", from, "evicted", drop.nonce, "id", drop.id)
1434+
log.Debug("Evicting overflown blob transaction", "from", from, "evicted", drop.nonce, "id", drop.id)
1435+
dropOverflownMeter.Mark(1)
1436+
13751437
if err := p.store.Delete(drop.id); err != nil {
13761438
log.Error("Failed to drop evicted transaction", "id", drop.id, "err", err)
13771439
}

core/txpool/blobpool/blobpool_test.go

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,16 @@ func verifyPoolInternals(t *testing.T, pool *BlobPool) {
305305
// - 1. A transaction that cannot be decoded must be dropped
306306
// - 2. A transaction that cannot be recovered (bad signature) must be dropped
307307
// - 3. All transactions after a nonce gap must be dropped
308-
// - 4. All transactions after an underpriced one (including it) must be dropped
308+
// - 4. All transactions after an already included nonce must be dropped
309+
// - 5. All transactions after an underpriced one (including it) must be dropped
310+
// - 6. All transactions after an overdrafting sequence must be dropped
311+
// - 7. All transactions exceeding the per-account limit must be dropped
312+
//
313+
// Furthermore, some strange corner-cases can also occur after a crash, as Billy's
314+
// simplicity also allows it to resurrect past deleted entities:
315+
//
316+
// - 8. Fully duplicate transactions (matching hash) must be dropped
317+
// - 9. Duplicate nonces from the same account must be dropped
309318
func TestOpenDrops(t *testing.T) {
310319
log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
311320

@@ -338,7 +347,7 @@ func TestOpenDrops(t *testing.T) {
338347
badsig, _ := store.Put(blob)
339348

340349
// Insert a sequence of transactions with a nonce gap in between to verify
341-
// that anything gapped will get evicted (case 3)
350+
// that anything gapped will get evicted (case 3).
342351
var (
343352
gapper, _ = crypto.GenerateKey()
344353

@@ -357,7 +366,7 @@ func TestOpenDrops(t *testing.T) {
357366
}
358367
}
359368
// Insert a sequence of transactions with a gapped starting nonce to verify
360-
// that the entire set will get dropped.
369+
// that the entire set will get dropped (case 3).
361370
var (
362371
dangler, _ = crypto.GenerateKey()
363372
dangling = make(map[uint64]struct{})
@@ -370,7 +379,7 @@ func TestOpenDrops(t *testing.T) {
370379
dangling[id] = struct{}{}
371380
}
372381
// Insert a sequence of transactions with already passed nonces to veirfy
373-
// that the entire set will get dropped.
382+
// that the entire set will get dropped (case 4).
374383
var (
375384
filler, _ = crypto.GenerateKey()
376385
filled = make(map[uint64]struct{})
@@ -383,7 +392,7 @@ func TestOpenDrops(t *testing.T) {
383392
filled[id] = struct{}{}
384393
}
385394
// Insert a sequence of transactions with partially passed nonces to veirfy
386-
// that the included part of the set will get dropped
395+
// that the included part of the set will get dropped (case 4).
387396
var (
388397
overlapper, _ = crypto.GenerateKey()
389398
overlapped = make(map[uint64]struct{})
@@ -400,7 +409,7 @@ func TestOpenDrops(t *testing.T) {
400409
}
401410
}
402411
// Insert a sequence of transactions with an underpriced first to verify that
403-
// the entire set will get dropped (case 4).
412+
// the entire set will get dropped (case 5).
404413
var (
405414
underpayer, _ = crypto.GenerateKey()
406415
underpaid = make(map[uint64]struct{})
@@ -419,7 +428,7 @@ func TestOpenDrops(t *testing.T) {
419428
}
420429

421430
// Insert a sequence of transactions with an underpriced in between to verify
422-
// that it and anything newly gapped will get evicted (case 4).
431+
// that it and anything newly gapped will get evicted (case 5).
423432
var (
424433
outpricer, _ = crypto.GenerateKey()
425434
outpriced = make(map[uint64]struct{})
@@ -441,7 +450,7 @@ func TestOpenDrops(t *testing.T) {
441450
}
442451
}
443452
// Insert a sequence of transactions fully overdrafted to verify that the
444-
// entire set will get invalidated.
453+
// entire set will get invalidated (case 6).
445454
var (
446455
exceeder, _ = crypto.GenerateKey()
447456
exceeded = make(map[uint64]struct{})
@@ -459,7 +468,7 @@ func TestOpenDrops(t *testing.T) {
459468
exceeded[id] = struct{}{}
460469
}
461470
// Insert a sequence of transactions partially overdrafted to verify that part
462-
// of the set will get invalidated.
471+
// of the set will get invalidated (case 6).
463472
var (
464473
overdrafter, _ = crypto.GenerateKey()
465474
overdrafted = make(map[uint64]struct{})
@@ -481,7 +490,7 @@ func TestOpenDrops(t *testing.T) {
481490
}
482491
}
483492
// Insert a sequence of transactions overflowing the account cap to verify
484-
// that part of the set will get invalidated.
493+
// that part of the set will get invalidated (case 7).
485494
var (
486495
overcapper, _ = crypto.GenerateKey()
487496
overcapped = make(map[uint64]struct{})
@@ -496,6 +505,42 @@ func TestOpenDrops(t *testing.T) {
496505
overcapped[id] = struct{}{}
497506
}
498507
}
508+
// Insert a batch of duplicated transactions to verify that only one of each
509+
// version will remain (case 8).
510+
var (
511+
duplicater, _ = crypto.GenerateKey()
512+
duplicated = make(map[uint64]struct{})
513+
)
514+
for _, nonce := range []uint64{0, 1, 2} {
515+
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, duplicater))
516+
517+
for i := 0; i < int(nonce)+1; i++ {
518+
id, _ := store.Put(blob)
519+
if i == 0 {
520+
valids[id] = struct{}{}
521+
} else {
522+
duplicated[id] = struct{}{}
523+
}
524+
}
525+
}
526+
// Insert a batch of duplicated nonces to verify that only one of each will
527+
// remain (case 9).
528+
var (
529+
repeater, _ = crypto.GenerateKey()
530+
repeated = make(map[uint64]struct{})
531+
)
532+
for _, nonce := range []uint64{0, 1, 2} {
533+
for i := 0; i < int(nonce)+1; i++ {
534+
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater))
535+
536+
id, _ := store.Put(blob)
537+
if i == 0 {
538+
valids[id] = struct{}{}
539+
} else {
540+
repeated[id] = struct{}{}
541+
}
542+
}
543+
}
499544
store.Close()
500545

501546
// Create a blob pool out of the pre-seeded data
@@ -511,6 +556,8 @@ func TestOpenDrops(t *testing.T) {
511556
statedb.AddBalance(crypto.PubkeyToAddress(exceeder.PublicKey), uint256.NewInt(1000000))
512557
statedb.AddBalance(crypto.PubkeyToAddress(overdrafter.PublicKey), uint256.NewInt(1000000))
513558
statedb.AddBalance(crypto.PubkeyToAddress(overcapper.PublicKey), uint256.NewInt(10000000))
559+
statedb.AddBalance(crypto.PubkeyToAddress(duplicater.PublicKey), uint256.NewInt(1000000))
560+
statedb.AddBalance(crypto.PubkeyToAddress(repeater.PublicKey), uint256.NewInt(1000000))
514561
statedb.Commit(0, true)
515562

516563
chain := &testBlockChain{
@@ -554,6 +601,10 @@ func TestOpenDrops(t *testing.T) {
554601
t.Errorf("partially overdrafted transaction remained in storage: %d", tx.id)
555602
} else if _, ok := overcapped[tx.id]; ok {
556603
t.Errorf("overcapped transaction remained in storage: %d", tx.id)
604+
} else if _, ok := duplicated[tx.id]; ok {
605+
t.Errorf("duplicated transaction remained in storage: %d", tx.id)
606+
} else if _, ok := repeated[tx.id]; ok {
607+
t.Errorf("repeated nonce transaction remained in storage: %d", tx.id)
557608
} else {
558609
alive[tx.id] = struct{}{}
559610
}

core/txpool/blobpool/metrics.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ var (
6565
pooltipGauge = metrics.NewRegisteredGauge("blobpool/pooltip", nil)
6666

6767
// addwait/time, resetwait/time and getwait/time track the rough health of
68-
// the pool and whether or not it's capable of keeping up with the load from
69-
// the network.
68+
// the pool and whether it's capable of keeping up with the load from the
69+
// network.
7070
addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015))
7171
addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015))
7272
getwaitHist = metrics.NewRegisteredHistogram("blobpool/getwait", nil, metrics.NewExpDecaySample(1028, 0.015))
@@ -75,4 +75,31 @@ var (
7575
pendtimeHist = metrics.NewRegisteredHistogram("blobpool/pendtime", nil, metrics.NewExpDecaySample(1028, 0.015))
7676
resetwaitHist = metrics.NewRegisteredHistogram("blobpool/resetwait", nil, metrics.NewExpDecaySample(1028, 0.015))
7777
resettimeHist = metrics.NewRegisteredHistogram("blobpool/resettime", nil, metrics.NewExpDecaySample(1028, 0.015))
78+
79+
// The below metrics track various cases where transactions are dropped out
80+
// of the pool. Most are exceptional, some are chain progression and some
81+
// threshold cappings.
82+
dropInvalidMeter = metrics.NewRegisteredMeter("blobpool/drop/invalid", nil) // Invalid transaction, consensus change or bugfix, neutral-ish
83+
dropDanglingMeter = metrics.NewRegisteredMeter("blobpool/drop/dangling", nil) // First nonce gapped, bad
84+
dropFilledMeter = metrics.NewRegisteredMeter("blobpool/drop/filled", nil) // State full-overlap, chain progress, ok
85+
dropOverlappedMeter = metrics.NewRegisteredMeter("blobpool/drop/overlapped", nil) // State partial-overlap, chain progress, ok
86+
dropRepeatedMeter = metrics.NewRegisteredMeter("blobpool/drop/repeated", nil) // Repeated nonce, bad
87+
dropGappedMeter = metrics.NewRegisteredMeter("blobpool/drop/gapped", nil) // Non-first nonce gapped, bad
88+
dropOverdraftedMeter = metrics.NewRegisteredMeter("blobpool/drop/overdrafted", nil) // Balance exceeded, bad
89+
dropOvercappedMeter = metrics.NewRegisteredMeter("blobpool/drop/overcapped", nil) // Per-account cap exceeded, bad
90+
dropOverflownMeter = metrics.NewRegisteredMeter("blobpool/drop/overflown", nil) // Global disk cap exceeded, neutral-ish
91+
dropUnderpricedMeter = metrics.NewRegisteredMeter("blobpool/drop/underpriced", nil) // Gas tip changed, neutral
92+
dropReplacedMeter = metrics.NewRegisteredMeter("blobpool/drop/replaced", nil) // Transaction replaced, neutral
93+
94+
// The below metrics track various outcomes of transactions being added to
95+
// the pool.
96+
addInvalidMeter = metrics.NewRegisteredMeter("blobpool/add/invalid", nil) // Invalid transaction, reject, neutral
97+
addUnderpricedMeter = metrics.NewRegisteredMeter("blobpool/add/underpriced", nil) // Gas tip too low, neutral
98+
addStaleMeter = metrics.NewRegisteredMeter("blobpool/add/stale", nil) // Nonce already filled, reject, bad-ish
99+
addGappedMeter = metrics.NewRegisteredMeter("blobpool/add/gapped", nil) // Nonce gapped, reject, bad-ish
100+
addOverdraftedMeter = metrics.NewRegisteredMeter("blobpool/add/overdrafted", nil) // Balance exceeded, reject, neutral
101+
addOvercappedMeter = metrics.NewRegisteredMeter("blobpool/add/overcapped", nil) // Per-account cap exceeded, reject, neutral
102+
addNoreplaceMeter = metrics.NewRegisteredMeter("blobpool/add/noreplace", nil) // Replacement fees or tips too low, neutral
103+
addNonExclusiveMeter = metrics.NewRegisteredMeter("blobpool/add/nonexclusive", nil) // Plain transaction from same account exists, reject, neutral
104+
addValidMeter = metrics.NewRegisteredMeter("blobpool/add/valid", nil) // Valid transaction, add, neutral
78105
)

0 commit comments

Comments
 (0)