Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 156 additions & 68 deletions builder/beacon_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package builder

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/r3labs/sse"
"golang.org/x/exp/slices"
)

type IBeaconClient interface {
Expand Down Expand Up @@ -64,6 +66,89 @@ func (b *NilBeaconClient) Start() error { return nil }

func (b *NilBeaconClient) Stop() {}

type MultiBeaconClient struct {
clients []*BeaconClient
closeCh chan struct{}
}

func NewMultiBeaconClient(endpoints []string, slotsInEpoch uint64, secondsInSlot uint64) *MultiBeaconClient {
clients := []*BeaconClient{}
for _, endpoint := range endpoints {
client := NewBeaconClient(endpoint, slotsInEpoch, secondsInSlot)
clients = append(clients, client)
}

return &MultiBeaconClient{
clients: clients,
closeCh: make(chan struct{}),
}
}

func (m *MultiBeaconClient) isValidator(pubkey PubkeyHex) bool {
for _, c := range m.clients {
// Pick the first one, always true
return c.isValidator(pubkey)
}

return false
}

func (m *MultiBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
var allErrs error
for _, c := range m.clients {
pk, err := c.getProposerForNextSlot(requestedSlot)
if err != nil {
allErrs = errors.Join(allErrs, err)
continue
}

return pk, nil
}
return PubkeyHex(""), allErrs
}

func payloadAttributesMatch(l types.BuilderPayloadAttributes, r types.BuilderPayloadAttributes) bool {
if l.Timestamp != r.Timestamp ||
l.Random != r.Random ||
l.SuggestedFeeRecipient != r.SuggestedFeeRecipient ||
l.Slot != r.Slot ||
l.HeadHash != r.HeadHash ||
l.GasLimit != r.GasLimit {
return false
}

if !slices.Equal(l.Withdrawals, r.Withdrawals) {
return false
}

return true
}

func (m *MultiBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
for _, c := range m.clients {
go c.SubscribeToPayloadAttributesEvents(payloadAttrC)
}
}

func (m *MultiBeaconClient) Start() error {
var allErrs error
for _, c := range m.clients {
err := c.Start()
if err != nil {
allErrs = errors.Join(allErrs, err)
}
}
return allErrs
}

func (m *MultiBeaconClient) Stop() {
for _, c := range m.clients {
c.Stop()
}

close(m.closeCh)
}

type BeaconClient struct {
endpoint string
slotsInEpoch uint64
Expand All @@ -72,21 +157,24 @@ type BeaconClient struct {
mu sync.Mutex
slotProposerMap map[uint64]PubkeyHex

closeCh chan struct{}
ctx context.Context
cancelFn context.CancelFunc
}

func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient {
ctx, cancelFn := context.WithCancel(context.Background())
return &BeaconClient{
endpoint: endpoint,
slotsInEpoch: slotsInEpoch,
secondsInSlot: secondsInSlot,
slotProposerMap: make(map[uint64]PubkeyHex),
closeCh: make(chan struct{}),
ctx: ctx,
cancelFn: cancelFn,
}
}

func (b *BeaconClient) Stop() {
close(b.closeCh)
b.cancelFn()
}

func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool {
Expand Down Expand Up @@ -139,7 +227,7 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
defer timer.Stop()
for true {
select {
case <-b.closeCh:
case <-b.ctx.Done():
return
case <-timer.C:
}
Expand Down Expand Up @@ -184,6 +272,70 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
}
}

// PayloadAttributesEvent represents the data of a payload_attributes event
// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}}
type PayloadAttributesEvent struct {
Version string `json:"version"`
Data PayloadAttributesEventData `json:"data"`
}

type PayloadAttributesEventData struct {
ProposalSlot uint64 `json:"proposal_slot,string"`
ParentBlockHash common.Hash `json:"parent_block_hash"`
PayloadAttributes PayloadAttributes `json:"payload_attributes"`
}

type PayloadAttributes struct {
Timestamp uint64 `json:"timestamp,string"`
PrevRandao common.Hash `json:"prev_randao"`
SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"`
Withdrawals []*capella.Withdrawal `json:"withdrawals"`
}

// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals
func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
payloadAttributesResp := new(PayloadAttributesEvent)

eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint)
log.Info("subscribing to payload_attributes events")

for {
client := sse.NewClient(eventsURL)
err := client.SubscribeRawWithContext(b.ctx, func(msg *sse.Event) {
err := json.Unmarshal(msg.Data, payloadAttributesResp)
if err != nil {
log.Error("could not unmarshal payload_attributes event", "err", err)
} else {
// convert capella.Withdrawal to types.Withdrawal
var withdrawals []*types.Withdrawal
for _, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals {
withdrawals = append(withdrawals, &types.Withdrawal{
Index: uint64(w.Index),
Validator: uint64(w.ValidatorIndex),
Address: common.Address(w.Address),
Amount: uint64(w.Amount),
})
}

data := types.BuilderPayloadAttributes{
Slot: payloadAttributesResp.Data.ProposalSlot,
HeadHash: payloadAttributesResp.Data.ParentBlockHash,
Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp),
Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao,
SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient,
Withdrawals: withdrawals,
}
payloadAttrC <- data
}
})
if err != nil {
log.Error("failed to subscribe to payload_attributes events", "err", err)
time.Sleep(1 * time.Second)
}
log.Warn("beaconclient SubscribeRaw ended, reconnecting")
}
}

func fetchCurrentSlot(endpoint string) (uint64, error) {
headerRes := &struct {
Data []struct {
Expand Down Expand Up @@ -286,67 +438,3 @@ func fetchBeacon(url string, dst any) error {
log.Info("fetched", "url", url, "res", dst)
return nil
}

// PayloadAttributesEvent represents the data of a payload_attributes event
// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}}
type PayloadAttributesEvent struct {
Version string `json:"version"`
Data PayloadAttributesEventData `json:"data"`
}

type PayloadAttributesEventData struct {
ProposalSlot uint64 `json:"proposal_slot,string"`
ParentBlockHash common.Hash `json:"parent_block_hash"`
PayloadAttributes PayloadAttributes `json:"payload_attributes"`
}

type PayloadAttributes struct {
Timestamp uint64 `json:"timestamp,string"`
PrevRandao common.Hash `json:"prev_randao"`
SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"`
Withdrawals []*capella.Withdrawal `json:"withdrawals"`
}

// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals
func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
payloadAttributesResp := new(PayloadAttributesEvent)

eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint)
log.Info("subscribing to payload_attributes events")

for {
client := sse.NewClient(eventsURL)
err := client.SubscribeRaw(func(msg *sse.Event) {
err := json.Unmarshal(msg.Data, payloadAttributesResp)
if err != nil {
log.Error("could not unmarshal payload_attributes event", "err", err)
} else {
// convert capella.Withdrawal to types.Withdrawal
withdrawals := make([]*types.Withdrawal, len(payloadAttributesResp.Data.PayloadAttributes.Withdrawals))
for i, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals {
withdrawals[i] = &types.Withdrawal{
Index: uint64(w.Index),
Validator: uint64(w.ValidatorIndex),
Address: common.Address(w.Address),
Amount: uint64(w.Amount),
}
}

data := types.BuilderPayloadAttributes{
Slot: payloadAttributesResp.Data.ProposalSlot,
HeadHash: payloadAttributesResp.Data.ParentBlockHash,
Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp),
Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao,
SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient,
Withdrawals: withdrawals,
}
payloadAttrC <- data
}
})
if err != nil {
log.Error("failed to subscribe to payload_attributes events", "err", err)
time.Sleep(1 * time.Second)
}
log.Warn("beaconclient SubscribeRaw ended, reconnecting")
}
}
38 changes: 26 additions & 12 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
slot: 0,
slotCtx: slotCtx,
slotCtxCancel: slotCtxCancel,

stop: make(chan struct{}, 1),
}
}

Expand All @@ -102,16 +104,27 @@ func (b *Builder) Start() error {
go func() {
c := make(chan types.BuilderPayloadAttributes)
go b.beaconClient.SubscribeToPayloadAttributesEvents(c)
beacon_loop:

currentSlot := uint64(0)

for {
select {
case <-b.stop:
break beacon_loop
return
case payloadAttributes := <-c:
b.OnPayloadAttribute(&payloadAttributes)
// Right now we are building only on a single head. This might change in the future!
if payloadAttributes.Slot < currentSlot {
continue
} else if payloadAttributes.Slot == currentSlot {
b.OnPayloadAttribute(&payloadAttributes)
} else if payloadAttributes.Slot > currentSlot {
currentSlot = payloadAttributes.Slot
b.OnPayloadAttribute(&payloadAttributes)
}
}
}
}()

return b.relay.Start()
}

Expand Down Expand Up @@ -289,18 +302,19 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro
b.slotMu.Lock()
defer b.slotMu.Unlock()

if b.slot != attrs.Slot {
if b.slotCtxCancel != nil {
b.slotCtxCancel()
}
// Forcibly cancel previous building job, build on top of reorgable blocks as this is the behaviour relays expect.
// This will change in the future

slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
b.slot = attrs.Slot
b.slotAttrs = nil
b.slotCtx = slotCtx
b.slotCtxCancel = slotCtxCancel
if b.slotCtxCancel != nil {
b.slotCtxCancel()
}

slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
b.slot = attrs.Slot
b.slotAttrs = nil
b.slotCtx = slotCtx
b.slotCtxCancel = slotCtxCancel

for _, currentAttrs := range b.slotAttrs {
if attrs.Equal(&currentAttrs) {
log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash)
Expand Down
4 changes: 2 additions & 2 deletions builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Config struct {
GenesisForkVersion string `toml:",omitempty"`
BellatrixForkVersion string `toml:",omitempty"`
GenesisValidatorsRoot string `toml:",omitempty"`
BeaconEndpoint string `toml:",omitempty"`
BeaconEndpoints []string `toml:",omitempty"`
RemoteRelayEndpoint string `toml:",omitempty"`
SecondaryRemoteRelayEndpoints []string `toml:",omitempty"`
ValidationBlocklist string `toml:",omitempty"`
Expand All @@ -35,7 +35,7 @@ var DefaultConfig = Config{
GenesisForkVersion: "0x00000000",
BellatrixForkVersion: "0x02000000",
GenesisValidatorsRoot: "0x0000000000000000000000000000000000000000000000000000000000000000",
BeaconEndpoint: "http://127.0.0.1:5052",
BeaconEndpoints: []string{"http://127.0.0.1:5052"},
RemoteRelayEndpoint: "",
SecondaryRemoteRelayEndpoints: nil,
ValidationBlocklist: "",
Expand Down
8 changes: 5 additions & 3 deletions builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,12 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot)

var beaconClient IBeaconClient
if cfg.BeaconEndpoint != "" {
beaconClient = NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot)
} else {
if len(cfg.BeaconEndpoints) == 0 {
beaconClient = &NilBeaconClient{}
} else if len(cfg.BeaconEndpoints) == 1 {
beaconClient = NewBeaconClient(cfg.BeaconEndpoints[0], cfg.SlotsInEpoch, cfg.SecondsInSlot)
} else {
beaconClient = NewMultiBeaconClient(cfg.BeaconEndpoints, cfg.SlotsInEpoch, cfg.SecondsInSlot)
}

var localRelay *LocalRelay
Expand Down
2 changes: 1 addition & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ var (
utils.BuilderGenesisForkVersion,
utils.BuilderBellatrixForkVersion,
utils.BuilderGenesisValidatorsRoot,
utils.BuilderBeaconEndpoint,
utils.BuilderBeaconEndpoints,
utils.BuilderRemoteRelayEndpoint,
utils.BuilderSecondaryRemoteRelayEndpoints,
}
Expand Down
Loading