Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type QueryOptions struct {
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span.
MaxClockSkewAdjust time.Duration `mapstructure:"max_clock_skew_adjust" valid:"optional"`
// MaxTraceSize is the maximum number of spans allowed per trace.
// If a trace has more spans than this, it will be truncated and a warning will be added
Comment thread
yurishkuro marked this conversation as resolved.
Outdated
Comment thread
yurishkuro marked this conversation as resolved.
Outdated
MaxTraceSize int `mapstructure:"max_trace_size" valid:"optional"`
// EnableTracing determines whether traces will be emitted by jaeger-query.
EnableTracing bool `mapstructure:"enable_tracing"`
// HTTP holds the HTTP configuration that the query service uses to serve requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type QueryServiceOptions struct {
ArchiveTraceWriter tracestore.Writer
// MaxClockSkewAdjust is the maximum duration by which to adjust a span.
MaxClockSkewAdjust time.Duration
// MaxTraceSize is the maximum number of spans allowed per trace.
// If a trace has more spans than this, it will be truncated and a warning will be added
Comment thread
yurishkuro marked this conversation as resolved.
Outdated
Comment thread
yurishkuro marked this conversation as resolved.
Outdated
MaxTraceSize int
}

// StorageCapabilities is a feature flag for query service
Expand Down Expand Up @@ -234,7 +237,7 @@ func (qs QueryService) receiveTraces(
if rawTraces {
seq(processTraces)
} else {
jptrace.AggregateTraces(seq)(func(trace ptrace.Traces, err error) bool {
jptrace.AggregateTracesWithLimit(seq, qs.options.MaxTraceSize)(func(trace ptrace.Traces, err error) bool {
return processTraces([]ptrace.Traces{trace}, err)
})
}
Expand Down
124 changes: 124 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/querysvc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,130 @@ func TestGetCapabilities(t *testing.T) {
}
}

// Consolidate Underlimit, Overlimit and Exactly at limit tests
func TestMaxTraceSize(t *testing.T) {
tests := []struct {
name string
maxTraceSize int
createTraces func() []ptrace.Traces
expectedSpans int
expectWarning bool
warningPattern string
}{
{
name: "under_limit",
maxTraceSize: 5,
createTraces: func() []ptrace.Traces {
trace := ptrace.NewTraces()
scopes := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}
return []ptrace.Traces{trace}
},
expectedSpans: 3,
expectWarning: false,
},
{
name: "over_limit",
maxTraceSize: 3,
createTraces: func() []ptrace.Traces {
trace1 := ptrace.NewTraces()
scopes1 := trace1.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes1.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}

trace2 := ptrace.NewTraces()
scopes2 := trace2.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 3; i < 5; i++ {
span := scopes2.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}
return []ptrace.Traces{trace1, trace2}
},
expectedSpans: 3,
expectWarning: true,
warningPattern: "trace has more than 3 spans",
},
{
name: "exactly_at_limit",
maxTraceSize: 3,
createTraces: func() []ptrace.Traces {
trace := ptrace.NewTraces()
scopes := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
for i := 0; i < 3; i++ {
span := scopes.Spans().AppendEmpty()
span.SetTraceID(testTraceID)
span.SetSpanID(pcommon.SpanID([8]byte{byte(i + 1)}))
span.SetName(fmt.Sprintf("span-%d", i))
}
return []ptrace.Traces{trace}
},
expectedSpans: 3,
expectWarning: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
traces := tt.createTraces()
responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
for _, trace := range traces {
if !yield([]ptrace.Traces{trace}, nil) {
return
}
}
})

traceReader := &tracestoremocks.Reader{}
dependencyStorage := &depstoremocks.Reader{}
options := QueryServiceOptions{
MaxTraceSize: tt.maxTraceSize,
}
tqs := &testQueryService{}
tqs.queryService = NewQueryService(traceReader, dependencyStorage, options)

params := GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{{TraceID: testTraceID}},
}
traceReader.On("GetTraces", mock.Anything, params.TraceIDs).
Return(responseIter).Once()

getTracesIter := tqs.queryService.GetTraces(context.Background(), params)
gotTraces, err := jiter.FlattenWithErrors(getTracesIter)
require.NoError(t, err)
require.Len(t, gotTraces, 1)

// count total spans
actualSpans := gotTraces[0].SpanCount()
require.Equal(t, tt.expectedSpans, actualSpans)

// check warning
firstSpan := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
warningsAttr, hasWarning := firstSpan.Attributes().Get("@jaeger@warnings")

if tt.expectWarning {
require.True(t, hasWarning, "expected warning but none found")
require.Equal(t, pcommon.ValueTypeSlice, warningsAttr.Type())
warnings := warningsAttr.Slice()
require.Positive(t, warnings.Len())
require.Contains(t, warnings.At(warnings.Len()-1).Str(), tt.warningPattern)
} else {
require.False(t, hasWarning, "unexpected warning found")
}
})
}
}

func TestQueryServiceGetServicesReturnsEmptySlice(t *testing.T) {
reader := new(tracestoremocks.Reader)
reader.
Expand Down
1 change: 1 addition & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {

opts := querysvc.QueryServiceOptions{
MaxClockSkewAdjust: s.config.MaxClockSkewAdjust,
MaxTraceSize: s.config.MaxTraceSize,
}
if err := s.addArchiveStorage(&opts, host); err != nil {
return err
Expand Down
100 changes: 96 additions & 4 deletions internal/jptrace/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package jptrace

import (
"fmt"
"iter"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -14,9 +15,19 @@ import (
//
// The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces.
func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptrace.Traces, error] {
return AggregateTracesWithLimit(tracesSeq, 0)
}

// AggregateTracesWithLimit aggregates a sequence of trace batches into individual traces
// but limits each trace size to maxSize spans. If maxSize is 0 or negative, there is no
// limit and all spans will be included in the aggregated trace.
//
// The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces.
func AggregateTracesWithLimit(tracesSeq iter.Seq2[[]ptrace.Traces, error], maxSize int) iter.Seq2[ptrace.Traces, error] {
return func(yield func(trace ptrace.Traces, err error) bool) {
currentTrace := ptrace.NewTraces()
currentTraceID := pcommon.NewTraceIDEmpty()
spanCount := 0
cont := true

tracesSeq(func(traces []ptrace.Traces, err error) bool {
Expand All @@ -28,19 +39,26 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra
if trace.SpanCount() == 0 {
continue
}
resources := trace.ResourceSpans()
traceID := resources.At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
traceID := GetTraceID(trace)
if currentTraceID == traceID {
MergeTraces(currentTrace, trace)
spanCount = mergeTracesWithLimit(currentTrace, trace, maxSize, spanCount)
} else {
if currentTrace.SpanCount() > 0 {
if !yield(currentTrace, nil) {
cont = false
return false
}
}
currentTrace = trace
currentTraceID = traceID
if maxSize > 0 && trace.SpanCount() > maxSize {
currentTrace = ptrace.NewTraces()
copySpansUpToLimit(currentTrace, trace, maxSize)
spanCount = maxSize
markTraceTruncated(currentTrace, maxSize)
} else {
currentTrace = trace
spanCount = trace.SpanCount()
}
}
}
return true
Expand All @@ -51,6 +69,72 @@ func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptra
}
}

func mergeTracesWithLimit(dest, src ptrace.Traces, maxSize int, spanCount int) int {
// early exit if already at max; trace was already marked truncated when the limit was first hit
if maxSize > 0 && spanCount >= maxSize {
return spanCount
}
Comment thread
yurishkuro marked this conversation as resolved.

incomingCount := src.SpanCount()
// check if we can merge all spans without exceeding limit
if maxSize <= 0 || spanCount+incomingCount <= maxSize {
MergeTraces(dest, src)
spanCount += incomingCount
return spanCount
}

// partial copy
remaining := maxSize - spanCount
if remaining > 0 {
copySpansUpToLimit(dest, src, remaining)
spanCount = maxSize
}
markTraceTruncated(dest, maxSize)
return spanCount
}

func copySpansUpToLimit(dest, src ptrace.Traces, limit int) {
copied := 0

for _, srcResource := range src.ResourceSpans().All() {
if copied >= limit {
return
}
var destResource ptrace.ResourceSpans
resourceAdded := false

for _, srcScope := range srcResource.ScopeSpans().All() {
if copied >= limit {
break
}
var destScope ptrace.ScopeSpans
scopeAdded := false

for _, span := range srcScope.Spans().All() {
if copied >= limit {
break
}
// Lazily create resource and scope containers only when a span is actually copied,
// to avoid leaving empty container artifacts in dest.
if !resourceAdded {
destResource = dest.ResourceSpans().AppendEmpty()
srcResource.Resource().CopyTo(destResource.Resource())
destResource.SetSchemaUrl(srcResource.SchemaUrl())
resourceAdded = true
}
if !scopeAdded {
destScope = destResource.ScopeSpans().AppendEmpty()
srcScope.Scope().CopyTo(destScope.Scope())
destScope.SetSchemaUrl(srcScope.SchemaUrl())
scopeAdded = true
}
span.CopyTo(destScope.Spans().AppendEmpty())
copied++
}
}
}
Comment thread
yurishkuro marked this conversation as resolved.
}

// MergeTraces merges src trace into dest trace.
// This is useful when multiple iterations return parts of the same trace.
func MergeTraces(dest, src ptrace.Traces) {
Expand All @@ -60,3 +144,11 @@ func MergeTraces(dest, src ptrace.Traces) {
resource.CopyTo(dest.ResourceSpans().AppendEmpty())
}
}

func markTraceTruncated(trace ptrace.Traces, maxSize int) {
SpanIter(trace)(func(_ SpanIterPos, span ptrace.Span) bool {
AddWarnings(span,
fmt.Sprintf("trace has more than %d spans, showing first %d spans only", maxSize, maxSize))
return false // stop after first span
})
}
Loading
Loading