Skip to content

Commit a7e8f00

Browse files
pmm-sumopmalek
authored andcommitted
Introduce cascading filter processor (#359)
* Introduce cascading filter processor
1 parent 54d44f2 commit a7e8f00

27 files changed

+4930
-0
lines changed

cmd/otelcontribcol/components.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
4747
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver"
4848
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
49+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor"
4950
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
5051
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor"
5152
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
@@ -174,6 +175,7 @@ func components() (component.Factories, error) {
174175

175176
processors := []component.ProcessorFactory{
176177
groupbyattrsprocessor.NewFactory(),
178+
cascadingfilterprocessor.NewFactory(),
177179
sourceprocessor.NewFactory(),
178180
groupbytraceprocessor.NewFactory(),
179181
k8sprocessor.NewFactory(),

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver v0.0.0-00010101000000-000000000000
3434
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver v0.0.0-00010101000000-000000000000
3535
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.0.0-00010101000000-000000000000
36+
github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor v0.0.0-00010101000000-000000000000
3637
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor v0.0.0-00010101000000-000000000000
3738
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor v0.0.0-00010101000000-000000000000
3839
github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.0.0-00010101000000-000000000000
@@ -69,6 +70,8 @@ require (
6970
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4
7071
)
7172

73+
replace go.opentelemetry.io/collector => github.com/SumoLogic/opentelemetry-collector v0.16.0-sumo
74+
7275
// Replace references to modules that are in this repository with their relateive paths
7376
// so that we always build with current (latest) version of the source code.
7477

@@ -196,6 +199,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/memca
196199

197200
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor => ./processor/groupbyattrsprocessor
198201

202+
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor => ./processor/cascadingfilterprocessor
203+
199204
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor => ./processor/groupbytraceprocessor
200205

201206
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor => ./processor/k8sprocessor/
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# Cascading Filter Processor
2+
3+
Supported pipeline types: traces
4+
5+
The Cascading Filter processor is a [fork of tailsamplingprocessor](../tailsamplingprocessor) which
6+
allows for defining smart cascading filtering rules with preset limits.
7+
8+
## Processor configuration
9+
10+
The following configuration options should be configured as desired:
11+
- `policies` (no default): Policies used to make a sampling decision
12+
- `spans_per_second` (default = 1500): Maximum total number of emitted spans per second
13+
- `probabilistic_filtering_ratio` (default = 0.2): Ratio of spans that are always probabilistically filtered
14+
(hence might be used for metrics calculation)
15+
16+
The following configuration options can also be modified:
17+
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a filtering decision
18+
- `num_traces` (default = 50000): Number of traces kept in memory
19+
- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures)
20+
21+
## Policy configuration
22+
23+
Each defined policy is evaluated with order as specified in config. There are several properties:
24+
- `name` (required): identifies the policy
25+
- `spans_per_second` (default = 0): defines maximum number of spans per second that could be handled by this policy. When set to `-1`,
26+
it selects the traces only if the global limit is not exceeded by other policies (however, without further limitations)
27+
28+
Additionally, each of the policy might have any of the following filtering criteria defined. They are evaluated for
29+
each of the trace spans. If at least one span matching all defined criteria is found, the trace is selected:
30+
- `numeric_attribute: {key: <name>, min_value: <min_value>, max_value: <max_value>}`: selects span by matching numeric
31+
attribute (either at resource of span level)
32+
- `string_attribute: {key: <name>, values: [<value1>, <value2>]}`: selects span by matching string attribute that is one
33+
of the provided values (either at resource of span level)
34+
- `properties: { min_number_of_spans: <number>}`: selects the trace if it has at least provided number of spans
35+
- `properties: { min_duration: <duration>}`: selects the span if the duration is greater or equal the given value
36+
(use `s` or `ms` as the suffix to indicate unit)
37+
- `properties: { name_pattern: <regex>`}: selects the span if its operation name matches the provided regular expression
38+
39+
## Limiting the number of spans
40+
41+
There are two `spans_per_second` settings. The global one and the policy-one.
42+
43+
While evaluating traces, the limit is evaluated first on the policy level and then on the global level. The sum
44+
of all `spans_per_second` rates might be actually higher than the global limit, but the latter will never be
45+
exceeded (so some of the traces will not be included).
46+
47+
For example, we have 3 policies: `A, B, C`. Each of them has limit of `300` spans per second and the global limit
48+
is `500` spans per second. Now, lets say, that there for each of the policies there were 5 distinct traces, each
49+
having `100` spans and matching policy criteria (lets call them `A1, A2, ... B1, B2...` and so forth:
50+
51+
`Policy A`: `A1, A2, A3`
52+
`Policy B`: `B1, B2, B3`
53+
`Policy C`: `C1, C2, C3`
54+
55+
However, in total, this is `900` spans, which is more than the global limit of `500` spans/second. The processor
56+
will take care of that and randomly select only the spans up to the global limit. So eventually, it might
57+
for example send further only following traces: `A1, A2, B1, C2, C5` and filter out the others.
58+
59+
## Example
60+
61+
```yaml
62+
processors:
63+
cascading_filter:
64+
decision_wait: 10s
65+
num_traces: 100
66+
expected_new_traces_per_sec: 10
67+
spans_per_second: 1000
68+
probabilistic_filtering_ratio: 0.1
69+
policies:
70+
[
71+
{
72+
name: test-policy-1,
73+
},
74+
{
75+
name: test-policy-2,
76+
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
77+
},
78+
{
79+
name: test-policy-3,
80+
string_attribute: {key: key2, values: [value1, value2]}
81+
},
82+
{
83+
name: test-policy-4,
84+
spans_per_second: 35,
85+
},
86+
{
87+
name: test-policy-5,
88+
spans_per_second: 123,
89+
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
90+
},
91+
{
92+
name: test-policy-6,
93+
spans_per_second: 50,
94+
properties: {min_duration: 9s }
95+
},
96+
{
97+
name: test-policy-7,
98+
properties: {
99+
name_pattern: "foo.*",
100+
min_number_of_spans: 10,
101+
min_duration: 9s
102+
}
103+
},
104+
{
105+
name: everything_else,
106+
spans_per_second: -1
107+
},
108+
]
109+
```
110+
111+
Refer to [cascading_filter_config.yaml](./testdata/cascading_filter_config.yaml) for detailed
112+
examples on using the processor.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package cascadingfilterprocessor
16+
17+
import (
18+
"sync"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
"go.opentelemetry.io/collector/config/configmodels"
25+
"go.opentelemetry.io/collector/consumer/pdata"
26+
"go.uber.org/zap"
27+
28+
tsconfig "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor/config"
29+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor/sampling"
30+
)
31+
32+
var testValue = 10 * time.Millisecond
33+
var cfg = tsconfig.Config{
34+
ProcessorSettings: configmodels.ProcessorSettings{},
35+
DecisionWait: 2 * time.Second,
36+
NumTraces: 100,
37+
ExpectedNewTracesPerSec: 100,
38+
SpansPerSecond: 1000,
39+
PolicyCfgs: []tsconfig.PolicyCfg{
40+
{
41+
Name: "duration",
42+
SpansPerSecond: 10,
43+
PropertiesCfg: tsconfig.PropertiesCfg{
44+
MinDuration: &testValue,
45+
},
46+
},
47+
{
48+
Name: "everything else",
49+
SpansPerSecond: -1,
50+
},
51+
},
52+
}
53+
54+
func fillSpan(span *pdata.Span, durationMicros int64) {
55+
nowTs := time.Now().UnixNano()
56+
startTime := nowTs - durationMicros*1000
57+
58+
span.Attributes().InsertInt("foo", 55)
59+
span.SetStartTime(pdata.TimestampUnixNano(startTime))
60+
span.SetEndTime(pdata.TimestampUnixNano(nowTs))
61+
}
62+
63+
func createTrace(fsp *cascadingFilterSpanProcessor, numSpans int, durationMicros int64) *sampling.TraceData {
64+
var traceBatches []pdata.Traces
65+
66+
traces := pdata.NewTraces()
67+
traces.ResourceSpans().Resize(1)
68+
rs := traces.ResourceSpans().At(0)
69+
rs.InstrumentationLibrarySpans().Resize(1)
70+
ils := rs.InstrumentationLibrarySpans().At(0)
71+
72+
ils.Spans().Resize(numSpans)
73+
74+
for i := 0; i < numSpans; i++ {
75+
span := ils.Spans().At(i)
76+
77+
fillSpan(&span, durationMicros)
78+
}
79+
80+
traceBatches = append(traceBatches, traces)
81+
82+
return &sampling.TraceData{
83+
Mutex: sync.Mutex{},
84+
Decisions: make([]sampling.Decision, len(fsp.policies)),
85+
ArrivalTime: time.Time{},
86+
DecisionTime: time.Time{},
87+
SpanCount: int64(numSpans),
88+
ReceivedBatches: traceBatches,
89+
}
90+
}
91+
92+
func createCascadingEvaluator(t *testing.T) *cascadingFilterSpanProcessor {
93+
cascading, err := newCascadingFilterSpanProcessor(zap.NewNop(), nil, cfg)
94+
assert.NoError(t, err)
95+
return cascading
96+
}
97+
98+
var (
99+
metrics = &policyMetrics{}
100+
)
101+
102+
func TestSampling(t *testing.T) {
103+
cascading := createCascadingEvaluator(t)
104+
105+
decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{0}), createTrace(cascading, 8, 1000000), metrics)
106+
require.Equal(t, sampling.Sampled, decision)
107+
108+
decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(cascading, 1000, 1000), metrics)
109+
require.Equal(t, sampling.SecondChance, decision)
110+
}
111+
112+
func TestSecondChanceEvaluation(t *testing.T) {
113+
cascading := createCascadingEvaluator(t)
114+
115+
decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{0}), createTrace(cascading, 8, 1000), metrics)
116+
require.Equal(t, sampling.SecondChance, decision)
117+
118+
decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(cascading, 8, 1000), metrics)
119+
require.Equal(t, sampling.SecondChance, decision)
120+
121+
// TODO: This could me optimized to make a decision within cascadingfilter processor, as such span would never fit anyway
122+
//decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(8000, 1000), metrics)
123+
//require.Equal(t, sampling.NotSampled, decision)
124+
}
125+
126+
func TestProbabilisticFilter(t *testing.T) {
127+
ratio := float32(0.5)
128+
cfg.ProbabilisticFilteringRatio = &ratio
129+
cascading := createCascadingEvaluator(t)
130+
131+
trace1 := createTrace(cascading, 8, 1000000)
132+
decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{0}), trace1, metrics)
133+
require.Equal(t, sampling.Sampled, decision)
134+
require.True(t, trace1.SelectedByProbabilisticFilter)
135+
136+
trace2 := createTrace(cascading, 800, 1000000)
137+
decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), trace2, metrics)
138+
require.Equal(t, sampling.SecondChance, decision)
139+
require.False(t, trace2.SelectedByProbabilisticFilter)
140+
141+
ratio = float32(0.0)
142+
cfg.ProbabilisticFilteringRatio = &ratio
143+
}
144+
145+
//func TestSecondChanceReevaluation(t *testing.T) {
146+
// cascading := createCascadingEvaluator()
147+
//
148+
// decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(100, 1000), metrics)
149+
// require.Equal(t, sampling.Sampled, decision)
150+
//
151+
// // Too much
152+
// decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(1000, 1000), metrics)
153+
// require.Equal(t, sampling.NotSampled, decision)
154+
//
155+
// // Just right
156+
// decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(900, 1000), metrics)
157+
// require.Equal(t, sampling.Sampled, decision)
158+
//}

0 commit comments

Comments
 (0)