Skip to content

Commit 810f45d

Browse files
server: split http handling and functionality (part2) (#721)
1 parent c99f1d5 commit 810f45d

2 files changed

Lines changed: 287 additions & 283 deletions

File tree

server/functionality.go

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,20 @@ package server
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net/http"
78
"sync"
9+
"sync/atomic"
810
"time"
911

12+
builderApi "github.com/attestantio/go-builder-client/api"
1013
builderSpec "github.com/attestantio/go-builder-client/spec"
14+
eth2ApiV1Deneb "github.com/attestantio/go-eth2-client/api/v1/deneb"
15+
eth2ApiV1Electra "github.com/attestantio/go-eth2-client/api/v1/electra"
16+
"github.com/attestantio/go-eth2-client/spec"
1117
"github.com/flashbots/mev-boost/config"
18+
"github.com/flashbots/mev-boost/server/params"
1219
"github.com/flashbots/mev-boost/server/types"
1320
"github.com/google/uuid"
1421
"github.com/sirupsen/logrus"
@@ -174,3 +181,275 @@ func (m *BoostService) getHeader(log *logrus.Entry, ua UserAgent, _slot uint64,
174181
result.relays = relays[BlockHashHex(result.bidInfo.blockHash.String())]
175182
return result, nil
176183
}
184+
185+
func (m *BoostService) processDenebPayload(log *logrus.Entry, ua UserAgent, blindedBlock *eth2ApiV1Deneb.SignedBlindedBeaconBlock) (*builderApi.VersionedSubmitBlindedBlockResponse, bidResp) {
186+
// Get the currentSlotUID for this slot
187+
currentSlotUID := ""
188+
m.slotUIDLock.Lock()
189+
if m.slotUID.slot == uint64(blindedBlock.Message.Slot) {
190+
currentSlotUID = m.slotUID.uid.String()
191+
} else {
192+
log.Warnf("latest slotUID is for slot %d rather than payload slot %d", m.slotUID.slot, blindedBlock.Message.Slot)
193+
}
194+
m.slotUIDLock.Unlock()
195+
196+
// Prepare logger
197+
log = log.WithFields(logrus.Fields{
198+
"ua": ua,
199+
"slot": blindedBlock.Message.Slot,
200+
"blockHash": blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash.String(),
201+
"parentHash": blindedBlock.Message.Body.ExecutionPayloadHeader.ParentHash.String(),
202+
"slotUID": currentSlotUID,
203+
})
204+
205+
// Log how late into the slot the request starts
206+
slotStartTimestamp := m.genesisTime + uint64(blindedBlock.Message.Slot)*config.SlotTimeSec
207+
msIntoSlot := uint64(time.Now().UTC().UnixMilli()) - slotStartTimestamp*1000
208+
log.WithFields(logrus.Fields{
209+
"genesisTime": m.genesisTime,
210+
"slotTimeSec": config.SlotTimeSec,
211+
"msIntoSlot": msIntoSlot,
212+
}).Infof("submitBlindedBlock request start - %d milliseconds into slot %d", msIntoSlot, blindedBlock.Message.Slot)
213+
214+
// Get the bid!
215+
m.bidsLock.Lock()
216+
originalBid := m.bids[bidKey(uint64(blindedBlock.Message.Slot), blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash)]
217+
m.bidsLock.Unlock()
218+
if originalBid.response.IsEmpty() {
219+
log.Error("no bid for this getPayload payload found, was getHeader called before?")
220+
} else if len(originalBid.relays) == 0 {
221+
log.Warn("bid found but no associated relays")
222+
}
223+
224+
// Add request headers
225+
headers := map[string]string{
226+
HeaderKeySlotUID: currentSlotUID,
227+
HeaderStartTimeUnixMS: fmt.Sprintf("%d", time.Now().UTC().UnixMilli()),
228+
}
229+
230+
// Prepare for requests
231+
resultCh := make(chan *builderApi.VersionedSubmitBlindedBlockResponse, len(m.relays))
232+
var received atomic.Bool
233+
go func() {
234+
// Make sure we receive a response within the timeout
235+
time.Sleep(m.httpClientGetPayload.Timeout)
236+
resultCh <- nil
237+
}()
238+
239+
// Prepare the request context, which will be cancelled after the first successful response from a relay
240+
requestCtx, requestCtxCancel := context.WithCancel(context.Background())
241+
defer requestCtxCancel()
242+
243+
for _, relay := range m.relays {
244+
go func(relay types.RelayEntry) {
245+
url := relay.GetURI(params.PathGetPayload)
246+
log := log.WithField("url", url)
247+
log.Debug("calling getPayload")
248+
249+
responsePayload := new(builderApi.VersionedSubmitBlindedBlockResponse)
250+
_, err := SendHTTPRequestWithRetries(requestCtx, m.httpClientGetPayload, http.MethodPost, url, ua, headers, blindedBlock, responsePayload, m.requestMaxRetries, log)
251+
if err != nil {
252+
if errors.Is(requestCtx.Err(), context.Canceled) {
253+
log.Info("request was cancelled") // this is expected, if payload has already been received by another relay
254+
} else {
255+
log.WithError(err).Error("error making request to relay")
256+
}
257+
return
258+
}
259+
260+
if responsePayload.Version != spec.DataVersionDeneb {
261+
log.WithFields(logrus.Fields{
262+
"version": responsePayload.Version,
263+
}).Error("response version was not deneb")
264+
return
265+
}
266+
if getPayloadResponseIsEmpty(responsePayload) {
267+
log.Error("response with empty data!")
268+
return
269+
}
270+
271+
payload := responsePayload.Deneb.ExecutionPayload
272+
blobs := responsePayload.Deneb.BlobsBundle
273+
274+
// Ensure the response blockhash matches the request
275+
if blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash != payload.BlockHash {
276+
log.WithFields(logrus.Fields{
277+
"responseBlockHash": payload.BlockHash.String(),
278+
}).Error("requestBlockHash does not equal responseBlockHash")
279+
return
280+
}
281+
282+
commitments := blindedBlock.Message.Body.BlobKZGCommitments
283+
// Ensure that blobs are valid and matches the request
284+
if len(commitments) != len(blobs.Blobs) || len(commitments) != len(blobs.Commitments) || len(commitments) != len(blobs.Proofs) {
285+
log.WithFields(logrus.Fields{
286+
"requestBlobCommitments": len(commitments),
287+
"responseBlobs": len(blobs.Blobs),
288+
"responseBlobCommitments": len(blobs.Commitments),
289+
"responseBlobProofs": len(blobs.Proofs),
290+
}).Error("block KZG commitment length does not equal responseBlobs length")
291+
return
292+
}
293+
294+
for i, commitment := range commitments {
295+
if commitment != blobs.Commitments[i] {
296+
log.WithFields(logrus.Fields{
297+
"requestBlobCommitment": commitment.String(),
298+
"responseBlobCommitment": blobs.Commitments[i].String(),
299+
"index": i,
300+
}).Error("requestBlobCommitment does not equal responseBlobCommitment")
301+
return
302+
}
303+
}
304+
305+
requestCtxCancel()
306+
if received.CompareAndSwap(false, true) {
307+
resultCh <- responsePayload
308+
log.Info("received payload from relay")
309+
} else {
310+
log.Trace("Discarding response, already received a correct response")
311+
}
312+
}(relay)
313+
}
314+
315+
// Wait for the first request to complete
316+
result := <-resultCh
317+
318+
return result, originalBid
319+
}
320+
321+
func (m *BoostService) processElectraPayload(log *logrus.Entry, ua UserAgent, blindedBlock *eth2ApiV1Electra.SignedBlindedBeaconBlock) (*builderApi.VersionedSubmitBlindedBlockResponse, bidResp) {
322+
// Get the currentSlotUID for this slot
323+
currentSlotUID := ""
324+
m.slotUIDLock.Lock()
325+
if m.slotUID.slot == uint64(blindedBlock.Message.Slot) {
326+
currentSlotUID = m.slotUID.uid.String()
327+
} else {
328+
log.Warnf("latest slotUID is for slot %d rather than payload slot %d", m.slotUID.slot, blindedBlock.Message.Slot)
329+
}
330+
m.slotUIDLock.Unlock()
331+
332+
// Prepare logger
333+
log = log.WithFields(logrus.Fields{
334+
"ua": ua,
335+
"slot": blindedBlock.Message.Slot,
336+
"blockHash": blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash.String(),
337+
"parentHash": blindedBlock.Message.Body.ExecutionPayloadHeader.ParentHash.String(),
338+
"slotUID": currentSlotUID,
339+
})
340+
341+
// Log how late into the slot the request starts
342+
slotStartTimestamp := m.genesisTime + uint64(blindedBlock.Message.Slot)*config.SlotTimeSec
343+
msIntoSlot := uint64(time.Now().UTC().UnixMilli()) - slotStartTimestamp*1000
344+
log.WithFields(logrus.Fields{
345+
"genesisTime": m.genesisTime,
346+
"slotTimeSec": config.SlotTimeSec,
347+
"msIntoSlot": msIntoSlot,
348+
}).Infof("submitBlindedBlock request start - %d milliseconds into slot %d", msIntoSlot, blindedBlock.Message.Slot)
349+
350+
// Get the bid!
351+
m.bidsLock.Lock()
352+
originalBid := m.bids[bidKey(uint64(blindedBlock.Message.Slot), blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash)]
353+
m.bidsLock.Unlock()
354+
if originalBid.response.IsEmpty() {
355+
log.Error("no bid for this getPayload payload found, was getHeader called before?")
356+
} else if len(originalBid.relays) == 0 {
357+
log.Warn("bid found but no associated relays")
358+
}
359+
360+
// Add request headers
361+
headers := map[string]string{
362+
HeaderKeySlotUID: currentSlotUID,
363+
HeaderStartTimeUnixMS: fmt.Sprintf("%d", time.Now().UTC().UnixMilli()),
364+
}
365+
366+
// Prepare for requests
367+
resultCh := make(chan *builderApi.VersionedSubmitBlindedBlockResponse, len(m.relays))
368+
var received atomic.Bool
369+
go func() {
370+
// Make sure we receive a response within the timeout
371+
time.Sleep(m.httpClientGetPayload.Timeout)
372+
resultCh <- nil
373+
}()
374+
375+
// Prepare the request context, which will be cancelled after the first successful response from a relay
376+
requestCtx, requestCtxCancel := context.WithCancel(context.Background())
377+
defer requestCtxCancel()
378+
379+
for _, relay := range m.relays {
380+
go func(relay types.RelayEntry) {
381+
url := relay.GetURI(params.PathGetPayload)
382+
log := log.WithField("url", url)
383+
log.Debug("calling getPayload")
384+
385+
responsePayload := new(builderApi.VersionedSubmitBlindedBlockResponse)
386+
_, err := SendHTTPRequestWithRetries(requestCtx, m.httpClientGetPayload, http.MethodPost, url, ua, headers, blindedBlock, responsePayload, m.requestMaxRetries, log)
387+
if err != nil {
388+
if errors.Is(requestCtx.Err(), context.Canceled) {
389+
log.Info("request was cancelled") // this is expected, if payload has already been received by another relay
390+
} else {
391+
log.WithError(err).Error("error making request to relay")
392+
}
393+
return
394+
}
395+
396+
if responsePayload.Version != spec.DataVersionElectra {
397+
log.WithFields(logrus.Fields{
398+
"version": responsePayload.Version,
399+
}).Error("response version was not electra")
400+
return
401+
}
402+
if getPayloadResponseIsEmpty(responsePayload) {
403+
log.Error("response with empty data!")
404+
return
405+
}
406+
407+
payload := responsePayload.Electra.ExecutionPayload
408+
blobs := responsePayload.Electra.BlobsBundle
409+
410+
// Ensure the response blockhash matches the request
411+
if blindedBlock.Message.Body.ExecutionPayloadHeader.BlockHash != payload.BlockHash {
412+
log.WithFields(logrus.Fields{
413+
"responseBlockHash": payload.BlockHash.String(),
414+
}).Error("requestBlockHash does not equal responseBlockHash")
415+
return
416+
}
417+
418+
commitments := blindedBlock.Message.Body.BlobKZGCommitments
419+
// Ensure that blobs are valid and matches the request
420+
if len(commitments) != len(blobs.Blobs) || len(commitments) != len(blobs.Commitments) || len(commitments) != len(blobs.Proofs) {
421+
log.WithFields(logrus.Fields{
422+
"requestBlobCommitments": len(commitments),
423+
"responseBlobs": len(blobs.Blobs),
424+
"responseBlobCommitments": len(blobs.Commitments),
425+
"responseBlobProofs": len(blobs.Proofs),
426+
}).Error("block KZG commitment length does not equal responseBlobs length")
427+
return
428+
}
429+
430+
for i, commitment := range commitments {
431+
if commitment != blobs.Commitments[i] {
432+
log.WithFields(logrus.Fields{
433+
"requestBlobCommitment": commitment.String(),
434+
"responseBlobCommitment": blobs.Commitments[i].String(),
435+
"index": i,
436+
}).Error("requestBlobCommitment does not equal responseBlobCommitment")
437+
return
438+
}
439+
}
440+
441+
requestCtxCancel()
442+
if received.CompareAndSwap(false, true) {
443+
resultCh <- responsePayload
444+
log.Info("received payload from relay")
445+
} else {
446+
log.Trace("Discarding response, already received a correct response")
447+
}
448+
}(relay)
449+
}
450+
451+
// Wait for the first request to complete
452+
result := <-resultCh
453+
454+
return result, originalBid
455+
}

0 commit comments

Comments
 (0)