Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Replace `ProcessorCreateParams` with `ProcessorCreateSettings`. (#3181)
- Replace `ExporterCreateParams` with `ExporterCreateSettings` (#3164)
- Replace `ReceiverCreateParams` with `ReceiverCreateSettings`. (#3167)
- Change `batchprocessor` logic to limit data points rather than metrics (#3141)

## 💡 Enhancements 💡

Expand Down
4 changes: 2 additions & 2 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ any data drops such as sampling.
Please refer to [config.go](./config.go) for the config spec.

The following configuration options can be modified:
- `send_batch_size` (default = 8192): Number of spans or metrics after which a
batch will be sent regardless of the timeout.
- `send_batch_size` (default = 8192): Number of spans, metric data points, or log
records after which a batch will be sent regardless of the timeout.
- `timeout` (default = 200ms): Time duration after which a batch will be sent
regardless of size.
- `send_batch_max_size` (default = 0): The upper limit of the batch size.
Expand Down
20 changes: 10 additions & 10 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func (bt *batchTraces) size() int {
}

type batchMetrics struct {
nextConsumer consumer.Metrics
metricData pdata.Metrics
metricCount int
nextConsumer consumer.Metrics
metricData pdata.Metrics
dataPointCount int
}

func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
Expand All @@ -274,19 +274,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
var req pdata.Metrics
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.metricCount -= sendBatchMaxSize
bm.dataPointCount -= sendBatchMaxSize
} else {
req = bm.metricData
bm.metricData = pdata.NewMetrics()
bm.metricCount = 0
bm.dataPointCount = 0
}
return bm.nextConsumer.ConsumeMetrics(ctx, req)
}

func (bm *batchMetrics) itemCount() int {
return bm.metricCount
return bm.dataPointCount
}

func (bm *batchMetrics) size() int {
Expand All @@ -296,11 +296,11 @@ func (bm *batchMetrics) size() int {
func (bm *batchMetrics) add(item interface{}) {
md := item.(pdata.Metrics)

newMetricsCount := md.MetricCount()
if newMetricsCount == 0 {
_, newDataPointCount := md.MetricAndDataPointCount()
if newDataPointCount == 0 {
return
}
bm.metricCount += newMetricsCount
bm.dataPointCount += newDataPointCount
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}

Expand Down
76 changes: 73 additions & 3 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package batchprocessor
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -320,6 +322,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {

requestCount := 100
metricsPerRequest := 5
dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
sink := new(consumertest.MetricsSink)

creationSet := component.ProcessorCreateSettings{Logger: zap.NewNop()}
Expand All @@ -339,8 +343,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * metricsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / metricsPerRequest
expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest

require.Equal(t, requestCount*metricsPerRequest, sink.MetricsCount())
receivedMds := sink.AllMetrics()
Expand All @@ -357,7 +361,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.MetricsCount(), int(distData.Sum()))
assert.Equal(t, sink.MetricsCount()*dataPointsPerMetric, int(distData.Sum()))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))

Expand All @@ -369,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, size, int(distData.Sum()))
}

func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
ctx := context.Background()
sink := new(metricsSink)
metricsCount := 50
dataPointsPerMetric := 2
sendBatchMaxSize := 99

batchMetrics := newBatchMetrics(sink)
md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount)

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
remainingDataPointsCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
require.Equal(t, remainingDataPointsCount, batchMetrics.dataPointCount)
}

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
Expand Down Expand Up @@ -502,6 +523,55 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
}
}

func BenchmarkBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
}
ctx := context.Background()
sink := new(metricsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
metricsPerRequest := 1000

batcher, err := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(b, err)
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))

mds := make([]pdata.Metrics, 0, b.N)
for n := 0; n < b.N; n++ {
mds = append(mds,
testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest),
)
}
b.StartTimer()
for n := 0; n < b.N; n++ {
batcher.ConsumeMetrics(ctx, mds[n])
}
b.StopTimer()
require.NoError(b, batcher.Shutdown(ctx))
require.Equal(b, b.N*metricsPerRequest, sink.metricsCount)
}

type metricsSink struct {
mu sync.Mutex
metricsCount int
}

func (sme *metricsSink) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: false,
}
}

func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
sme.mu.Lock()
defer sme.mu.Unlock()
sme.metricsCount += md.MetricCount()
return nil
}

func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
Expand Down
105 changes: 94 additions & 11 deletions processor/batchprocessor/splitmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (

// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
if src.MetricCount() <= size {
_, dataPoints := src.MetricAndDataPointCount()
if dataPoints <= size {
return src
}
totalCopiedMetrics := 0
totalCopiedDataPoints := 0
dest := pdata.NewMetrics()

src.ResourceMetrics().RemoveIf(func(srcRs pdata.ResourceMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}

Expand All @@ -37,29 +38,31 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {

srcRs.InstrumentationLibraryMetrics().RemoveIf(func(srcIlm pdata.InstrumentationLibraryMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}

destIlm := destRs.InstrumentationLibraryMetrics().AppendEmpty()
srcIlm.InstrumentationLibrary().CopyTo(destIlm.InstrumentationLibrary())

// If possible to move all metrics do that.
srcMetricsLen := srcIlm.Metrics().Len()
if size-totalCopiedMetrics >= srcMetricsLen {
totalCopiedMetrics += srcMetricsLen
srcDataPointCount := metricSliceDataPointCount(srcIlm.Metrics())
if size-totalCopiedDataPoints >= srcDataPointCount {
totalCopiedDataPoints += srcDataPointCount
srcIlm.Metrics().MoveAndAppendTo(destIlm.Metrics())
return true
}

srcIlm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}
srcMetric.CopyTo(destIlm.Metrics().AppendEmpty())
totalCopiedMetrics++
return true
// If the metric has more data points than free slots we should split it.
newMetric, remove := splitMetric(srcMetric, size-totalCopiedDataPoints)
newMetric.CopyTo(destIlm.Metrics().AppendEmpty())
totalCopiedDataPoints += metricDataPointCount(newMetric)
return remove
})
return false
})
Expand All @@ -68,3 +71,83 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {

return dest
}

// metricSliceDataPointCount calculates the total number of data points.
func metricSliceDataPointCount(ms pdata.MetricSlice) (dataPointCount int) {
for k := 0; k < ms.Len(); k++ {
dataPointCount += metricDataPointCount(ms.At(k))
}
return
}

// metricDataPointCount calculates the total number of data points.
func metricDataPointCount(ms pdata.Metric) (dataPointCount int) {
switch ms.DataType() {
case pdata.MetricDataTypeIntGauge:
dataPointCount = ms.IntGauge().DataPoints().Len()
case pdata.MetricDataTypeDoubleGauge:
dataPointCount = ms.DoubleGauge().DataPoints().Len()
case pdata.MetricDataTypeIntSum:
dataPointCount = ms.IntSum().DataPoints().Len()
case pdata.MetricDataTypeDoubleSum:
dataPointCount = ms.DoubleSum().DataPoints().Len()
case pdata.MetricDataTypeIntHistogram:
dataPointCount = ms.IntHistogram().DataPoints().Len()
case pdata.MetricDataTypeHistogram:
dataPointCount = ms.Histogram().DataPoints().Len()
case pdata.MetricDataTypeSummary:
dataPointCount = ms.Summary().DataPoints().Len()
}
return
}

// splitMetric removes metric points from the input data and returns new data of the specified size
// and boolean describing, whether the metric should be removed from original slice.
func splitMetric(ms pdata.Metric, size int) (pdata.Metric, bool) {
if metricDataPointCount(ms) <= size {
return ms, true
}

result := pdata.NewMetric()
ms.CopyTo(result)
msSize, i := metricDataPointCount(ms)-size, 0
filterDataPoints := func() bool { i++; return i <= msSize }
switch ms.DataType() {
case pdata.MetricDataTypeIntGauge:
result.IntGauge().DataPoints().Resize(size)
ms.IntGauge().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeDoubleGauge:
result.DoubleGauge().DataPoints().Resize(size)
ms.DoubleGauge().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeIntSum:
result.IntSum().DataPoints().Resize(size)
ms.IntSum().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeDoubleSum:
result.DoubleSum().DataPoints().Resize(size)
ms.DoubleSum().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeIntHistogram:
result.IntHistogram().DataPoints().Resize(size)
ms.IntHistogram().DataPoints().RemoveIf(func(_ pdata.IntHistogramDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeHistogram:
result.Histogram().DataPoints().Resize(size)
ms.Histogram().DataPoints().RemoveIf(func(_ pdata.HistogramDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeSummary:
result.Summary().DataPoints().Resize(size)
ms.Summary().DataPoints().RemoveIf(func(_ pdata.SummaryDataPoint) bool {
return filterDataPoints()
})
}
return result, false
}
Loading