Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewSimulatedBackendChain(database ethdb.Database, blockchain *core.BlockCha
filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

header := backend.blockchain.CurrentBlock()
block := backend.blockchain.GetBlock(header.Hash(), header.Number.Uint64())

Expand Down
194 changes: 188 additions & 6 deletions builder/beacon_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package builder

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -10,11 +11,23 @@ import (
"sync"
"time"

"github.com/attestantio/go-eth2-client/spec/capella"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/r3labs/sse"
"golang.org/x/exp/slices"
)

type IBeaconClient interface {
isValidator(pubkey PubkeyHex) bool
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes)
Start() error
Stop()
}

type testBeaconClient struct {
validator *ValidatorPrivateData
slot uint64
Expand All @@ -30,8 +43,110 @@ func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool {
func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil
}
func (b *testBeaconClient) Start() error {
return nil

func (b *testBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
}

func (b *testBeaconClient) Start() error { return nil }

type NilBeaconClient struct{}

func (b *NilBeaconClient) isValidator(pubkey PubkeyHex) bool {
return false
}

func (b *NilBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
return PubkeyHex(""), nil
}

func (b *NilBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
}

func (b *NilBeaconClient) Start() error { return nil }

func (b *NilBeaconClient) Stop() {}

type MultiBeaconClient struct {
clients []*BeaconClient
closeCh chan struct{}
}

func NewMultiBeaconClient(endpoints []string, slotsInEpoch uint64, secondsInSlot uint64) *MultiBeaconClient {
clients := []*BeaconClient{}
for _, endpoint := range endpoints {
client := NewBeaconClient(endpoint, slotsInEpoch, secondsInSlot)
clients = append(clients, client)
}

return &MultiBeaconClient{
clients: clients,
closeCh: make(chan struct{}),
}
}

func (m *MultiBeaconClient) isValidator(pubkey PubkeyHex) bool {
for _, c := range m.clients {
// Pick the first one, always true
return c.isValidator(pubkey)
}

return false
}

func (m *MultiBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
var allErrs error
for _, c := range m.clients {
pk, err := c.getProposerForNextSlot(requestedSlot)
if err != nil {
allErrs = errors.Join(allErrs, err)
continue
}

return pk, nil
}
return PubkeyHex(""), allErrs
}

func payloadAttributesMatch(l types.BuilderPayloadAttributes, r types.BuilderPayloadAttributes) bool {
if l.Timestamp != r.Timestamp ||
l.Random != r.Random ||
l.SuggestedFeeRecipient != r.SuggestedFeeRecipient ||
l.Slot != r.Slot ||
l.HeadHash != r.HeadHash ||
l.GasLimit != r.GasLimit {
return false
}

if !slices.Equal(l.Withdrawals, r.Withdrawals) {
return false
}

return true
}

func (m *MultiBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
for _, c := range m.clients {
go c.SubscribeToPayloadAttributesEvents(payloadAttrC)
}
}

func (m *MultiBeaconClient) Start() error {
var allErrs error
for _, c := range m.clients {
err := c.Start()
if err != nil {
allErrs = errors.Join(allErrs, err)
}
}
return allErrs
}

func (m *MultiBeaconClient) Stop() {
for _, c := range m.clients {
c.Stop()
}

close(m.closeCh)
}

type BeaconClient struct {
Expand All @@ -42,21 +157,24 @@ type BeaconClient struct {
mu sync.Mutex
slotProposerMap map[uint64]PubkeyHex

closeCh chan struct{}
ctx context.Context
cancelFn context.CancelFunc
}

func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient {
ctx, cancelFn := context.WithCancel(context.Background())
return &BeaconClient{
endpoint: endpoint,
slotsInEpoch: slotsInEpoch,
secondsInSlot: secondsInSlot,
slotProposerMap: make(map[uint64]PubkeyHex),
closeCh: make(chan struct{}),
ctx: ctx,
cancelFn: cancelFn,
}
}

func (b *BeaconClient) Stop() {
close(b.closeCh)
b.cancelFn()
}

func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool {
Expand Down Expand Up @@ -109,7 +227,7 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
defer timer.Stop()
for true {
select {
case <-b.closeCh:
case <-b.ctx.Done():
return
case <-timer.C:
}
Expand Down Expand Up @@ -154,6 +272,70 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
}
}

// PayloadAttributesEvent represents the data of a payload_attributes event
// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}}
type PayloadAttributesEvent struct {
Version string `json:"version"`
Data PayloadAttributesEventData `json:"data"`
}

type PayloadAttributesEventData struct {
ProposalSlot uint64 `json:"proposal_slot,string"`
ParentBlockHash common.Hash `json:"parent_block_hash"`
PayloadAttributes PayloadAttributes `json:"payload_attributes"`
}

type PayloadAttributes struct {
Timestamp uint64 `json:"timestamp,string"`
PrevRandao common.Hash `json:"prev_randao"`
SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"`
Withdrawals []*capella.Withdrawal `json:"withdrawals"`
}

// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals
func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
payloadAttributesResp := new(PayloadAttributesEvent)

eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint)
log.Info("subscribing to payload_attributes events")

for {
client := sse.NewClient(eventsURL)
err := client.SubscribeRawWithContext(b.ctx, func(msg *sse.Event) {
err := json.Unmarshal(msg.Data, payloadAttributesResp)
if err != nil {
log.Error("could not unmarshal payload_attributes event", "err", err)
} else {
// convert capella.Withdrawal to types.Withdrawal
var withdrawals []*types.Withdrawal
for _, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals {
withdrawals = append(withdrawals, &types.Withdrawal{
Index: uint64(w.Index),
Validator: uint64(w.ValidatorIndex),
Address: common.Address(w.Address),
Amount: uint64(w.Amount),
})
}

data := types.BuilderPayloadAttributes{
Slot: payloadAttributesResp.Data.ProposalSlot,
HeadHash: payloadAttributesResp.Data.ParentBlockHash,
Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp),
Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao,
SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient,
Withdrawals: withdrawals,
}
payloadAttrC <- data
}
})
if err != nil {
log.Error("failed to subscribe to payload_attributes events", "err", err)
time.Sleep(1 * time.Second)
}
log.Warn("beaconclient SubscribeRaw ended, reconnecting")
}
}

func fetchCurrentSlot(endpoint string) (uint64, error) {
headerRes := &struct {
Data []struct {
Expand Down
60 changes: 43 additions & 17 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ type ValidatorData struct {
GasLimit uint64
}

type IBeaconClient interface {
isValidator(pubkey PubkeyHex) bool
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
Start() error
Stop()
}

type IRelay interface {
SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error
SubmitBlockCapella(msg *capellaapi.SubmitBlockRequest, vd ValidatorData) error
Expand All @@ -64,6 +57,7 @@ type Builder struct {
eth IEthereumService
dryRun bool
validator *blockvalidation.BlockValidationAPI
beaconClient IBeaconClient
builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
builderSigningDomain boostTypes.Domain
Expand All @@ -75,9 +69,11 @@ type Builder struct {
slotAttrs []types.BuilderPayloadAttributes
slotCtx context.Context
slotCtxCancel context.CancelFunc

stop chan struct{}
}

func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService, dryRun bool, validator *blockvalidation.BlockValidationAPI) *Builder {
func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService, dryRun bool, validator *blockvalidation.BlockValidationAPI, beaconClient IBeaconClient) *Builder {
pkBytes := bls.PublicKeyFromSecretKey(sk).Compress()
pk := boostTypes.PublicKey{}
pk.FromSlice(pkBytes)
Expand All @@ -89,6 +85,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
eth: eth,
dryRun: dryRun,
validator: validator,
beaconClient: beaconClient,
builderSecretKey: sk,
builderPublicKey: pk,
builderSigningDomain: builderSigningDomain,
Expand All @@ -97,14 +94,42 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
slot: 0,
slotCtx: slotCtx,
slotCtxCancel: slotCtxCancel,

stop: make(chan struct{}, 1),
}
}

func (b *Builder) Start() error {
// Start regular payload attributes updates
go func() {
c := make(chan types.BuilderPayloadAttributes)
go b.beaconClient.SubscribeToPayloadAttributesEvents(c)

currentSlot := uint64(0)

for {
select {
case <-b.stop:
return
case payloadAttributes := <-c:
// Right now we are building only on a single head. This might change in the future!
if payloadAttributes.Slot < currentSlot {
continue
} else if payloadAttributes.Slot == currentSlot {
b.OnPayloadAttribute(&payloadAttributes)
} else if payloadAttributes.Slot > currentSlot {
currentSlot = payloadAttributes.Slot
b.OnPayloadAttribute(&payloadAttributes)
}
}
}
}()

return b.relay.Start()
}

func (b *Builder) Stop() error {
close(b.stop)
return nil
}

Expand Down Expand Up @@ -277,18 +302,19 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro
b.slotMu.Lock()
defer b.slotMu.Unlock()

if b.slot != attrs.Slot {
if b.slotCtxCancel != nil {
b.slotCtxCancel()
}
// Forcibly cancel previous building job, build on top of reorgable blocks as this is the behaviour relays expect.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why "buidling on multiple tips" is no longer necessary?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, it just doesn't work with how relays are set up. This will come back shortly after Capella.

// This will change in the future

slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
b.slot = attrs.Slot
b.slotAttrs = nil
b.slotCtx = slotCtx
b.slotCtxCancel = slotCtxCancel
if b.slotCtxCancel != nil {
b.slotCtxCancel()
}

slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
b.slot = attrs.Slot
b.slotAttrs = nil
b.slotCtx = slotCtx
b.slotCtxCancel = slotCtxCancel

for _, currentAttrs := range b.slotAttrs {
if attrs.Equal(&currentAttrs) {
log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash)
Expand Down
2 changes: 1 addition & 1 deletion builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestOnPayloadAttributes(t *testing.T) {

testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock, testBlockValue: big.NewInt(10)}

builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testRelay, bDomain, testEthService, false, nil)
builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testRelay, bDomain, testEthService, false, nil, &testBeacon)
builder.Start()
defer builder.Stop()

Expand Down
Loading