Skip to content

Commit 9df19a1

Browse files
authored
Fix batchprocessor to avoid reordering and send max size (#3029)
* The reordering could happen if during processing of an item other items were added to the newItems channel. This is fixed by avoiding to re-add the left items to the channel. * The logic of spliting was wrong by forcing sendBatchSize instead of sendBatchMaxSize when split was called. * Fix logic if a very large message is received > 2x sendBatchMaxSize to call export multiple times instead of once and reset the timer. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 25da8cf commit 9df19a1

File tree

2 files changed

+70
-96
lines changed

2 files changed

+70
-96
lines changed

processor/batchprocessor/batch_processor.go

Lines changed: 67 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ type batchProcessor struct {
4343
exportCtx context.Context
4444
timer *time.Timer
4545
timeout time.Duration
46-
sendBatchSize uint32
47-
sendBatchMaxSize uint32
46+
sendBatchSize int
47+
sendBatchMaxSize int
4848

4949
newItem chan interface{}
5050
batch batch
@@ -57,17 +57,14 @@ type batchProcessor struct {
5757

5858
type batch interface {
5959
// export the current batch
60-
export(ctx context.Context) error
60+
export(ctx context.Context, sendBatchMaxSize int) error
6161

6262
// itemCount returns the size of the current batch
63-
itemCount() uint32
63+
itemCount() int
6464

6565
// size returns the size in bytes of the current batch
6666
size() int
6767

68-
// reset the current batch structure with zero/empty values.
69-
reset()
70-
7168
// add item to the current batch
7269
add(item interface{})
7370
}
@@ -86,8 +83,8 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
8683
exportCtx: exportCtx,
8784
telemetryLevel: telemetryLevel,
8885

89-
sendBatchSize: cfg.SendBatchSize,
90-
sendBatchMaxSize: cfg.SendBatchMaxSize,
86+
sendBatchSize: int(cfg.SendBatchSize),
87+
sendBatchMaxSize: int(cfg.SendBatchMaxSize),
9188
timeout: cfg.Timeout,
9289
newItem: make(chan interface{}, runtime.NumCPU()),
9390
batch: batch,
@@ -152,40 +149,15 @@ func (bp *batchProcessor) startProcessingCycle() {
152149
}
153150

154151
func (bp *batchProcessor) processItem(item interface{}) {
155-
if bp.sendBatchMaxSize > 0 {
156-
if td, ok := item.(pdata.Traces); ok {
157-
itemCount := bp.batch.itemCount()
158-
if itemCount+uint32(td.SpanCount()) > bp.sendBatchMaxSize {
159-
item = splitTrace(int(bp.sendBatchSize-itemCount), td)
160-
go func() {
161-
bp.newItem <- td
162-
}()
163-
}
164-
}
165-
if td, ok := item.(pdata.Metrics); ok {
166-
itemCount := bp.batch.itemCount()
167-
if itemCount+uint32(td.MetricCount()) > bp.sendBatchMaxSize {
168-
item = splitMetrics(int(bp.sendBatchSize-itemCount), td)
169-
go func() {
170-
bp.newItem <- td
171-
}()
172-
}
173-
}
174-
if td, ok := item.(pdata.Logs); ok {
175-
itemCount := bp.batch.itemCount()
176-
if itemCount+uint32(td.LogRecordCount()) > bp.sendBatchMaxSize {
177-
item = splitLogs(int(bp.sendBatchSize-itemCount), td)
178-
go func() {
179-
bp.newItem <- td
180-
}()
181-
}
182-
}
152+
bp.batch.add(item)
153+
sent := false
154+
for bp.batch.itemCount() >= bp.sendBatchSize {
155+
sent = true
156+
bp.sendItems(statBatchSizeTriggerSend)
183157
}
184158

185-
bp.batch.add(item)
186-
if bp.batch.itemCount() >= bp.sendBatchSize {
159+
if sent {
187160
bp.stopTimer()
188-
bp.sendItems(statBatchSizeTriggerSend)
189161
bp.resetTimer()
190162
}
191163
}
@@ -208,10 +180,9 @@ func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) {
208180
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bp.batch.size())))
209181
}
210182

211-
if err := bp.batch.export(bp.exportCtx); err != nil {
183+
if err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize); err != nil {
212184
bp.logger.Warn("Sender failed", zap.Error(err))
213185
}
214-
bp.batch.reset()
215186
}
216187

217188
// ConsumeTraces implements TracesProcessor
@@ -251,13 +222,11 @@ func newBatchLogsProcessor(params component.ProcessorCreateParams, next consumer
251222
type batchTraces struct {
252223
nextConsumer consumer.Traces
253224
traceData pdata.Traces
254-
spanCount uint32
225+
spanCount int
255226
}
256227

257228
func newBatchTraces(nextConsumer consumer.Traces) *batchTraces {
258-
b := &batchTraces{nextConsumer: nextConsumer}
259-
b.reset()
260-
return b
229+
return &batchTraces{nextConsumer: nextConsumer, traceData: pdata.NewTraces()}
261230
}
262231

263232
// add updates current batchTraces by adding new TraceData object
@@ -268,106 +237,111 @@ func (bt *batchTraces) add(item interface{}) {
268237
return
269238
}
270239

271-
bt.spanCount += uint32(newSpanCount)
240+
bt.spanCount += newSpanCount
272241
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
273242
}
274243

275-
func (bt *batchTraces) export(ctx context.Context) error {
276-
return bt.nextConsumer.ConsumeTraces(ctx, bt.traceData)
244+
func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int) error {
245+
var req pdata.Traces
246+
if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize {
247+
req = splitTrace(sendBatchMaxSize, bt.traceData)
248+
bt.spanCount -= sendBatchMaxSize
249+
} else {
250+
req = bt.traceData
251+
bt.traceData = pdata.NewTraces()
252+
bt.spanCount = 0
253+
}
254+
return bt.nextConsumer.ConsumeTraces(ctx, req)
277255
}
278256

279-
func (bt *batchTraces) itemCount() uint32 {
257+
func (bt *batchTraces) itemCount() int {
280258
return bt.spanCount
281259
}
282260

283261
func (bt *batchTraces) size() int {
284262
return bt.traceData.OtlpProtoSize()
285263
}
286264

287-
// resets the current batchTraces structure with zero values
288-
func (bt *batchTraces) reset() {
289-
bt.traceData = pdata.NewTraces()
290-
bt.spanCount = 0
291-
}
292-
293265
type batchMetrics struct {
294266
nextConsumer consumer.Metrics
295267
metricData pdata.Metrics
296-
metricCount uint32
268+
metricCount int
297269
}
298270

299271
func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
300-
b := &batchMetrics{nextConsumer: nextConsumer}
301-
b.reset()
302-
return b
303-
}
304-
305-
func (bm *batchMetrics) export(ctx context.Context) error {
306-
return bm.nextConsumer.ConsumeMetrics(ctx, bm.metricData)
272+
return &batchMetrics{nextConsumer: nextConsumer, metricData: pdata.NewMetrics()}
273+
}
274+
275+
func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
276+
var req pdata.Metrics
277+
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
278+
req = splitMetrics(sendBatchMaxSize, bm.metricData)
279+
bm.metricCount -= sendBatchMaxSize
280+
} else {
281+
req = bm.metricData
282+
bm.metricData = pdata.NewMetrics()
283+
bm.metricCount = 0
284+
}
285+
return bm.nextConsumer.ConsumeMetrics(ctx, req)
307286
}
308287

309-
func (bm *batchMetrics) itemCount() uint32 {
288+
func (bm *batchMetrics) itemCount() int {
310289
return bm.metricCount
311290
}
312291

313292
func (bm *batchMetrics) size() int {
314293
return bm.metricData.OtlpProtoSize()
315294
}
316295

317-
// resets the current batchMetrics structure with zero/empty values.
318-
func (bm *batchMetrics) reset() {
319-
bm.metricData = pdata.NewMetrics()
320-
bm.metricCount = 0
321-
}
322-
323296
func (bm *batchMetrics) add(item interface{}) {
324297
md := item.(pdata.Metrics)
325298

326299
newMetricsCount := md.MetricCount()
327300
if newMetricsCount == 0 {
328301
return
329302
}
330-
bm.metricCount += uint32(newMetricsCount)
303+
bm.metricCount += newMetricsCount
331304
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
332305
}
333306

334307
type batchLogs struct {
335308
nextConsumer consumer.Logs
336309
logData pdata.Logs
337-
logCount uint32
310+
logCount int
338311
}
339312

340313
func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
341-
b := &batchLogs{nextConsumer: nextConsumer}
342-
b.reset()
343-
return b
344-
}
345-
346-
func (bm *batchLogs) export(ctx context.Context) error {
347-
return bm.nextConsumer.ConsumeLogs(ctx, bm.logData)
348-
}
349-
350-
func (bm *batchLogs) itemCount() uint32 {
351-
return bm.logCount
314+
return &batchLogs{nextConsumer: nextConsumer, logData: pdata.NewLogs()}
315+
}
316+
317+
func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error {
318+
var req pdata.Logs
319+
if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize {
320+
req = splitLogs(sendBatchMaxSize, bl.logData)
321+
bl.logCount -= sendBatchMaxSize
322+
} else {
323+
req = bl.logData
324+
bl.logData = pdata.NewLogs()
325+
bl.logCount = 0
326+
}
327+
return bl.nextConsumer.ConsumeLogs(ctx, req)
352328
}
353329

354-
func (bm *batchLogs) size() int {
355-
return bm.logData.OtlpProtoSize()
330+
func (bl *batchLogs) itemCount() int {
331+
return bl.logCount
356332
}
357333

358-
// resets the current batchLogs structure with zero/empty values.
359-
func (bm *batchLogs) reset() {
360-
bm.logData = pdata.NewLogs()
361-
bm.logCount = 0
334+
func (bl *batchLogs) size() int {
335+
return bl.logData.OtlpProtoSize()
362336
}
363337

364-
func (bm *batchLogs) add(item interface{}) {
338+
func (bl *batchLogs) add(item interface{}) {
365339
ld := item.(pdata.Logs)
366340

367341
newLogsCount := ld.LogRecordCount()
368342
if newLogsCount == 0 {
369343
return
370344
}
371-
bm.logCount += uint32(newLogsCount)
372-
ld.ResourceLogs().MoveAndAppendTo(bm.logData.ResourceLogs())
345+
bl.logCount += newLogsCount
346+
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
373347
}

processor/batchprocessor/batch_processor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
7979
sink := new(consumertest.TracesSink)
8080
cfg := createDefaultConfig().(*Config)
8181
cfg.SendBatchSize = 128
82-
cfg.SendBatchMaxSize = 128
82+
cfg.SendBatchMaxSize = 130
8383
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
8484
batcher, err := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic)
8585
require.NoError(t, err)
@@ -112,10 +112,10 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
112112

113113
require.Equal(t, requestCount*spansPerRequest, sink.SpansCount())
114114
for i := 0; i < len(sink.AllTraces())-1; i++ {
115-
assert.Equal(t, cfg.SendBatchSize, uint32(sink.AllTraces()[i].SpanCount()))
115+
assert.Equal(t, int(cfg.SendBatchMaxSize), sink.AllTraces()[i].SpanCount())
116116
}
117117
// the last batch has the remaining size
118-
assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount())
118+
assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchMaxSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount())
119119
}
120120

121121
func TestBatchProcessorSentBySize(t *testing.T) {

0 commit comments

Comments
 (0)