Skip to content

Commit ac9a987

Browse files
bogdandrutudashpole
authored andcommitted
Refactor processorhelper to use consumerhelper, split by signal type (open-telemetry#3180)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent a72eed5 commit ac9a987

File tree

8 files changed

+533
-349
lines changed

8 files changed

+533
-349
lines changed

processor/processorhelper/logs.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processorhelper
16+
17+
import (
18+
"context"
19+
"errors"
20+
21+
"go.opencensus.io/trace"
22+
23+
"go.opentelemetry.io/collector/component"
24+
"go.opentelemetry.io/collector/component/componenterror"
25+
"go.opentelemetry.io/collector/component/componenthelper"
26+
"go.opentelemetry.io/collector/config"
27+
"go.opentelemetry.io/collector/consumer"
28+
"go.opentelemetry.io/collector/consumer/consumerhelper"
29+
"go.opentelemetry.io/collector/consumer/pdata"
30+
)
31+
32+
// LProcessor is a helper interface that allows avoiding implementing all functions in LogsProcessor by using NewLogsProcessor.
33+
type LProcessor interface {
34+
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
35+
// If error is returned then returned data are ignored. It MUST not call the next component.
36+
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
37+
}
38+
39+
type logProcessor struct {
40+
component.Component
41+
consumer.Logs
42+
}
43+
44+
// NewLogsProcessor creates a LogsProcessor that ensure context propagation and the right tags are set.
45+
// TODO: Add observability metrics support
46+
func NewLogsProcessor(
47+
cfg config.Processor,
48+
nextConsumer consumer.Logs,
49+
processor LProcessor,
50+
options ...Option,
51+
) (component.LogsProcessor, error) {
52+
if processor == nil {
53+
return nil, errors.New("nil processor")
54+
}
55+
56+
if nextConsumer == nil {
57+
return nil, componenterror.ErrNilNextConsumer
58+
}
59+
60+
traceAttributes := spanAttributes(cfg.ID())
61+
bs := fromOptions(options)
62+
logsConsumer, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
63+
span := trace.FromContext(ctx)
64+
span.Annotate(traceAttributes, "Start processing.")
65+
var err error
66+
ld, err = processor.ProcessLogs(ctx, ld)
67+
span.Annotate(traceAttributes, "End processing.")
68+
if err != nil {
69+
if errors.Is(err, ErrSkipProcessingData) {
70+
return nil
71+
}
72+
return err
73+
}
74+
return nextConsumer.ConsumeLogs(ctx, ld)
75+
}, bs.consumerOptions...)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
return &logProcessor{
81+
Component: componenthelper.New(bs.componentOptions...),
82+
Logs: logsConsumer,
83+
}, nil
84+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processorhelper
16+
17+
import (
18+
"context"
19+
"errors"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"go.opentelemetry.io/collector/component"
26+
"go.opentelemetry.io/collector/component/componenterror"
27+
"go.opentelemetry.io/collector/component/componenttest"
28+
"go.opentelemetry.io/collector/config"
29+
"go.opentelemetry.io/collector/consumer"
30+
"go.opentelemetry.io/collector/consumer/consumertest"
31+
"go.opentelemetry.io/collector/consumer/pdata"
32+
"go.opentelemetry.io/collector/internal/testdata"
33+
)
34+
35+
var testLogsCfg = config.NewProcessorSettings(config.NewID(typeStr))
36+
37+
func TestNewLogsProcessor(t *testing.T) {
38+
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil))
39+
require.NoError(t, err)
40+
41+
assert.True(t, lp.Capabilities().MutatesData)
42+
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
43+
assert.NoError(t, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
44+
assert.NoError(t, lp.Shutdown(context.Background()))
45+
}
46+
47+
func TestNewLogsProcessor_WithOptions(t *testing.T) {
48+
want := errors.New("my_error")
49+
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil),
50+
WithStart(func(context.Context, component.Host) error { return want }),
51+
WithShutdown(func(context.Context) error { return want }),
52+
WithCapabilities(consumer.Capabilities{MutatesData: false}))
53+
assert.NoError(t, err)
54+
55+
assert.Equal(t, want, lp.Start(context.Background(), componenttest.NewNopHost()))
56+
assert.Equal(t, want, lp.Shutdown(context.Background()))
57+
assert.False(t, lp.Capabilities().MutatesData)
58+
}
59+
60+
func TestNewLogsProcessor_NilRequiredFields(t *testing.T) {
61+
_, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), nil)
62+
assert.Error(t, err)
63+
64+
_, err = NewLogsProcessor(&testLogsCfg, nil, newTestLProcessor(nil))
65+
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
66+
}
67+
68+
func TestNewLogsProcessor_ProcessLogError(t *testing.T) {
69+
want := errors.New("my_error")
70+
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(want))
71+
require.NoError(t, err)
72+
assert.Equal(t, want, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
73+
}
74+
75+
func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) {
76+
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData))
77+
require.NoError(t, err)
78+
assert.Equal(t, nil, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
79+
}
80+
81+
type testLProcessor struct {
82+
retError error
83+
}
84+
85+
func newTestLProcessor(retError error) LProcessor {
86+
return &testLProcessor{retError: retError}
87+
}
88+
89+
func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
90+
return ld, tlp.retError
91+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processorhelper
16+
17+
import (
18+
"context"
19+
"errors"
20+
21+
"go.opencensus.io/trace"
22+
23+
"go.opentelemetry.io/collector/component"
24+
"go.opentelemetry.io/collector/component/componenterror"
25+
"go.opentelemetry.io/collector/component/componenthelper"
26+
"go.opentelemetry.io/collector/config"
27+
"go.opentelemetry.io/collector/consumer"
28+
"go.opentelemetry.io/collector/consumer/consumerhelper"
29+
"go.opentelemetry.io/collector/consumer/pdata"
30+
)
31+
32+
// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTracesProcessor.
33+
type MProcessor interface {
34+
// ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component.
35+
// If error is returned then returned data are ignored. It MUST not call the next component.
36+
ProcessMetrics(context.Context, pdata.Metrics) (pdata.Metrics, error)
37+
}
38+
39+
type metricsProcessor struct {
40+
component.Component
41+
consumer.Metrics
42+
}
43+
44+
// NewMetricsProcessor creates a MetricsProcessor that ensure context propagation and the right tags are set.
45+
// TODO: Add observability metrics support
46+
func NewMetricsProcessor(
47+
cfg config.Processor,
48+
nextConsumer consumer.Metrics,
49+
processor MProcessor,
50+
options ...Option,
51+
) (component.MetricsProcessor, error) {
52+
if processor == nil {
53+
return nil, errors.New("nil processor")
54+
}
55+
56+
if nextConsumer == nil {
57+
return nil, componenterror.ErrNilNextConsumer
58+
}
59+
60+
traceAttributes := spanAttributes(cfg.ID())
61+
bs := fromOptions(options)
62+
metricsConsumer, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error {
63+
span := trace.FromContext(ctx)
64+
span.Annotate(traceAttributes, "Start processing.")
65+
var err error
66+
md, err = processor.ProcessMetrics(ctx, md)
67+
span.Annotate(traceAttributes, "End processing.")
68+
if err != nil {
69+
if errors.Is(err, ErrSkipProcessingData) {
70+
return nil
71+
}
72+
return err
73+
}
74+
return nextConsumer.ConsumeMetrics(ctx, md)
75+
}, bs.consumerOptions...)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
return &metricsProcessor{
81+
Component: componenthelper.New(bs.componentOptions...),
82+
Metrics: metricsConsumer,
83+
}, nil
84+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processorhelper
16+
17+
import (
18+
"context"
19+
"errors"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"go.opentelemetry.io/collector/component"
26+
"go.opentelemetry.io/collector/component/componenterror"
27+
"go.opentelemetry.io/collector/component/componenttest"
28+
"go.opentelemetry.io/collector/config"
29+
"go.opentelemetry.io/collector/consumer"
30+
"go.opentelemetry.io/collector/consumer/consumertest"
31+
"go.opentelemetry.io/collector/consumer/pdata"
32+
"go.opentelemetry.io/collector/internal/testdata"
33+
)
34+
35+
var testMetricsCfg = config.NewProcessorSettings(config.NewID(typeStr))
36+
37+
func TestNewMetricsProcessor(t *testing.T) {
38+
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil))
39+
require.NoError(t, err)
40+
41+
assert.True(t, mp.Capabilities().MutatesData)
42+
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
43+
assert.NoError(t, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
44+
assert.NoError(t, mp.Shutdown(context.Background()))
45+
}
46+
47+
func TestNewMetricsProcessor_WithOptions(t *testing.T) {
48+
want := errors.New("my_error")
49+
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil),
50+
WithStart(func(context.Context, component.Host) error { return want }),
51+
WithShutdown(func(context.Context) error { return want }),
52+
WithCapabilities(consumer.Capabilities{MutatesData: false}))
53+
assert.NoError(t, err)
54+
55+
assert.Equal(t, want, mp.Start(context.Background(), componenttest.NewNopHost()))
56+
assert.Equal(t, want, mp.Shutdown(context.Background()))
57+
assert.False(t, mp.Capabilities().MutatesData)
58+
}
59+
60+
func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) {
61+
_, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), nil)
62+
assert.Error(t, err)
63+
64+
_, err = NewMetricsProcessor(&testMetricsCfg, nil, newTestMProcessor(nil))
65+
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
66+
}
67+
68+
func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) {
69+
want := errors.New("my_error")
70+
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want))
71+
require.NoError(t, err)
72+
assert.Equal(t, want, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
73+
}
74+
75+
func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) {
76+
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData))
77+
require.NoError(t, err)
78+
assert.Equal(t, nil, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
79+
}
80+
81+
type testMProcessor struct {
82+
retError error
83+
}
84+
85+
func newTestMProcessor(retError error) MProcessor {
86+
return &testMProcessor{retError: retError}
87+
}
88+
89+
func (tmp *testMProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
90+
return md, tmp.retError
91+
}

0 commit comments

Comments
 (0)