Skip to content

Commit 6d44f0d

Browse files
authored
Make the batch processor limit data points rather than metrics. (#3141)
**Link to tracking Issue:** #2754 This change introduces a BenchmarkBatchMetricProcessor that stress tests batching logic. Results before: `BenchmarkBatchMetricProcessor-12 20000 80614 ns/op` Results after the change: `BenchmarkBatchMetricProcessor-12 20000 96184 ns/op`
1 parent 5b73ece commit 6d44f0d

File tree

6 files changed

+252
-33
lines changed

6 files changed

+252
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- Replace `ProcessorCreateParams` with `ProcessorCreateSettings`. (#3181)
1414
- Replace `ExporterCreateParams` with `ExporterCreateSettings` (#3164)
1515
- Replace `ReceiverCreateParams` with `ReceiverCreateSettings`. (#3167)
16+
- Change `batchprocessor` logic to limit data points rather than metrics (#3141)
1617

1718
## 💡 Enhancements 💡
1819

processor/batchprocessor/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ any data drops such as sampling.
1515
Please refer to [config.go](./config.go) for the config spec.
1616

1717
The following configuration options can be modified:
18-
- `send_batch_size` (default = 8192): Number of spans or metrics after which a
19-
batch will be sent regardless of the timeout.
18+
- `send_batch_size` (default = 8192): Number of spans, metric data points, or log
19+
records after which a batch will be sent regardless of the timeout.
2020
- `timeout` (default = 200ms): Time duration after which a batch will be sent
2121
regardless of size.
2222
- `send_batch_max_size` (default = 0): The upper limit of the batch size.

processor/batchprocessor/batch_processor.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,9 @@ func (bt *batchTraces) size() int {
263263
}
264264

265265
type batchMetrics struct {
266-
nextConsumer consumer.Metrics
267-
metricData pdata.Metrics
268-
metricCount int
266+
nextConsumer consumer.Metrics
267+
metricData pdata.Metrics
268+
dataPointCount int
269269
}
270270

271271
func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
@@ -274,19 +274,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
274274

275275
func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
276276
var req pdata.Metrics
277-
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
277+
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
278278
req = splitMetrics(sendBatchMaxSize, bm.metricData)
279-
bm.metricCount -= sendBatchMaxSize
279+
bm.dataPointCount -= sendBatchMaxSize
280280
} else {
281281
req = bm.metricData
282282
bm.metricData = pdata.NewMetrics()
283-
bm.metricCount = 0
283+
bm.dataPointCount = 0
284284
}
285285
return bm.nextConsumer.ConsumeMetrics(ctx, req)
286286
}
287287

288288
func (bm *batchMetrics) itemCount() int {
289-
return bm.metricCount
289+
return bm.dataPointCount
290290
}
291291

292292
func (bm *batchMetrics) size() int {
@@ -296,11 +296,11 @@ func (bm *batchMetrics) size() int {
296296
func (bm *batchMetrics) add(item interface{}) {
297297
md := item.(pdata.Metrics)
298298

299-
newMetricsCount := md.MetricCount()
300-
if newMetricsCount == 0 {
299+
_, newDataPointCount := md.MetricAndDataPointCount()
300+
if newDataPointCount == 0 {
301301
return
302302
}
303-
bm.metricCount += newMetricsCount
303+
bm.dataPointCount += newDataPointCount
304304
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
305305
}
306306

processor/batchprocessor/batch_processor_test.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package batchprocessor
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
2021
"testing"
2122
"time"
2223

@@ -29,6 +30,7 @@ import (
2930
"go.opentelemetry.io/collector/component/componenttest"
3031
"go.opentelemetry.io/collector/config"
3132
"go.opentelemetry.io/collector/config/configtelemetry"
33+
"go.opentelemetry.io/collector/consumer"
3234
"go.opentelemetry.io/collector/consumer/consumertest"
3335
"go.opentelemetry.io/collector/consumer/pdata"
3436
"go.opentelemetry.io/collector/internal/testdata"
@@ -320,6 +322,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
320322

321323
requestCount := 100
322324
metricsPerRequest := 5
325+
dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
326+
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
323327
sink := new(consumertest.MetricsSink)
324328

325329
creationSet := component.ProcessorCreateSettings{Logger: zap.NewNop()}
@@ -339,8 +343,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
339343
elapsed := time.Since(start)
340344
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
341345

342-
expectedBatchesNum := requestCount * metricsPerRequest / int(cfg.SendBatchSize)
343-
expectedBatchingFactor := int(cfg.SendBatchSize) / metricsPerRequest
346+
expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
347+
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest
344348

345349
require.Equal(t, requestCount*metricsPerRequest, sink.MetricsCount())
346350
receivedMds := sink.AllMetrics()
@@ -357,7 +361,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
357361
assert.Equal(t, 1, len(viewData))
358362
distData := viewData[0].Data.(*view.DistributionData)
359363
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
360-
assert.Equal(t, sink.MetricsCount(), int(distData.Sum()))
364+
assert.Equal(t, sink.MetricsCount()*dataPointsPerMetric, int(distData.Sum()))
361365
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
362366
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))
363367

@@ -369,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
369373
assert.Equal(t, size, int(distData.Sum()))
370374
}
371375

376+
func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
377+
ctx := context.Background()
378+
sink := new(metricsSink)
379+
metricsCount := 50
380+
dataPointsPerMetric := 2
381+
sendBatchMaxSize := 99
382+
383+
batchMetrics := newBatchMetrics(sink)
384+
md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount)
385+
386+
batchMetrics.add(md)
387+
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
388+
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
389+
remainingDataPointsCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
390+
require.Equal(t, remainingDataPointsCount, batchMetrics.dataPointCount)
391+
}
392+
372393
func TestBatchMetricsProcessor_Timeout(t *testing.T) {
373394
cfg := Config{
374395
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
@@ -502,6 +523,55 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
502523
}
503524
}
504525

526+
func BenchmarkBatchMetricProcessor(b *testing.B) {
527+
b.StopTimer()
528+
cfg := Config{
529+
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
530+
Timeout: 100 * time.Millisecond,
531+
SendBatchSize: 2000,
532+
}
533+
ctx := context.Background()
534+
sink := new(metricsSink)
535+
creationSet := component.ProcessorCreateSettings{Logger: zap.NewNop()}
536+
metricsPerRequest := 1000
537+
538+
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed)
539+
require.NoError(b, err)
540+
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))
541+
542+
mds := make([]pdata.Metrics, 0, b.N)
543+
for n := 0; n < b.N; n++ {
544+
mds = append(mds,
545+
testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest),
546+
)
547+
}
548+
b.StartTimer()
549+
for n := 0; n < b.N; n++ {
550+
batcher.ConsumeMetrics(ctx, mds[n])
551+
}
552+
b.StopTimer()
553+
require.NoError(b, batcher.Shutdown(ctx))
554+
require.Equal(b, b.N*metricsPerRequest, sink.metricsCount)
555+
}
556+
557+
type metricsSink struct {
558+
mu sync.Mutex
559+
metricsCount int
560+
}
561+
562+
func (sme *metricsSink) Capabilities() consumer.Capabilities {
563+
return consumer.Capabilities{
564+
MutatesData: false,
565+
}
566+
}
567+
568+
func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
569+
sme.mu.Lock()
570+
defer sme.mu.Unlock()
571+
sme.metricsCount += md.MetricCount()
572+
return nil
573+
}
574+
505575
func TestBatchLogProcessor_ReceivingData(t *testing.T) {
506576
// Instantiate the batch processor with low config values to test data
507577
// gets sent through the processor.

processor/batchprocessor/splitmetrics.go

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ import (
2020

2121
// splitMetrics removes metrics from the input data and returns a new data of the specified size.
2222
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
23-
if src.MetricCount() <= size {
23+
_, dataPoints := src.MetricAndDataPointCount()
24+
if dataPoints <= size {
2425
return src
2526
}
26-
totalCopiedMetrics := 0
27+
totalCopiedDataPoints := 0
2728
dest := pdata.NewMetrics()
2829

2930
src.ResourceMetrics().RemoveIf(func(srcRs pdata.ResourceMetrics) bool {
3031
// If we are done skip everything else.
31-
if totalCopiedMetrics == size {
32+
if totalCopiedDataPoints == size {
3233
return false
3334
}
3435

@@ -37,29 +38,30 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
3738

3839
srcRs.InstrumentationLibraryMetrics().RemoveIf(func(srcIlm pdata.InstrumentationLibraryMetrics) bool {
3940
// If we are done skip everything else.
40-
if totalCopiedMetrics == size {
41+
if totalCopiedDataPoints == size {
4142
return false
4243
}
4344

4445
destIlm := destRs.InstrumentationLibraryMetrics().AppendEmpty()
4546
srcIlm.InstrumentationLibrary().CopyTo(destIlm.InstrumentationLibrary())
4647

4748
// If possible to move all metrics do that.
48-
srcMetricsLen := srcIlm.Metrics().Len()
49-
if size-totalCopiedMetrics >= srcMetricsLen {
50-
totalCopiedMetrics += srcMetricsLen
49+
srcDataPointCount := metricSliceDataPointCount(srcIlm.Metrics())
50+
if size-totalCopiedDataPoints >= srcDataPointCount {
51+
totalCopiedDataPoints += srcDataPointCount
5152
srcIlm.Metrics().MoveAndAppendTo(destIlm.Metrics())
5253
return true
5354
}
5455

5556
srcIlm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
5657
// If we are done skip everything else.
57-
if totalCopiedMetrics == size {
58+
if totalCopiedDataPoints == size {
5859
return false
5960
}
60-
srcMetric.CopyTo(destIlm.Metrics().AppendEmpty())
61-
totalCopiedMetrics++
62-
return true
61+
// If the metric has more data points than free slots we should split it.
62+
copiedDataPoints, remove := splitMetric(srcMetric, destIlm.Metrics().AppendEmpty(), size-totalCopiedDataPoints)
63+
totalCopiedDataPoints += copiedDataPoints
64+
return remove
6365
})
6466
return false
6567
})
@@ -68,3 +70,82 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
6870

6971
return dest
7072
}
73+
74+
// metricSliceDataPointCount calculates the total number of data points.
75+
func metricSliceDataPointCount(ms pdata.MetricSlice) (dataPointCount int) {
76+
for k := 0; k < ms.Len(); k++ {
77+
dataPointCount += metricDataPointCount(ms.At(k))
78+
}
79+
return
80+
}
81+
82+
// metricDataPointCount calculates the total number of data points.
83+
func metricDataPointCount(ms pdata.Metric) (dataPointCount int) {
84+
switch ms.DataType() {
85+
case pdata.MetricDataTypeIntGauge:
86+
dataPointCount = ms.IntGauge().DataPoints().Len()
87+
case pdata.MetricDataTypeDoubleGauge:
88+
dataPointCount = ms.DoubleGauge().DataPoints().Len()
89+
case pdata.MetricDataTypeIntSum:
90+
dataPointCount = ms.IntSum().DataPoints().Len()
91+
case pdata.MetricDataTypeDoubleSum:
92+
dataPointCount = ms.DoubleSum().DataPoints().Len()
93+
case pdata.MetricDataTypeIntHistogram:
94+
dataPointCount = ms.IntHistogram().DataPoints().Len()
95+
case pdata.MetricDataTypeHistogram:
96+
dataPointCount = ms.Histogram().DataPoints().Len()
97+
case pdata.MetricDataTypeSummary:
98+
dataPointCount = ms.Summary().DataPoints().Len()
99+
}
100+
return
101+
}
102+
103+
// splitMetric removes metric points from the input data and moves data of the specified size to destination.
104+
// Returns size of moved data and boolean describing, whether the metric should be removed from original slice.
105+
func splitMetric(ms, dest pdata.Metric, size int) (int, bool) {
106+
ms.CopyTo(dest)
107+
if metricDataPointCount(ms) <= size {
108+
return metricDataPointCount(ms), true
109+
}
110+
111+
msSize, i := metricDataPointCount(ms)-size, 0
112+
filterDataPoints := func() bool { i++; return i <= msSize }
113+
switch ms.DataType() {
114+
case pdata.MetricDataTypeIntGauge:
115+
dest.IntGauge().DataPoints().Resize(size)
116+
ms.IntGauge().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
117+
return filterDataPoints()
118+
})
119+
case pdata.MetricDataTypeDoubleGauge:
120+
dest.DoubleGauge().DataPoints().Resize(size)
121+
ms.DoubleGauge().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
122+
return filterDataPoints()
123+
})
124+
case pdata.MetricDataTypeIntSum:
125+
dest.IntSum().DataPoints().Resize(size)
126+
ms.IntSum().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
127+
return filterDataPoints()
128+
})
129+
case pdata.MetricDataTypeDoubleSum:
130+
dest.DoubleSum().DataPoints().Resize(size)
131+
ms.DoubleSum().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
132+
return filterDataPoints()
133+
})
134+
case pdata.MetricDataTypeIntHistogram:
135+
dest.IntHistogram().DataPoints().Resize(size)
136+
ms.IntHistogram().DataPoints().RemoveIf(func(_ pdata.IntHistogramDataPoint) bool {
137+
return filterDataPoints()
138+
})
139+
case pdata.MetricDataTypeHistogram:
140+
dest.Histogram().DataPoints().Resize(size)
141+
ms.Histogram().DataPoints().RemoveIf(func(_ pdata.HistogramDataPoint) bool {
142+
return filterDataPoints()
143+
})
144+
case pdata.MetricDataTypeSummary:
145+
dest.Summary().DataPoints().Resize(size)
146+
ms.Summary().DataPoints().RemoveIf(func(_ pdata.SummaryDataPoint) bool {
147+
return filterDataPoints()
148+
})
149+
}
150+
return size, false
151+
}

0 commit comments

Comments
 (0)