diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 7e9609a3dc7..720dfcf6d89 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -43,8 +43,8 @@ type batchProcessor struct { exportCtx context.Context timer *time.Timer timeout time.Duration - sendBatchSize uint32 - sendBatchMaxSize uint32 + sendBatchSize int + sendBatchMaxSize int newItem chan interface{} batch batch @@ -57,17 +57,14 @@ type batchProcessor struct { type batch interface { // export the current batch - export(ctx context.Context) error + export(ctx context.Context, sendBatchMaxSize int) error // itemCount returns the size of the current batch - itemCount() uint32 + itemCount() int // size returns the size in bytes of the current batch size() int - // reset the current batch structure with zero/empty values. - reset() - // add item to the current batch add(item interface{}) } @@ -86,8 +83,8 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc exportCtx: exportCtx, telemetryLevel: telemetryLevel, - sendBatchSize: cfg.SendBatchSize, - sendBatchMaxSize: cfg.SendBatchMaxSize, + sendBatchSize: int(cfg.SendBatchSize), + sendBatchMaxSize: int(cfg.SendBatchMaxSize), timeout: cfg.Timeout, newItem: make(chan interface{}, runtime.NumCPU()), batch: batch, @@ -152,40 +149,15 @@ func (bp *batchProcessor) startProcessingCycle() { } func (bp *batchProcessor) processItem(item interface{}) { - if bp.sendBatchMaxSize > 0 { - if td, ok := item.(pdata.Traces); ok { - itemCount := bp.batch.itemCount() - if itemCount+uint32(td.SpanCount()) > bp.sendBatchMaxSize { - item = splitTrace(int(bp.sendBatchSize-itemCount), td) - go func() { - bp.newItem <- td - }() - } - } - if td, ok := item.(pdata.Metrics); ok { - itemCount := bp.batch.itemCount() - if itemCount+uint32(td.MetricCount()) > bp.sendBatchMaxSize { - item = splitMetrics(int(bp.sendBatchSize-itemCount), td) - go func() { - bp.newItem <- td - }() - } - } - if td, ok := item.(pdata.Logs); ok { - itemCount := bp.batch.itemCount() - if itemCount+uint32(td.LogRecordCount()) > bp.sendBatchMaxSize { - item = splitLogs(int(bp.sendBatchSize-itemCount), td) - go func() { - bp.newItem <- td - }() - } - } + bp.batch.add(item) + sent := false + for bp.batch.itemCount() >= bp.sendBatchSize { + sent = true + bp.sendItems(statBatchSizeTriggerSend) } - bp.batch.add(item) - if bp.batch.itemCount() >= bp.sendBatchSize { + if sent { bp.stopTimer() - bp.sendItems(statBatchSizeTriggerSend) bp.resetTimer() } } @@ -208,10 +180,9 @@ func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) { stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bp.batch.size()))) } - if err := bp.batch.export(bp.exportCtx); err != nil { + if err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize); err != nil { bp.logger.Warn("Sender failed", zap.Error(err)) } - bp.batch.reset() } // ConsumeTraces implements TracesProcessor @@ -251,13 +222,11 @@ func newBatchLogsProcessor(params component.ProcessorCreateParams, next consumer type batchTraces struct { nextConsumer consumer.Traces traceData pdata.Traces - spanCount uint32 + spanCount int } func newBatchTraces(nextConsumer consumer.Traces) *batchTraces { - b := &batchTraces{nextConsumer: nextConsumer} - b.reset() - return b + return &batchTraces{nextConsumer: nextConsumer, traceData: pdata.NewTraces()} } // add updates current batchTraces by adding new TraceData object @@ -268,15 +237,24 @@ func (bt *batchTraces) add(item interface{}) { return } - bt.spanCount += uint32(newSpanCount) + bt.spanCount += newSpanCount td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) } -func (bt *batchTraces) export(ctx context.Context) error { - return bt.nextConsumer.ConsumeTraces(ctx, bt.traceData) +func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int) error { + var req pdata.Traces + if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize { + req = splitTrace(sendBatchMaxSize, bt.traceData) + bt.spanCount -= sendBatchMaxSize + } else { + req = bt.traceData + bt.traceData = pdata.NewTraces() + bt.spanCount = 0 + } + return bt.nextConsumer.ConsumeTraces(ctx, req) } -func (bt *batchTraces) itemCount() uint32 { +func (bt *batchTraces) itemCount() int { return bt.spanCount } @@ -284,29 +262,30 @@ func (bt *batchTraces) size() int { return bt.traceData.OtlpProtoSize() } -// resets the current batchTraces structure with zero values -func (bt *batchTraces) reset() { - bt.traceData = pdata.NewTraces() - bt.spanCount = 0 -} - type batchMetrics struct { nextConsumer consumer.Metrics metricData pdata.Metrics - metricCount uint32 + metricCount int } func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { - b := &batchMetrics{nextConsumer: nextConsumer} - b.reset() - return b -} - -func (bm *batchMetrics) export(ctx context.Context) error { - return bm.nextConsumer.ConsumeMetrics(ctx, bm.metricData) + return &batchMetrics{nextConsumer: nextConsumer, metricData: pdata.NewMetrics()} +} + +func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error { + var req pdata.Metrics + if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize { + req = splitMetrics(sendBatchMaxSize, bm.metricData) + bm.metricCount -= sendBatchMaxSize + } else { + req = bm.metricData + bm.metricData = pdata.NewMetrics() + bm.metricCount = 0 + } + return bm.nextConsumer.ConsumeMetrics(ctx, req) } -func (bm *batchMetrics) itemCount() uint32 { +func (bm *batchMetrics) itemCount() int { return bm.metricCount } @@ -314,12 +293,6 @@ func (bm *batchMetrics) size() int { return bm.metricData.OtlpProtoSize() } -// resets the current batchMetrics structure with zero/empty values. -func (bm *batchMetrics) reset() { - bm.metricData = pdata.NewMetrics() - bm.metricCount = 0 -} - func (bm *batchMetrics) add(item interface{}) { md := item.(pdata.Metrics) @@ -327,47 +300,48 @@ func (bm *batchMetrics) add(item interface{}) { if newMetricsCount == 0 { return } - bm.metricCount += uint32(newMetricsCount) + bm.metricCount += newMetricsCount md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics()) } type batchLogs struct { nextConsumer consumer.Logs logData pdata.Logs - logCount uint32 + logCount int } func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { - b := &batchLogs{nextConsumer: nextConsumer} - b.reset() - return b -} - -func (bm *batchLogs) export(ctx context.Context) error { - return bm.nextConsumer.ConsumeLogs(ctx, bm.logData) -} - -func (bm *batchLogs) itemCount() uint32 { - return bm.logCount + return &batchLogs{nextConsumer: nextConsumer, logData: pdata.NewLogs()} +} + +func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error { + var req pdata.Logs + if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize { + req = splitLogs(sendBatchMaxSize, bl.logData) + bl.logCount -= sendBatchMaxSize + } else { + req = bl.logData + bl.logData = pdata.NewLogs() + bl.logCount = 0 + } + return bl.nextConsumer.ConsumeLogs(ctx, req) } -func (bm *batchLogs) size() int { - return bm.logData.OtlpProtoSize() +func (bl *batchLogs) itemCount() int { + return bl.logCount } -// resets the current batchLogs structure with zero/empty values. -func (bm *batchLogs) reset() { - bm.logData = pdata.NewLogs() - bm.logCount = 0 +func (bl *batchLogs) size() int { + return bl.logData.OtlpProtoSize() } -func (bm *batchLogs) add(item interface{}) { +func (bl *batchLogs) add(item interface{}) { ld := item.(pdata.Logs) newLogsCount := ld.LogRecordCount() if newLogsCount == 0 { return } - bm.logCount += uint32(newLogsCount) - ld.ResourceLogs().MoveAndAppendTo(bm.logData.ResourceLogs()) + bl.logCount += newLogsCount + ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) } diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index ea9bfdac039..541b25260b5 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -79,7 +79,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - cfg.SendBatchMaxSize = 128 + cfg.SendBatchMaxSize = 130 creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher, err := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic) require.NoError(t, err) @@ -112,10 +112,10 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { require.Equal(t, requestCount*spansPerRequest, sink.SpansCount()) for i := 0; i < len(sink.AllTraces())-1; i++ { - assert.Equal(t, cfg.SendBatchSize, uint32(sink.AllTraces()[i].SpanCount())) + assert.Equal(t, int(cfg.SendBatchMaxSize), sink.AllTraces()[i].SpanCount()) } // the last batch has the remaining size - assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount()) + assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchMaxSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount()) } func TestBatchProcessorSentBySize(t *testing.T) {