From 48ea323c3f80b47689bf094043a49b3d4915d2bb Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 21 Mar 2024 17:21:22 +0100 Subject: [PATCH 1/2] beacon/light/api: improve handling of event stream setup failures The StartHeadListener method will only be called once. So it can't just make one attempt to connect to the eventsource endpoint, it has to keep trying. Note that once the stream is established, the eventsource implementation itself will keep retrying. --- beacon/light/api/light_api.go | 111 ++++++++++++++++++++++++---------- 1 file changed, 79 insertions(+), 32 deletions(-) diff --git a/beacon/light/api/light_api.go b/beacon/light/api/light_api.go index 7e5ac38420b2..d768a325d45d 100755 --- a/beacon/light/api/light_api.go +++ b/beacon/light/api/light_api.go @@ -17,11 +17,13 @@ package api import ( + "context" "encoding/json" "errors" "fmt" "io" "net/http" + "sync" "time" "github.com/donovanhide/eventsource" @@ -416,39 +418,34 @@ type HeadEventListener struct { // The callbacks are also called for the current head and optimistic head at startup. // They are never called concurrently. func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() { - closeCh := make(chan struct{}) // initiate closing the stream - closedCh := make(chan struct{}) // stream closed (or failed to create) - stoppedCh := make(chan struct{}) // sync loop stopped - streamCh := make(chan *eventsource.Stream, 1) + var ( + ctx, closeCtx = context.WithCancel(context.Background()) + streamCh = make(chan *eventsource.Stream, 1) + wg sync.WaitGroup + ) + + // When connected to a Lodestar node the subscription blocks until the first actual + // event arrives; therefore we create the subscription in a separate goroutine while + // letting the main goroutine sync up to the current head. + wg.Add(1) go func() { - defer close(closedCh) - // when connected to a Lodestar node the subscription blocks until the - // first actual event arrives; therefore we create the subscription in - // a separate goroutine while letting the main goroutine sync up to the - // current head - req, err := http.NewRequest("GET", api.url+ - "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update", nil) - if err != nil { - listener.OnError(fmt.Errorf("error creating event subscription request: %v", err)) - return - } - for k, v := range api.customHeaders { - req.Header.Set(k, v) - } - stream, err := eventsource.SubscribeWithRequest("", req) - if err != nil { - listener.OnError(fmt.Errorf("error creating event subscription: %v", err)) - close(streamCh) + defer wg.Done() + stream := api.startEventStream(ctx, &listener) + if stream == nil { + // This case happens when the context was closed. return } + // Stream was opened, wait for close signal. streamCh <- stream - <-closeCh + <-ctx.Done() stream.Close() }() + wg.Add(1) go func() { - defer close(stoppedCh) + defer wg.Done() + // Request initial data. if head, err := api.GetHeader(common.Hash{}); err == nil { listener.OnNewHead(head.Slot, head.Hash()) } @@ -458,32 +455,42 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() if finalityUpdate, err := api.GetFinalityUpdate(); err == nil { listener.OnFinality(finalityUpdate) } - stream := <-streamCh - if stream == nil { + + // Receive the stream. + var stream *eventsource.Stream + select { + case stream = <-streamCh: + case <-ctx.Done(): return } for { select { + case <-ctx.Done(): + stream.Close() + case event, ok := <-stream.Events: if !ok { return } switch event.Event() { case "head": - if slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())); err == nil { + slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())) + if err == nil { listener.OnNewHead(slot, blockRoot) } else { listener.OnError(fmt.Errorf("error decoding head event: %v", err)) } case "light_client_optimistic_update": - if signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())); err == nil { + signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())) + if err == nil { listener.OnSignedHead(signedHead) } else { listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err)) } case "light_client_finality_update": - if finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data())); err == nil { + finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data())) + if err == nil { listener.OnFinality(finalityUpdate) } else { listener.OnError(fmt.Errorf("error decoding finality update event: %v", err)) @@ -491,6 +498,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() default: listener.OnError(fmt.Errorf("unexpected event: %s", event.Event())) } + case err, ok := <-stream.Errors: if !ok { return @@ -499,9 +507,48 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() } } }() + return func() { - close(closeCh) - <-closedCh - <-stoppedCh + closeCtx() + wg.Wait() + } +} + +// startEventStream establishes an event stream. This will keep retrying until the stream has been +// established. It can only return nil when the context is canceled. +func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream { + for initial := true; ; initial = false { + if !initial { + if ctxSleep(ctx, 5*time.Second) { + return nil + } + } + + path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update" + req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil) + if err != nil { + listener.OnError(fmt.Errorf("error creating event subscription request: %v", err)) + continue + } + for k, v := range api.customHeaders { + req.Header.Set(k, v) + } + stream, err := eventsource.SubscribeWithRequest("", req) + if err != nil { + listener.OnError(fmt.Errorf("error creating event subscription: %v", err)) + continue + } + return stream + } +} + +func ctxSleep(ctx context.Context, timeout time.Duration) (interrupted bool) { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case <-timer.C: + return false + case <-ctx.Done(): + return true } } From 4ab086bf95c24c748d1e927c57dc8420b0232532 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 22 Mar 2024 13:17:09 +0100 Subject: [PATCH 2/2] beacon/light/api: update --- beacon/light/api/light_api.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/beacon/light/api/light_api.go b/beacon/light/api/light_api.go index d768a325d45d..b23a89b319ae 100755 --- a/beacon/light/api/light_api.go +++ b/beacon/light/api/light_api.go @@ -517,13 +517,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() // startEventStream establishes an event stream. This will keep retrying until the stream has been // established. It can only return nil when the context is canceled. func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream { - for initial := true; ; initial = false { - if !initial { - if ctxSleep(ctx, 5*time.Second) { - return nil - } - } - + for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) { path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update" req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil) if err != nil { @@ -540,15 +534,16 @@ func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadE } return stream } + return nil } -func ctxSleep(ctx context.Context, timeout time.Duration) (interrupted bool) { +func ctxSleep(ctx context.Context, timeout time.Duration) (ok bool) { timer := time.NewTimer(timeout) defer timer.Stop() select { case <-timer.C: - return false - case <-ctx.Done(): return true + case <-ctx.Done(): + return false } }