diff --git a/builder/beacon_client.go b/builder/beacon_client.go index 84b30b585c..bb97ebfa6c 100644 --- a/builder/beacon_client.go +++ b/builder/beacon_client.go @@ -30,8 +30,7 @@ func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool { func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) { return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil } - -func (b *testBeaconClient) updateValidatorsMap() error { +func (b *testBeaconClient) Start() error { return nil } @@ -41,7 +40,6 @@ type BeaconClient struct { secondsInSlot uint64 mu sync.Mutex - currentEpoch uint64 slotProposerMap map[uint64]PubkeyHex closeCh chan struct{} @@ -77,65 +75,83 @@ func (b *BeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, return nextSlotProposer, nil } -func (b *BeaconClient) updateValidatorsMap() error { - // Start regular slot updates - slotsPerEpoch := b.slotsInEpoch +func (b *BeaconClient) Start() error { + go b.UpdateValidatorMapForever() + return nil +} + +func (b *BeaconClient) UpdateValidatorMapForever() { durationPerSlot := time.Duration(b.secondsInSlot) * time.Second - durationPerEpoch := durationPerSlot * time.Duration(slotsPerEpoch) - // Every half epoch request validators map - timer := time.NewTicker(durationPerEpoch / 2) + + prevFetchSlot := uint64(0) + + // fetch current epoch if beacon is online + currentSlot, err := fetchCurrentSlot(b.endpoint) + if err != nil { + log.Error("could not get current slot", "err", err) + } else { + currentEpoch := currentSlot / b.slotsInEpoch + slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch) + if err != nil { + log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err) + } else { + b.mu.Lock() + b.slotProposerMap = slotProposerMap + b.mu.Unlock() + } + } + + retryDelay := time.Second + + // Every half epoch request validators map, polling for the slot + // more frequently to avoid missing updates on errors + timer := time.NewTimer(retryDelay) defer timer.Stop() - for { + for true { select { + case <-b.closeCh: + return case <-timer.C: - currentSlot, err := fetchCurrentSlot(b.endpoint) - if err != nil { - log.Error("could not get current slot", "err", err) - continue - } + } - currentEpoch := currentSlot / b.slotsInEpoch - if len(b.slotProposerMap) == 0 { - slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch) - if err != nil { - log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err) - continue - } - - b.mu.Lock() - b.slotProposerMap = slotProposerMap - b.mu.Unlock() - } + currentSlot, err := fetchCurrentSlot(b.endpoint) + if err != nil { + log.Error("could not get current slot", "err", err) + timer.Reset(retryDelay) + continue + } - if currentEpoch > b.currentEpoch { - slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1) - if err != nil { - log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err) - continue - } - - prevEpoch := currentEpoch - 1 - b.mu.Lock() - b.currentEpoch = currentEpoch - // remove previous epoch slots - for k := range b.slotProposerMap { - if k/b.slotsInEpoch == prevEpoch { - delete(b.slotProposerMap, k) - } - } - // update the slot proposer map for next epoch - for k, v := range slotProposerMap { - b.slotProposerMap[k] = v - } - b.mu.Unlock() - } + // TODO: should poll after consistent slot within the epoch (slot % slotsInEpoch/2 == 0) + nextFetchSlot := prevFetchSlot + b.slotsInEpoch/2 + if currentSlot < nextFetchSlot { + timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot) + continue + } - timer.Reset(durationPerEpoch / 2) - case <-b.closeCh: - return nil + currentEpoch := currentSlot / b.slotsInEpoch + slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1) + if err != nil { + log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err) + timer.Reset(retryDelay) + continue + } + + prevFetchSlot = currentSlot + b.mu.Lock() + // remove previous epoch slots + for k := range b.slotProposerMap { + if k < currentEpoch*b.slotsInEpoch { + delete(b.slotProposerMap, k) + } } + // update the slot proposer map for next epoch + for k, v := range slotProposerMap { + b.slotProposerMap[k] = v + } + b.mu.Unlock() + + timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot) } - return nil } func fetchCurrentSlot(endpoint string) (uint64, error) { diff --git a/builder/builder.go b/builder/builder.go index eca067a717..c732bf0553 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -32,7 +32,7 @@ type ValidatorData struct { type IBeaconClient interface { isValidator(pubkey PubkeyHex) bool getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) - updateValidatorsMap() error + Start() error Stop() } diff --git a/builder/local_relay.go b/builder/local_relay.go index 02a6ef3afe..d07d463231 100644 --- a/builder/local_relay.go +++ b/builder/local_relay.go @@ -85,7 +85,7 @@ func NewLocalRelay(sk *bls.SecretKey, beaconClient IBeaconClient, builderSigning } func (r *LocalRelay) Start() error { - go r.beaconClient.updateValidatorsMap() + r.beaconClient.Start() return nil }