Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var (
queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
queuedEvictionMeter = metrics.NewRegisteredMeter("txpool/queued/eviction", nil) // Dropped due to lifetime

// General tx metrics
knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil)
Expand Down Expand Up @@ -231,11 +232,12 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
queuedTs map[common.Hash]time.Time // Timestamp for when queued transactions were added
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
Expand Down Expand Up @@ -266,6 +268,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
queuedTs: make(map[common.Hash]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
Expand Down Expand Up @@ -363,7 +366,10 @@ func (pool *TxPool) loop() {
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
for _, tx := range pool.queue[addr].Flatten() {
pool.removeTx(tx.Hash(), true)
if time.Since(pool.queuedTs[tx.Hash()]) > pool.config.Lifetime {
queuedEvictionMeter.Mark(1)
pool.removeTx(tx.Hash(), true)
}
}
}
}
Expand Down Expand Up @@ -616,6 +622,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.all.Add(tx)
pool.priced.Put(tx)
pool.journalTx(from, tx)
pool.beats[from] = time.Now()
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
return old != nil, nil
Expand Down Expand Up @@ -658,16 +665,20 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
old_hash := old.Hash()
pool.all.Remove(old_hash)
pool.priced.Removed(1)
delete(pool.queuedTs, old_hash)
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
pool.queuedTs[hash] = time.Now()
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
pool.queuedTs[hash] = time.Now()
}
return old != nil, nil
}
Expand Down Expand Up @@ -700,15 +711,14 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// An older transaction was better, discard this
pool.all.Remove(hash)
pool.priced.Removed(1)

delete(pool.queuedTs, hash)
pendingDiscardMeter.Mark(1)
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)

pendingReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the pending counter
Expand All @@ -721,6 +731,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
delete(pool.queuedTs, hash)
pool.pendingNonces.set(addr, tx.Nonce()+1)

return true
Expand Down Expand Up @@ -895,7 +906,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// If no more pending transactions are left, remove the list
if pending.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
// Postpone any invalidated transactions
for _, tx := range invalids {
Expand All @@ -913,6 +923,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedGauge.Dec(1)
delete(pool.queuedTs, hash)
}
if future.Empty() {
delete(pool.queue, addr)
Expand Down Expand Up @@ -1191,13 +1202,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
delete(pool.queuedTs, hash)
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
delete(pool.queuedTs, hash)
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
Expand All @@ -1220,6 +1233,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
delete(pool.queuedTs, hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
Expand Down Expand Up @@ -1414,7 +1428,7 @@ func (pool *TxPool) demoteUnexecutables() {
}
pendingGauge.Dec(int64(len(gapped)))
}
// Delete the entire queue entry if it became empty.
// Delete the entire pending entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
Expand Down
133 changes: 132 additions & 1 deletion core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func validateTxPoolInternals(pool *TxPool) error {
if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued)
}
if queued != len(pool.queuedTs) {
return fmt.Errorf("total queued transaction count %d != %d queuedTs length", queued, len(pool.queuedTs))
}

// Ensure the next nonce to assign is the correct one
for addr, txs := range pool.pending {
// Find the last transaction
Expand Down Expand Up @@ -868,7 +872,7 @@ func TestTransactionQueueTimeLimitingNoLocals(t *testing.T) {
func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
// Reduce the eviction interval to a testable amount
defer func(old time.Duration) { evictionInterval = old }(evictionInterval)
evictionInterval = time.Second
evictionInterval = time.Millisecond * 100

// Create the pool to test the non-expiration enforcement
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
Expand Down Expand Up @@ -905,6 +909,22 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// Allow the eviction interval to run
time.Sleep(2 * evictionInterval)

// Transactions should not be evicted from the queue yet since lifetime duration has not passed
pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains
time.Sleep(2 * config.Lifetime)

Expand All @@ -924,6 +944,117 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// remove current transactions and increase nonce to prepare for a reset and cleanup
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2)
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)

<-pool.requestReset(nil, nil)

// make sure queue, pending are cleared
pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
if err := pool.AddLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}

// wait a short amount of time to add an additional future queued item to test proper eviction when
// pending is removed
time.Sleep(2 * evictionInterval)
if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}

// Make sure future queue and pending have transactions
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// Trigger a reset to make sure queued items are not evicted
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 3)
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 3)
<-pool.requestReset(nil, nil)

// Wait for eviction to run
time.Sleep(evictionInterval * 2)

// a pool reset, empty pending list, or demotion of pending transactions should maintain
// queued transactions for non locals and locals alike if the lifetime duration has not passed yet
pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// Wait for the lifetime to run for all transactions except the one that was added later
time.Sleep(evictionInterval * 7)
pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if nolocals {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
} else {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
}

if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// lifetime should pass for the final transaction
time.Sleep(evictionInterval * 2)

pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if nolocals {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
} else {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
}

// Tests that even if the transaction count belonging to a single account goes
Expand Down