Skip to content

Commit 97d9d86

Browse files
authored
Add batch size metric (#1241)
* Add batch size metric to otel collector Signed-off-by: Pavol Loffay <[email protected]> * Rename variable Signed-off-by: Pavol Loffay <[email protected]>
1 parent 0b23b6e commit 97d9d86

File tree

3 files changed

+41
-1
lines changed

3 files changed

+41
-1
lines changed

processor/batchprocessor/batch_processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ func (bp *batchTraceProcessor) resetTimer() {
171171
func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) {
172172
// Add that it came form the trace pipeline?
173173
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
174-
stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
174+
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
175+
_ = stats.RecordWithTags(context.Background(), statsTags, statBatchSendSize.M(int64(bp.batchTraces.getSpanCount())))
175176

176177
if err := bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()); err != nil {
177178
bp.logger.Warn("Sender failed", zap.Error(err))
@@ -232,6 +233,7 @@ func (bp *batchMetricProcessor) sendItems(measure *stats.Int64Measure) {
232233
// Add that it came from the metrics pipeline
233234
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
234235
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
236+
_ = stats.RecordWithTags(context.Background(), statsTags, statBatchSendSize.M(int64(bp.batchMetrics.metricData.MetricCount())))
235237

236238
_ = bp.metricsConsumer.ConsumeMetrics(context.Background(), bp.batchMetrics.getData())
237239
bp.batchMetrics.reset()

processor/batchprocessor/batch_processor_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/bmizerany/assert"
2425
"github.com/stretchr/testify/require"
26+
"go.opencensus.io/stats/view"
2527
"go.uber.org/zap"
2628

2729
"go.opentelemetry.io/collector/component"
@@ -73,6 +75,10 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
7375
}
7476

7577
func TestBatchProcessorSentBySize(t *testing.T) {
78+
views := MetricViews()
79+
view.Register(views...)
80+
defer view.Unregister(views...)
81+
7682
sender := newTestSender()
7783
cfg := generateDefaultConfig()
7884
sendBatchSize := 20
@@ -114,6 +120,15 @@ func TestBatchProcessorSentBySize(t *testing.T) {
114120
}
115121
}
116122

123+
data, err := view.RetrieveData(statBatchSendSize.Name())
124+
require.NoError(t, err)
125+
assert.Equal(t, 1, len(data))
126+
distData := data[0].Data.(*view.DistributionData)
127+
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
128+
assert.Equal(t, sender.spansReceived, int(distData.Sum()))
129+
assert.Equal(t, sendBatchSize, int(distData.Min))
130+
assert.Equal(t, sendBatchSize, int(distData.Max))
131+
117132
sender.mtx.RUnlock()
118133
}
119134

@@ -384,6 +399,10 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
384399
}
385400

386401
func TestBatchMetricProcessor_BatchSize(t *testing.T) {
402+
views := MetricViews()
403+
view.Register(views...)
404+
defer view.Unregister(views...)
405+
387406
// Instantiate the batch processor with low config values to test data
388407
// gets sent through the processor.
389408
cfg := Config{
@@ -427,6 +446,15 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
427446
require.Equal(t, metricsPerRequest, md.ResourceMetrics().At(i).InstrumentationLibraryMetrics().At(0).Metrics().Len())
428447
}
429448
}
449+
450+
data, err := view.RetrieveData(statBatchSendSize.Name())
451+
require.NoError(t, err)
452+
assert.Equal(t, 1, len(data))
453+
distData := data[0].Data.(*view.DistributionData)
454+
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
455+
assert.Equal(t, tms.metricsReceived, int(distData.Sum()))
456+
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
457+
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))
430458
tms.mtx.RUnlock()
431459
}
432460

processor/batchprocessor/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
var (
2727
statBatchSizeTriggerSend = stats.Int64("batch_size_trigger_send", "Number of times the batch was sent due to a size trigger", stats.UnitDimensionless)
2828
statTimeoutTriggerSend = stats.Int64("timeout_trigger_send", "Number of times the batch was sent due to a timeout trigger", stats.UnitDimensionless)
29+
statBatchSendSize = stats.Int64("batch_send_size", "Number of units in the batch", stats.UnitDimensionless)
2930
)
3031

3132
// MetricViews returns the metrics views related to batching
@@ -48,9 +49,18 @@ func MetricViews() []*view.View {
4849
Aggregation: view.Sum(),
4950
}
5051

52+
distributionBatchSendSizeView := &view.View{
53+
Name: statBatchSendSize.Name(),
54+
Measure: statBatchSendSize,
55+
Description: statBatchSendSize.Description(),
56+
TagKeys: processorTagKeys,
57+
Aggregation: view.Distribution(10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000),
58+
}
59+
5160
legacyViews := []*view.View{
5261
countBatchSizeTriggerSendView,
5362
countTimeoutTriggerSendView,
63+
distributionBatchSendSizeView,
5464
}
5565

5666
return obsreport.ProcessorMetricViews(typeStr, legacyViews)

0 commit comments

Comments
 (0)