Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Initialize map with `len(keys)` in `NewAllowKeysFilter` and `NewDenyKeysFilter` to avoid unnecessary allocations in `go.opentelemetry.io/otel/attribute`. (#6455)
- `go.opentelemetry.io/otel/log/logtest` is now a separate Go module. (#6465)
- `go.opentelemetry.io/otel/sdk/log/logtest` is now a separate Go module. (#6466)
- `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` now drops the most recently emitted log records instead of the oldest ones when the queue reaches its capacity. (#6569)

### Deprecated

Expand Down
25 changes: 18 additions & 7 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,17 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
global.Warn("dropped log records", "dropped", d)
}

qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
buf = slices.Clone(buf)
}
return ok
})
qLen := b.q.Len()
// don't copy data from queue unless exporter can accept more, it is very expensive
if !b.exporter.IsQueueFull() {
qLen = b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
buf = slices.Clone(buf)
}
return ok
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
} else {
qLen = b.q.Len()
}

if qLen >= b.batchSize {
// There is another full batch ready. Immediately trigger
// another export attempt.
Expand Down Expand Up @@ -272,6 +276,13 @@ func newQueue(size int) *queue {
}
}

func (q *queue) Len() int {
q.Lock()
defer q.Unlock()

return q.len
}

// Dropped returns the number of Records dropped during enqueueing since the
// last time Dropped was called.
func (q *queue) Dropped() uint64 {
Expand Down
17 changes: 17 additions & 0 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import (
"github.com/stretchr/testify/assert"
)

type mockDelayExporter struct{}

func (mockDelayExporter) Export(context.Context, []Record) error {
time.Sleep(time.Millisecond * 5)
return nil
}

func (mockDelayExporter) Shutdown(context.Context) error { return nil }

func (mockDelayExporter) ForceFlush(context.Context) error { return nil }

func BenchmarkProcessor(b *testing.B) {
for _, tc := range []struct {
name string
Expand All @@ -30,6 +41,12 @@ func BenchmarkProcessor(b *testing.B) {
return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))}
},
},
{
name: "BatchSimulateExport",
f: func() []LoggerProviderOption {
return []LoggerProviderOption{WithProcessor(NewBatchProcessor(mockDelayExporter{}))}
},
},
{
name: "SetTimestampSimple",
f: func() []LoggerProviderOption {
Expand Down
11 changes: 7 additions & 4 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,10 @@ type bufferExporter struct {

// newBufferExporter returns a new bufferExporter that wraps exporter. The
// returned bufferExporter will buffer at most size number of export requests.
// If size is less than zero, zero will be used (i.e. only synchronous
// exporting will be supported).
// If size is less than 1, 1 will be used.
func newBufferExporter(exporter Exporter, size int) *bufferExporter {
if size < 0 {
size = 0
if size < 1 {
size = 1
}
input := make(chan exportData, size)
return &bufferExporter{
Expand All @@ -201,6 +200,10 @@ func newBufferExporter(exporter Exporter, size int) *bufferExporter {
}
}

func (e *bufferExporter) IsQueueFull() bool {
return len(e.input) == cap(e.input)
}

var errStopped = errors.New("exporter stopped")

func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {
Expand Down
Loading