Skip to content
82 changes: 76 additions & 6 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ func (h *HTTPGateway) returnTraces(traces []ptrace.Traces, err error, w http.Res
http.Error(w, string(resp), http.StatusNotFound)
return
}
// TODO: the response should be streamed back to the client
// https://github.com/jaegertracing/jaeger/issues/6467

combinedTrace := ptrace.NewTraces()
for _, t := range traces {
resources := t.ResourceSpans()
Expand All @@ -150,6 +149,79 @@ func (h *HTTPGateway) returnTraces(traces []ptrace.Traces, err error, w http.Res
h.returnTrace(combinedTrace, w)
}

func (h *HTTPGateway) streamTraces(tracesIter func(yield func([]ptrace.Traces, error) bool), w http.ResponseWriter) {
flusher, ok := w.(http.Flusher)
if !ok {
traces, err := jiter.FlattenWithErrors(tracesIter)
h.returnTraces(traces, err, w)
return
}

w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? add comment

w.Header().Set("Content-Encoding", "identity")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is "identity" encoding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sending data immediately without compression that's why using identity header.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I am not mistaken we have a compression handler defined at the server level. How would that work with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No compression middleware exists for API endpoints
-> Verified in cmd/query/app/server.go (lines 188-196) - handler chain has no compression


tracesFound := false
firstChunk := true
hasError := false

tracesIter(func(traces []ptrace.Traces, err error) bool {
if err != nil {
if firstChunk {
h.tryHandleError(w, err, http.StatusInternalServerError)
} else {
h.Logger.Error("Error while streaming traces", zap.Error(err))
}
hasError = true
return false
}

if len(traces) == 0 {
return true
}
Comment on lines +200 to +202
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current handling of empty trace arrays may lead to ambiguity in the API response. When all iterations return empty arrays, tracesFound remains false and a 404 error is returned. This approach doesn't distinguish between "no traces exist for this query" and "traces exist but contain no data." Consider adding a flag that indicates whether the query matched any traces at all, separate from whether those traces contain spans. This would provide more accurate feedback to API consumers about whether their query parameters matched anything in the system.

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a reasonable callout. Can we add a test for this use case?


tracesFound = true

for _, td := range traces {
tracesData := jptrace.TracesData(td)
response := &api_v3.GRPCGatewayWrapper{
Result: &tracesData,
}

if firstChunk {
w.WriteHeader(http.StatusOK)
firstChunk = false
}

marshaler := jsonpb.Marshaler{}
if err := marshaler.Marshal(w, response); err != nil {
h.Logger.Error("Failed to marshal trace chunk", zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens to the output stream if we just log error and exit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before streaming if error occur then it will throw error with 500 code and if during streaming error occur then client will receive incomplete data and something like "Connection closes unexpectedly" and a log will also be logged in server logs.

return false
}

if _, err := w.Write([]byte("\n")); err != nil {
h.Logger.Error("Failed to write chunk separator", zap.Error(err))
return false
}

flusher.Flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand correctly you serialize each chunk and flush it to the client. What happens to content-length header in this case? And how does the client know that there are multiple chunks to be received?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So http client by default will read it till EOF, no content length need to know by client as it is handled automatically.

}

return true
})

if !tracesFound && !hasError {
errorResponse := api_v3.GRPCGatewayError{
Error: &api_v3.GRPCGatewayError_GRPCGatewayErrorDetails{
HttpCode: http.StatusNotFound,
Message: "No traces found",
},
}
resp, _ := json.Marshal(&errorResponse)
http.Error(w, string(resp), http.StatusNotFound)
}
}

func (*HTTPGateway) marshalResponse(response proto.Message, w http.ResponseWriter) {
_ = new(jsonpb.Marshaler).Marshal(w, response)
}
Expand Down Expand Up @@ -193,8 +265,7 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
request.RawTraces = rawTraces
}
getTracesIter := h.QueryService.GetTraces(r.Context(), request)
trc, err := jiter.FlattenWithErrors(getTracesIter)
h.returnTraces(trc, err, w)
h.streamTraces(getTracesIter, w)
}

func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
Expand All @@ -204,8 +275,7 @@ func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
}

findTracesIter := h.QueryService.FindTraces(r.Context(), *queryParams)
traces, err := jiter.FlattenWithErrors(findTracesIter)
h.returnTraces(traces, err, w)
h.streamTraces(findTracesIter, w)
}

func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParams, bool) {
Expand Down
Loading
Loading