Skip to content

Commit 73ca69e

Browse files
committed
[exporterhelper] Context unmarshaling along with the request
1 parent c9aaed8 commit 73ca69e

File tree

22 files changed

+343
-49
lines changed

22 files changed

+343
-49
lines changed

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,15 @@ func newFakeQueueBatch() QueueBatchSettings[request.Request] {
162162

163163
type fakeEncoding struct{}
164164

165-
func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
166-
return []byte("mockRequest"), nil
165+
func (f fakeEncoding) MarshalTo(_ request.Request, b []byte) (int, error) {
166+
for i, ch := range "mockRequest" {
167+
b[i] = byte(ch)
168+
}
169+
return len("mockRequest"), nil
170+
}
171+
172+
func (f fakeEncoding) MarshalSize(request.Request) int {
173+
return len("mockRequest")
167174
}
168175

169176
func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
5+
6+
import (
7+
"context"
8+
"encoding/binary"
9+
"slices"
10+
11+
"go.opentelemetry.io/otel/propagation"
12+
13+
"go.opentelemetry.io/collector/featuregate"
14+
)
15+
16+
// persistRequestContextFeatureGate controls whether request context should be persisted in the queue.
17+
var persistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
18+
"exporter.PersistRequestContext",
19+
featuregate.StageAlpha,
20+
featuregate.WithRegisterFromVersion("v0.128.0"),
21+
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
22+
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/pull/12934"),
23+
)
24+
25+
type Encoding[T any] interface {
26+
// MarshalTo marshals a request into a preallocated byte slice.
27+
// The size of the byte slice must be at least MarshalSize(T) bytes.
28+
MarshalTo(T, []byte) (int, error)
29+
30+
// MarshalSize returns the size of the marshaled request.
31+
MarshalSize(T) int
32+
33+
// Unmarshal unmarshals bytes into a request.
34+
Unmarshal([]byte) (T, error)
35+
}
36+
37+
// Encoder provides an interface for marshalling and unmarshaling requests along with their context.
38+
type Encoder[T any] struct {
39+
encoding Encoding[T]
40+
}
41+
42+
func NewEncoder[T any](encoding Encoding[T]) Encoder[T] {
43+
return Encoder[T]{
44+
encoding: encoding,
45+
}
46+
}
47+
48+
const (
49+
// requestDataKey is the key used to store request data in bytesMap.
50+
requestDataKey = "request_data"
51+
52+
// traceContextBufCap is a capacity of the bytesMap buffer that is enough for trace context key-value pairs.
53+
traceContextBufCap = 128
54+
)
55+
56+
var tracePropagator = propagation.TraceContext{}
57+
58+
func (re Encoder[T]) Marshal(ctx context.Context, req T) ([]byte, error) {
59+
reqSize := re.encoding.MarshalSize(req)
60+
61+
if !persistRequestContextFeatureGate.IsEnabled() {
62+
b := make([]byte, reqSize)
63+
_, err := re.encoding.MarshalTo(req, b)
64+
if err != nil {
65+
return nil, err
66+
}
67+
return b, nil
68+
}
69+
70+
bm := newBytesMap(traceContextBufCap + reqSize)
71+
tracePropagator.Inject(ctx, &bytesMapCarrier{bytesMap: *bm})
72+
reqBuf := bm.setEmptyBytes(requestDataKey, reqSize)
73+
_, err := re.encoding.MarshalTo(req, reqBuf)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
return reqBuf, nil
79+
}
80+
81+
func (re Encoder[T]) Unmarshal(b []byte) (T, context.Context, error) {
82+
if !persistRequestContextFeatureGate.IsEnabled() {
83+
req, err := re.encoding.Unmarshal(b)
84+
return req, context.Background(), err
85+
}
86+
87+
bm := bytesMapFromBytes(b)
88+
if bm == nil {
89+
// fall back to unmarshalling of the request alone
90+
// this can happen if the data stored by old version
91+
req, err := re.encoding.Unmarshal(b)
92+
return req, context.Background(), err
93+
}
94+
ctx := context.Background()
95+
tracePropagator.Extract(ctx, &bytesMapCarrier{bytesMap: *bm})
96+
reqBuf := bm.get(requestDataKey)
97+
req, err := re.encoding.Unmarshal(reqBuf)
98+
return req, ctx, err
99+
}
100+
101+
// bytesMap is a slice of bytes that represents a map-like structure for storing key-value pairs.
102+
// It's optimized for efficient memory usage for low number of key-value pairs with big values.
103+
// The format is a sequence of key-value pairs encoded as:
104+
// - 1 byte length of the key
105+
// - key bytes
106+
// - 4 byte length of the value
107+
// - value bytes
108+
type bytesMap []byte
109+
110+
// prefix bytes to denote the bytesMap serialization.
111+
const (
112+
magicByte = byte(0x00)
113+
formatV1Byte = byte(0x01)
114+
prefixBytesLen = 2
115+
)
116+
117+
func newBytesMap(initSize int) *bytesMap {
118+
bm := bytesMap(make([]byte, 0, prefixBytesLen+initSize))
119+
bm = append(bm, magicByte, formatV1Byte)
120+
return &bm
121+
}
122+
123+
// setEmptyBytes sets the specified key in the map, reserves the given number of bytes for the value,
124+
// and returns a byte slice for the value. Must be called only once for each key.
125+
func (bm *bytesMap) setEmptyBytes(key string, size int) []byte {
126+
*bm = append(*bm, byte(len(key)))
127+
*bm = append(*bm, key...)
128+
129+
var lenBuf [4]byte
130+
binary.LittleEndian.PutUint32(lenBuf[:], uint32(size))
131+
*bm = append(*bm, lenBuf[:]...)
132+
133+
start := len(*bm)
134+
*bm = slices.Grow(*bm, size)
135+
*bm = []byte(*bm)[:start+size]
136+
137+
return []byte(*bm)[start:]
138+
}
139+
140+
// get scans sequentially for the first matching key and returns the value as bytes.
141+
func (bm *bytesMap) get(k string) []byte {
142+
for i := prefixBytesLen; i < len(*bm); {
143+
kl := int([]byte(*bm)[i])
144+
i++
145+
146+
key := string([]byte(*bm)[i : i+kl])
147+
i += kl
148+
149+
if i+4 > len(*bm) {
150+
return nil // malformed entry
151+
}
152+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
153+
i += 4
154+
155+
val := []byte(*bm)[i : i+int(vLen)]
156+
i += int(vLen)
157+
158+
if key == k {
159+
return val
160+
}
161+
}
162+
return nil
163+
}
164+
165+
// keys returns header names in encounter order.
166+
func (bm *bytesMap) keys() []string {
167+
var out []string
168+
for i := prefixBytesLen; i < len(*bm); {
169+
kl := int([]byte(*bm)[i])
170+
i++
171+
out = append(out, string([]byte(*bm)[i:i+kl]))
172+
i += kl
173+
174+
if i+4 > len(*bm) {
175+
break
176+
}
177+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
178+
i += 4 + int(vLen)
179+
}
180+
return out
181+
}
182+
183+
func (bm *bytesMap) bytes() []byte { return *bm }
184+
185+
func bytesMapFromBytes(b []byte) *bytesMap {
186+
if len(b) < prefixBytesLen || b[0] != magicByte || b[1] != formatV1Byte {
187+
return nil
188+
}
189+
return (*bytesMap)(&b)
190+
}
191+
192+
// bytesMapCarrier implements propagation.TextMapCarrier on top of bytesMap.
193+
type bytesMapCarrier struct {
194+
bytesMap
195+
}
196+
197+
var _ propagation.TextMapCarrier = (*bytesMapCarrier)(nil)
198+
199+
// Set appends a new string entry; if the key already exists it is left unchanged.
200+
func (c *bytesMapCarrier) Set(k, v string) {
201+
buf := c.setEmptyBytes(k, len(v))
202+
copy(buf, v)
203+
}
204+
205+
// Get scans sequentially for the first matching key.
206+
func (c *bytesMapCarrier) Get(k string) string {
207+
return string(c.get(k)) // returns string value
208+
}
209+
210+
// Keys returns header names in encounter order.
211+
func (c *bytesMapCarrier) Keys() []string {
212+
return c.keys()
213+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ syntax = "proto3";
22

33
package opentelemetry.collector.exporter.exporterhelper.internal.queuebatch.internal.persistentqueue;
44

5-
option go_package = "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue";
5+
option go_package = "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue";
66

77
// Sizer type configuration
88
enum SizerType {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
55

66
import (
77
"fmt"

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ import (
1313

1414
"go.uber.org/zap"
1515

16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1920
)
2021

2122
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
2223
type QueueBatchSettings[T any] struct {
23-
Encoding queuebatch.Encoding[T]
24+
Encoding persistentqueue.Encoding[T]
2425
Sizers map[request.SizerType]request.Sizer[T]
2526
Partitioner queuebatch.Partitioner[T]
2627
}

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
18-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
1919
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2020
"go.opentelemetry.io/collector/extension/xextension/storage"
2121
"go.opentelemetry.io/collector/pipeline"
@@ -57,7 +57,7 @@ type persistentQueueSettings[T any] struct {
5757
blockOnOverflow bool
5858
signal pipeline.Signal
5959
storageID component.ID
60-
encoding Encoding[T]
60+
encoder persistentqueue.Encoder[T]
6161
id component.ID
6262
telemetry component.TelemetrySettings
6363
}
@@ -254,7 +254,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
254254
}
255255
}
256256

257-
reqBuf, err := pq.set.encoding.Marshal(req)
257+
reqBuf, err := pq.set.encoder.Marshal(ctx, req)
258258
if err != nil {
259259
return err
260260
}
@@ -295,7 +295,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
295295

296296
// Read until either a successful retrieved element or no more elements in the storage.
297297
for pq.metadata.ReadIndex != pq.metadata.WriteIndex {
298-
index, req, consumed := pq.getNextItem(ctx)
298+
index, req, reqCtx, consumed := pq.getNextItem(ctx)
299299
// Ensure the used size and the channel size are in sync.
300300
if pq.metadata.ReadIndex == pq.metadata.WriteIndex {
301301
pq.metadata.QueueSize = 0
@@ -304,7 +304,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
304304
if consumed {
305305
id := indexDonePool.Get().(*indexDone)
306306
id.reset(index, pq.set.sizer.Sizeof(req), pq)
307-
return context.Background(), req, id, true
307+
return reqCtx, req, id, true
308308
}
309309
}
310310

@@ -317,7 +317,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
317317
// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
318318
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
319319
// returns false.
320-
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
320+
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, context.Context, bool) {
321321
index := pq.metadata.ReadIndex
322322
// Increase here, so even if errors happen below, it always iterates
323323
pq.metadata.ReadIndex++
@@ -329,8 +329,9 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
329329
getOp)
330330

331331
var request T
332+
restoredCtx := context.Background()
332333
if err == nil {
333-
request, err = pq.set.encoding.Unmarshal(getOp.Value)
334+
request, restoredCtx, err = pq.set.encoder.Unmarshal(getOp.Value)
334335
}
335336

336337
if err != nil {
@@ -340,14 +341,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
340341
pq.logger.Error("Error deleting item from queue", zap.Error(err))
341342
}
342343

343-
return 0, request, false
344+
return 0, request, restoredCtx, false
344345
}
345346

346347
// Increase the reference count, so the client is not closed while the request is being processed.
347348
// The client cannot be closed because we hold the lock since last we checked `stopped`.
348349
pq.refClient++
349350

350-
return index, request, true
351+
return index, request, restoredCtx, true
351352
}
352353

353354
// onDone should be called to remove the item of the given index from the queue once processing is finished.
@@ -439,13 +440,13 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
439440
pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
440441
continue
441442
}
442-
req, err := pq.set.encoding.Unmarshal(op.Value)
443+
req, reqCtx, err := pq.set.encoder.Unmarshal(op.Value)
443444
// If error happened or item is nil, it will be efficiently ignored
444445
if err != nil {
445446
pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
446447
continue
447448
}
448-
if pq.putInternal(ctx, req) != nil {
449+
if pq.putInternal(reqCtx, req) != nil {
449450
errCount++
450451
}
451452
}

0 commit comments

Comments
 (0)