Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 67 additions & 93 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type batchProcessor struct {
exportCtx context.Context
timer *time.Timer
timeout time.Duration
sendBatchSize uint32
sendBatchMaxSize uint32
sendBatchSize int
sendBatchMaxSize int

newItem chan interface{}
batch batch
Expand All @@ -57,17 +57,14 @@ type batchProcessor struct {

type batch interface {
// export the current batch
export(ctx context.Context) error
export(ctx context.Context, sendBatchMaxSize int) error

// itemCount returns the size of the current batch
itemCount() uint32
itemCount() int

// size returns the size in bytes of the current batch
size() int

// reset the current batch structure with zero/empty values.
reset()

// add item to the current batch
add(item interface{})
}
Expand All @@ -86,8 +83,8 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
exportCtx: exportCtx,
telemetryLevel: telemetryLevel,

sendBatchSize: cfg.SendBatchSize,
sendBatchMaxSize: cfg.SendBatchMaxSize,
sendBatchSize: int(cfg.SendBatchSize),
sendBatchMaxSize: int(cfg.SendBatchMaxSize),
timeout: cfg.Timeout,
newItem: make(chan interface{}, runtime.NumCPU()),
batch: batch,
Expand Down Expand Up @@ -152,40 +149,15 @@ func (bp *batchProcessor) startProcessingCycle() {
}

func (bp *batchProcessor) processItem(item interface{}) {
if bp.sendBatchMaxSize > 0 {
if td, ok := item.(pdata.Traces); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.SpanCount()) > bp.sendBatchMaxSize {
item = splitTrace(int(bp.sendBatchSize-itemCount), td)
go func() {
bp.newItem <- td
}()
}
}
if td, ok := item.(pdata.Metrics); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.MetricCount()) > bp.sendBatchMaxSize {
item = splitMetrics(int(bp.sendBatchSize-itemCount), td)
go func() {
bp.newItem <- td
}()
}
}
if td, ok := item.(pdata.Logs); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.LogRecordCount()) > bp.sendBatchMaxSize {
item = splitLogs(int(bp.sendBatchSize-itemCount), td)
go func() {
bp.newItem <- td
}()
}
}
bp.batch.add(item)
sent := false
for bp.batch.itemCount() >= bp.sendBatchSize {
sent = true
bp.sendItems(statBatchSizeTriggerSend)
}

bp.batch.add(item)
if bp.batch.itemCount() >= bp.sendBatchSize {
if sent {
bp.stopTimer()
bp.sendItems(statBatchSizeTriggerSend)
bp.resetTimer()
}
}
Expand All @@ -208,10 +180,9 @@ func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) {
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bp.batch.size())))
}

if err := bp.batch.export(bp.exportCtx); err != nil {
if err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize); err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
}
bp.batch.reset()
}

// ConsumeTraces implements TracesProcessor
Expand Down Expand Up @@ -251,13 +222,11 @@ func newBatchLogsProcessor(params component.ProcessorCreateParams, next consumer
type batchTraces struct {
nextConsumer consumer.Traces
traceData pdata.Traces
spanCount uint32
spanCount int
}

func newBatchTraces(nextConsumer consumer.Traces) *batchTraces {
b := &batchTraces{nextConsumer: nextConsumer}
b.reset()
return b
return &batchTraces{nextConsumer: nextConsumer, traceData: pdata.NewTraces()}
}

// add updates current batchTraces by adding new TraceData object
Expand All @@ -268,106 +237,111 @@ func (bt *batchTraces) add(item interface{}) {
return
}

bt.spanCount += uint32(newSpanCount)
bt.spanCount += newSpanCount
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

func (bt *batchTraces) export(ctx context.Context) error {
return bt.nextConsumer.ConsumeTraces(ctx, bt.traceData)
func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int) error {
var req pdata.Traces
if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize {
req = splitTrace(sendBatchMaxSize, bt.traceData)
bt.spanCount -= sendBatchMaxSize
} else {
req = bt.traceData
bt.traceData = pdata.NewTraces()
bt.spanCount = 0
}
return bt.nextConsumer.ConsumeTraces(ctx, req)
}

func (bt *batchTraces) itemCount() uint32 {
func (bt *batchTraces) itemCount() int {
return bt.spanCount
}

func (bt *batchTraces) size() int {
return bt.traceData.OtlpProtoSize()
}

// resets the current batchTraces structure with zero values
func (bt *batchTraces) reset() {
bt.traceData = pdata.NewTraces()
bt.spanCount = 0
}

type batchMetrics struct {
nextConsumer consumer.Metrics
metricData pdata.Metrics
metricCount uint32
metricCount int
}

func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
b := &batchMetrics{nextConsumer: nextConsumer}
b.reset()
return b
}

func (bm *batchMetrics) export(ctx context.Context) error {
return bm.nextConsumer.ConsumeMetrics(ctx, bm.metricData)
return &batchMetrics{nextConsumer: nextConsumer, metricData: pdata.NewMetrics()}
}

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
var req pdata.Metrics
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.metricCount -= sendBatchMaxSize
} else {
req = bm.metricData
bm.metricData = pdata.NewMetrics()
bm.metricCount = 0
}
return bm.nextConsumer.ConsumeMetrics(ctx, req)
}

func (bm *batchMetrics) itemCount() uint32 {
func (bm *batchMetrics) itemCount() int {
return bm.metricCount
}

func (bm *batchMetrics) size() int {
return bm.metricData.OtlpProtoSize()
}

// resets the current batchMetrics structure with zero/empty values.
func (bm *batchMetrics) reset() {
bm.metricData = pdata.NewMetrics()
bm.metricCount = 0
}

func (bm *batchMetrics) add(item interface{}) {
md := item.(pdata.Metrics)

newMetricsCount := md.MetricCount()
if newMetricsCount == 0 {
return
}
bm.metricCount += uint32(newMetricsCount)
bm.metricCount += newMetricsCount
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}

type batchLogs struct {
nextConsumer consumer.Logs
logData pdata.Logs
logCount uint32
logCount int
}

func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
b := &batchLogs{nextConsumer: nextConsumer}
b.reset()
return b
}

func (bm *batchLogs) export(ctx context.Context) error {
return bm.nextConsumer.ConsumeLogs(ctx, bm.logData)
}

func (bm *batchLogs) itemCount() uint32 {
return bm.logCount
return &batchLogs{nextConsumer: nextConsumer, logData: pdata.NewLogs()}
}

func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error {
var req pdata.Logs
if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize {
req = splitLogs(sendBatchMaxSize, bl.logData)
bl.logCount -= sendBatchMaxSize
} else {
req = bl.logData
bl.logData = pdata.NewLogs()
bl.logCount = 0
}
return bl.nextConsumer.ConsumeLogs(ctx, req)
}

func (bm *batchLogs) size() int {
return bm.logData.OtlpProtoSize()
func (bl *batchLogs) itemCount() int {
return bl.logCount
}

// resets the current batchLogs structure with zero/empty values.
func (bm *batchLogs) reset() {
bm.logData = pdata.NewLogs()
bm.logCount = 0
func (bl *batchLogs) size() int {
return bl.logData.OtlpProtoSize()
}

func (bm *batchLogs) add(item interface{}) {
func (bl *batchLogs) add(item interface{}) {
ld := item.(pdata.Logs)

newLogsCount := ld.LogRecordCount()
if newLogsCount == 0 {
return
}
bm.logCount += uint32(newLogsCount)
ld.ResourceLogs().MoveAndAppendTo(bm.logData.ResourceLogs())
bl.logCount += newLogsCount
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
}
6 changes: 3 additions & 3 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.SendBatchMaxSize = 128
cfg.SendBatchMaxSize = 130
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher, err := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic)
require.NoError(t, err)
Expand Down Expand Up @@ -112,10 +112,10 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {

require.Equal(t, requestCount*spansPerRequest, sink.SpansCount())
for i := 0; i < len(sink.AllTraces())-1; i++ {
assert.Equal(t, cfg.SendBatchSize, uint32(sink.AllTraces()[i].SpanCount()))
assert.Equal(t, int(cfg.SendBatchMaxSize), sink.AllTraces()[i].SpanCount())
}
// the last batch has the remaining size
assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount())
assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchMaxSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount())
}

func TestBatchProcessorSentBySize(t *testing.T) {
Expand Down