@@ -20,6 +20,7 @@ import (
2020 "crypto/rand"
2121 "crypto/sha256"
2222 "errors"
23+ "fmt"
2324 "math/big"
2425 "sync"
2526 "time"
@@ -30,6 +31,7 @@ import (
3031 "github.com/ethereum/go-ethereum/core/types"
3132 "github.com/ethereum/go-ethereum/crypto/kzg4844"
3233 "github.com/ethereum/go-ethereum/eth"
34+ "github.com/ethereum/go-ethereum/event"
3335 "github.com/ethereum/go-ethereum/log"
3436 "github.com/ethereum/go-ethereum/node"
3537 "github.com/ethereum/go-ethereum/params"
@@ -41,36 +43,46 @@ const devEpochLength = 32
4143// withdrawalQueue implements a FIFO queue which holds withdrawals that are
4244// pending inclusion.
4345type withdrawalQueue struct {
44- pending chan * types.Withdrawal
46+ pending types.Withdrawals
47+ mu sync.Mutex
48+ feed event.Feed
49+ subs event.SubscriptionScope
4550}
4651
52+ type newWithdrawalsEvent struct { Withdrawals types.Withdrawals }
53+
4754// add queues a withdrawal for future inclusion.
4855func (w * withdrawalQueue ) add (withdrawal * types.Withdrawal ) error {
49- select {
50- case w .pending <- withdrawal :
51- break
52- default :
53- return errors .New ("withdrawal queue full" )
54- }
56+ w .mu .Lock ()
57+ w .pending = append (w .pending , withdrawal )
58+ w .mu .Unlock ()
59+
60+ w .feed .Send (newWithdrawalsEvent {types.Withdrawals {withdrawal }})
5561 return nil
5662}
5763
58- // gatherPending returns a number of queued withdrawals up to a maximum count.
59- func (w * withdrawalQueue ) gatherPending (maxCount int ) []* types.Withdrawal {
60- withdrawals := []* types.Withdrawal {}
61- for {
62- select {
63- case withdrawal := <- w .pending :
64- withdrawals = append (withdrawals , withdrawal )
65- if len (withdrawals ) == maxCount {
66- return withdrawals
67- }
68- default :
69- return withdrawals
70- }
71- }
64+ // pop dequeues the specified number of withdrawals from the queue.
65+ func (w * withdrawalQueue ) pop (count int ) types.Withdrawals {
66+ w .mu .Lock ()
67+ defer w .mu .Unlock ()
68+
69+ count = min (count , len (w .pending ))
70+ popped := w .pending [0 :count ]
71+ w .pending = w .pending [count :]
72+
73+ return popped
74+ }
75+
76+ // subscribe allows a listener to be updated when new withdrawals are added to
77+ // the queue.
78+ func (w * withdrawalQueue ) subscribe (ch chan <- newWithdrawalsEvent ) event.Subscription {
79+ sub := w .feed .Subscribe (ch )
80+ return w .subs .Track (sub )
7281}
7382
83+ // SimulatedBeacon drives an Ethereum instance as if it were a real beacon
84+ // client. It can run in period mode where it mines a new block every period
85+ // (seconds) or on every transaction via Commit, Fork and AdjustTime.
7486type SimulatedBeacon struct {
7587 shutdownCh chan struct {}
7688 eth * eth.Ethereum
@@ -86,10 +98,6 @@ type SimulatedBeacon struct {
8698}
8799
88100// NewSimulatedBeacon constructs a new simulated beacon chain.
89- // Period sets the period in which blocks should be produced.
90- //
91- // - If period is set to 0, a block is produced on every transaction.
92- // via Commit, Fork and AdjustTime.
93101func NewSimulatedBeacon (period uint64 , eth * eth.Ethereum ) (* SimulatedBeacon , error ) {
94102 block := eth .BlockChain ().CurrentBlock ()
95103 current := engine.ForkchoiceStateV1 {
@@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err
112120 engineAPI : engineAPI ,
113121 lastBlockTime : block .Time ,
114122 curForkchoiceState : current ,
115- withdrawals : withdrawalQueue {make (chan * types.Withdrawal , 20 )},
116123 }, nil
117124}
118125
@@ -156,6 +163,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
156163 c .setCurrentState (header .Hash (), * finalizedHash )
157164 }
158165
166+ // Because transaction insertion, block insertion, and block production will
167+ // happen without any timing delay between them in simulator mode and the
168+ // transaction pool will be running its internal reset operation on a
169+ // background thread, flaky executions can happen. To avoid the racey
170+ // behavior, the pool will be explicitly blocked on its reset before
171+ // continuing to the block production below.
172+ if err := c .eth .APIBackend .TxPool ().Sync (); err != nil {
173+ return fmt .Errorf ("failed to sync txpool: %w" , err )
174+ }
175+
159176 var random [32 ]byte
160177 rand .Read (random [:])
161178 fcResponse , err := c .engineAPI .forkchoiceUpdated (c .curForkchoiceState , & engine.PayloadAttributes {
@@ -164,13 +181,14 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
164181 Withdrawals : withdrawals ,
165182 Random : random ,
166183 BeaconRoot : & common.Hash {},
167- }, engine .PayloadV3 , true )
184+ }, engine .PayloadV3 )
168185 if err != nil {
169186 return err
170187 }
171188 if fcResponse == engine .STATUS_SYNCING {
172189 return errors .New ("chain rewind prevented invocation of payload creation" )
173190 }
191+
174192 envelope , err := c .engineAPI .getPayload (* fcResponse .PayloadID , true )
175193 if err != nil {
176194 return err
@@ -223,8 +241,7 @@ func (c *SimulatedBeacon) loop() {
223241 case <- c .shutdownCh :
224242 return
225243 case <- timer .C :
226- withdrawals := c .withdrawals .gatherPending (10 )
227- if err := c .sealBlock (withdrawals , uint64 (time .Now ().Unix ())); err != nil {
244+ if err := c .sealBlock (c .withdrawals .pop (10 ), uint64 (time .Now ().Unix ())); err != nil {
228245 log .Warn ("Error performing sealing work" , "err" , err )
229246 } else {
230247 timer .Reset (time .Second * time .Duration (c .period ))
@@ -260,7 +277,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) {
260277
261278// Commit seals a block on demand.
262279func (c * SimulatedBeacon ) Commit () common.Hash {
263- withdrawals := c .withdrawals .gatherPending (10 )
280+ withdrawals := c .withdrawals .pop (10 )
264281 if err := c .sealBlock (withdrawals , uint64 (time .Now ().Unix ())); err != nil {
265282 log .Warn ("Error performing sealing work" , "err" , err )
266283 }
@@ -301,16 +318,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
301318 if parent == nil {
302319 return errors .New ("parent not found" )
303320 }
304- withdrawals := c .withdrawals .gatherPending (10 )
321+ withdrawals := c .withdrawals .pop (10 )
305322 return c .sealBlock (withdrawals , parent .Time + uint64 (adjustment / time .Second ))
306323}
307324
325+ // RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the
326+ // stack.
308327func RegisterSimulatedBeaconAPIs (stack * node.Node , sim * SimulatedBeacon ) {
309- api := & api {sim }
310- if sim .period == 0 {
311- // mine on demand if period is set to 0
312- go api .loop ()
313- }
328+ api := newSimulatedBeaconAPI (sim )
314329 stack .RegisterAPIs ([]rpc.API {
315330 {
316331 Namespace : "dev" ,
0 commit comments