Skip to content

Commit 083822b

Browse files
MariusVanDerWijdenrjl493456442
authored andcommitted
eth/catalyst, miner: build the execution payload async (ethereum#24866)
* eth/catalyst: build the execution payload async * miner: added comment, added test case * eth/catalyst: miner: move async block production to miner * eth/catalyst, miner: support generate seal block async * miner: rework GetSealingBlockAsync to use a passed channel * miner: apply rjl's diff * eth/catalyst: nitpicks Co-authored-by: Gary Rong <[email protected]>
1 parent 61aa943 commit 083822b

File tree

6 files changed

+186
-53
lines changed

6 files changed

+186
-53
lines changed

eth/catalyst/api.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -204,18 +204,19 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
204204
// sealed by the beacon client. The payload will be requested later, and we
205205
// might replace it arbitrarily many times in between.
206206
if payloadAttributes != nil {
207-
log.Info("Creating new payload for sealing")
208-
start := time.Now()
209-
210-
data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes)
207+
// Create an empty block first which can be used as a fallback
208+
empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true)
209+
if err != nil {
210+
return valid(nil), err
211+
}
212+
// Send a request to generate a full block in the background.
213+
// The result can be obtained via the returned channel.
214+
resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false)
211215
if err != nil {
212-
log.Error("Failed to create sealing payload", "err", err)
213-
return valid(nil), err // valid setHead, invalid payload
216+
return valid(nil), err
214217
}
215218
id := computePayloadId(update.HeadBlockHash, payloadAttributes)
216-
api.localBlocks.put(id, data)
217-
218-
log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start))
219+
api.localBlocks.put(id, &payload{empty: empty, result: resCh})
219220
return valid(&id), nil
220221
}
221222
return valid(nil), nil
@@ -351,14 +352,3 @@ func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 {
351352
errorMsg := err.Error()
352353
return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: &currentHash, ValidationError: &errorMsg}
353354
}
354-
355-
// assembleBlock creates a new block and returns the "execution
356-
// data" required for beacon clients to process the new block.
357-
func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
358-
log.Info("Producing block", "parentHash", parentHash)
359-
block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
360-
if err != nil {
361-
return nil, err
362-
}
363-
return beacon.BlockToExecutableData(block), nil
364-
}

eth/catalyst/api_test.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func TestEth2AssembleBlock(t *testing.T) {
9393
blockParams := beacon.PayloadAttributesV1{
9494
Timestamp: blocks[9].Time() + 5,
9595
}
96-
execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams)
96+
execData, err := assembleBlock(api, blocks[9].Hash(), &blockParams)
9797
if err != nil {
9898
t.Fatalf("error producing block, err=%v", err)
9999
}
@@ -114,7 +114,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
114114
blockParams := beacon.PayloadAttributesV1{
115115
Timestamp: blocks[8].Time() + 5,
116116
}
117-
execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams)
117+
execData, err := assembleBlock(api, blocks[8].Hash(), &blockParams)
118118
if err != nil {
119119
t.Fatalf("error producing block, err=%v", err)
120120
}
@@ -273,7 +273,7 @@ func TestEth2NewBlock(t *testing.T) {
273273
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
274274
ethservice.TxPool().AddLocal(tx)
275275

276-
execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
276+
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
277277
Timestamp: parent.Time() + 5,
278278
})
279279
if err != nil {
@@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) {
313313
)
314314
parent = preMergeBlocks[len(preMergeBlocks)-1]
315315
for i := 0; i < 10; i++ {
316-
execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
316+
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
317317
Timestamp: parent.Time() + 6,
318318
})
319319
if err != nil {
@@ -530,3 +530,77 @@ func TestExchangeTransitionConfig(t *testing.T) {
530530
t.Fatalf("expected no error on valid config, got %v", err)
531531
}
532532
}
533+
534+
func TestEmptyBlocks(t *testing.T) {
535+
genesis, preMergeBlocks := generatePreMergeChain(10)
536+
n, ethservice := startEthService(t, genesis, preMergeBlocks)
537+
ethservice.Merger().ReachTTD()
538+
defer n.Close()
539+
var (
540+
api = NewConsensusAPI(ethservice)
541+
parent = ethservice.BlockChain().CurrentBlock()
542+
// This EVM code generates a log when the contract is created.
543+
logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
544+
)
545+
for i := 0; i < 10; i++ {
546+
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
547+
nonce := statedb.GetNonce(testAddr)
548+
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
549+
ethservice.TxPool().AddLocal(tx)
550+
551+
params := beacon.PayloadAttributesV1{
552+
Timestamp: parent.Time() + 1,
553+
Random: crypto.Keccak256Hash([]byte{byte(i)}),
554+
SuggestedFeeRecipient: parent.Coinbase(),
555+
}
556+
557+
fcState := beacon.ForkchoiceStateV1{
558+
HeadBlockHash: parent.Hash(),
559+
SafeBlockHash: common.Hash{},
560+
FinalizedBlockHash: common.Hash{},
561+
}
562+
resp, err := api.ForkchoiceUpdatedV1(fcState, &params)
563+
if err != nil {
564+
t.Fatalf("error preparing payload, err=%v", err)
565+
}
566+
if resp.PayloadStatus.Status != beacon.VALID {
567+
t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
568+
}
569+
payload, err := api.GetPayloadV1(*resp.PayloadID)
570+
if err != nil {
571+
t.Fatalf("can't get payload: %v", err)
572+
}
573+
// TODO(493456442, marius) this test can be flaky since we rely on a 100ms
574+
// allowance for block generation internally.
575+
if len(payload.Transactions) == 0 {
576+
t.Fatalf("payload should not be empty")
577+
}
578+
execResp, err := api.NewPayloadV1(*payload)
579+
if err != nil {
580+
t.Fatalf("can't execute payload: %v", err)
581+
}
582+
if execResp.Status != beacon.VALID {
583+
t.Fatalf("invalid status: %v", execResp.Status)
584+
}
585+
fcState = beacon.ForkchoiceStateV1{
586+
HeadBlockHash: payload.BlockHash,
587+
SafeBlockHash: payload.ParentHash,
588+
FinalizedBlockHash: payload.ParentHash,
589+
}
590+
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
591+
t.Fatalf("Failed to insert block: %v", err)
592+
}
593+
if ethservice.BlockChain().CurrentBlock().NumberU64() != payload.Number {
594+
t.Fatalf("Chain head should be updated")
595+
}
596+
parent = ethservice.BlockChain().CurrentBlock()
597+
}
598+
}
599+
600+
func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
601+
block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false)
602+
if err != nil {
603+
return nil, err
604+
}
605+
return beacon.BlockToExecutableData(block), nil
606+
}

eth/catalyst/queue.go

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package catalyst
1818

1919
import (
2020
"sync"
21+
"time"
2122

2223
"github.com/ethereum/go-ethereum/common"
2324
"github.com/ethereum/go-ethereum/core/beacon"
@@ -34,11 +35,52 @@ const maxTrackedPayloads = 10
3435
// latest one; but have a slight wiggle room for non-ideal conditions.
3536
const maxTrackedHeaders = 10
3637

38+
// payload wraps the miner's block production channel, allowing the mined block
39+
// to be retrieved later upon the GetPayload engine API call.
40+
type payload struct {
41+
lock sync.Mutex
42+
done bool
43+
empty *types.Block
44+
block *types.Block
45+
result chan *types.Block
46+
}
47+
48+
// resolve extracts the generated full block from the given channel if possible
49+
// or fallback to empty block as an alternative.
50+
func (req *payload) resolve() *beacon.ExecutableDataV1 {
51+
// this function can be called concurrently, prevent any
52+
// concurrency issue in the first place.
53+
req.lock.Lock()
54+
defer req.lock.Unlock()
55+
56+
// Try to resolve the full block first if it's not obtained
57+
// yet. The returned block can be nil if the generation fails.
58+
59+
if !req.done {
60+
timeout := time.NewTimer(500 * time.Millisecond)
61+
defer timeout.Stop()
62+
63+
select {
64+
case req.block = <-req.result:
65+
req.done = true
66+
case <-timeout.C:
67+
// TODO(rjl49345642, Marius), should we keep this
68+
// 100ms timeout allowance? Why not just use the
69+
// default and then fallback to empty directly?
70+
}
71+
}
72+
73+
if req.block != nil {
74+
return beacon.BlockToExecutableData(req.block)
75+
}
76+
return beacon.BlockToExecutableData(req.empty)
77+
}
78+
3779
// payloadQueueItem represents an id->payload tuple to store until it's retrieved
3880
// or evicted.
3981
type payloadQueueItem struct {
40-
id beacon.PayloadID
41-
payload *beacon.ExecutableDataV1
82+
id beacon.PayloadID
83+
data *payload
4284
}
4385

4486
// payloadQueue tracks the latest handful of constructed payloads to be retrieved
@@ -57,14 +99,14 @@ func newPayloadQueue() *payloadQueue {
5799
}
58100

59101
// put inserts a new payload into the queue at the given id.
60-
func (q *payloadQueue) put(id beacon.PayloadID, data *beacon.ExecutableDataV1) {
102+
func (q *payloadQueue) put(id beacon.PayloadID, data *payload) {
61103
q.lock.Lock()
62104
defer q.lock.Unlock()
63105

64106
copy(q.payloads[1:], q.payloads)
65107
q.payloads[0] = &payloadQueueItem{
66-
id: id,
67-
payload: data,
108+
id: id,
109+
data: data,
68110
}
69111
}
70112

@@ -78,7 +120,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
78120
return nil // no more items
79121
}
80122
if item.id == id {
81-
return item.payload
123+
return item.data.resolve()
82124
}
83125
}
84126
return nil

miner/miner.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,32 @@ func (miner *Miner) DisablePreseal() {
235235
miner.worker.disablePreseal()
236236
}
237237

238-
// GetSealingBlock retrieves a sealing block based on the given parameters.
239-
// The returned block is not sealed but all other fields should be filled.
240-
func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
241-
return miner.worker.getSealingBlock(parent, timestamp, coinbase, random)
242-
}
243-
244238
// SubscribePendingLogs starts delivering logs from pending transactions
245239
// to the given channel.
246240
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
247241
return miner.worker.pendingLogsFeed.Subscribe(ch)
248242
}
243+
244+
// GetSealingBlockAsync requests to generate a sealing block according to the
245+
// given parameters. Regardless of whether the generation is successful or not,
246+
// there is always a result that will be returned through the result channel.
247+
// The difference is that if the execution fails, the returned result is nil
248+
// and the concrete error is dropped silently.
249+
func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) {
250+
resCh, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
251+
if err != nil {
252+
return nil, err
253+
}
254+
return resCh, nil
255+
}
256+
257+
// GetSealingBlockSync creates a sealing block according to the given parameters.
258+
// If the generation is failed or the underlying work is already closed, an error
259+
// will be returned.
260+
func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) {
261+
resCh, errCh, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
262+
if err != nil {
263+
return nil, err
264+
}
265+
return <-resCh, <-errCh
266+
}

miner/worker.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ type newWorkReq struct {
178178
// getWorkReq represents a request for getting a new sealing work with provided parameters.
179179
type getWorkReq struct {
180180
params *generateParams
181-
err error
182-
result chan *types.Block
181+
result chan *types.Block // non-blocking channel
182+
err chan error
183183
}
184184

185185
// intervalAdjust represents a resubmitting interval adjustment.
@@ -605,12 +605,12 @@ func (w *worker) mainLoop() {
605605
case req := <-w.getWorkCh:
606606
block, err := w.generateWork(req.params)
607607
if err != nil {
608-
req.err = err
608+
req.err <- err
609609
req.result <- nil
610610
} else {
611+
req.err <- nil
611612
req.result <- block
612613
}
613-
614614
case ev := <-w.chainSideCh:
615615
// Short circuit for duplicate side blocks
616616
if _, exist := w.localUncles[ev.Block.Hash()]; exist {
@@ -1288,6 +1288,7 @@ type generateParams struct {
12881288
random common.Hash // The randomness generated by beacon chain, empty before the merge
12891289
noUncle bool // Flag whether the uncle block inclusion is allowed
12901290
noExtra bool // Flag whether the extra field assignment is allowed
1291+
noTxs bool // Flag whether an empty block without any transaction is expected
12911292
}
12921293

12931294
// prepareWork constructs the sealing task according to the given parameters,
@@ -1485,8 +1486,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
14851486
}
14861487
defer work.discard()
14871488

1488-
w.fillTransactions(nil, work)
1489-
1489+
if !params.noTxs {
1490+
w.fillTransactions(nil, work)
1491+
}
14901492
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
14911493
}
14921494

@@ -1647,7 +1649,6 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
16471649
work.discard()
16481650
return
16491651
}
1650-
16511652
w.commit(work.copy(), w.fullTaskHook, true, start)
16521653

16531654
// Swap out the old work with the new one, terminating any leftover
@@ -1792,7 +1793,13 @@ func (w *worker) commitEx(env *environment, interval func(), update bool, start
17921793
}
17931794

17941795
// getSealingBlock generates the sealing block based on the given parameters.
1795-
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
1796+
// The generation result will be passed back via the given channel no matter
1797+
// the generation itself succeeds or not.
1798+
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) {
1799+
var (
1800+
resCh = make(chan *types.Block, 1)
1801+
errCh = make(chan error, 1)
1802+
)
17961803
req := &getWorkReq{
17971804
params: &generateParams{
17981805
timestamp: timestamp,
@@ -1802,18 +1809,16 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase
18021809
random: random,
18031810
noUncle: true,
18041811
noExtra: true,
1812+
noTxs: noTxs,
18051813
},
1806-
result: make(chan *types.Block, 1),
1814+
result: resCh,
1815+
err: errCh,
18071816
}
18081817
select {
18091818
case w.getWorkCh <- req:
1810-
block := <-req.result
1811-
if block == nil {
1812-
return nil, req.err
1813-
}
1814-
return block, nil
1819+
return resCh, errCh, nil
18151820
case <-w.exitCh:
1816-
return nil, errors.New("miner closed")
1821+
return nil, nil, errors.New("miner closed")
18171822
}
18181823
}
18191824

0 commit comments

Comments
 (0)