Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 74 additions & 32 deletions beacon/light/api/light_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/donovanhide/eventsource"
Expand Down Expand Up @@ -416,39 +418,34 @@ type HeadEventListener struct {
// The callbacks are also called for the current head and optimistic head at startup.
// They are never called concurrently.
func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() {
closeCh := make(chan struct{}) // initiate closing the stream
closedCh := make(chan struct{}) // stream closed (or failed to create)
stoppedCh := make(chan struct{}) // sync loop stopped
streamCh := make(chan *eventsource.Stream, 1)
var (
ctx, closeCtx = context.WithCancel(context.Background())
streamCh = make(chan *eventsource.Stream, 1)
wg sync.WaitGroup
)

// When connected to a Lodestar node the subscription blocks until the first actual
// event arrives; therefore we create the subscription in a separate goroutine while
// letting the main goroutine sync up to the current head.
wg.Add(1)
go func() {
defer close(closedCh)
// when connected to a Lodestar node the subscription blocks until the
// first actual event arrives; therefore we create the subscription in
// a separate goroutine while letting the main goroutine sync up to the
// current head
req, err := http.NewRequest("GET", api.url+
"/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update", nil)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
return
}
for k, v := range api.customHeaders {
req.Header.Set(k, v)
}
stream, err := eventsource.SubscribeWithRequest("", req)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
close(streamCh)
defer wg.Done()
stream := api.startEventStream(ctx, &listener)
if stream == nil {
// This case happens when the context was closed.
return
}
// Stream was opened, wait for close signal.
streamCh <- stream
<-closeCh
<-ctx.Done()
stream.Close()
}()

wg.Add(1)
go func() {
defer close(stoppedCh)
defer wg.Done()

// Request initial data.
if head, err := api.GetHeader(common.Hash{}); err == nil {
listener.OnNewHead(head.Slot, head.Hash())
}
Expand All @@ -458,39 +455,50 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
if finalityUpdate, err := api.GetFinalityUpdate(); err == nil {
listener.OnFinality(finalityUpdate)
}
stream := <-streamCh
if stream == nil {

// Receive the stream.
var stream *eventsource.Stream
select {
case stream = <-streamCh:
case <-ctx.Done():
return
}

for {
select {
case <-ctx.Done():
stream.Close()

case event, ok := <-stream.Events:
if !ok {
return
}
switch event.Event() {
case "head":
if slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())); err == nil {
slot, blockRoot, err := decodeHeadEvent([]byte(event.Data()))
if err == nil {
listener.OnNewHead(slot, blockRoot)
} else {
listener.OnError(fmt.Errorf("error decoding head event: %v", err))
}
case "light_client_optimistic_update":
if signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())); err == nil {
signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data()))
if err == nil {
listener.OnSignedHead(signedHead)
} else {
listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err))
}
case "light_client_finality_update":
if finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data())); err == nil {
finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data()))
if err == nil {
listener.OnFinality(finalityUpdate)
} else {
listener.OnError(fmt.Errorf("error decoding finality update event: %v", err))
}
default:
listener.OnError(fmt.Errorf("unexpected event: %s", event.Event()))
}

case err, ok := <-stream.Errors:
if !ok {
return
Expand All @@ -499,9 +507,43 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
}
}
}()

return func() {
close(closeCh)
<-closedCh
<-stoppedCh
closeCtx()
wg.Wait()
}
}

// startEventStream establishes an event stream. This will keep retrying until the stream has been
// established. It can only return nil when the context is canceled.
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update"
req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
continue
}
for k, v := range api.customHeaders {
req.Header.Set(k, v)
}
stream, err := eventsource.SubscribeWithRequest("", req)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
continue
}
return stream
}
return nil
}

func ctxSleep(ctx context.Context, timeout time.Duration) (ok bool) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-timer.C:
return true
case <-ctx.Done():
return false
}
}