Skip to content
2 changes: 2 additions & 0 deletions cmd/query/app/apiv3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (gw *testGateway) execRequest(t *testing.T, url string) ([]byte, int) {

func (*testGateway) verifySnapshot(t *testing.T, body []byte) []byte {
// reformat JSON body with indentation, to make diffing easier
// Note: body may contain multiple newline-separated JSON objects for streaming responses
var data any
require.NoError(t, json.Unmarshal(body, &data), "response: %s", string(body))
body, err := json.MarshalIndent(data, "", " ")
Expand Down Expand Up @@ -168,6 +169,7 @@ func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTrac

var response api_v3.GRPCGatewayWrapper
parseResponse(t, body, &response)

td := response.Result.ToTraces()
assert.Equal(t, 1, td.SpanCount())
traceID := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
Expand Down
112 changes: 106 additions & 6 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
Expand All @@ -31,6 +32,21 @@ import (
"github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter"
)

var enableHTTPStreaming *featuregate.Gate

func init() {
enableHTTPStreaming = featuregate.GlobalRegistry().MustRegister(
"jaeger.query.http.streaming",
featuregate.StageAlpha,
featuregate.WithRegisterDescription(
"Enable HTTP streaming for /api/v3 endpoints using NDJSON format with chunked transfer encoding",
),
featuregate.WithRegisterReferenceURL(
"https://github.com/jaegertracing/jaeger/issues/6467",
),
)
}

const (
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
Expand Down Expand Up @@ -137,8 +153,7 @@ func (h *HTTPGateway) returnTraces(traces []ptrace.Traces, err error, w http.Res
http.Error(w, string(resp), http.StatusNotFound)
return
}
// TODO: the response should be streamed back to the client
// https://github.com/jaegertracing/jaeger/issues/6467

combinedTrace := ptrace.NewTraces()
for _, t := range traces {
resources := t.ResourceSpans()
Expand All @@ -150,6 +165,93 @@ func (h *HTTPGateway) returnTraces(traces []ptrace.Traces, err error, w http.Res
h.returnTrace(combinedTrace, w)
}

func (h *HTTPGateway) streamTraces(tracesIter func(yield func([]ptrace.Traces, error) bool), w http.ResponseWriter) {
// Check feature gate FIRST - if disabled, use non-streaming behavior
if !enableHTTPStreaming.IsEnabled() {
traces, err := jiter.FlattenWithErrors(tracesIter)
h.returnTraces(traces, err, w)
return
}

flusher, ok := w.(http.Flusher)
if !ok {
traces, err := jiter.FlattenWithErrors(tracesIter)
h.returnTraces(traces, err, w)
return
}

w.Header().Set("Content-Type", "application/json")

tracesFound := false
hasError := false
headerWritten := false

tracesIter(func(traces []ptrace.Traces, err error) bool {
if err != nil {
if !headerWritten {
h.tryHandleError(w, err, http.StatusInternalServerError)
} else {
h.Logger.Error("Error while streaming traces", zap.Error(err))
}
hasError = true
return false
}

if len(traces) == 0 {
return true
}
Comment on lines +200 to +202
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current handling of empty trace arrays may lead to ambiguity in the API response. When all iterations return empty arrays, tracesFound remains false and a 404 error is returned. This approach doesn't distinguish between "no traces exist for this query" and "traces exist but contain no data." Consider adding a flag that indicates whether the query matched any traces at all, separate from whether those traces contain spans. This would provide more accurate feedback to API consumers about whether their query parameters matched anything in the system.

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a reasonable callout. Can we add a test for this use case?


tracesFound = true

for _, td := range traces {
combinedTrace := ptrace.NewTraces()
resources := td.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
resource := resources.At(i)
resource.CopyTo(combinedTrace.ResourceSpans().AppendEmpty())
}

tracesData := jptrace.TracesData(combinedTrace)
response := &api_v3.GRPCGatewayWrapper{
Result: &tracesData,
}

if !headerWritten {
w.WriteHeader(http.StatusOK)
headerWritten = true
} else {
// Write newline separator between messages (grpc-web style NDJSON)
if _, err := w.Write([]byte("\n")); err != nil {
h.Logger.Error("Failed to write newline separator", zap.Error(err))
return false
}
}

marshaler := jsonpb.Marshaler{}
if err := marshaler.Marshal(w, response); err != nil {
h.Logger.Error("Failed to marshal trace chunk", zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens to the output stream if we just log error and exit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before streaming if error occur then it will throw error with 500 code and if during streaming error occur then client will receive incomplete data and something like "Connection closes unexpectedly" and a log will also be logged in server logs.

return false
}

flusher.Flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand correctly you serialize each chunk and flush it to the client. What happens to content-length header in this case? And how does the client know that there are multiple chunks to be received?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So http client by default will read it till EOF, no content length need to know by client as it is handled automatically.

}

return true
})

if !tracesFound && !hasError {
errorResponse := api_v3.GRPCGatewayError{
Error: &api_v3.GRPCGatewayError_GRPCGatewayErrorDetails{
HttpCode: http.StatusNotFound,
Message: "No traces found",
},
}
resp, _ := json.Marshal(&errorResponse)
http.Error(w, string(resp), http.StatusNotFound)
return
}
}

func (*HTTPGateway) marshalResponse(response proto.Message, w http.ResponseWriter) {
_ = new(jsonpb.Marshaler).Marshal(w, response)
}
Expand Down Expand Up @@ -193,8 +295,7 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
request.RawTraces = rawTraces
}
getTracesIter := h.QueryService.GetTraces(r.Context(), request)
trc, err := jiter.FlattenWithErrors(getTracesIter)
h.returnTraces(trc, err, w)
h.streamTraces(getTracesIter, w)
}

func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
Expand All @@ -204,8 +305,7 @@ func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
}

findTracesIter := h.QueryService.FindTraces(r.Context(), *queryParams)
traces, err := jiter.FlattenWithErrors(findTracesIter)
h.returnTraces(traces, err, w)
h.streamTraces(findTracesIter, w)
}

func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParams, bool) {
Expand Down
Loading
Loading