Skip to content

Commit 61c9ec2

Browse files
committed
Make batch processor limit number of data points rather than number of metrics.
This change introduces a BenchmarkBatchMetricProcessor that stress tests batching logic. Results before: `BenchmarkBatchMetricProcessor-12 20000 80614 ns/op` Results after the change: `BenchmarkBatchMetricProcessor-12 20000 96184 ns/op`
1 parent 827f5e1 commit 61c9ec2

File tree

6 files changed

+254
-33
lines changed

6 files changed

+254
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- Move BigEndian helper functions in `tracetranslator` to an internal package.(#3298)
1111
- Rename `configtest.LoadConfigFile` to `configtest.LoadConfigAndValidate` (#3306)
1212
- Replace `ExtensionCreateParams` with `ExtensionCreateSettings` (#3294)
13+
- Change `batchprocessor` logic to limit data points rather than metrics (#3141)
1314

1415
## 💡 Enhancements 💡
1516

processor/batchprocessor/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ any data drops such as sampling.
1515
Please refer to [config.go](./config.go) for the config spec.
1616

1717
The following configuration options can be modified:
18-
- `send_batch_size` (default = 8192): Number of spans or metrics after which a
19-
batch will be sent regardless of the timeout.
18+
- `send_batch_size` (default = 8192): Number of spans, metric data points, or log
19+
records after which a batch will be sent regardless of the timeout.
2020
- `timeout` (default = 200ms): Time duration after which a batch will be sent
2121
regardless of size.
2222
- `send_batch_max_size` (default = 0): The upper limit of the batch size.

processor/batchprocessor/batch_processor.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,9 @@ func (bt *batchTraces) size() int {
263263
}
264264

265265
type batchMetrics struct {
266-
nextConsumer consumer.Metrics
267-
metricData pdata.Metrics
268-
metricCount int
266+
nextConsumer consumer.Metrics
267+
metricData pdata.Metrics
268+
dataPointCount int
269269
}
270270

271271
func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
@@ -274,19 +274,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
274274

275275
func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
276276
var req pdata.Metrics
277-
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
277+
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
278278
req = splitMetrics(sendBatchMaxSize, bm.metricData)
279-
bm.metricCount -= sendBatchMaxSize
279+
bm.dataPointCount -= sendBatchMaxSize
280280
} else {
281281
req = bm.metricData
282282
bm.metricData = pdata.NewMetrics()
283-
bm.metricCount = 0
283+
bm.dataPointCount = 0
284284
}
285285
return bm.nextConsumer.ConsumeMetrics(ctx, req)
286286
}
287287

288288
func (bm *batchMetrics) itemCount() int {
289-
return bm.metricCount
289+
return bm.dataPointCount
290290
}
291291

292292
func (bm *batchMetrics) size() int {
@@ -296,11 +296,11 @@ func (bm *batchMetrics) size() int {
296296
func (bm *batchMetrics) add(item interface{}) {
297297
md := item.(pdata.Metrics)
298298

299-
newMetricsCount := md.MetricCount()
300-
if newMetricsCount == 0 {
299+
_, newDataPointCount := md.MetricAndDataPointCount()
300+
if newDataPointCount == 0 {
301301
return
302302
}
303-
bm.metricCount += newMetricsCount
303+
bm.dataPointCount += newDataPointCount
304304
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
305305
}
306306

processor/batchprocessor/batch_processor_test.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package batchprocessor
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
2021
"testing"
2122
"time"
2223

@@ -29,6 +30,7 @@ import (
2930
"go.opentelemetry.io/collector/component/componenttest"
3031
"go.opentelemetry.io/collector/config"
3132
"go.opentelemetry.io/collector/config/configtelemetry"
33+
"go.opentelemetry.io/collector/consumer"
3234
"go.opentelemetry.io/collector/consumer/consumertest"
3335
"go.opentelemetry.io/collector/consumer/pdata"
3436
"go.opentelemetry.io/collector/internal/testdata"
@@ -320,6 +322,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
320322

321323
requestCount := 100
322324
metricsPerRequest := 5
325+
dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
326+
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
323327
sink := new(consumertest.MetricsSink)
324328

325329
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
@@ -339,8 +343,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
339343
elapsed := time.Since(start)
340344
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
341345

342-
expectedBatchesNum := requestCount * metricsPerRequest / int(cfg.SendBatchSize)
343-
expectedBatchingFactor := int(cfg.SendBatchSize) / metricsPerRequest
346+
expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
347+
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest
344348

345349
require.Equal(t, requestCount*metricsPerRequest, sink.MetricsCount())
346350
receivedMds := sink.AllMetrics()
@@ -357,7 +361,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
357361
assert.Equal(t, 1, len(viewData))
358362
distData := viewData[0].Data.(*view.DistributionData)
359363
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
360-
assert.Equal(t, sink.MetricsCount(), int(distData.Sum()))
364+
assert.Equal(t, sink.MetricsCount()*dataPointsPerMetric, int(distData.Sum()))
361365
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
362366
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))
363367

@@ -369,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
369373
assert.Equal(t, size, int(distData.Sum()))
370374
}
371375

376+
func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
377+
ctx := context.Background()
378+
sink := new(metricsSink)
379+
metricsCount := 50
380+
dataPointsPerMetric := 2
381+
sendBatchMaxSize := 99
382+
383+
batchMetrics := newBatchMetrics(sink)
384+
md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount)
385+
386+
batchMetrics.add(md)
387+
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
388+
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
389+
remainingDataPointsCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
390+
require.Equal(t, remainingDataPointsCount, batchMetrics.dataPointCount)
391+
}
392+
372393
func TestBatchMetricsProcessor_Timeout(t *testing.T) {
373394
cfg := Config{
374395
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
@@ -502,6 +523,55 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
502523
}
503524
}
504525

526+
func BenchmarkBatchMetricProcessor(b *testing.B) {
527+
b.StopTimer()
528+
cfg := Config{
529+
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
530+
Timeout: 100 * time.Millisecond,
531+
SendBatchSize: 2000,
532+
}
533+
ctx := context.Background()
534+
sink := new(metricsSink)
535+
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
536+
metricsPerRequest := 1000
537+
538+
batcher, err := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
539+
require.NoError(b, err)
540+
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))
541+
542+
mds := make([]pdata.Metrics, 0, b.N)
543+
for n := 0; n < b.N; n++ {
544+
mds = append(mds,
545+
testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest),
546+
)
547+
}
548+
b.StartTimer()
549+
for n := 0; n < b.N; n++ {
550+
batcher.ConsumeMetrics(ctx, mds[n])
551+
}
552+
b.StopTimer()
553+
require.NoError(b, batcher.Shutdown(ctx))
554+
require.Equal(b, b.N*metricsPerRequest, sink.metricsCount)
555+
}
556+
557+
type metricsSink struct {
558+
mu sync.Mutex
559+
metricsCount int
560+
}
561+
562+
func (sme *metricsSink) Capabilities() consumer.Capabilities {
563+
return consumer.Capabilities{
564+
MutatesData: false,
565+
}
566+
}
567+
568+
func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
569+
sme.mu.Lock()
570+
defer sme.mu.Unlock()
571+
sme.metricsCount += md.MetricCount()
572+
return nil
573+
}
574+
505575
func TestBatchLogProcessor_ReceivingData(t *testing.T) {
506576
// Instantiate the batch processor with low config values to test data
507577
// gets sent through the processor.

processor/batchprocessor/splitmetrics.go

Lines changed: 94 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ import (
2020

2121
// splitMetrics removes metrics from the input data and returns a new data of the specified size.
2222
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
23-
if src.MetricCount() <= size {
23+
_, dataPoints := src.MetricAndDataPointCount()
24+
if dataPoints <= size {
2425
return src
2526
}
26-
totalCopiedMetrics := 0
27+
totalCopiedDataPoints := 0
2728
dest := pdata.NewMetrics()
2829

2930
src.ResourceMetrics().RemoveIf(func(srcRs pdata.ResourceMetrics) bool {
3031
// If we are done skip everything else.
31-
if totalCopiedMetrics == size {
32+
if totalCopiedDataPoints == size {
3233
return false
3334
}
3435

@@ -37,29 +38,31 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
3738

3839
srcRs.InstrumentationLibraryMetrics().RemoveIf(func(srcIlm pdata.InstrumentationLibraryMetrics) bool {
3940
// If we are done skip everything else.
40-
if totalCopiedMetrics == size {
41+
if totalCopiedDataPoints == size {
4142
return false
4243
}
4344

4445
destIlm := destRs.InstrumentationLibraryMetrics().AppendEmpty()
4546
srcIlm.InstrumentationLibrary().CopyTo(destIlm.InstrumentationLibrary())
4647

4748
// If possible to move all metrics do that.
48-
srcMetricsLen := srcIlm.Metrics().Len()
49-
if size-totalCopiedMetrics >= srcMetricsLen {
50-
totalCopiedMetrics += srcMetricsLen
49+
srcDataPointCount := metricSliceDataPointCount(srcIlm.Metrics())
50+
if size-totalCopiedDataPoints >= srcDataPointCount {
51+
totalCopiedDataPoints += srcDataPointCount
5152
srcIlm.Metrics().MoveAndAppendTo(destIlm.Metrics())
5253
return true
5354
}
5455

5556
srcIlm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
5657
// If we are done skip everything else.
57-
if totalCopiedMetrics == size {
58+
if totalCopiedDataPoints == size {
5859
return false
5960
}
60-
srcMetric.CopyTo(destIlm.Metrics().AppendEmpty())
61-
totalCopiedMetrics++
62-
return true
61+
// If the metric has more data points than free slots we should split it.
62+
newMetric, remove := splitMetric(srcMetric, size-totalCopiedDataPoints)
63+
newMetric.CopyTo(destIlm.Metrics().AppendEmpty())
64+
totalCopiedDataPoints += metricDataPointCount(newMetric)
65+
return remove
6366
})
6467
return false
6568
})
@@ -68,3 +71,83 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
6871

6972
return dest
7073
}
74+
75+
// metricSliceDataPointCount calculates the total number of data points.
76+
func metricSliceDataPointCount(ms pdata.MetricSlice) (dataPointCount int) {
77+
for k := 0; k < ms.Len(); k++ {
78+
dataPointCount += metricDataPointCount(ms.At(k))
79+
}
80+
return
81+
}
82+
83+
// metricDataPointCount calculates the total number of data points.
84+
func metricDataPointCount(ms pdata.Metric) (dataPointCount int) {
85+
switch ms.DataType() {
86+
case pdata.MetricDataTypeIntGauge:
87+
dataPointCount = ms.IntGauge().DataPoints().Len()
88+
case pdata.MetricDataTypeDoubleGauge:
89+
dataPointCount = ms.DoubleGauge().DataPoints().Len()
90+
case pdata.MetricDataTypeIntSum:
91+
dataPointCount = ms.IntSum().DataPoints().Len()
92+
case pdata.MetricDataTypeDoubleSum:
93+
dataPointCount = ms.DoubleSum().DataPoints().Len()
94+
case pdata.MetricDataTypeIntHistogram:
95+
dataPointCount = ms.IntHistogram().DataPoints().Len()
96+
case pdata.MetricDataTypeHistogram:
97+
dataPointCount = ms.Histogram().DataPoints().Len()
98+
case pdata.MetricDataTypeSummary:
99+
dataPointCount = ms.Summary().DataPoints().Len()
100+
}
101+
return
102+
}
103+
104+
// splitMetric removes metric points from the input data and returns new data of the specified size
105+
// and boolean describing, whether the metric should be removed from original slice.
106+
func splitMetric(ms pdata.Metric, size int) (pdata.Metric, bool) {
107+
if metricDataPointCount(ms) <= size {
108+
return ms, true
109+
}
110+
111+
result := pdata.NewMetric()
112+
ms.CopyTo(result)
113+
msSize, i := metricDataPointCount(ms)-size, 0
114+
filterDataPoints := func() bool { i++; return i <= msSize }
115+
switch ms.DataType() {
116+
case pdata.MetricDataTypeIntGauge:
117+
result.IntGauge().DataPoints().Resize(size)
118+
ms.IntGauge().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
119+
return filterDataPoints()
120+
})
121+
case pdata.MetricDataTypeDoubleGauge:
122+
result.DoubleGauge().DataPoints().Resize(size)
123+
ms.DoubleGauge().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
124+
return filterDataPoints()
125+
})
126+
case pdata.MetricDataTypeIntSum:
127+
result.IntSum().DataPoints().Resize(size)
128+
ms.IntSum().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
129+
return filterDataPoints()
130+
})
131+
case pdata.MetricDataTypeDoubleSum:
132+
result.DoubleSum().DataPoints().Resize(size)
133+
ms.DoubleSum().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
134+
return filterDataPoints()
135+
})
136+
case pdata.MetricDataTypeIntHistogram:
137+
result.IntHistogram().DataPoints().Resize(size)
138+
ms.IntHistogram().DataPoints().RemoveIf(func(_ pdata.IntHistogramDataPoint) bool {
139+
return filterDataPoints()
140+
})
141+
case pdata.MetricDataTypeHistogram:
142+
result.Histogram().DataPoints().Resize(size)
143+
ms.Histogram().DataPoints().RemoveIf(func(_ pdata.HistogramDataPoint) bool {
144+
return filterDataPoints()
145+
})
146+
case pdata.MetricDataTypeSummary:
147+
result.Summary().DataPoints().Resize(size)
148+
ms.Summary().DataPoints().RemoveIf(func(_ pdata.SummaryDataPoint) bool {
149+
return filterDataPoints()
150+
})
151+
}
152+
return result, false
153+
}

0 commit comments

Comments
 (0)