Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions server/functionality.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ package server

import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"

builderApi "github.com/attestantio/go-builder-client/api"
builderSpec "github.com/attestantio/go-builder-client/spec"
eth2ApiV1Deneb "github.com/attestantio/go-eth2-client/api/v1/deneb"
eth2ApiV1Electra "github.com/attestantio/go-eth2-client/api/v1/electra"
"github.com/attestantio/go-eth2-client/spec"
"github.com/flashbots/mev-boost/config"
"github.com/flashbots/mev-boost/server/params"
"github.com/flashbots/mev-boost/server/types"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -174,3 +181,275 @@ func (m *BoostService) getHeader(log *logrus.Entry, ua UserAgent, _slot uint64,
result.relays = relays[BlockHashHex(result.bidInfo.blockHash.String())]
return result, nil
}

func (m *BoostService) processDenebPayload(log *logrus.Entry, ua UserAgent, blindedBlock *eth2ApiV1Deneb.SignedBlindedBeaconBlock) (*builderApi.VersionedSubmitBlindedBlockResponse, bidResp) {
// Get the currentSlotUID for this slot
currentSlotUID := ""
m.slotUIDLock.Lock()
if m.slotUID.slot == uint64(blindedBlock.Message.Slot) {
currentSlotUID = m.slotUID.uid.String()
} else {
log.Warnf("latest slotUID is for slot %d rather than payload slot %d", m.slotUID.slot, blindedBlock.Message.Slot)
}
m.slotUIDLock.Unlock()

// Prepare logger
log = log.WithFields(logrus.Fields{
"ua": ua,
"slot": blindedBlock.Message.Slot,
"blockHash": blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash.String(),
"parentHash": blindedBlock.Message.Body.ExecutionPayloadHeader.ParentHash.String(),
"slotUID": currentSlotUID,
})

// Log how late into the slot the request starts
slotStartTimestamp := m.genesisTime + uint64(blindedBlock.Message.Slot)*config.SlotTimeSec
msIntoSlot := uint64(time.Now().UTC().UnixMilli()) - slotStartTimestamp*1000
log.WithFields(logrus.Fields{
"genesisTime": m.genesisTime,
"slotTimeSec": config.SlotTimeSec,
"msIntoSlot": msIntoSlot,
}).Infof("submitBlindedBlock request start - %d milliseconds into slot %d", msIntoSlot, blindedBlock.Message.Slot)

// Get the bid!
m.bidsLock.Lock()
originalBid := m.bids[bidKey(uint64(blindedBlock.Message.Slot), blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash)]
m.bidsLock.Unlock()
if originalBid.response.IsEmpty() {
log.Error("no bid for this getPayload payload found, was getHeader called before?")
} else if len(originalBid.relays) == 0 {
log.Warn("bid found but no associated relays")
}

// Add request headers
headers := map[string]string{
HeaderKeySlotUID: currentSlotUID,
HeaderStartTimeUnixMS: fmt.Sprintf("%d", time.Now().UTC().UnixMilli()),
}

// Prepare for requests
resultCh := make(chan *builderApi.VersionedSubmitBlindedBlockResponse, len(m.relays))
var received atomic.Bool
go func() {
// Make sure we receive a response within the timeout
time.Sleep(m.httpClientGetPayload.Timeout)
resultCh <- nil
}()

// Prepare the request context, which will be cancelled after the first successful response from a relay
requestCtx, requestCtxCancel := context.WithCancel(context.Background())
defer requestCtxCancel()

for _, relay := range m.relays {
go func(relay types.RelayEntry) {
url := relay.GetURI(params.PathGetPayload)
log := log.WithField("url", url)
log.Debug("calling getPayload")

responsePayload := new(builderApi.VersionedSubmitBlindedBlockResponse)
_, err := SendHTTPRequestWithRetries(requestCtx, m.httpClientGetPayload, http.MethodPost, url, ua, headers, blindedBlock, responsePayload, m.requestMaxRetries, log)
if err != nil {
if errors.Is(requestCtx.Err(), context.Canceled) {
log.Info("request was cancelled") // this is expected, if payload has already been received by another relay
} else {
log.WithError(err).Error("error making request to relay")
}
return
}

if responsePayload.Version != spec.DataVersionDeneb {
log.WithFields(logrus.Fields{
"version": responsePayload.Version,
}).Error("response version was not deneb")
return
}
if getPayloadResponseIsEmpty(responsePayload) {
log.Error("response with empty data!")
return
}

payload := responsePayload.Deneb.ExecutionPayload
blobs := responsePayload.Deneb.BlobsBundle

// Ensure the response blockhash matches the request
if blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash != payload.BlockHash {
log.WithFields(logrus.Fields{
"responseBlockHash": payload.BlockHash.String(),
}).Error("requestBlockHash does not equal responseBlockHash")
return
}

commitments := blindedBlock.Message.Body.BlobKZGCommitments
// Ensure that blobs are valid and matches the request
if len(commitments) != len(blobs.Blobs) || len(commitments) != len(blobs.Commitments) || len(commitments) != len(blobs.Proofs) {
log.WithFields(logrus.Fields{
"requestBlobCommitments": len(commitments),
"responseBlobs": len(blobs.Blobs),
"responseBlobCommitments": len(blobs.Commitments),
"responseBlobProofs": len(blobs.Proofs),
}).Error("block KZG commitment length does not equal responseBlobs length")
return
}

for i, commitment := range commitments {
if commitment != blobs.Commitments[i] {
log.WithFields(logrus.Fields{
"requestBlobCommitment": commitment.String(),
"responseBlobCommitment": blobs.Commitments[i].String(),
"index": i,
}).Error("requestBlobCommitment does not equal responseBlobCommitment")
return
}
}

requestCtxCancel()
if received.CompareAndSwap(false, true) {
resultCh <- responsePayload
log.Info("received payload from relay")
} else {
log.Trace("Discarding response, already received a correct response")
}
}(relay)
}

// Wait for the first request to complete
result := <-resultCh

return result, originalBid
}

func (m *BoostService) processElectraPayload(log *logrus.Entry, ua UserAgent, blindedBlock *eth2ApiV1Electra.SignedBlindedBeaconBlock) (*builderApi.VersionedSubmitBlindedBlockResponse, bidResp) {
// Get the currentSlotUID for this slot
currentSlotUID := ""
m.slotUIDLock.Lock()
if m.slotUID.slot == uint64(blindedBlock.Message.Slot) {
currentSlotUID = m.slotUID.uid.String()
} else {
log.Warnf("latest slotUID is for slot %d rather than payload slot %d", m.slotUID.slot, blindedBlock.Message.Slot)
}
m.slotUIDLock.Unlock()

// Prepare logger
log = log.WithFields(logrus.Fields{
"ua": ua,
"slot": blindedBlock.Message.Slot,
"blockHash": blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash.String(),
"parentHash": blindedBlock.Message.Body.ExecutionPayloadHeader.ParentHash.String(),
"slotUID": currentSlotUID,
})

// Log how late into the slot the request starts
slotStartTimestamp := m.genesisTime + uint64(blindedBlock.Message.Slot)*config.SlotTimeSec
msIntoSlot := uint64(time.Now().UTC().UnixMilli()) - slotStartTimestamp*1000
log.WithFields(logrus.Fields{
"genesisTime": m.genesisTime,
"slotTimeSec": config.SlotTimeSec,
"msIntoSlot": msIntoSlot,
}).Infof("submitBlindedBlock request start - %d milliseconds into slot %d", msIntoSlot, blindedBlock.Message.Slot)

// Get the bid!
m.bidsLock.Lock()
originalBid := m.bids[bidKey(uint64(blindedBlock.Message.Slot), blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash)]
m.bidsLock.Unlock()
if originalBid.response.IsEmpty() {
log.Error("no bid for this getPayload payload found, was getHeader called before?")
} else if len(originalBid.relays) == 0 {
log.Warn("bid found but no associated relays")
}

// Add request headers
headers := map[string]string{
HeaderKeySlotUID: currentSlotUID,
HeaderStartTimeUnixMS: fmt.Sprintf("%d", time.Now().UTC().UnixMilli()),
}

// Prepare for requests
resultCh := make(chan *builderApi.VersionedSubmitBlindedBlockResponse, len(m.relays))
var received atomic.Bool
go func() {
// Make sure we receive a response within the timeout
time.Sleep(m.httpClientGetPayload.Timeout)
resultCh <- nil
}()

// Prepare the request context, which will be cancelled after the first successful response from a relay
requestCtx, requestCtxCancel := context.WithCancel(context.Background())
defer requestCtxCancel()

for _, relay := range m.relays {
go func(relay types.RelayEntry) {
url := relay.GetURI(params.PathGetPayload)
log := log.WithField("url", url)
log.Debug("calling getPayload")

responsePayload := new(builderApi.VersionedSubmitBlindedBlockResponse)
_, err := SendHTTPRequestWithRetries(requestCtx, m.httpClientGetPayload, http.MethodPost, url, ua, headers, blindedBlock, responsePayload, m.requestMaxRetries, log)
if err != nil {
if errors.Is(requestCtx.Err(), context.Canceled) {
log.Info("request was cancelled") // this is expected, if payload has already been received by another relay
} else {
log.WithError(err).Error("error making request to relay")
}
return
}

if responsePayload.Version != spec.DataVersionElectra {
log.WithFields(logrus.Fields{
"version": responsePayload.Version,
}).Error("response version was not electra")
return
}
if getPayloadResponseIsEmpty(responsePayload) {
log.Error("response with empty data!")
return
}

payload := responsePayload.Electra.ExecutionPayload
blobs := responsePayload.Electra.BlobsBundle

// Ensure the response blockhash matches the request
if blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash != payload.BlockHash {
log.WithFields(logrus.Fields{
"responseBlockHash": payload.BlockHash.String(),
}).Error("requestBlockHash does not equal responseBlockHash")
return
}

commitments := blindedBlock.Message.Body.BlobKZGCommitments
// Ensure that blobs are valid and matches the request
if len(commitments) != len(blobs.Blobs) || len(commitments) != len(blobs.Commitments) || len(commitments) != len(blobs.Proofs) {
log.WithFields(logrus.Fields{
"requestBlobCommitments": len(commitments),
"responseBlobs": len(blobs.Blobs),
"responseBlobCommitments": len(blobs.Commitments),
"responseBlobProofs": len(blobs.Proofs),
}).Error("block KZG commitment length does not equal responseBlobs length")
return
}

for i, commitment := range commitments {
if commitment != blobs.Commitments[i] {
log.WithFields(logrus.Fields{
"requestBlobCommitment": commitment.String(),
"responseBlobCommitment": blobs.Commitments[i].String(),
"index": i,
}).Error("requestBlobCommitment does not equal responseBlobCommitment")
return
}
}

requestCtxCancel()
if received.CompareAndSwap(false, true) {
resultCh <- responsePayload
log.Info("received payload from relay")
} else {
log.Trace("Discarding response, already received a correct response")
}
}(relay)
}

// Wait for the first request to complete
result := <-resultCh

return result, originalBid
}
Loading
Loading