Skip to content
Merged
27 changes: 27 additions & 0 deletions .chloggen/shared-sampler-core-prototype.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Only allocate one set of internal log sampling counters

# One or more tracking issues or pull requests related to the change
issues: [13014]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The case where logs are only exported to stdout was fixed in v0.126.0;
this new fix also covers the case where logs are exported through OTLP.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
125 changes: 111 additions & 14 deletions internal/telemetry/componentattribute/logger_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componentattribute_test
package componentattribute

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -14,7 +15,6 @@ import (
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
"go.opentelemetry.io/collector/pipeline"
)

Expand All @@ -31,7 +31,7 @@ type test struct {
func createZapCore() (zapcore.Core, *observer.ObservedLogs) {
core, observed := observer.New(zap.DebugLevel)
core = core.With([]zapcore.Field{zap.String("preexisting", "value")})
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
core = NewConsoleCoreWithAttributes(core, attribute.NewSet())
return core, observed
}

Expand All @@ -40,13 +40,13 @@ func checkZapLogs(t *testing.T, observed *observer.ObservedLogs) {
require.Len(t, observedLogs, 3)

parentContext := map[string]string{
"preexisting": "value",
componentattribute.SignalKey: pipeline.SignalLogs.String(),
componentattribute.ComponentIDKey: "filelog",
"preexisting": "value",
SignalKey: pipeline.SignalLogs.String(),
ComponentIDKey: "filelog",
}
childContext := map[string]string{
"preexisting": "value",
componentattribute.ComponentIDKey: "filelog",
"preexisting": "value",
ComponentIDKey: "filelog",
}

require.Equal(t, "test parent before child", observedLogs[0].Message)
Expand All @@ -70,8 +70,8 @@ func checkZapLogs(t *testing.T, observed *observer.ObservedLogs) {

func TestCore(t *testing.T) {
attrs := attribute.NewSet(
attribute.String(componentattribute.SignalKey, pipeline.SignalLogs.String()),
attribute.String(componentattribute.ComponentIDKey, "filelog"),
attribute.String(SignalKey, pipeline.SignalLogs.String()),
attribute.String(ComponentIDKey, "filelog"),
)

tests := []test{
Expand All @@ -90,7 +90,7 @@ func TestCore(t *testing.T) {
createLogger: func() (*zap.Logger, logRecorder) {
core, observed := createZapCore()
recorder := logtest.NewRecorder()
core = componentattribute.NewOTelTeeCoreWithAttributes(core, recorder, "testinstr", zap.DebugLevel, attribute.NewSet(), func(c zapcore.Core) zapcore.Core { return c })
core = NewOTelTeeCoreWithAttributes(core, recorder, "testinstr", zap.DebugLevel, attribute.NewSet())
return zap.New(core), logRecorder{zapLogs: observed, otelLogs: recorder}
},
check: func(t *testing.T, rec logRecorder) {
Expand All @@ -107,7 +107,7 @@ func TestCore(t *testing.T) {
}

childAttrs := attribute.NewSet(
attribute.String(componentattribute.ComponentIDKey, "filelog"),
attribute.String(ComponentIDKey, "filelog"),
)

assert.Equal(t, map[string]attribute.Set{
Expand All @@ -122,13 +122,110 @@ func TestCore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
logger, state := test.createLogger()

parent := componentattribute.ZapLoggerWithAttributes(logger, attrs)
parent := ZapLoggerWithAttributes(logger, attrs)
parent.Info("test parent before child")
child := componentattribute.ZapLoggerWithAttributes(parent, componentattribute.RemoveAttributes(attrs, componentattribute.SignalKey))
child := ZapLoggerWithAttributes(parent, RemoveAttributes(attrs, SignalKey))
child.Info("test child")
parent.Info("test parent after child")

test.check(t, state)
})
}
}

func TestSamplerCore(t *testing.T) {
tick := time.Second
// Drop identical messages after the first two
first := 2
thereafter := 0

type testCase struct {
name string
withAttributes func(inner zapcore.Core, sampler zapcore.Core, attrs attribute.Set) zapcore.Core
expectedAttrs []string
}
testCases := []testCase{
{
name: "new-sampler",
withAttributes: func(inner zapcore.Core, _ zapcore.Core, attrs attribute.Set) zapcore.Core {
return zapcore.NewSamplerWithOptions(tryWithAttributeSet(inner, attrs), tick, first, thereafter)
},
expectedAttrs: []string{"foo", "bar", "foo", "bar"},
},
{
name: "cloned-sampler",
withAttributes: func(_ zapcore.Core, sampler zapcore.Core, attrs attribute.Set) zapcore.Core {
return tryWithAttributeSet(sampler, attrs)
},
expectedAttrs: []string{"foo", "bar"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
inner, obs := observer.New(zapcore.DebugLevel)
inner = NewConsoleCoreWithAttributes(inner, attribute.NewSet(attribute.String("test", "foo")))

sampler1 := NewSamplerCoreWithAttributes(inner, tick, first, thereafter)
loggerFoo := zap.New(sampler1)

sampler2 := tc.withAttributes(inner, sampler1, attribute.NewSet(attribute.String("test", "bar")))
loggerBar := zap.New(sampler2)

// If the two samplers share their counters, only the first two messages will go through.
// If they are independent, the first three and the fifth will go through.
loggerFoo.Info("test")
loggerBar.Info("test")
loggerFoo.Info("test")
loggerFoo.Info("test")
loggerBar.Info("test")
loggerBar.Info("test")

var attrs []string
for _, log := range obs.All() {
var fooValue string
for _, field := range log.Context {
if field.Key == "test" {
fooValue = field.String
}
}
attrs = append(attrs, fooValue)
}
assert.Equal(t, tc.expectedAttrs, attrs)
})
}
}

// Worst case scenario for the reflect spell in samplerCoreWithAttributes
type crazySampler struct {
Core int
}

var _ zapcore.Core = (*crazySampler)(nil)

func (s *crazySampler) Enabled(zapcore.Level) bool {
return true
}

func (s *crazySampler) With([]zapcore.Field) zapcore.Core {
return s
}

func (s *crazySampler) Check(_ zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
return ce
}

func (s *crazySampler) Write(zapcore.Entry, []zapcore.Field) error {
return nil
}

func (s *crazySampler) Sync() error {
return nil
}

func TestSamplerCorePanic(t *testing.T) {
sampler := NewSamplerCoreWithAttributes(zapcore.NewNopCore(), 1, 1, 1)
sampler.(*samplerCoreWithAttributes).Core = &crazySampler{}
assert.PanicsWithValue(t, "Unexpected Zap sampler type; see github.com/open-telemetry/opentelemetry-collector/issues/13014", func() {
tryWithAttributeSet(sampler, attribute.NewSet())
})
}
63 changes: 58 additions & 5 deletions internal/telemetry/componentattribute/logger_zap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
package componentattribute // import "go.opentelemetry.io/collector/internal/telemetry/componentattribute"

import (
"reflect"
"time"

"go.opentelemetry.io/contrib/bridges/otelzap"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/log"
Expand Down Expand Up @@ -76,7 +79,6 @@
lp log.LoggerProvider
scopeName string
level zapcore.Level
wrapper func(zapcore.Core) zapcore.Core
}

var _ coreWithAttributes = (*otelTeeCoreWithAttributes)(nil)
Expand All @@ -85,7 +87,7 @@
// logs, component attributes are injected as instrumentation scope attributes.
//
// This is used when service::telemetry::logs::processors is configured.
func NewOTelTeeCoreWithAttributes(consoleCore zapcore.Core, lp log.LoggerProvider, scopeName string, level zapcore.Level, attrs attribute.Set, wrapper func(zapcore.Core) zapcore.Core) zapcore.Core {
func NewOTelTeeCoreWithAttributes(consoleCore zapcore.Core, lp log.LoggerProvider, scopeName string, level zapcore.Level, attrs attribute.Set) zapcore.Core {
otelCore, err := zapcore.NewIncreaseLevelCore(otelzap.NewCore(
scopeName,
otelzap.WithLoggerProvider(lp),
Expand All @@ -96,17 +98,68 @@
}

return &otelTeeCoreWithAttributes{
Core: zapcore.NewTee(consoleCore, wrapper(otelCore)),
Core: zapcore.NewTee(consoleCore, otelCore),
consoleCore: consoleCore,
lp: lp,
scopeName: scopeName,
level: level,
wrapper: wrapper,
}
}

func (ocwa *otelTeeCoreWithAttributes) withAttributeSet(attrs attribute.Set) zapcore.Core {
return NewOTelTeeCoreWithAttributes(tryWithAttributeSet(ocwa.consoleCore, attrs), ocwa.lp, ocwa.scopeName, ocwa.level, attrs, ocwa.wrapper)
return NewOTelTeeCoreWithAttributes(tryWithAttributeSet(ocwa.consoleCore, attrs), ocwa.lp, ocwa.scopeName, ocwa.level, attrs)
}

type samplerCoreWithAttributes struct {
zapcore.Core
from zapcore.Core
}

var _ coreWithAttributes = (*samplerCoreWithAttributes)(nil)

func NewSamplerCoreWithAttributes(inner zapcore.Core, tick time.Duration, first int, thereafter int) zapcore.Core {
return &samplerCoreWithAttributes{
Core: zapcore.NewSamplerWithOptions(inner, tick, first, thereafter),
from: inner,
}
}

func checkSamplerType(ty reflect.Type) bool {
if ty.Kind() != reflect.Pointer {
return false
}

Check warning on line 130 in internal/telemetry/componentattribute/logger_zap.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/componentattribute/logger_zap.go#L129-L130

Added lines #L129 - L130 were not covered by tests
ty = ty.Elem()
if ty.Kind() != reflect.Struct {
return false
}

Check warning on line 134 in internal/telemetry/componentattribute/logger_zap.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/componentattribute/logger_zap.go#L133-L134

Added lines #L133 - L134 were not covered by tests
innerField, ok := ty.FieldByName("Core")
if !ok {
return false
}

Check warning on line 138 in internal/telemetry/componentattribute/logger_zap.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/componentattribute/logger_zap.go#L137-L138

Added lines #L137 - L138 were not covered by tests
return reflect.TypeFor[zapcore.Core]().AssignableTo(innerField.Type)
}

func (ssc *samplerCoreWithAttributes) withAttributeSet(attrs attribute.Set) zapcore.Core {
newInner := tryWithAttributeSet(ssc.from, attrs)

// https://github.com/uber-go/zap/blob/fcf8ee58669e358bbd6460bef5c2ee7a53c0803a/zapcore/sampler.go#L168
// We need to create a new Zap sampler core with the same settings but with a new inner core,
// while reusing the very RAM-intensive `counters` data structure.
// The `With` method does something similar, but it's not quite what we want, so we use `reflect`.
// This hack can be removed once Zap supports this use case.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jade-guiton-dd would be good if we can file an issue for this on their side and link it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed an issue a while back, added it to the comment.

val1 := reflect.ValueOf(ssc.Core)
if !checkSamplerType(val1.Type()) { // To avoid a more esoteric panic message below
panic("Unexpected Zap sampler type; see github.com/open-telemetry/opentelemetry-collector/issues/13014")
}
val2 := reflect.New(val1.Type().Elem()) // core2 := new(sampler)
val2.Elem().Set(val1.Elem()) // *core2 = *core1
val2.Elem().FieldByName("Core").Set(reflect.ValueOf(newInner)) // core2.Core = newInner
newSampler := val2.Interface().(zapcore.Core)

return samplerCoreWithAttributes{
Core: newSampler,
from: newInner,
}
}

// ZapLoggerWithAttributes creates a Zap Logger with a new set of injected component attributes.
Expand Down
20 changes: 5 additions & 15 deletions service/telemetry/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,23 @@ func newLogger(set Settings, cfg Config) (*zap.Logger, log.LoggerProvider, error

var lp log.LoggerProvider
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
core = newSampledCore(core, cfg.Logs.Sampling)
}

core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())

if len(cfg.Logs.Processors) > 0 && set.SDK != nil {
lp = set.SDK.LoggerProvider()
wrapper := func(c zapcore.Core) zapcore.Core {
return c
}
if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
wrapper = func(c zapcore.Core) zapcore.Core {
return newSampledCore(c, cfg.Logs.Sampling)
}
}

core = componentattribute.NewOTelTeeCoreWithAttributes(
core,
lp,
"go.opentelemetry.io/collector/service/telemetry",
cfg.Logs.Level,
attribute.NewSet(),
wrapper,
)
}

if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
core = newSampledCore(core, cfg.Logs.Sampling)
}

return core
}))

Expand All @@ -93,7 +83,7 @@ func newLogger(set Settings, cfg Config) (*zap.Logger, log.LoggerProvider, error
func newSampledCore(core zapcore.Core, sc *LogsSamplingConfig) zapcore.Core {
// Create a logger that samples every Nth message after the first M messages every S seconds
// where N = sc.Thereafter, M = sc.Initial, S = sc.Tick.
return zapcore.NewSamplerWithOptions(
return componentattribute.NewSamplerCoreWithAttributes(
core,
sc.Tick,
sc.Initial,
Expand Down
2 changes: 1 addition & 1 deletion service/telemetry/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestNewLogger(t *testing.T) {
InitialFields: map[string]any(nil),
},
},
wantCoreType: "*componentattribute.consoleCoreWithAttributes",
wantCoreType: "*componentattribute.samplerCoreWithAttributes",
},
}
for _, tt := range tests {
Expand Down
Loading