Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 69 additions & 53 deletions builder/beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool {
func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil
}

func (b *testBeaconClient) updateValidatorsMap() error {
func (b *testBeaconClient) Start() error {
return nil
}

Expand All @@ -41,7 +40,6 @@ type BeaconClient struct {
secondsInSlot uint64

mu sync.Mutex
currentEpoch uint64
slotProposerMap map[uint64]PubkeyHex

closeCh chan struct{}
Expand Down Expand Up @@ -77,65 +75,83 @@ func (b *BeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex,
return nextSlotProposer, nil
}

func (b *BeaconClient) updateValidatorsMap() error {
// Start regular slot updates
slotsPerEpoch := b.slotsInEpoch
func (b *BeaconClient) Start() error {
go b.UpdateValidatorMapForever()
return nil
}

func (b *BeaconClient) UpdateValidatorMapForever() {
durationPerSlot := time.Duration(b.secondsInSlot) * time.Second
durationPerEpoch := durationPerSlot * time.Duration(slotsPerEpoch)
// Every half epoch request validators map
timer := time.NewTicker(durationPerEpoch / 2)

prevFetchSlot := uint64(0)

// fetch current epoch if beacon is online
currentSlot, err := fetchCurrentSlot(b.endpoint)
if err != nil {
log.Error("could not get current slot", "err", err)
} else {
currentEpoch := currentSlot / b.slotsInEpoch
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err)
} else {
b.mu.Lock()
b.slotProposerMap = slotProposerMap
b.mu.Unlock()
}
}

retryDelay := time.Second

// Every half epoch request validators map, polling for the slot
// more frequently to avoid missing updates on errors
timer := time.NewTimer(retryDelay)
defer timer.Stop()
for {
for true {
select {
case <-b.closeCh:
return
case <-timer.C:
currentSlot, err := fetchCurrentSlot(b.endpoint)
if err != nil {
log.Error("could not get current slot", "err", err)
continue
}
}

currentEpoch := currentSlot / b.slotsInEpoch
if len(b.slotProposerMap) == 0 {
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err)
continue
}

b.mu.Lock()
b.slotProposerMap = slotProposerMap
b.mu.Unlock()
}
currentSlot, err := fetchCurrentSlot(b.endpoint)
if err != nil {
log.Error("could not get current slot", "err", err)
timer.Reset(retryDelay)
continue
}

if currentEpoch > b.currentEpoch {
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err)
continue
}

prevEpoch := currentEpoch - 1
b.mu.Lock()
b.currentEpoch = currentEpoch
// remove previous epoch slots
for k := range b.slotProposerMap {
if k/b.slotsInEpoch == prevEpoch {
delete(b.slotProposerMap, k)
}
}
// update the slot proposer map for next epoch
for k, v := range slotProposerMap {
b.slotProposerMap[k] = v
}
b.mu.Unlock()
}
// TODO: should poll after consistent slot within the epoch (slot % slotsInEpoch/2 == 0)
nextFetchSlot := prevFetchSlot + b.slotsInEpoch/2
if currentSlot < nextFetchSlot {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if currentSlot < nextFetchSlot {
if currentSlot < nextFetchSlot || (currentSlot % b.slotsInEpoch) < b.slotsInEpoch/2 {

this works fine except that it requests the same validator map twice in the same epoch. Should only request it once.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is actually what we want - since we are trying to avoid corner cases where the BN is not ready with the proposers map for the next epoch (we can ask for what the BN thinks is epoch+2)

timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
continue
}

timer.Reset(durationPerEpoch / 2)
case <-b.closeCh:
return nil
currentEpoch := currentSlot / b.slotsInEpoch
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err)
timer.Reset(retryDelay)
continue
}

prevFetchSlot = currentSlot
b.mu.Lock()
// remove previous epoch slots
for k := range b.slotProposerMap {
if k < currentEpoch*b.slotsInEpoch {
delete(b.slotProposerMap, k)
}
}
// update the slot proposer map for next epoch
for k, v := range slotProposerMap {
b.slotProposerMap[k] = v
}
b.mu.Unlock()

timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
}
return nil
}

func fetchCurrentSlot(endpoint string) (uint64, error) {
Expand Down
2 changes: 1 addition & 1 deletion builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type ValidatorData struct {
type IBeaconClient interface {
isValidator(pubkey PubkeyHex) bool
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
updateValidatorsMap() error
Start() error
Stop()
}

Expand Down
2 changes: 1 addition & 1 deletion builder/local_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewLocalRelay(sk *bls.SecretKey, beaconClient IBeaconClient, builderSigning
}

func (r *LocalRelay) Start() error {
go r.beaconClient.updateValidatorsMap()
r.beaconClient.Start()
return nil
}

Expand Down