Skip to content

Commit c372214

Browse files
committed
eth/catalyst: ensure period zero mode leaves no pending txs in pool
1 parent 142c94d commit c372214

File tree

3 files changed

+159
-52
lines changed

3 files changed

+159
-52
lines changed

eth/catalyst/simulated_beacon.go

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/ethereum/go-ethereum/core/types"
3131
"github.com/ethereum/go-ethereum/crypto/kzg4844"
3232
"github.com/ethereum/go-ethereum/eth"
33+
"github.com/ethereum/go-ethereum/event"
3334
"github.com/ethereum/go-ethereum/log"
3435
"github.com/ethereum/go-ethereum/node"
3536
"github.com/ethereum/go-ethereum/params"
@@ -41,36 +42,47 @@ const devEpochLength = 32
4142
// withdrawalQueue implements a FIFO queue which holds withdrawals that are
4243
// pending inclusion.
4344
type withdrawalQueue struct {
44-
pending chan *types.Withdrawal
45+
pending types.Withdrawals
46+
mu sync.Mutex
47+
feed event.Feed
48+
subs event.SubscriptionScope
4549
}
4650

51+
type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals }
52+
4753
// add queues a withdrawal for future inclusion.
48-
func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error {
49-
select {
50-
case w.pending <- withdrawal:
51-
break
52-
default:
53-
return errors.New("withdrawal queue full")
54-
}
54+
func (w *withdrawalQueue) Add(withdrawal *types.Withdrawal) error {
55+
w.mu.Lock()
56+
defer w.mu.Unlock()
57+
58+
w.pending = append(w.pending, withdrawal)
59+
w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}})
60+
5561
return nil
5662
}
5763

58-
// gatherPending returns a number of queued withdrawals up to a maximum count.
59-
func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal {
60-
withdrawals := []*types.Withdrawal{}
61-
for {
62-
select {
63-
case withdrawal := <-w.pending:
64-
withdrawals = append(withdrawals, withdrawal)
65-
if len(withdrawals) == maxCount {
66-
return withdrawals
67-
}
68-
default:
69-
return withdrawals
70-
}
71-
}
64+
// pop dequeues the specified number of withdrawals from the queue.
65+
func (w *withdrawalQueue) Pop(count int) types.Withdrawals {
66+
w.mu.Lock()
67+
defer w.mu.Unlock()
68+
69+
count = min(count, len(w.pending))
70+
popped := w.pending[0:count]
71+
w.pending = w.pending[count:]
72+
73+
return popped
7274
}
7375

76+
// subscribe allows a listener to be updated when new withdrawals are added to
77+
// the queue.
78+
func (w *withdrawalQueue) Subscribe(ch chan<- newWithdrawalsEvent) event.Subscription {
79+
sub := w.feed.Subscribe(ch)
80+
return w.subs.Track(sub)
81+
}
82+
83+
// SimulatedBeacon drives an Ethereum instance as if it were a real beacon
84+
// client. It can run in period mode where it mines a new block every period
85+
// (seconds) or on every transaction via Commit, Fork and AdjustTime.
7486
type SimulatedBeacon struct {
7587
shutdownCh chan struct{}
7688
eth *eth.Ethereum
@@ -86,10 +98,6 @@ type SimulatedBeacon struct {
8698
}
8799

88100
// NewSimulatedBeacon constructs a new simulated beacon chain.
89-
// Period sets the period in which blocks should be produced.
90-
//
91-
// - If period is set to 0, a block is produced on every transaction.
92-
// via Commit, Fork and AdjustTime.
93101
func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, error) {
94102
block := eth.BlockChain().CurrentBlock()
95103
current := engine.ForkchoiceStateV1{
@@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err
112120
engineAPI: engineAPI,
113121
lastBlockTime: block.Time,
114122
curForkchoiceState: current,
115-
withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)},
116123
}, nil
117124
}
118125

@@ -171,6 +178,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
171178
if fcResponse == engine.STATUS_SYNCING {
172179
return errors.New("chain rewind prevented invocation of payload creation")
173180
}
181+
174182
envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true)
175183
if err != nil {
176184
return err
@@ -223,8 +231,7 @@ func (c *SimulatedBeacon) loop() {
223231
case <-c.shutdownCh:
224232
return
225233
case <-timer.C:
226-
withdrawals := c.withdrawals.gatherPending(10)
227-
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
234+
if err := c.sealBlock(c.withdrawals.Pop(10), uint64(time.Now().Unix())); err != nil {
228235
log.Warn("Error performing sealing work", "err", err)
229236
} else {
230237
timer.Reset(time.Second * time.Duration(c.period))
@@ -260,7 +267,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) {
260267

261268
// Commit seals a block on demand.
262269
func (c *SimulatedBeacon) Commit() common.Hash {
263-
withdrawals := c.withdrawals.gatherPending(10)
270+
withdrawals := c.withdrawals.Pop(10)
264271
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
265272
log.Warn("Error performing sealing work", "err", err)
266273
}
@@ -301,12 +308,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
301308
if parent == nil {
302309
return errors.New("parent not found")
303310
}
304-
withdrawals := c.withdrawals.gatherPending(10)
311+
withdrawals := c.withdrawals.Pop(10)
305312
return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second))
306313
}
307314

315+
// RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the
316+
// stack.
308317
func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) {
309-
api := &api{sim}
318+
api := &simulatedBeaconAPI{sim: sim, doCommit: make(chan struct{}, 1)}
310319
if sim.period == 0 {
311320
// mine on demand if period is set to 0
312321
go api.loop()

eth/catalyst/simulated_beacon_api.go

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,44 +18,77 @@ package catalyst
1818

1919
import (
2020
"context"
21-
"time"
2221

2322
"github.com/ethereum/go-ethereum/common"
2423
"github.com/ethereum/go-ethereum/core"
2524
"github.com/ethereum/go-ethereum/core/types"
26-
"github.com/ethereum/go-ethereum/log"
2725
)
2826

29-
type api struct {
30-
sim *SimulatedBeacon
27+
// simulatedBeaconAPI provides a RPC API for SimulatedBeacon.
28+
type simulatedBeaconAPI struct {
29+
sim *SimulatedBeacon
30+
doCommit chan struct{}
3131
}
3232

33-
func (a *api) loop() {
33+
// loop is the main loop for the API when it's running in period = 0 mode. It
34+
// ensures that block production is triggered as soon as a new withdrawal or
35+
// transaction is received.
36+
func (a *simulatedBeaconAPI) loop() {
3437
var (
35-
newTxs = make(chan core.NewTxsEvent)
36-
sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
38+
newTxs = make(chan core.NewTxsEvent)
39+
newWxs = make(chan newWithdrawalsEvent)
40+
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
41+
newWxsSub = a.sim.withdrawals.Subscribe(newWxs)
3742
)
38-
defer sub.Unsubscribe()
43+
defer newTxsSub.Unsubscribe()
44+
defer newWxsSub.Unsubscribe()
45+
46+
go a.worker()
3947

4048
for {
4149
select {
4250
case <-a.sim.shutdownCh:
4351
return
44-
case w := <-a.sim.withdrawals.pending:
45-
withdrawals := append(a.sim.withdrawals.gatherPending(9), w)
46-
if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
47-
log.Warn("Error performing sealing work", "err", err)
48-
}
52+
case <-newWxs:
53+
a.commit()
4954
case <-newTxs:
55+
a.commit()
56+
}
57+
}
58+
}
59+
60+
// commit is a non-blocking method to initate Commit() on the simulator.
61+
func (a *simulatedBeaconAPI) commit() {
62+
select {
63+
case a.doCommit <- struct{}{}:
64+
default:
65+
}
66+
}
67+
68+
// worker runs in the background and signals to the simulator when to commit
69+
// based on messages over doCommit.
70+
func (a *simulatedBeaconAPI) worker() {
71+
for {
72+
select {
73+
case <-a.sim.shutdownCh:
74+
return
75+
case <-a.doCommit:
5076
a.sim.Commit()
77+
a.sim.eth.TxPool().Sync()
78+
executable, _ := a.sim.eth.TxPool().Stats()
79+
if executable != 0 {
80+
a.commit()
81+
}
5182
}
5283
}
5384
}
5485

55-
func (a *api) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
56-
return a.sim.withdrawals.add(withdrawal)
86+
// AddWithdrawal adds a withdrawal to the pending queue.
87+
func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
88+
return a.sim.withdrawals.Add(withdrawal)
5789
}
5890

59-
func (a *api) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
91+
// SetFeeRecipient sets the fee recipient for block building purposes.
92+
func (a *simulatedBeaconAPI) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
6093
a.sim.setFeeRecipient(feeRecipient)
6194
}

eth/catalyst/simulated_beacon_test.go

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
"github.com/ethereum/go-ethereum/params"
3636
)
3737

38-
func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node.Node, *eth.Ethereum, *SimulatedBeacon) {
38+
func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis, period uint64) (*node.Node, *eth.Ethereum, *SimulatedBeacon) {
3939
t.Helper()
4040

4141
n, err := node.New(&node.Config{
@@ -55,7 +55,7 @@ func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node.
5555
t.Fatal("can't create eth service:", err)
5656
}
5757

58-
simBeacon, err := NewSimulatedBeacon(1, ethservice)
58+
simBeacon, err := NewSimulatedBeacon(period, ethservice)
5959
if err != nil {
6060
t.Fatal("can't create simulated beacon:", err)
6161
}
@@ -87,7 +87,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
8787
// short period (1 second) for testing purposes
8888
var gasLimit uint64 = 10_000_000
8989
genesis := core.DeveloperGenesisBlock(gasLimit, &testAddr)
90-
node, ethService, mock := startSimulatedBeaconEthService(t, genesis)
90+
node, ethService, mock := startSimulatedBeaconEthService(t, genesis, 1)
9191
_ = mock
9292
defer node.Close()
9393

@@ -98,7 +98,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
9898
// generate some withdrawals
9999
for i := 0; i < 20; i++ {
100100
withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)})
101-
if err := mock.withdrawals.add(&withdrawals[i]); err != nil {
101+
if err := mock.withdrawals.Add(&withdrawals[i]); err != nil {
102102
t.Fatal("addWithdrawal failed", err)
103103
}
104104
}
@@ -140,3 +140,68 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
140140
}
141141
}
142142
}
143+
144+
// Tests that zero-period dev mode can handle a lot of simultaneous
145+
// transactions/withdrawals
146+
func TestOnDemandSpam(t *testing.T) {
147+
var (
148+
withdrawals []types.Withdrawal
149+
txs = make(map[common.Hash]*types.Transaction)
150+
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
151+
testAddr = crypto.PubkeyToAddress(testKey.PublicKey)
152+
gasLimit uint64 = 10_000_000
153+
genesis = core.DeveloperGenesisBlock(gasLimit, &testAddr)
154+
node, eth, mock = startSimulatedBeaconEthService(t, genesis, 0)
155+
signer = types.LatestSigner(eth.BlockChain().Config())
156+
chainHeadCh = make(chan core.ChainHeadEvent, 100)
157+
sub = eth.BlockChain().SubscribeChainHeadEvent(chainHeadCh)
158+
)
159+
defer node.Close()
160+
defer sub.Unsubscribe()
161+
162+
// start simulated beacon
163+
api := &simulatedBeaconAPI{sim: mock, doCommit: make(chan struct{}, 1)}
164+
go api.loop()
165+
166+
// generate some withdrawals
167+
for i := 0; i < 20; i++ {
168+
withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)})
169+
if err := mock.withdrawals.Add(&withdrawals[i]); err != nil {
170+
t.Fatal("addWithdrawal failed", err)
171+
}
172+
}
173+
174+
// generate a bunch of transactions
175+
for i := 0; i < 20000; i++ {
176+
tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey)
177+
if err != nil {
178+
t.Fatal("error signing transaction", err)
179+
}
180+
txs[tx.Hash()] = tx
181+
if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil {
182+
t.Fatal("error adding txs to pool", err)
183+
}
184+
}
185+
186+
var (
187+
includedTxs = make(map[common.Hash]struct{})
188+
includedWxs []uint64
189+
)
190+
for {
191+
select {
192+
case evt := <-chainHeadCh:
193+
for _, itx := range evt.Block.Transactions() {
194+
includedTxs[itx.Hash()] = struct{}{}
195+
}
196+
for _, iwx := range evt.Block.Withdrawals() {
197+
includedWxs = append(includedWxs, iwx.Index)
198+
}
199+
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
200+
if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) {
201+
return
202+
}
203+
case <-time.After(10 * time.Second):
204+
t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals))
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)