Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/consumererror-downstream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: consumererror

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new "Downstream" error marker

# One or more tracking issues or pull requests related to the change
issues: [13234]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This new error wrapper type indicates that the error returned by a component's
`Consume` method is not an internal failure of the component, but instead
was passed through from another component further downstream.
This is used internally by the new pipeline instrumentation feature to
determine the `outcome` of a component call. This wrapper is not intended to
be used by components directly.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
30 changes: 30 additions & 0 deletions .chloggen/obsconsumer-downstream-refused.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: New pipeline instrumentation now differentiates internal failures from downstream errors

# One or more tracking issues or pull requests related to the change
issues: [13234]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
With the telemetry.newPipelineTelemetry feature gate enabled, the "received" and "produced"
metrics related to a component now distinguish between two types of errors:
- "outcome = failure" indicates that the component returned an internal error;
- "outcome = refused" indicates that the component successfully emitted data, but returned an
error coming from a downstream component processing that data.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
38 changes: 38 additions & 0 deletions consumer/consumererror/downstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"

import "errors"

type downstreamError struct {
inner error
}

var _ error = downstreamError{}

func (de downstreamError) Error() string {
return de.inner.Error()
}

func (de downstreamError) Unwrap() error {
return de.inner
}

// NewDownstream wraps an error to indicate that it is a downstream error, i.e. an
// error that does not come from the current component, but from one further downstream.
// This is used by pipeline instrumentation to determine whether an operation's outcome
// was an internal failure, or if it successfully produced data that was later refused.
// This wrapper is not intended to be used manually inside components.
func NewDownstream(err error) error {
return downstreamError{
inner: err,
}
}

// IsDownstream checks if an error was wrapped with the NewDownstream function,
// or if it contains one such error in its Unwrap() tree.
func IsDownstream(err error) bool {
var de downstreamError
return errors.As(err, &de)
}
44 changes: 44 additions & 0 deletions consumer/consumererror/downstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumererror

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

//nolint:testifylint // Testing properties of errors, no reason to use require
func TestDownstream(t *testing.T) {
err1 := errors.New("test error")
assert.False(t, IsDownstream(err1))
err2 := errors.New("test error 2")
assert.False(t, IsDownstream(err2))

errDownstream1 := NewDownstream(err1)
assert.True(t, IsDownstream(errDownstream1))
assert.Equal(t, err1.Error(), errDownstream1.Error())
assert.ErrorIs(t, errDownstream1, err1)
assert.NotErrorIs(t, errDownstream1, err2)

// we can access downstream wrappers through other wrappers
errWrapDownstream := NewRetryableError(errDownstream1)
assert.True(t, IsDownstream(errWrapDownstream))
errorStruct := new(Error)
assert.ErrorAs(t, errWrapDownstream, &errorStruct)

// we can access other wrappers through downstream wrappers
errDownstreamWrap := NewDownstream(NewRetryableError(err1))
assert.True(t, IsDownstream(errDownstreamWrap))
assert.ErrorAs(t, errDownstreamWrap, &errorStruct)

// downstream + downstream = downstream
errJoin2 := errors.Join(errDownstream1, NewDownstream(err2))
assert.True(t, IsDownstream(errJoin2))

// downstream + not downstream = downstream
errJoin1 := errors.Join(errDownstream1, err2)
assert.True(t, IsDownstream(errJoin1))
}
2 changes: 1 addition & 1 deletion service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
go.opentelemetry.io/collector/connector/connectortest v0.130.1
go.opentelemetry.io/collector/connector/xconnector v0.130.1
go.opentelemetry.io/collector/consumer v1.36.1
go.opentelemetry.io/collector/consumer/consumererror v0.130.1
go.opentelemetry.io/collector/consumer/consumertest v0.130.1
go.opentelemetry.io/collector/consumer/xconsumer v0.130.1
go.opentelemetry.io/collector/exporter v0.130.1
Expand Down Expand Up @@ -111,7 +112,6 @@ require (
go.opentelemetry.io/collector/config/configopaque v1.36.1 // indirect
go.opentelemetry.io/collector/config/configoptional v0.130.1 // indirect
go.opentelemetry.io/collector/config/configtls v1.36.1 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.130.1 // indirect
go.opentelemetry.io/collector/extension/extensionauth v1.36.1 // indirect
go.opentelemetry.io/collector/extension/extensionmiddleware v0.130.1 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
Expand Down
198 changes: 198 additions & 0 deletions service/internal/obsconsumer/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package obsconsumer_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/service/internal/obsconsumer"
)

type failingConsumer struct {
err error
}

var (
_ consumer.Metrics = (*failingConsumer)(nil)
_ consumer.Logs = (*failingConsumer)(nil)
_ consumer.Traces = (*failingConsumer)(nil)
_ xconsumer.Profiles = (*failingConsumer)(nil)
)

func (*failingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (fc *failingConsumer) ConsumeMetrics(_ context.Context, _ pmetric.Metrics) error {
return fc.err
}

func (fc *failingConsumer) ConsumeLogs(_ context.Context, _ plog.Logs) error {
return fc.err
}

func (fc *failingConsumer) ConsumeTraces(_ context.Context, _ ptrace.Traces) error {
return fc.err
}

func (fc *failingConsumer) ConsumeProfiles(_ context.Context, _ pprofile.Profiles) error {
return fc.err
}

func TestConsumeRefused(t *testing.T) {
setGateForTest(t, true)

ctx := context.Background()
originalErr := errors.New("test error")
expectedErr := consumererror.NewDownstream(originalErr)
mockConsumer := &failingConsumer{err: originalErr}

// Use delta temporality so sums don't accumulate across tests
reader := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(func(_ sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}))
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
meter := mp.Meter("test")

receivedItemsCounter, err := meter.Int64Counter("received.items")
require.NoError(t, err)
receivedSizeCounter, err := meter.Int64Counter("received.size")
require.NoError(t, err)

producedItemsCounter, err := meter.Int64Counter("produced.items")
require.NoError(t, err)
producedSizeConter, err := meter.Int64Counter("produced.size")
require.NoError(t, err)

type testCase struct {
name string
testConsumer func() error
}

testCases := []testCase{
{
name: "metrics",
testConsumer: func() error {
consumer1 := obsconsumer.NewMetrics(mockConsumer, receivedItemsCounter, receivedSizeCounter)
consumer2 := obsconsumer.NewMetrics(consumer1, producedItemsCounter, producedSizeConter)
md := pmetric.NewMetrics()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty()
return consumer2.ConsumeMetrics(ctx, md)
},
},
{
name: "logs",
testConsumer: func() error {
consumer1 := obsconsumer.NewLogs(mockConsumer, receivedItemsCounter, receivedSizeCounter)
consumer2 := obsconsumer.NewLogs(consumer1, producedItemsCounter, producedSizeConter)
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
return consumer2.ConsumeLogs(ctx, ld)
},
},
{
name: "traces",
testConsumer: func() error {
consumer1 := obsconsumer.NewTraces(mockConsumer, receivedItemsCounter, receivedSizeCounter)
consumer2 := obsconsumer.NewTraces(consumer1, producedItemsCounter, producedSizeConter)
td := ptrace.NewTraces()
td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
return consumer2.ConsumeTraces(ctx, td)
},
},
{
name: "profiles",
testConsumer: func() error {
consumer1 := obsconsumer.NewProfiles(mockConsumer, receivedItemsCounter, receivedSizeCounter)
consumer2 := obsconsumer.NewProfiles(consumer1, producedItemsCounter, producedSizeConter)
pd := pprofile.NewProfiles()
pd.ResourceProfiles().AppendEmpty().ScopeProfiles().AppendEmpty().Profiles().AppendEmpty().Sample().AppendEmpty()
return consumer2.ConsumeProfiles(ctx, pd)
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.testConsumer()
assert.Equal(t, expectedErr, err)

var rm metricdata.ResourceMetrics
err = reader.Collect(ctx, &rm)
require.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 4)

var receivedItemMetric, receivedSizeMetric metricdata.Metrics
var producedItemMetric, producedSizeMetric metricdata.Metrics
for _, m := range rm.ScopeMetrics[0].Metrics {
switch m.Name {
case "received.items":
receivedItemMetric = m
case "received.size":
receivedSizeMetric = m
case "produced.items":
producedItemMetric = m
case "produced.size":
producedSizeMetric = m
}
}
require.NotNil(t, receivedItemMetric)
require.NotNil(t, receivedSizeMetric)
require.NotNil(t, producedItemMetric)
require.NotNil(t, producedSizeMetric)

data := receivedItemMetric.Data.(metricdata.Sum[int64])
require.Len(t, data.DataPoints, 1)
require.Equal(t, int64(1), data.DataPoints[0].Value)
attrs := data.DataPoints[0].Attributes
require.Equal(t, 1, attrs.Len())
val, ok := attrs.Value(attribute.Key(obsconsumer.ComponentOutcome))
require.True(t, ok)
require.Equal(t, "failure", val.Emit())

data = receivedSizeMetric.Data.(metricdata.Sum[int64])
require.Len(t, data.DataPoints, 1)
require.Positive(t, data.DataPoints[0].Value)
attrs = data.DataPoints[0].Attributes
require.Equal(t, 1, attrs.Len())
val, ok = attrs.Value(attribute.Key(obsconsumer.ComponentOutcome))
require.True(t, ok)
require.Equal(t, "failure", val.Emit())

data = producedItemMetric.Data.(metricdata.Sum[int64])
require.Len(t, data.DataPoints, 1)
require.Equal(t, int64(1), data.DataPoints[0].Value)
attrs = data.DataPoints[0].Attributes
require.Equal(t, 1, attrs.Len())
val, ok = attrs.Value(attribute.Key(obsconsumer.ComponentOutcome))
require.True(t, ok)
require.Equal(t, "refused", val.Emit())

data = producedSizeMetric.Data.(metricdata.Sum[int64])
require.Len(t, data.DataPoints, 1)
require.Positive(t, data.DataPoints[0].Value)
attrs = data.DataPoints[0].Attributes
require.Equal(t, 1, attrs.Len())
val, ok = attrs.Value(attribute.Key(obsconsumer.ComponentOutcome))
require.True(t, ok)
require.Equal(t, "refused", val.Emit())
})
}
}
8 changes: 7 additions & 1 deletion service/internal/obsconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/pdata/plog"
)
Expand Down Expand Up @@ -62,7 +63,12 @@ func (c obsLogs) ConsumeLogs(ctx context.Context, ld plog.Logs) error {

err := c.consumer.ConsumeLogs(ctx, ld)
if err != nil {
attrs = &c.withFailureAttrs
if consumererror.IsDownstream(err) {
attrs = &c.withRefusedAttrs
} else {
attrs = &c.withFailureAttrs
err = consumererror.NewDownstream(err)
}
}
return err
}
Expand Down
Loading
Loading