Skip to content

Commit a194bd9

Browse files
authored
[refactor] move root span handler into aggregator (#5478)
## Which problem is this PR solving? - #5389 (comment) ## Description of the changes - Refactored `handleRootSpan` logic into a helper method in aggregator.go.
1 parent 584b6ff commit a194bd9

File tree

7 files changed

+88
-113
lines changed

7 files changed

+88
-113
lines changed

cmd/collector/app/collector.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
3131
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
3232
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
33+
"github.com/jaegertracing/jaeger/model"
3334
"github.com/jaegertracing/jaeger/pkg/healthcheck"
3435
"github.com/jaegertracing/jaeger/pkg/metrics"
3536
"github.com/jaegertracing/jaeger/pkg/tenancy"
@@ -103,7 +104,9 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
103104

104105
var additionalProcessors []ProcessSpan
105106
if c.aggregator != nil {
106-
additionalProcessors = append(additionalProcessors, handleRootSpan(c.aggregator, c.logger))
107+
additionalProcessors = append(additionalProcessors, func(span *model.Span, tenant string) {
108+
c.aggregator.HandleRootSpan(span, c.logger)
109+
})
107110
}
108111

109112
c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)

cmd/collector/app/collector_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package app
1717
import (
1818
"context"
1919
"io"
20+
"sync/atomic"
2021
"testing"
2122
"time"
2223

@@ -47,6 +48,26 @@ func optionsForEphemeralPorts() *flags.CollectorOptions {
4748
return collectorOpts
4849
}
4950

51+
type mockAggregator struct {
52+
callCount atomic.Int32
53+
closeCount atomic.Int32
54+
}
55+
56+
func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) {
57+
t.callCount.Add(1)
58+
}
59+
60+
func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) {
61+
t.callCount.Add(1)
62+
}
63+
64+
func (t *mockAggregator) Start() {}
65+
66+
func (t *mockAggregator) Close() error {
67+
t.closeCount.Add(1)
68+
return nil
69+
}
70+
5071
func TestNewCollector(t *testing.T) {
5172
// prepare
5273
hc := healthcheck.New()

cmd/collector/app/root_span_handler.go

Lines changed: 0 additions & 42 deletions
This file was deleted.

cmd/collector/app/root_span_handler_test.go

Lines changed: 0 additions & 70 deletions
This file was deleted.

cmd/collector/app/sampling/strategystore/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919
"io"
2020

21+
"go.uber.org/zap"
22+
2123
"github.com/jaegertracing/jaeger/model"
2224
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
2325
)
@@ -36,6 +38,10 @@ type Aggregator interface {
3638
// Close() from io.Closer stops the aggregator from aggregating throughput.
3739
io.Closer
3840

41+
// The HandleRootSpan function processes a span, checking if it's a root span.
42+
// If it is, it extracts sampler parameters, then calls RecordThroughput.
43+
HandleRootSpan(span *model.Span, logger *zap.Logger)
44+
3945
// RecordThroughput records throughput for an operation for aggregation.
4046
RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64)
4147

plugin/sampling/strategystore/adaptive/aggregator.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,20 @@ func (a *aggregator) Close() error {
144144
a.bgFinished.Wait()
145145
return nil
146146
}
147+
148+
func (a *aggregator) HandleRootSpan(span *span_model.Span, logger *zap.Logger) {
149+
// simply checking parentId to determine if a span is a root span is not sufficient. However,
150+
// we can be sure that only a root span will have sampler tags.
151+
if span.ParentSpanID() != span_model.NewSpanID(0) {
152+
return
153+
}
154+
service := span.Process.ServiceName
155+
if service == "" || span.OperationName == "" {
156+
return
157+
}
158+
samplerType, samplerParam := span.GetSamplerParams(logger)
159+
if samplerType == span_model.SamplerTypeUnrecognized {
160+
return
161+
}
162+
a.RecordThroughput(service, span.OperationName, samplerType, samplerParam)
163+
}

plugin/sampling/strategystore/adaptive/aggregator_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,43 @@ func TestLowerboundThroughput(t *testing.T) {
115115
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
116116
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
117117
}
118+
119+
func TestRecordThroughput(t *testing.T) {
120+
metricsFactory := metricstest.NewFactory(0)
121+
mockStorage := &mocks.Store{}
122+
mockEP := &epmocks.ElectionParticipant{}
123+
testOpts := Options{
124+
CalculationInterval: 1 * time.Second,
125+
AggregationBuckets: 1,
126+
BucketsForCalculation: 1,
127+
}
128+
logger := zap.NewNop()
129+
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
130+
require.NoError(t, err)
131+
132+
// Testing non-root span
133+
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
134+
a.HandleRootSpan(span, logger)
135+
require.Empty(t, a.(*aggregator).currentThroughput)
136+
137+
// Testing span with service name but no operation
138+
span.References = []model.SpanRef{}
139+
span.Process = &model.Process{
140+
ServiceName: "A",
141+
}
142+
a.HandleRootSpan(span, logger)
143+
require.Empty(t, a.(*aggregator).currentThroughput)
144+
145+
// Testing span with service name and operation but no probabilistic sampling tags
146+
span.OperationName = "GET"
147+
a.HandleRootSpan(span, logger)
148+
require.Empty(t, a.(*aggregator).currentThroughput)
149+
150+
// Testing span with service name, operation, and probabilistic sampling tags
151+
span.Tags = model.KeyValues{
152+
model.String("sampler.type", "probabilistic"),
153+
model.String("sampler.param", "0.001"),
154+
}
155+
a.HandleRootSpan(span, logger)
156+
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count)
157+
}

0 commit comments

Comments
 (0)