Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3521db9
Add FilterProcessor
MrAlias Aug 7, 2024
8727522
Update the LoggerProvider and Logger
MrAlias Aug 7, 2024
38abe56
Update the ContextFilterProcessor example
MrAlias Aug 7, 2024
f0ba53f
Add changelog entry
MrAlias Aug 7, 2024
4031e03
Add BenchmarkLoggerEnabled
MrAlias Aug 7, 2024
53876b0
Update provider and logger tests
MrAlias Aug 7, 2024
2a3fa43
Mention FilterProcessor from the Processor docs
MrAlias Aug 8, 2024
b3f073f
Remove Enabled method from BatchProcessor
MrAlias Aug 8, 2024
6c9ba9b
Remove Enabled method from SimpleProcessor
MrAlias Aug 8, 2024
6fc46a0
Add sdk/log/internal/x pacakge
MrAlias Aug 8, 2024
9555d9f
Doc experimental features in sdk/log pkg
MrAlias Aug 8, 2024
2271040
Replace FilterProcessor with x.FilterProcessor
MrAlias Aug 8, 2024
d4e9e0f
Fix import comment for sdk/log/internal/x
MrAlias Aug 8, 2024
a8ee3b4
Update changelog
MrAlias Aug 8, 2024
51ad24f
Update changelog with Enabled method removals
MrAlias Aug 8, 2024
0535da8
Add minsev example to x README
MrAlias Aug 8, 2024
ba537c4
Apply suggestions from code review
MrAlias Aug 9, 2024
ecd21f8
Clarify the SDK Logger.Enabled behavior
MrAlias Aug 9, 2024
7079a4d
Merge branch 'main' into FilterProcessor
MrAlias Aug 9, 2024
98978ad
Merge branch 'main' into FilterProcessor
XSAM Aug 15, 2024
67deb25
Merge branch 'main' into FilterProcessor
XSAM Aug 15, 2024
3faedf8
Merge branch 'main' into FilterProcessor
MrAlias Aug 19, 2024
e308557
Merge branch 'main' into FilterProcessor
MrAlias Aug 19, 2024
844fe0c
Merge branch 'main' into FilterProcessor
MrAlias Aug 20, 2024
ec06ff8
Merge branch 'main' into FilterProcessor
MrAlias Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
See our [versioning policy](VERSIONING.md) for more information about these stability guarantees. (#5629)
- Add `InstrumentationScope` field to `SpanStub` in `go.opentelemetry.io/otel/sdk/trace/tracetest`, as a replacement for the deprecated `InstrumentationLibrary`. (#5627)
- Zero value of `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` no longer panics. (#5665)
- The `FilterProcessor` interface type is added in `go.opentelemetry.io/otel/sdk/log`.
This is an optional interface that log `Processor`s can implement to instruct the `Logger` if a `Record` will be processed or not.
This replaces the existing `Enabled` method that is removed from the `Processor` interface itself. (#5692)

### Changed

- `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636)
- `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665)
- The `Processor` interface in `go.opentelemetry.io/otel/sdk/log` no longer includes the `Enabled` method.
See the added `FilterProcessor` interface type to the same package to continue providing this functionality. (#5692)

### Fixed

Expand Down
13 changes: 11 additions & 2 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strings"
"sync"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
Expand Down Expand Up @@ -58,7 +59,7 @@ func ExampleProcessor_filtering() {
// Wrap the processor so that it ignores processing log records
// when a context deriving from WithIgnoreLogs is passed
// to the logging methods.
processor = &ContextFilterProcessor{processor}
processor = &ContextFilterProcessor{Processor: processor}

// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
Expand All @@ -81,6 +82,9 @@ func WithIgnoreLogs(ctx context.Context) context.Context {
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
filter log.FilterProcessor
}

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
Expand All @@ -91,7 +95,12 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool {
return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record)
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(log.FilterProcessor); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, record))
}

func ignoreLogs(ctx context.Context) bool {
Expand Down
26 changes: 22 additions & 4 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,31 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}
}

func (l *logger) Enabled(ctx context.Context, r log.Record) bool {
newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if enabled := p.Enabled(ctx, newRecord); enabled {
// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process the record for the provided context.
//
// If it is not possible to definitively determine the record will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process the
// record.
func (l *logger) Enabled(ctx context.Context, record log.Record) bool {
fltrs := l.provider.filterProcessors()
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(fltrs) ||
anyEnabled(ctx, l.newRecord(ctx, record), fltrs)
}

func anyEnabled(ctx context.Context, r Record, fltrs []FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, r) {
// At least one Processor will process the Record.
return true
}
}
// No Processor will process the record
return false
}

Expand Down
26 changes: 24 additions & 2 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ func TestLoggerEmit(t *testing.T) {
}

func TestLoggerEnabled(t *testing.T) {
p0, p1, p2WithDisabled := newProcessor("0"), newProcessor("1"), newProcessor("2")
p2WithDisabled.enabled = false
p0 := newFilterProcessor("0", true)
p1 := newFilterProcessor("1", true)
p2WithDisabled := newFilterProcessor("2", false)

testCases := []struct {
name string
Expand Down Expand Up @@ -273,3 +274,24 @@ func TestLoggerEnabled(t *testing.T) {
})
}
}

func BenchmarkLoggerEnabled(b *testing.B) {
provider := NewLoggerProvider(
WithProcessor(newFilterProcessor("0", false)),
WithProcessor(newFilterProcessor("1", true)),
)
logger := provider.Logger("BenchmarkLoggerEnabled")
ctx, r := context.Background(), log.Record{}
r.SetSeverityText("test")

var enabled bool

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
enabled = logger.Enabled(ctx, r)
}

_ = enabled
}
54 changes: 34 additions & 20 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,40 @@ type Processor interface {
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error

// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
Shutdown(ctx context.Context) error

// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
ForceFlush(ctx context.Context) error
}

// FilterProcessor is a Processor that knows, and can identify, what Record it
// will process or drop when it passed to OnEmit.
//
// This is useful for users of logging APIs that want to know if they should
// perform complex operations to emit a Record if it will be processed, or if
// not if it will be dropped.
//
// This is an optional interface, and Processor implementations are not
// required to support it. Implementations that choose to support this are
// expected to re-evaluate the Records passed to OnEmit, it is not expected
// that the caller to OnEmit will check if that Record will be processed or
// dropped.
//
// This should only be implemented for Processors that can make reliable enough
// determination of this prior to processing a Record.
type FilterProcessor interface {
// Enabled returns whether the Processor will process for the given context
// and record.
//
Expand All @@ -50,26 +84,6 @@ type Processor interface {
// indeterminate state, but may return false if valid reasons in particular
// circumstances exist (e.g. performance, correctness).
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor] until any processor returns true.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record Record) bool

// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
Shutdown(ctx context.Context) error

// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
ForceFlush(ctx context.Context) error
}
14 changes: 14 additions & 0 deletions sdk/log/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type LoggerProvider struct {
attributeCountLimit int
attributeValueLengthLimit int

fltrProcessorsOnce sync.Once
fltrProcessors []FilterProcessor

loggersMu sync.Mutex
loggers map[instrumentation.Scope]*logger

Expand All @@ -90,6 +93,17 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
}
}

func (p *LoggerProvider) filterProcessors() []FilterProcessor {
p.fltrProcessorsOnce.Do(func() {
for _, proc := range p.processors {
if f, ok := proc.(FilterProcessor); ok {
p.fltrProcessors = append(p.fltrProcessors, f)
}
}
})
return p.fltrProcessors
}

// Logger returns a new [log.Logger] with the provided name and configuration.
//
// If p is shut down, a [noop.Logger] instance is returned.
Expand Down
24 changes: 18 additions & 6 deletions sdk/log/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ type processor struct {
forceFlushCalls int

records []Record
enabled bool
}

func newProcessor(name string) *processor {
return &processor{Name: name, enabled: true}
return &processor{Name: name}
}

func (p *processor) OnEmit(ctx context.Context, r *Record) error {
Expand All @@ -47,10 +46,6 @@ func (p *processor) OnEmit(ctx context.Context, r *Record) error {
return nil
}

func (p *processor) Enabled(context.Context, Record) bool {
return p.enabled
}

func (p *processor) Shutdown(context.Context) error {
p.shutdownCalls++
return p.Err
Expand All @@ -61,6 +56,23 @@ func (p *processor) ForceFlush(context.Context) error {
return p.Err
}

type filterProcessor struct {
*processor

enabled bool
}

func newFilterProcessor(name string, enabled bool) *filterProcessor {
return &filterProcessor{
processor: newProcessor(name),
enabled: enabled,
}
}

func (p *filterProcessor) Enabled(context.Context, Record) bool {
return p.enabled
}

func TestNewLoggerProviderConfiguration(t *testing.T) {
t.Cleanup(func(orig otel.ErrorHandler) func() {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
Expand Down