Skip to content

Commit 01f44f9

Browse files
committed
prometheusremotewriteexporter: Remove unnecessary buffer copy in proto conversion
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent ed57e60 commit 01f44f9

File tree

3 files changed

+59
-38
lines changed

3 files changed

+59
-38
lines changed

exporter/prometheusremotewriteexporter/exporter.go

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ import (
1717
"time"
1818

1919
"github.com/cenkalti/backoff/v5"
20-
"github.com/gogo/protobuf/proto"
21-
"github.com/golang/snappy"
20+
"github.com/klauspost/compress/snappy"
2221
"github.com/prometheus/otlptranslator"
2322
"github.com/prometheus/prometheus/config"
2423
"github.com/prometheus/prometheus/prompb"
@@ -80,16 +79,46 @@ func (p *prwTelemetryOtel) recordWrittenExemplars(ctx context.Context, numExempl
8079
p.telemetryBuilder.ExporterPrometheusremotewriteWrittenExemplars.Add(ctx, numExemplars, metric.WithAttributes(p.otelAttrs...))
8180
}
8281

82+
type gogoProto interface {
83+
Size() int
84+
MarshalToSizedBuffer([]byte) (int, error)
85+
}
86+
8387
type buffer struct {
84-
protobuf *proto.Buffer
88+
protobuf []byte
8589
snappy []byte
8690
}
8791

92+
func (b *buffer) MarshalAndEncode(req gogoProto) ([]byte, error) {
93+
sizePb := req.Size()
94+
if sizePb > cap(b.protobuf) {
95+
b.protobuf = make([]byte, sizePb)
96+
}
97+
b.protobuf = b.protobuf[:sizePb]
98+
n, err := req.MarshalToSizedBuffer(b.protobuf)
99+
if err != nil {
100+
return nil, err
101+
}
102+
b.protobuf = b.protobuf[:n]
103+
104+
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
105+
// Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards.
106+
maxCompressedLen := snappy.MaxEncodedLen(len(b.protobuf))
107+
if maxCompressedLen > len(b.snappy) {
108+
if cap(b.snappy) < maxCompressedLen {
109+
b.snappy = make([]byte, maxCompressedLen)
110+
} else {
111+
b.snappy = b.snappy[:maxCompressedLen]
112+
}
113+
}
114+
return snappy.Encode(b.snappy, b.protobuf), nil
115+
}
116+
88117
// A reusable buffer pool for serializing protobufs and compressing them with Snappy.
89118
var bufferPool = sync.Pool{
90119
New: func() any {
91120
return &buffer{
92-
protobuf: proto.NewBuffer(nil),
121+
protobuf: nil,
93122
snappy: nil,
94123
}
95124
},
@@ -340,6 +369,8 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
340369
for i := 0; i < concurrencyLimit; i++ {
341370
go func() {
342371
defer wg.Done()
372+
buf := bufferPool.Get().(*buffer)
373+
defer bufferPool.Put(buf)
343374
for {
344375
select {
345376
case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
@@ -350,18 +381,15 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
350381
return
351382
}
352383

353-
buf := bufferPool.Get().(*buffer)
354-
buf.protobuf.Reset()
355-
defer bufferPool.Put(buf)
356-
357-
if errMarshal := buf.protobuf.Marshal(request); errMarshal != nil {
384+
reqBuf, errMarshal := buf.MarshalAndEncode(request)
385+
if errMarshal != nil {
358386
mu.Lock()
359387
errs = multierr.Append(errs, consumererror.NewPermanent(errMarshal))
360388
mu.Unlock()
361389
return
362390
}
363391

364-
if errExecute := prwe.execute(ctx, buf); errExecute != nil {
392+
if errExecute := prwe.execute(ctx, reqBuf); errExecute != nil {
365393
mu.Lock()
366394
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
367395
mu.Unlock()
@@ -375,19 +403,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
375403
return errs
376404
}
377405

378-
func (prwe *prwExporter) execute(ctx context.Context, buf *buffer) error {
379-
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
380-
// Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards.
381-
maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes()))
382-
if maxCompressedLen > len(buf.snappy) {
383-
if cap(buf.snappy) < maxCompressedLen {
384-
buf.snappy = make([]byte, maxCompressedLen)
385-
} else {
386-
buf.snappy = buf.snappy[:maxCompressedLen]
387-
}
388-
}
389-
compressedData := snappy.Encode(buf.snappy, buf.protobuf.Bytes())
390-
406+
func (prwe *prwExporter) execute(ctx context.Context, buf []byte) error {
391407
retryCount := 0
392408
// executeFunc can be used for backoff and non backoff scenarios.
393409
executeFunc := func() (int, error) {
@@ -402,7 +418,7 @@ func (prwe *prwExporter) execute(ctx context.Context, buf *buffer) error {
402418
}
403419

404420
// Create the HTTP POST request to send to the endpoint
405-
req, err := http.NewRequestWithContext(ctx, http.MethodPost, prwe.endpointURL.String(), bytes.NewReader(compressedData))
421+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, prwe.endpointURL.String(), bytes.NewReader(buf))
406422
if err != nil {
407423
return http.StatusBadRequest, backoff.Permanent(consumererror.NewPermanent(err))
408424
}

exporter/prometheusremotewriteexporter/exporter_test.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,16 +1195,15 @@ func TestRetries(t *testing.T) {
11951195
telemetry: telemetry,
11961196
}
11971197
buf := bufferPool.Get().(*buffer)
1198-
buf.protobuf.Reset()
11991198
defer bufferPool.Put(buf)
12001199

1201-
errMarshal := buf.protobuf.Marshal(&prompb.WriteRequest{})
1200+
reqBuf, errMarshal := buf.MarshalAndEncode(&prompb.WriteRequest{})
12021201
if errMarshal != nil {
12031202
require.NoError(t, errMarshal)
12041203
return
12051204
}
12061205

1207-
err = exporter.execute(tt.ctx, buf)
1206+
err = exporter.execute(tt.ctx, reqBuf)
12081207
tt.assertError(t, err)
12091208
tt.assertErrorType(t, err)
12101209
assert.Equal(t, tt.expectedAttempts, totalAttempts)
@@ -1228,10 +1227,17 @@ func benchmarkExecute(b *testing.B, numSample int) {
12281227
endpointURL, err := url.Parse(mockServer.URL)
12291228
require.NoError(b, err)
12301229

1230+
// Create the telemetry
1231+
testTel := componenttest.NewTelemetry()
1232+
telemetry, err := newPRWTelemetry(exporter.Settings{TelemetrySettings: testTel.NewTelemetrySettings()}, endpointURL)
1233+
require.NoError(b, err)
1234+
12311235
// Create the prwExporter
12321236
exporter := &prwExporter{
12331237
endpointURL: endpointURL,
12341238
client: http.DefaultClient,
1239+
settings: testTel.NewTelemetrySettings(),
1240+
telemetry: telemetry,
12351241
}
12361242

12371243
generateSamples := func(n int) []prompb.Sample {
@@ -1297,18 +1303,18 @@ func benchmarkExecute(b *testing.B, numSample int) {
12971303
ctx := b.Context()
12981304
b.ReportAllocs()
12991305
b.ResetTimer()
1306+
13001307
for _, req := range reqs {
13011308
buf := bufferPool.Get().(*buffer)
1302-
buf.protobuf.Reset()
1303-
defer bufferPool.Put(buf)
1304-
1305-
errMarshal := buf.protobuf.Marshal(req)
1309+
reqBuf, errMarshal := buf.MarshalAndEncode(req)
13061310
if errMarshal != nil {
13071311
require.NoError(b, errMarshal)
13081312
return
13091313
}
1310-
err := exporter.execute(ctx, buf)
1311-
require.NoError(b, err)
1314+
if err = exporter.execute(ctx, reqBuf); err != nil {
1315+
b.Fatal(err)
1316+
}
1317+
bufferPool.Put(buf)
13121318
}
13131319
}
13141320

exporter/prometheusremotewriteexporter/exporter_v2.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Reque
5151
for i := 0; i < concurrencyLimit; i++ {
5252
go func() {
5353
defer wg.Done()
54+
buf := bufferPool.Get().(*buffer)
55+
defer bufferPool.Put(buf)
5456
for {
5557
select {
5658
case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
@@ -61,10 +63,7 @@ func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Reque
6163
return
6264
}
6365

64-
buf := bufferPool.Get().(*buffer)
65-
buf.protobuf.Reset()
66-
67-
errMarshal := buf.protobuf.Marshal(request)
66+
reqBuf, errMarshal := buf.MarshalAndEncode(request)
6867
if errMarshal != nil {
6968
mu.Lock()
7069
errs = multierr.Append(errs, errMarshal)
@@ -73,7 +72,7 @@ func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Reque
7372
return
7473
}
7574

76-
if errExecute := prwe.execute(ctx, buf); errExecute != nil {
75+
if errExecute := prwe.execute(ctx, reqBuf); errExecute != nil {
7776
mu.Lock()
7877
errs = multierr.Append(errs, errExecute)
7978
mu.Unlock()

0 commit comments

Comments
 (0)