Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolAccountPendingLimitFlag,
utils.TxPoolLifetimeFlag,
utils.SyncModeFlag,
utils.ExitWhenSyncedFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolAccountPendingLimitFlag,
utils.TxPoolLifetimeFlag,
},
},
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ var (
Usage: "Maximum number of non-executable transaction slots for all accounts",
Value: ethconfig.Defaults.TxPool.GlobalQueue,
}
TxPoolAccountPendingLimitFlag = cli.Uint64Flag{
Name: "txpool.accountpendinglimit",
Usage: "Maximum number of executable transactions allowed per account",
Value: ethconfig.Defaults.TxPool.AccountPendingLimit,
}
TxPoolLifetimeFlag = cli.DurationFlag{
Name: "txpool.lifetime",
Usage: "Maximum amount of time non-executable transaction are queued",
Expand Down Expand Up @@ -1519,6 +1524,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolGlobalQueueFlag.Name) {
cfg.GlobalQueue = ctx.GlobalUint64(TxPoolGlobalQueueFlag.Name)
}
if ctx.GlobalIsSet(TxPoolAccountPendingLimitFlag.Name) {
cfg.AccountPendingLimit = ctx.GlobalUint64(TxPoolAccountPendingLimitFlag.Name)
}
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}
Expand Down
58 changes: 58 additions & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (
pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
pendingEvictionMeter = metrics.NewRegisteredMeter("txpool/pending/eviction", nil) // Dropped due to lifetime

// Metrics for the queued pool
queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
Expand Down Expand Up @@ -178,6 +179,8 @@ type TxPoolConfig struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

AccountPendingLimit uint64 // Number of executable transactions allowed per account

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

Expand All @@ -195,6 +198,8 @@ var DefaultTxPoolConfig = TxPoolConfig{
AccountQueue: 64,
GlobalQueue: 1024,

AccountPendingLimit: 1024,

Lifetime: 3 * time.Hour,
}

Expand Down Expand Up @@ -230,6 +235,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
}
if conf.AccountPendingLimit < 1 {
log.Warn("Sanitizing invalid txpool account pending limit", "provided", conf.AccountPendingLimit, "updated", DefaultTxPoolConfig.AccountPendingLimit)
conf.AccountPendingLimit = DefaultTxPoolConfig.AccountPendingLimit
}
if conf.Lifetime < 1 {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
conf.Lifetime = DefaultTxPoolConfig.Lifetime
Expand Down Expand Up @@ -422,6 +431,7 @@ func (pool *TxPool) loop() {
// Handle inactive account transaction eviction
case <-evict.C:
pool.mu.Lock()
// Evict queued transactions
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
Expand All @@ -437,6 +447,22 @@ func (pool *TxPool) loop() {
queuedEvictionMeter.Mark(int64(len(list)))
}
}
// Evict pending transactions
for addr := range pool.pending {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.pending[addr].Flatten()
for _, tx := range list {
log.Trace("Evicting transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds())
pool.removeTx(tx.Hash(), true)
}
pendingEvictionMeter.Mark(int64(len(list)))
}
}
pool.mu.Unlock()

// Handle local transaction journal rotation
Expand Down Expand Up @@ -957,6 +983,15 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
}
list := pool.pending[addr]

// Account pending list is full
if uint64(list.Len()) >= pool.config.AccountPendingLimit {
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())
pool.priced.Removed(1)
pendingDiscardMeter.Mark(1)
return false
}

inserted, old := list.Add(tx, pool.currentState, pool.config.PriceBump, pool.chainconfig, pool.currentHead)
if !inserted {
// An older transaction was better, discard this
Expand Down Expand Up @@ -1574,6 +1609,29 @@ func (pool *TxPool) executableTxFilter(costLimit *big.Int) func(tx *types.Transa
// pending limit. The algorithm tries to reduce transaction counts by an approximately
// equal number for all for accounts with many pending transactions.
func (pool *TxPool) truncatePending() {
// Truncate pending lists to max length
for addr, list := range pool.pending {
if list.Len() > int(pool.config.AccountPendingLimit) {
caps := list.Cap(int(pool.config.AccountPendingLimit))
for _, tx := range caps {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())

// Update the account nonce to the dropped transaction
// note: this will set pending nonce to the min nonce from the discarded txs
pool.pendingNonces.setIfLower(addr, tx.Nonce())
log.Trace("Removed pending transaction to comply with hard limit", "hash", hash.Hex())
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(caps)))
}
}
}

pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
Expand Down
121 changes: 119 additions & 2 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,14 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
// The whole life time pass after last promotion, kick out stale transactions
time.Sleep(2 * config.Lifetime)
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if nolocals {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
} else {
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
}
if nolocals {
if queued != 0 {
Expand Down Expand Up @@ -1203,6 +1209,117 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
}
}

// Tests that the transaction count belonging to any account cannot go
// above the configured hard threshold.
func TestTransactionPendingPerAccountLimiting(t *testing.T) {
t.Parallel()

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}

config := testTxPoolConfig
config.AccountPendingLimit = 16

pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()

// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
}

// Generate and queue a batch of transactions
nonces := make(map[common.Address]uint64)
txs := types.Transactions{}
for _, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
// Add limit + 1 transactions per account
for j := 0; j < int(config.AccountPendingLimit)+1; j++ {
txs = append(txs, transaction(nonces[addr], 100000, key))
nonces[addr]++
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotesSync(txs)

// Check that limits are enforced
for _, list := range pool.pending {
pending := list.Len()
if pending > int(config.AccountPendingLimit) {
t.Fatalf("pending transactions for account overflow allowance: %d > %d", pending, config.AccountPendingLimit)
}
}
globalPending, globalQueued := pool.Stats()
if globalPending != int(config.AccountPendingLimit)*5 {
t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5)
}
if globalQueued != 0 {
t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, 0)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// Save dropped nonce for future use
pendingNonces := make(map[common.Address]uint64)
for addr, nonce := range nonces {
pendingNonces[addr] = nonce - 1
}

// Generate and queue a batch of transactions (with nonce gap)
txs = types.Transactions{}
for _, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
for j := 0; j < int(config.AccountPendingLimit); j++ {
txs = append(txs, transaction(nonces[addr], 100000, key))
nonces[addr]++
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotesSync(txs)

globalPending, globalQueued = pool.Stats()
if globalPending != int(config.AccountPendingLimit)*5 {
t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5)
}
if globalQueued != int(config.AccountPendingLimit)*5 {
t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, int(config.AccountPendingLimit)*5)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}

// Generate and queue a batch of transactions (fill the nonce gap)
txs = types.Transactions{}
for _, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
txs = append(txs, transaction(pendingNonces[addr], 100000, key))
}
// Import the batch and verify that limits have been enforced
pool.AddRemotesSync(txs)

// Check that limits are enforced
for _, list := range pool.pending {
pending := list.Len()
if pending > int(config.AccountPendingLimit) {
t.Fatalf("pending transactions for account overflow allowance: %d > %d", pending, config.AccountPendingLimit)
}
}
globalPending, globalQueued = pool.Stats()
if globalPending != int(config.AccountPendingLimit)*5 {
t.Fatalf("unexpected global pending transaction count: %d != %d", globalPending, int(config.AccountPendingLimit)*5)
}
if globalQueued != 0 {
t.Fatalf("unexpected global queued transaction count: %d != %d", globalQueued, 0)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
}

// Test the limit on transaction size is enforced correctly.
// This test verifies every transaction having allowed size
// is added to the pool, and longer transactions are rejected.
Expand Down