Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processorhelper

import (
"context"
"errors"

"go.opencensus.io/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
)

// LProcessor is a helper interface that allows avoiding implementing all functions in LogsProcessor by using NewLogsProcessor.
type LProcessor interface {
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
}

type logProcessor struct {
component.Component
consumer.Logs
}

// NewLogsProcessor creates a LogsProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewLogsProcessor(
cfg config.Processor,
nextConsumer consumer.Logs,
processor LProcessor,
options ...Option,
) (component.LogsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}

if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}

traceAttributes := spanAttributes(cfg.ID())
bs := fromOptions(options)
logsConsumer, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
span := trace.FromContext(ctx)
span.Annotate(traceAttributes, "Start processing.")
var err error
ld, err = processor.ProcessLogs(ctx, ld)
span.Annotate(traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}

return &logProcessor{
Component: componenthelper.New(bs.componentOptions...),
Logs: logsConsumer,
}, nil
}
91 changes: 91 additions & 0 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processorhelper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
)

var testLogsCfg = config.NewProcessorSettings(config.NewID(typeStr))

func TestNewLogsProcessor(t *testing.T) {
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil))
require.NoError(t, err)

assert.True(t, lp.Capabilities().MutatesData)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
assert.NoError(t, lp.Shutdown(context.Background()))
}

func TestNewLogsProcessor_WithOptions(t *testing.T) {
want := errors.New("my_error")
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(consumer.Capabilities{MutatesData: false}))
assert.NoError(t, err)

assert.Equal(t, want, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, lp.Shutdown(context.Background()))
assert.False(t, lp.Capabilities().MutatesData)
}

func TestNewLogsProcessor_NilRequiredFields(t *testing.T) {
_, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), nil)
assert.Error(t, err)

_, err = NewLogsProcessor(&testLogsCfg, nil, newTestLProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}

func TestNewLogsProcessor_ProcessLogError(t *testing.T) {
want := errors.New("my_error")
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
}

func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) {
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData))
require.NoError(t, err)
assert.Equal(t, nil, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
}

type testLProcessor struct {
retError error
}

func newTestLProcessor(retError error) LProcessor {
return &testLProcessor{retError: retError}
}

func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, tlp.retError
}
84 changes: 84 additions & 0 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processorhelper

import (
"context"
"errors"

"go.opencensus.io/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
)

// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTracesProcessor.
type MProcessor interface {
// ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessMetrics(context.Context, pdata.Metrics) (pdata.Metrics, error)
}

type metricsProcessor struct {
component.Component
consumer.Metrics
}

// NewMetricsProcessor creates a MetricsProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewMetricsProcessor(
cfg config.Processor,
nextConsumer consumer.Metrics,
processor MProcessor,
options ...Option,
) (component.MetricsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}

if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}

traceAttributes := spanAttributes(cfg.ID())
bs := fromOptions(options)
metricsConsumer, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error {
span := trace.FromContext(ctx)
span.Annotate(traceAttributes, "Start processing.")
var err error
md, err = processor.ProcessMetrics(ctx, md)
span.Annotate(traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}

return &metricsProcessor{
Component: componenthelper.New(bs.componentOptions...),
Metrics: metricsConsumer,
}, nil
}
91 changes: 91 additions & 0 deletions processor/processorhelper/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processorhelper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
)

var testMetricsCfg = config.NewProcessorSettings(config.NewID(typeStr))

func TestNewMetricsProcessor(t *testing.T) {
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil))
require.NoError(t, err)

assert.True(t, mp.Capabilities().MutatesData)
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestNewMetricsProcessor_WithOptions(t *testing.T) {
want := errors.New("my_error")
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(consumer.Capabilities{MutatesData: false}))
assert.NoError(t, err)

assert.Equal(t, want, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, mp.Shutdown(context.Background()))
assert.False(t, mp.Capabilities().MutatesData)
}

func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) {
_, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), nil)
assert.Error(t, err)

_, err = NewMetricsProcessor(&testMetricsCfg, nil, newTestMProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}

func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) {
want := errors.New("my_error")
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
}

func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) {
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData))
require.NoError(t, err)
assert.Equal(t, nil, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
}

type testMProcessor struct {
retError error
}

func newTestMProcessor(retError error) MProcessor {
return &testMProcessor{retError: retError}
}

func (tmp *testMProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
return md, tmp.retError
}
Loading