Skip to content

Commit efe712a

Browse files
ctlongacrmp
authored andcommitted
Propagate errors in rlp log client streaming
Propogates errors received from the rlp via gRPC when sending requests. Now that we don't block on dialing the server, we need to handle errors at the sending requests level. Signed-off-by: Andrew Crump <[email protected]>
1 parent 368f6ba commit efe712a

File tree

5 files changed

+55
-25
lines changed

5 files changed

+55
-25
lines changed

src/rlp-gateway/internal/ingress/log_client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@ func NewLogClient(creds credentials.TransportCredentials, logsProviderAddr strin
3333
}
3434

3535
// Stream opens a new stream on the log client.
36-
func (c *LogClient) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) web.Receiver {
36+
func (c *LogClient) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (web.Receiver, error) {
3737
receiver, err := c.c.BatchedReceiver(ctx, req)
3838
if err != nil {
3939
log.Printf("failed to open stream from logs provider: %s", err)
40+
return nil, err
4041
}
4142

42-
return receiver.Recv
43+
return receiver.Recv, nil
4344
}
4445

4546
func (c *LogClient) Close() error {

src/rlp-gateway/internal/web/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type Receiver func() (*loggregator_v2.EnvelopeBatch, error)
1414
// LogsProvder defines the interface for opening streams to the
1515
// logs provider
1616
type LogsProvider interface {
17-
Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) Receiver
17+
Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (Receiver, error)
1818
}
1919

2020
// Handler defines a struct for servering http endpoints

src/rlp-gateway/internal/web/json_error.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ var (
1313
errCounterNamePresentButEmpty = newJSONError(http.StatusBadRequest, "missing_counter_name", "counter.name is invalid without value")
1414
errGaugeNamePresentButEmpty = newJSONError(http.StatusBadRequest, "missing_gauge_name", "gauge.name is invalid without value")
1515
errStreamingUnsupported = newJSONError(http.StatusInternalServerError, "streaming_unsupported", "request does not support streaming")
16+
errStreamingUnavailable = newJSONError(http.StatusServiceUnavailable, "streaming_unavailable", "streaming is temporarily unavailable")
1617
errNotFound = newJSONError(http.StatusNotFound, "not_found", "resource not found")
1718
)
1819

src/rlp-gateway/internal/web/read.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,7 @@ func ReadHandler(
4747
return
4848
}
4949

50-
w.Header().Set("Content-Type", "text/event-stream")
51-
w.Header().Set("Cache-Control", "no-cache")
52-
w.Header().Set("Connection", "keep-alive")
53-
54-
flusher.Flush()
55-
56-
recv := lp.Stream(
50+
recv, err := lp.Stream(
5751
ctx,
5852
&loggregator_v2.EgressBatchRequest{
5953
ShardId: query.Get("shard_id"),
@@ -62,6 +56,16 @@ func ReadHandler(
6256
Selectors: s,
6357
},
6458
)
59+
if err != nil {
60+
errStreamingUnavailable.Write(w)
61+
return
62+
}
63+
64+
w.Header().Set("Content-Type", "text/event-stream")
65+
w.Header().Set("Cache-Control", "no-cache")
66+
w.Header().Set("Connection", "keep-alive")
67+
68+
flusher.Flush()
6569

6670
data := make(chan *loggregator_v2.EnvelopeBatch)
6771
errs := make(chan error, 1)

src/rlp-gateway/internal/web/read_test.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var _ = Describe("Read", func() {
2727

2828
BeforeEach(func() {
2929
lp = newStubLogsProvider()
30-
lp._batchResponse = &loggregator_v2.EnvelopeBatch{
30+
lp.resp = &loggregator_v2.EnvelopeBatch{
3131
Batch: []*loggregator_v2.Envelope{
3232
{
3333
SourceId: "source-id-a",
@@ -150,7 +150,7 @@ var _ = Describe("Read", func() {
150150
})
151151

152152
It("contains zero values for gauge metrics", func() {
153-
lp._batchResponse = &loggregator_v2.EnvelopeBatch{
153+
lp.resp = &loggregator_v2.EnvelopeBatch{
154154
Batch: []*loggregator_v2.Envelope{
155155
{
156156
SourceId: "source-id-a",
@@ -310,9 +310,28 @@ var _ = Describe("Read", func() {
310310
}).Should(Equal(io.EOF))
311311
})
312312

313+
It("returns service unavailable when unable to stream from the logs provider", func() {
314+
lp.streamErr = errors.New("streaming unavailable")
315+
req, err := http.NewRequest(http.MethodGet, server.URL+"/v2/read?log", nil)
316+
Expect(err).ToNot(HaveOccurred())
317+
318+
req = req.WithContext(ctx)
319+
320+
resp, err := server.Client().Do(req)
321+
Expect(err).ToNot(HaveOccurred())
322+
body, err := io.ReadAll(resp.Body)
323+
Expect(err).ToNot(HaveOccurred())
324+
325+
Expect(resp.StatusCode).To(Equal(http.StatusServiceUnavailable))
326+
Expect(body).To(MatchJSON(`{
327+
"error": "streaming_unavailable",
328+
"message": "streaming is temporarily unavailable"
329+
}`))
330+
})
331+
313332
It("closes the SSE stream if the envelope stream returns any error", func() {
314-
lp._batchResponse = nil
315-
lp._errorResponse = errors.New("an error")
333+
lp.resp = nil
334+
lp.respErr = errors.New("an error")
316335

317336
req, err := http.NewRequest(http.MethodGet, server.URL+"/v2/read?log", nil)
318337
Expect(err).ToNot(HaveOccurred())
@@ -379,38 +398,43 @@ var _ = Describe("Read", func() {
379398
})
380399

381400
type stubLogsProvider struct {
382-
mu sync.Mutex
383-
_requests []*loggregator_v2.EgressBatchRequest
384-
_batchResponse *loggregator_v2.EnvelopeBatch
385-
_errorResponse error
386-
block bool
401+
mu sync.Mutex
402+
reqs []*loggregator_v2.EgressBatchRequest
403+
resp *loggregator_v2.EnvelopeBatch
404+
respErr error
405+
block bool
406+
streamErr error
387407
}
388408

389409
func newStubLogsProvider() *stubLogsProvider {
390410
return &stubLogsProvider{}
391411
}
392412

393-
func (s *stubLogsProvider) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) web.Receiver {
413+
func (s *stubLogsProvider) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (web.Receiver, error) {
394414
s.mu.Lock()
395415
defer s.mu.Unlock()
396-
s._requests = append(s._requests, req)
416+
s.reqs = append(s.reqs, req)
417+
418+
if s.streamErr != nil {
419+
return nil, s.streamErr
420+
}
397421

398422
return func() (*loggregator_v2.EnvelopeBatch, error) {
399423
if s.block {
400424
var block chan int
401425
<-block
402426
}
403427

404-
return s._batchResponse, s._errorResponse
405-
}
428+
return s.resp, s.respErr
429+
}, nil
406430
}
407431

408432
func (s *stubLogsProvider) requests() []*loggregator_v2.EgressBatchRequest {
409433
s.mu.Lock()
410434
defer s.mu.Unlock()
411435

412-
result := make([]*loggregator_v2.EgressBatchRequest, len(s._requests))
413-
copy(result, s._requests)
436+
result := make([]*loggregator_v2.EgressBatchRequest, len(s.reqs))
437+
copy(result, s.reqs)
414438

415439
return result
416440
}

0 commit comments

Comments
 (0)