Skip to content

Commit ce816ce

Browse files
committed
[exporterhelper] Context unmarshaling along with the request
1 parent c9aaed8 commit ce816ce

File tree

23 files changed

+419
-49
lines changed

23 files changed

+419
-49
lines changed

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,15 @@ func newFakeQueueBatch() QueueBatchSettings[request.Request] {
162162

163163
type fakeEncoding struct{}
164164

165-
func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
166-
return []byte("mockRequest"), nil
165+
func (f fakeEncoding) MarshalTo(_ request.Request, b []byte) (int, error) {
166+
for i, ch := range "mockRequest" {
167+
b[i] = byte(ch)
168+
}
169+
return len("mockRequest"), nil
170+
}
171+
172+
func (f fakeEncoding) MarshalSize(request.Request) int {
173+
return len("mockRequest")
167174
}
168175

169176
func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
5+
6+
import (
7+
"context"
8+
"encoding/binary"
9+
"errors"
10+
"fmt"
11+
"io"
12+
"math"
13+
"slices"
14+
15+
"go.opentelemetry.io/otel/propagation"
16+
17+
"go.opentelemetry.io/collector/featuregate"
18+
)
19+
20+
// persistRequestContextFeatureGate controls whether request context should be preserved in the persistent queue.
21+
var persistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
22+
"exporter.PersistRequestContext",
23+
featuregate.StageAlpha,
24+
featuregate.WithRegisterFromVersion("v0.128.0"),
25+
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
26+
)
27+
28+
type Encoding[T any] interface {
29+
// MarshalTo marshals a request into a preallocated byte slice.
30+
// The size of the byte slice must be at least MarshalSize(T) bytes.
31+
MarshalTo(T, []byte) (int, error)
32+
33+
// MarshalSize returns the size of the marshaled request.
34+
MarshalSize(T) int
35+
36+
// Unmarshal unmarshals bytes into a request.
37+
Unmarshal([]byte) (T, error)
38+
}
39+
40+
// Encoder provides an interface for marshaling and unmarshaling requests along with their context.
41+
type Encoder[T any] struct {
42+
encoding Encoding[T]
43+
}
44+
45+
func NewEncoder[T any](encoding Encoding[T]) Encoder[T] {
46+
return Encoder[T]{
47+
encoding: encoding,
48+
}
49+
}
50+
51+
const (
52+
// requestDataKey is the key used to store request data in bytesMap.
53+
requestDataKey = "request_data"
54+
55+
// traceContextBufCap is a capacity of the bytesMap buffer that is enough for trace context key-value pairs.
56+
traceContextBufCap = 128
57+
)
58+
59+
var tracePropagator = propagation.TraceContext{}
60+
61+
func (re Encoder[T]) Marshal(ctx context.Context, req T) ([]byte, error) {
62+
reqSize := re.encoding.MarshalSize(req)
63+
64+
if !persistRequestContextFeatureGate.IsEnabled() {
65+
b := make([]byte, reqSize)
66+
_, err := re.encoding.MarshalTo(req, b)
67+
if err != nil {
68+
return nil, fmt.Errorf("failed to marshal request: %w", err)
69+
}
70+
return b, nil
71+
}
72+
73+
bm := newBytesMap(traceContextBufCap + reqSize)
74+
tracePropagator.Inject(ctx, &bytesMapCarrier{bytesMap: *bm})
75+
reqBuf, err := bm.setEmptyBytes(requestDataKey, reqSize)
76+
if err != nil {
77+
return nil, fmt.Errorf("failed to marshal request: %w", err)
78+
}
79+
_, err = re.encoding.MarshalTo(req, reqBuf)
80+
if err != nil {
81+
return nil, fmt.Errorf("failed to marshal request: %w", err)
82+
}
83+
84+
return reqBuf, nil
85+
}
86+
87+
func (re Encoder[T]) Unmarshal(b []byte) (T, context.Context, error) {
88+
if !persistRequestContextFeatureGate.IsEnabled() {
89+
req, err := re.encoding.Unmarshal(b)
90+
return req, context.Background(), err
91+
}
92+
93+
bm := bytesMapFromBytes(b)
94+
if bm == nil {
95+
// Fall back to unmarshalling of the request alone.
96+
// This can happen if the data persisted by the version that doesn't support the context unmarshaling.
97+
req, err := re.encoding.Unmarshal(b)
98+
return req, context.Background(), err
99+
}
100+
ctx := context.Background()
101+
tracePropagator.Extract(ctx, &bytesMapCarrier{bytesMap: *bm})
102+
reqBuf, err := bm.get(requestDataKey)
103+
var req T
104+
if err != nil {
105+
return req, context.Background(), fmt.Errorf("failed to read serialized request data: %w", err)
106+
}
107+
req, err = re.encoding.Unmarshal(reqBuf)
108+
return req, ctx, err
109+
}
110+
111+
// bytesMap is a slice of bytes that represents a map-like structure for storing key-value pairs.
112+
// It's optimized for efficient memory usage for low number of key-value pairs with big values.
113+
// The format is a sequence of key-value pairs encoded as:
114+
// - 1 byte length of the key
115+
// - key bytes
116+
// - 4 byte length of the value
117+
// - value bytes
118+
type bytesMap []byte
119+
120+
// prefix bytes to denote the bytesMap serialization.
121+
const (
122+
magicByte = byte(0x00)
123+
formatV1Byte = byte(0x01)
124+
prefixBytesLen = 2
125+
)
126+
127+
func newBytesMap(initSize int) *bytesMap {
128+
bm := bytesMap(make([]byte, 0, prefixBytesLen+initSize))
129+
bm = append(bm, magicByte, formatV1Byte)
130+
return &bm
131+
}
132+
133+
// setEmptyBytes sets the specified key in the map, reserves the given number of bytes for the value,
134+
// and returns a byte slice for the value. Must be called only once for each key.
135+
func (bm *bytesMap) setEmptyBytes(key string, size int) ([]byte, error) {
136+
if len(key) > math.MaxUint8 {
137+
return nil, errors.New("key param is too long")
138+
}
139+
if size < 0 || uint64(size) > math.MaxUint32 {
140+
return nil, fmt.Errorf("invalid value size %d", size)
141+
}
142+
143+
*bm = append(*bm, byte(len(key)))
144+
*bm = append(*bm, key...)
145+
146+
var lenBuf [4]byte
147+
binary.LittleEndian.PutUint32(lenBuf[:], uint32(size)) //nolint:gosec // disable G115
148+
*bm = append(*bm, lenBuf[:]...)
149+
150+
start := len(*bm)
151+
*bm = slices.Grow(*bm, size)
152+
*bm = []byte(*bm)[:start+size]
153+
154+
return []byte(*bm)[start:], nil
155+
}
156+
157+
// get scans sequentially for the first matching key and returns the value as bytes.
158+
func (bm *bytesMap) get(k string) ([]byte, error) {
159+
for i := prefixBytesLen; i < len(*bm); {
160+
kl := int([]byte(*bm)[i])
161+
i++
162+
163+
if i+kl > len(*bm) {
164+
return nil, io.ErrUnexpectedEOF
165+
}
166+
key := string([]byte(*bm)[i : i+kl])
167+
i += kl
168+
169+
if i+4 > len(*bm) {
170+
return nil, io.ErrUnexpectedEOF
171+
}
172+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
173+
i += 4
174+
175+
if i+int(vLen) > len(*bm) {
176+
return nil, io.ErrUnexpectedEOF
177+
}
178+
val := []byte(*bm)[i : i+int(vLen)]
179+
i += int(vLen)
180+
181+
if key == k {
182+
return val, nil
183+
}
184+
}
185+
return nil, nil
186+
}
187+
188+
// keys returns header names in encounter order.
189+
func (bm *bytesMap) keys() []string {
190+
var out []string
191+
for i := prefixBytesLen; i < len(*bm); {
192+
kl := int([]byte(*bm)[i])
193+
i++
194+
195+
if i+kl > len(*bm) {
196+
break // malformed entry
197+
}
198+
out = append(out, string([]byte(*bm)[i:i+kl]))
199+
i += kl
200+
201+
if i+4 > len(*bm) {
202+
break // malformed entry
203+
}
204+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
205+
i += 4 + int(vLen)
206+
}
207+
return out
208+
}
209+
210+
func bytesMapFromBytes(b []byte) *bytesMap {
211+
if len(b) < prefixBytesLen || b[0] != magicByte || b[1] != formatV1Byte {
212+
return nil
213+
}
214+
return (*bytesMap)(&b)
215+
}
216+
217+
// bytesMapCarrier implements propagation.TextMapCarrier on top of bytesMap.
218+
type bytesMapCarrier struct {
219+
bytesMap
220+
}
221+
222+
var _ propagation.TextMapCarrier = (*bytesMapCarrier)(nil)
223+
224+
// Set appends a new string entry; if the key already exists it is left unchanged.
225+
func (c *bytesMapCarrier) Set(k, v string) {
226+
buf, _ := c.setEmptyBytes(k, len(v))
227+
copy(buf, v)
228+
}
229+
230+
// Get scans sequentially for the first matching key.
231+
func (c *bytesMapCarrier) Get(k string) string {
232+
v, _ := c.get(k)
233+
return string(v)
234+
}
235+
236+
// Keys returns header names in encounter order.
237+
func (c *bytesMapCarrier) Keys() []string {
238+
return c.keys()
239+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestBytesMap(t *testing.T) {
13+
data := []struct {
14+
key string
15+
val []byte
16+
}{
17+
{"key1", []byte("value1")},
18+
{"key2", []byte("value2")},
19+
{"key3", []byte("value3")},
20+
{"key4", []byte("value4")},
21+
}
22+
23+
bm := newBytesMap(0)
24+
for _, d := range data {
25+
buf, err := bm.setEmptyBytes(d.key, len(d.val))
26+
require.NoError(t, err)
27+
copy(buf, d.val)
28+
}
29+
30+
assert.Equal(t, []string{"key1", "key2", "key3", "key4"}, bm.keys())
31+
32+
buf, err := bm.get("key2")
33+
assert.NoError(t, err)
34+
assert.Equal(t, []byte("value2"), buf)
35+
buf, err = bm.get("key4")
36+
assert.NoError(t, err)
37+
assert.Equal(t, []byte("value4"), buf)
38+
}
39+
40+
func TestBytesMapCarrier(t *testing.T) {
41+
bm := newBytesMap(0)
42+
carrier := &bytesMapCarrier{bytesMap: *bm}
43+
44+
carrier.Set("key1", "val1")
45+
carrier.Set("key2", "val2")
46+
47+
assert.Equal(t, []string{"key1", "key2"}, carrier.Keys())
48+
assert.Equal(t, "val2", carrier.Get("key2"))
49+
assert.Equal(t, "val1", carrier.Get("key1"))
50+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ syntax = "proto3";
22

33
package opentelemetry.collector.exporter.exporterhelper.internal.queuebatch.internal.persistentqueue;
44

5-
option go_package = "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue";
5+
option go_package = "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue";
66

77
// Sizer type configuration
88
enum SizerType {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
55

66
import (
77
"fmt"

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ import (
1313

1414
"go.uber.org/zap"
1515

16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1920
)
2021

2122
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
2223
type QueueBatchSettings[T any] struct {
23-
Encoding queuebatch.Encoding[T]
24+
Encoding persistentqueue.Encoding[T]
2425
Sizers map[request.SizerType]request.Sizer[T]
2526
Partitioner queuebatch.Partitioner[T]
2627
}

0 commit comments

Comments
 (0)