Skip to content

Commit b61bcf6

Browse files
[processor/tailsampling] Simplify tsp locking (open-telemetry#43671)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR changes the tailsampling processor to do all work (append traces to a batch, decide on which traces from a batch are sampled, forward to the next processor) in a single goroutine. At Grafana Labs we have found that there is significant contention between the various mutexes such that little performance is gained by allowing traces to be updated while a policy evaluation is occurring. In addition, the locking adds a lot of complexity which makes future improvements more challenging, and introduces subtle concurrency issues such as seen in the tracking issue linked below. We (mostly @Logiraptor) also experimented with using multiple workers, each with their own goroutine to process traces but we found little improvement as yet again various contention issues showed up. See Logiraptor#1 (or the first commit in this PR) for more details. If additional performance is necessary for slow decision tick latencies I would suggest adding parallelism just to the call to `makeDecision`, which would allow additional CPUs to be used without adding contention on `idToTrace` or other locations. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#41656 Working towards open-telemetry#43876 <!--Describe what testing was performed and which tests were added.--> #### Testing Tests were updated to make sure the tick behavior remains correct and consistent. In addition, the syncBatcher needed to be updated to properly implement `Stop`. --------- Co-authored-by: Patrick Oyarzun <[email protected]>
1 parent 676f0b3 commit b61bcf6

28 files changed

+563
-519
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: processor/tail_sampling
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Simplify the locking used by the tail sampling
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [41656, 43671]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
There are two small breaking changes as part of this work:
20+
1. Pending traces are now passed through the decision logic during shutdown by default. If this is not desired it can be turned off using `drop_pending_traces_on_shutdown`.
21+
2. The mutex in `samplingpolicy.TraceData` has been removed and `samplingpolicy.SpanCount` is now an `int64` instead of `*atomic.Int64`. Custom extensions using these fields will need to be updated.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [api]

processor/tailsamplingprocessor/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ The following configuration options can also be modified:
6161
persisting the "drop" decisions for traces that may have already been released from memory.
6262
By default, the size is 0 and the cache is inactive.
6363
- `sample_on_first_match`: Make decision as soon as a policy matches
64+
- `drop_pending_traces_on_shutdown`: Drop pending traces on shutdown instead of making a decision with the partial data
65+
already ingested.
6466

6567

6668
Each policy will result in a decision, and the processor will evaluate them to make a final decision:

processor/tailsamplingprocessor/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,4 +291,7 @@ type Config struct {
291291
Options []Option `mapstructure:"-"`
292292
// Make decision as soon as a policy matches
293293
SampleOnFirstMatch bool `mapstructure:"sample_on_first_match"`
294+
// DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision
295+
// wait when the processor is shutdown.
296+
DropPendingTracesOnShutdown bool `mapstructure:"drop_pending_traces_on_shutdown"`
294297
}

processor/tailsamplingprocessor/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
77
github.com/google/uuid v1.6.0
88
github.com/hashicorp/golang-lru/v2 v2.0.7
9-
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.140.1
9+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.140.1 // indirect
1010
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.140.1
1111
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.140.1
1212
github.com/stretchr/testify v1.11.1

processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ func NewBooleanAttributeFilter(settings component.TelemetrySettings, key string,
3636

3737
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
3838
func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trace *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
39-
trace.Lock()
40-
defer trace.Unlock()
4139
batches := trace.ReceivedBatches
4240

4341
if baf.invertMatch {

processor/tailsamplingprocessor/internal/sampling/composite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace
111111
// The subpolicy made a decision to Sample. Now we need to make our decision.
112112

113113
// Calculate resulting SPS counter if we decide to sample this trace
114-
spansInSecondIfSampled := sub.sampledSPS + trace.SpanCount.Load()
114+
spansInSecondIfSampled := sub.sampledSPS + trace.SpanCount
115115

116116
// Check if the rate will be within the allocated bandwidth.
117117
if spansInSecondIfSampled <= sub.allocatedSPS && spansInSecondIfSampled <= c.maxTotalSPS {

processor/tailsamplingprocessor/internal/sampling/composite_test.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
package sampling
44

55
import (
6-
"sync/atomic"
76
"testing"
87
"time"
98

@@ -28,9 +27,7 @@ func (f FakeTimeProvider) getCurSecond() int64 {
2827
var traceID = pcommon.TraceID([16]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x52, 0x96, 0x9A, 0x89, 0x55, 0x57, 0x1A, 0x3F})
2928

3029
func createTrace() *samplingpolicy.TraceData {
31-
spanCount := &atomic.Int64{}
32-
spanCount.Store(1)
33-
trace := &samplingpolicy.TraceData{SpanCount: spanCount, ReceivedBatches: ptrace.NewTraces()}
30+
trace := &samplingpolicy.TraceData{SpanCount: 1, ReceivedBatches: ptrace.NewTraces()}
3431
return trace
3532
}
3633

@@ -49,11 +46,9 @@ func newTraceWithKV(traceID pcommon.TraceID, key string, val int64) *samplingpol
4946
))
5047
span.Attributes().PutInt(key, val)
5148

52-
spanCount := &atomic.Int64{}
53-
spanCount.Store(1)
5449
return &samplingpolicy.TraceData{
5550
ReceivedBatches: traces,
56-
SpanCount: spanCount,
51+
SpanCount: 1,
5752
}
5853
}
5954

processor/tailsamplingprocessor/internal/sampling/latency.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ func NewLatency(settings component.TelemetrySettings, thresholdMs, upperThreshol
3535
func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
3636
l.logger.Debug("Evaluating spans in latency filter")
3737

38-
traceData.Lock()
39-
defer traceData.Unlock()
4038
batches := traceData.ReceivedBatches
4139

4240
var minTime pcommon.Timestamp

processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ func NewNumericAttributeFilter(settings component.TelemetrySettings, key string,
4444

4545
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
4646
func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trace *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
47-
trace.Lock()
48-
defer trace.Unlock()
4947
batches := trace.ReceivedBatches
5048

5149
// Get the effective min/max values

processor/tailsamplingprocessor/internal/sampling/ottl.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ func (ocf *ottlConditionFilter) Evaluate(ctx context.Context, traceID pcommon.Tr
6464
return samplingpolicy.NotSampled, nil
6565
}
6666

67-
trace.Lock()
68-
defer trace.Unlock()
6967
batches := trace.ReceivedBatches
7068

7169
for i := 0; i < batches.ResourceSpans().Len(); i++ {

0 commit comments

Comments
 (0)