From 2c0331807d94d0c308c7670d925b15ca48d61882 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 13 Oct 2025 09:57:53 -0700 Subject: [PATCH] Avoid unnecessary mutex in collector logs, replace by atomic pointer Signed-off-by: Bogdan Drutu --- .chloggen/avoid-mutex.yaml | 25 +++++++++++++++++++ otelcol/collector.go | 2 +- otelcol/collector_core.go | 45 ++++++++++++++++------------------ otelcol/collector_core_test.go | 32 ++++++++++++------------ 4 files changed, 62 insertions(+), 42 deletions(-) create mode 100644 .chloggen/avoid-mutex.yaml diff --git a/.chloggen/avoid-mutex.yaml b/.chloggen/avoid-mutex.yaml new file mode 100644 index 00000000000..065d6d968fa --- /dev/null +++ b/.chloggen/avoid-mutex.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pkg/otelcol + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Avoid unnecessary mutex in collector logs, replace by atomic pointer + +# One or more tracking issues or pull requests related to the change +issues: [14008] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/otelcol/collector.go b/otelcol/collector.go index c6783472041..eff64d6275b 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -122,7 +122,7 @@ type Collector struct { // NewCollector creates and returns a new instance of Collector. func NewCollector(set CollectorSettings) (*Collector, error) { bc := newBufferedCore(zapcore.DebugLevel) - cc := &collectorCore{core: bc} + cc := newCollectorCore(bc) options := append([]zap.Option{zap.WithCaller(true)}, set.LoggingOptions...) logger := zap.New(cc, options...) set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: logger} diff --git a/otelcol/collector_core.go b/otelcol/collector_core.go index b0a379fedc9..399b58eba5c 100644 --- a/otelcol/collector_core.go +++ b/otelcol/collector_core.go @@ -4,7 +4,7 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( - "sync" + "sync/atomic" "go.uber.org/zap/zapcore" ) @@ -12,47 +12,44 @@ import ( var _ zapcore.Core = (*collectorCore)(nil) type collectorCore struct { - core zapcore.Core - rw sync.RWMutex + delegate atomic.Pointer[zapcore.Core] +} + +func newCollectorCore(core zapcore.Core) *collectorCore { + cc := &collectorCore{} + cc.SetCore(core) + return cc } func (c *collectorCore) Enabled(l zapcore.Level) bool { - c.rw.RLock() - defer c.rw.RUnlock() - return c.core.Enabled(l) + return c.loadDelegate().Enabled(l) } func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { - c.rw.RLock() - defer c.rw.RUnlock() - return &collectorCore{ - core: c.core.With(f), - } + return newCollectorCore(c.loadDelegate().With(f)) } func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { - c.rw.RLock() - defer c.rw.RUnlock() - if c.core.Enabled(e.Level) { - return ce.AddCore(e, c) + core := c.loadDelegate() + if core.Enabled(e.Level) { + return ce.AddCore(e, core) } return ce } func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { - c.rw.RLock() - defer c.rw.RUnlock() - return c.core.Write(e, f) + return c.loadDelegate().Write(e, f) } func (c *collectorCore) Sync() error { - c.rw.RLock() - defer c.rw.RUnlock() - return c.core.Sync() + return c.loadDelegate().Sync() } func (c *collectorCore) SetCore(core zapcore.Core) { - c.rw.Lock() - defer c.rw.Unlock() - c.core = core + c.delegate.Store(&core) +} + +// loadDelegate returns the delegate. +func (c *collectorCore) loadDelegate() zapcore.Core { + return *c.delegate.Load() } diff --git a/otelcol/collector_core_test.go b/otelcol/collector_core_test.go index 304eb25d2ad..f96be74f593 100644 --- a/otelcol/collector_core_test.go +++ b/otelcol/collector_core_test.go @@ -12,7 +12,7 @@ import ( ) func Test_collectorCore_Enabled(t *testing.T) { - cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel)) assert.True(t, cc.Enabled(zapcore.ErrorLevel)) assert.False(t, cc.Enabled(zapcore.DebugLevel)) } @@ -20,29 +20,27 @@ func Test_collectorCore_Enabled(t *testing.T) { func Test_collectorCore_Check(t *testing.T) { t.Run("check passed", func(t *testing.T) { bc := newBufferedCore(zapcore.InfoLevel) - cc := collectorCore{core: bc} + cc := newCollectorCore(bc) e := zapcore.Entry{ Level: zapcore.InfoLevel, } expected := &zapcore.CheckedEntry{} - expected = expected.AddCore(e, &cc) - ce := cc.Check(e, nil) - assert.Equal(t, expected, ce) + expected = expected.AddCore(e, bc) + assert.Equal(t, expected, cc.Check(e, nil)) }) t.Run("check did not pass", func(t *testing.T) { - cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel)) e := zapcore.Entry{ Level: zapcore.DebugLevel, } - ce := cc.Check(e, nil) - assert.Nil(t, ce) + assert.Nil(t, cc.Check(e, nil)) }) } func Test_collectorCore_With(t *testing.T) { - cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} - cc.core.(*bufferedCore).context = []zapcore.Field{ + cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel)) + cc.loadDelegate().(*bufferedCore).context = []zapcore.Field{ {Key: "original", String: "context"}, } inputs := []zapcore.Field{ @@ -53,11 +51,11 @@ func Test_collectorCore_With(t *testing.T) { {Key: "test", String: "passed"}, } newCC := cc.With(inputs) - assert.Equal(t, expected, newCC.(*collectorCore).core.(*bufferedCore).context) + assert.Equal(t, expected, newCC.(*collectorCore).loadDelegate().(*bufferedCore).context) } func Test_collectorCore_Write(t *testing.T) { - cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel)) e := zapcore.Entry{ Level: zapcore.DebugLevel, Message: "test", @@ -72,18 +70,18 @@ func Test_collectorCore_Write(t *testing.T) { e, fields, } - require.Len(t, cc.core.(*bufferedCore).logs, 1) - require.Equal(t, expected, cc.core.(*bufferedCore).logs[0]) + require.Len(t, cc.loadDelegate().(*bufferedCore).logs, 1) + require.Equal(t, expected, cc.loadDelegate().(*bufferedCore).logs[0]) } func Test_collectorCore_Sync(t *testing.T) { - cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel)) assert.NoError(t, cc.Sync()) } func Test_collectorCore_SetCore(t *testing.T) { - cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel)) newCore := newBufferedCore(zapcore.DebugLevel) cc.SetCore(newCore) - assert.Equal(t, zapcore.DebugLevel, cc.core.(*bufferedCore).LevelEnabler) + assert.Equal(t, zapcore.DebugLevel, cc.loadDelegate().(*bufferedCore).LevelEnabler) }