diff --git a/builder/beacon_client.go b/builder/beacon_client.go index aaa584fdd4..f3122e2758 100644 --- a/builder/beacon_client.go +++ b/builder/beacon_client.go @@ -1,6 +1,7 @@ package builder import ( + "context" "encoding/json" "errors" "fmt" @@ -16,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/r3labs/sse" + "golang.org/x/exp/slices" ) type IBeaconClient interface { @@ -64,6 +66,89 @@ func (b *NilBeaconClient) Start() error { return nil } func (b *NilBeaconClient) Stop() {} +type MultiBeaconClient struct { + clients []*BeaconClient + closeCh chan struct{} +} + +func NewMultiBeaconClient(endpoints []string, slotsInEpoch uint64, secondsInSlot uint64) *MultiBeaconClient { + clients := []*BeaconClient{} + for _, endpoint := range endpoints { + client := NewBeaconClient(endpoint, slotsInEpoch, secondsInSlot) + clients = append(clients, client) + } + + return &MultiBeaconClient{ + clients: clients, + closeCh: make(chan struct{}), + } +} + +func (m *MultiBeaconClient) isValidator(pubkey PubkeyHex) bool { + for _, c := range m.clients { + // Pick the first one, always true + return c.isValidator(pubkey) + } + + return false +} + +func (m *MultiBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) { + var allErrs error + for _, c := range m.clients { + pk, err := c.getProposerForNextSlot(requestedSlot) + if err != nil { + allErrs = errors.Join(allErrs, err) + continue + } + + return pk, nil + } + return PubkeyHex(""), allErrs +} + +func payloadAttributesMatch(l types.BuilderPayloadAttributes, r types.BuilderPayloadAttributes) bool { + if l.Timestamp != r.Timestamp || + l.Random != r.Random || + l.SuggestedFeeRecipient != r.SuggestedFeeRecipient || + l.Slot != r.Slot || + l.HeadHash != r.HeadHash || + l.GasLimit != r.GasLimit { + return false + } + + if !slices.Equal(l.Withdrawals, r.Withdrawals) { + return false + } + + return true +} + +func (m *MultiBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) { + for _, c := range m.clients { + go c.SubscribeToPayloadAttributesEvents(payloadAttrC) + } +} + +func (m *MultiBeaconClient) Start() error { + var allErrs error + for _, c := range m.clients { + err := c.Start() + if err != nil { + allErrs = errors.Join(allErrs, err) + } + } + return allErrs +} + +func (m *MultiBeaconClient) Stop() { + for _, c := range m.clients { + c.Stop() + } + + close(m.closeCh) +} + type BeaconClient struct { endpoint string slotsInEpoch uint64 @@ -72,21 +157,24 @@ type BeaconClient struct { mu sync.Mutex slotProposerMap map[uint64]PubkeyHex - closeCh chan struct{} + ctx context.Context + cancelFn context.CancelFunc } func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient { + ctx, cancelFn := context.WithCancel(context.Background()) return &BeaconClient{ endpoint: endpoint, slotsInEpoch: slotsInEpoch, secondsInSlot: secondsInSlot, slotProposerMap: make(map[uint64]PubkeyHex), - closeCh: make(chan struct{}), + ctx: ctx, + cancelFn: cancelFn, } } func (b *BeaconClient) Stop() { - close(b.closeCh) + b.cancelFn() } func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool { @@ -139,7 +227,7 @@ func (b *BeaconClient) UpdateValidatorMapForever() { defer timer.Stop() for true { select { - case <-b.closeCh: + case <-b.ctx.Done(): return case <-timer.C: } @@ -184,6 +272,70 @@ func (b *BeaconClient) UpdateValidatorMapForever() { } } +// PayloadAttributesEvent represents the data of a payload_attributes event +// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}} +type PayloadAttributesEvent struct { + Version string `json:"version"` + Data PayloadAttributesEventData `json:"data"` +} + +type PayloadAttributesEventData struct { + ProposalSlot uint64 `json:"proposal_slot,string"` + ParentBlockHash common.Hash `json:"parent_block_hash"` + PayloadAttributes PayloadAttributes `json:"payload_attributes"` +} + +type PayloadAttributes struct { + Timestamp uint64 `json:"timestamp,string"` + PrevRandao common.Hash `json:"prev_randao"` + SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"` + Withdrawals []*capella.Withdrawal `json:"withdrawals"` +} + +// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals +func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) { + payloadAttributesResp := new(PayloadAttributesEvent) + + eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint) + log.Info("subscribing to payload_attributes events") + + for { + client := sse.NewClient(eventsURL) + err := client.SubscribeRawWithContext(b.ctx, func(msg *sse.Event) { + err := json.Unmarshal(msg.Data, payloadAttributesResp) + if err != nil { + log.Error("could not unmarshal payload_attributes event", "err", err) + } else { + // convert capella.Withdrawal to types.Withdrawal + var withdrawals []*types.Withdrawal + for _, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals { + withdrawals = append(withdrawals, &types.Withdrawal{ + Index: uint64(w.Index), + Validator: uint64(w.ValidatorIndex), + Address: common.Address(w.Address), + Amount: uint64(w.Amount), + }) + } + + data := types.BuilderPayloadAttributes{ + Slot: payloadAttributesResp.Data.ProposalSlot, + HeadHash: payloadAttributesResp.Data.ParentBlockHash, + Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp), + Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao, + SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient, + Withdrawals: withdrawals, + } + payloadAttrC <- data + } + }) + if err != nil { + log.Error("failed to subscribe to payload_attributes events", "err", err) + time.Sleep(1 * time.Second) + } + log.Warn("beaconclient SubscribeRaw ended, reconnecting") + } +} + func fetchCurrentSlot(endpoint string) (uint64, error) { headerRes := &struct { Data []struct { @@ -286,67 +438,3 @@ func fetchBeacon(url string, dst any) error { log.Info("fetched", "url", url, "res", dst) return nil } - -// PayloadAttributesEvent represents the data of a payload_attributes event -// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}} -type PayloadAttributesEvent struct { - Version string `json:"version"` - Data PayloadAttributesEventData `json:"data"` -} - -type PayloadAttributesEventData struct { - ProposalSlot uint64 `json:"proposal_slot,string"` - ParentBlockHash common.Hash `json:"parent_block_hash"` - PayloadAttributes PayloadAttributes `json:"payload_attributes"` -} - -type PayloadAttributes struct { - Timestamp uint64 `json:"timestamp,string"` - PrevRandao common.Hash `json:"prev_randao"` - SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"` - Withdrawals []*capella.Withdrawal `json:"withdrawals"` -} - -// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals -func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) { - payloadAttributesResp := new(PayloadAttributesEvent) - - eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint) - log.Info("subscribing to payload_attributes events") - - for { - client := sse.NewClient(eventsURL) - err := client.SubscribeRaw(func(msg *sse.Event) { - err := json.Unmarshal(msg.Data, payloadAttributesResp) - if err != nil { - log.Error("could not unmarshal payload_attributes event", "err", err) - } else { - // convert capella.Withdrawal to types.Withdrawal - withdrawals := make([]*types.Withdrawal, len(payloadAttributesResp.Data.PayloadAttributes.Withdrawals)) - for i, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals { - withdrawals[i] = &types.Withdrawal{ - Index: uint64(w.Index), - Validator: uint64(w.ValidatorIndex), - Address: common.Address(w.Address), - Amount: uint64(w.Amount), - } - } - - data := types.BuilderPayloadAttributes{ - Slot: payloadAttributesResp.Data.ProposalSlot, - HeadHash: payloadAttributesResp.Data.ParentBlockHash, - Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp), - Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao, - SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient, - Withdrawals: withdrawals, - } - payloadAttrC <- data - } - }) - if err != nil { - log.Error("failed to subscribe to payload_attributes events", "err", err) - time.Sleep(1 * time.Second) - } - log.Warn("beaconclient SubscribeRaw ended, reconnecting") - } -} diff --git a/builder/builder.go b/builder/builder.go index b7c86c40ff..52d87141c5 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -94,6 +94,8 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe slot: 0, slotCtx: slotCtx, slotCtxCancel: slotCtxCancel, + + stop: make(chan struct{}, 1), } } @@ -102,16 +104,27 @@ func (b *Builder) Start() error { go func() { c := make(chan types.BuilderPayloadAttributes) go b.beaconClient.SubscribeToPayloadAttributesEvents(c) - beacon_loop: + + currentSlot := uint64(0) + for { select { case <-b.stop: - break beacon_loop + return case payloadAttributes := <-c: - b.OnPayloadAttribute(&payloadAttributes) + // Right now we are building only on a single head. This might change in the future! + if payloadAttributes.Slot < currentSlot { + continue + } else if payloadAttributes.Slot == currentSlot { + b.OnPayloadAttribute(&payloadAttributes) + } else if payloadAttributes.Slot > currentSlot { + currentSlot = payloadAttributes.Slot + b.OnPayloadAttribute(&payloadAttributes) + } } } }() + return b.relay.Start() } @@ -289,18 +302,19 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro b.slotMu.Lock() defer b.slotMu.Unlock() - if b.slot != attrs.Slot { - if b.slotCtxCancel != nil { - b.slotCtxCancel() - } + // Forcibly cancel previous building job, build on top of reorgable blocks as this is the behaviour relays expect. + // This will change in the future - slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second) - b.slot = attrs.Slot - b.slotAttrs = nil - b.slotCtx = slotCtx - b.slotCtxCancel = slotCtxCancel + if b.slotCtxCancel != nil { + b.slotCtxCancel() } + slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second) + b.slot = attrs.Slot + b.slotAttrs = nil + b.slotCtx = slotCtx + b.slotCtxCancel = slotCtxCancel + for _, currentAttrs := range b.slotAttrs { if attrs.Equal(¤tAttrs) { log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash) diff --git a/builder/config.go b/builder/config.go index cc445d5efc..f9f2bba5b3 100644 --- a/builder/config.go +++ b/builder/config.go @@ -14,7 +14,7 @@ type Config struct { GenesisForkVersion string `toml:",omitempty"` BellatrixForkVersion string `toml:",omitempty"` GenesisValidatorsRoot string `toml:",omitempty"` - BeaconEndpoint string `toml:",omitempty"` + BeaconEndpoints []string `toml:",omitempty"` RemoteRelayEndpoint string `toml:",omitempty"` SecondaryRemoteRelayEndpoints []string `toml:",omitempty"` ValidationBlocklist string `toml:",omitempty"` @@ -35,7 +35,7 @@ var DefaultConfig = Config{ GenesisForkVersion: "0x00000000", BellatrixForkVersion: "0x02000000", GenesisValidatorsRoot: "0x0000000000000000000000000000000000000000000000000000000000000000", - BeaconEndpoint: "http://127.0.0.1:5052", + BeaconEndpoints: []string{"http://127.0.0.1:5052"}, RemoteRelayEndpoint: "", SecondaryRemoteRelayEndpoints: nil, ValidationBlocklist: "", diff --git a/builder/service.go b/builder/service.go index 32d3799dbf..a315553212 100644 --- a/builder/service.go +++ b/builder/service.go @@ -121,10 +121,12 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error { proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot) var beaconClient IBeaconClient - if cfg.BeaconEndpoint != "" { - beaconClient = NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot) - } else { + if len(cfg.BeaconEndpoints) == 0 { beaconClient = &NilBeaconClient{} + } else if len(cfg.BeaconEndpoints) == 1 { + beaconClient = NewBeaconClient(cfg.BeaconEndpoints[0], cfg.SlotsInEpoch, cfg.SecondsInSlot) + } else { + beaconClient = NewMultiBeaconClient(cfg.BeaconEndpoints, cfg.SlotsInEpoch, cfg.SecondsInSlot) } var localRelay *LocalRelay diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 5c592ee09d..24a9395cf0 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -171,7 +171,7 @@ var ( utils.BuilderGenesisForkVersion, utils.BuilderBellatrixForkVersion, utils.BuilderGenesisValidatorsRoot, - utils.BuilderBeaconEndpoint, + utils.BuilderBeaconEndpoints, utils.BuilderRemoteRelayEndpoint, utils.BuilderSecondaryRemoteRelayEndpoints, } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2c318ecb20..973580de76 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -778,10 +778,10 @@ var ( Value: "0x0000000000000000000000000000000000000000000000000000000000000000", Category: flags.BuilderCategory, } - BuilderBeaconEndpoint = &cli.StringFlag{ - Name: "builder.beacon_endpoint", - Usage: "Beacon endpoint to connect to for beacon chain data", - EnvVars: []string{"BUILDER_BEACON_ENDPOINT"}, + BuilderBeaconEndpoints = &cli.StringFlag{ + Name: "builder.beacon_endpoints", + Usage: "Comma separated list of beacon endpoints to connect to for beacon chain data", + EnvVars: []string{"BUILDER_BEACON_ENDPOINTS"}, Value: "http://127.0.0.1:5052", Category: flags.BuilderCategory, } @@ -1608,7 +1608,7 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) { cfg.GenesisForkVersion = ctx.String(BuilderGenesisForkVersion.Name) cfg.BellatrixForkVersion = ctx.String(BuilderBellatrixForkVersion.Name) cfg.GenesisValidatorsRoot = ctx.String(BuilderGenesisValidatorsRoot.Name) - cfg.BeaconEndpoint = ctx.String(BuilderBeaconEndpoint.Name) + cfg.BeaconEndpoints = strings.Split(ctx.String(BuilderBeaconEndpoints.Name), ",") cfg.RemoteRelayEndpoint = ctx.String(BuilderRemoteRelayEndpoint.Name) cfg.SecondaryRemoteRelayEndpoints = strings.Split(ctx.String(BuilderSecondaryRemoteRelayEndpoints.Name), ",") cfg.ValidationBlocklist = ctx.String(BuilderBlockValidationBlacklistSourceFilePath.Name)