Skip to content

Commit a492600

Browse files
committed
merge context
1 parent d513c73 commit a492600

File tree

7 files changed

+68
-86
lines changed

7 files changed

+68
-86
lines changed

exporter/exporterhelper/internal/queuebatch/batch_context.go

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,32 +31,9 @@ func parentsFromContext(ctx context.Context) []trace.Link {
3131
return LinksFromContext(ctx)
3232
}
3333

34-
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
35-
return mergedContext{
36-
context.WithValue(context.Background(),
37-
batchSpanLinksKey,
38-
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...)),
39-
ctx1,
40-
ctx2,
41-
}
42-
}
43-
44-
type mergedContext struct {
45-
context.Context
46-
ctx1 context.Context
47-
ctx2 context.Context
48-
}
49-
50-
func (c mergedContext) Value(key any) any {
51-
if c.ctx1 != nil {
52-
if val := c.ctx1.Value(key); val != nil {
53-
return val
54-
}
55-
}
56-
if c.ctx2 != nil {
57-
if val := c.ctx2.Value(key); val != nil {
58-
return val
59-
}
60-
}
61-
return nil
34+
func contextWithMergedLinks(mergedCtx context.Context, ctx1 context.Context, ctx2 context.Context) context.Context {
35+
return context.WithValue(
36+
mergedCtx,
37+
batchSpanLinksKey,
38+
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...))
6239
}

exporter/exporterhelper/internal/queuebatch/batch_context_test.go

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,31 @@ import (
1313
"go.opentelemetry.io/collector/component/componenttest"
1414
)
1515

16-
type testContextKey string
16+
type testTimestampKeyType int
17+
18+
const testTimestampKey testTimestampKeyType = iota
19+
20+
// mergeCtxFunc corresponds to user specified mergeCtx function in the batcher settings.
21+
// This specific merge Context function keeps the greater of timestamps from two contexts.
22+
func mergeCtxFunc(ctx1, ctx2 context.Context) context.Context {
23+
timestamp1 := ctx1.Value(testTimestampKey)
24+
timestamp2 := ctx2.Value(testTimestampKey)
25+
if timestamp1 != nil && timestamp2 != nil {
26+
if timestamp1.(int) > timestamp2.(int) {
27+
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
28+
}
29+
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
30+
}
31+
if timestamp1 != nil {
32+
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
33+
}
34+
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
35+
}
36+
37+
// mergeContextHelper performs the same operation done during batching.
38+
func mergeContextHelper(ctx1, ctx2 context.Context) context.Context {
39+
return contextWithMergedLinks(mergeCtxFunc(ctx1, ctx2), ctx1, ctx2)
40+
}
1741

1842
func TestBatchContextLink(t *testing.T) {
1943
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
@@ -30,53 +54,19 @@ func TestBatchContextLink(t *testing.T) {
3054
ctx4, span4 := tracer.Start(ctx1, "span4")
3155
defer span4.End()
3256

33-
batchContext := contextWithMergedLinks(ctx2, ctx3)
34-
batchContext = contextWithMergedLinks(batchContext, ctx4)
57+
batchContext := mergeContextHelper(ctx2, ctx3)
58+
batchContext = mergeContextHelper(batchContext, ctx4)
3559

3660
actualLinks := LinksFromContext(batchContext)
37-
// require.Len(t, actualLinks, 3)
38-
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[0].SpanContext)
39-
// require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
40-
// require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
61+
require.Len(t, actualLinks, 3)
62+
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
63+
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
64+
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
4165
}
4266

4367
func TestMergedContext_GetValue(t *testing.T) {
44-
ctx1 := context.WithValue(context.Background(), testContextKey("key1"), "value1")
45-
ctx2 := context.WithValue(context.Background(), testContextKey("key1"), "value2")
46-
ctx2 = context.WithValue(ctx2, testContextKey("key2"), "value2")
47-
ctx3 := context.WithValue(context.Background(), testContextKey("key2"), "value3")
48-
49-
var mergedCtx context.Context
50-
mergedCtx = contextWithMergedLinks(ctx1, ctx2)
51-
mergedCtx = contextWithMergedLinks(mergedCtx, ctx3)
52-
53-
require.Equal(t, "value1", mergedCtx.Value(testContextKey("key1")))
54-
require.Equal(t, "value2", mergedCtx.Value(testContextKey("key2")))
55-
require.Nil(t, mergedCtx.Value("nonexistent_key"))
56-
}
57-
58-
func TestMergedValues_GetValue_NilContext(t *testing.T) {
59-
ctx1 := context.WithValue(context.Background(), testContextKey("key1"), "value1")
60-
var ctx2 context.Context // nil context
61-
62-
var mergedCtx context.Context
63-
mergedCtx = contextWithMergedLinks(ctx1, ctx2)
64-
65-
require.Equal(t, "value1", mergedCtx.Value(testContextKey("key1")))
66-
require.Nil(t, mergedCtx.Value(testContextKey("key2")))
67-
require.Nil(t, mergedCtx.Value("nonexistent_key"))
68-
}
69-
70-
func TestMergedValues_GetValue_CanceledContext(t *testing.T) {
71-
ctx1 := context.WithValue(context.Background(), testContextKey("key1"), "value1")
72-
ctx2, cancel := context.WithCancel(context.WithValue(context.Background(), testContextKey("key2"), "value2"))
73-
74-
var mergedCtx context.Context
75-
mergedCtx = contextWithMergedLinks(ctx1, ctx2)
76-
77-
cancel()
78-
79-
require.Equal(t, "value1", mergedCtx.Value(testContextKey("key1")))
80-
require.Equal(t, "value2", mergedCtx.Value(testContextKey("key2")))
81-
require.Nil(t, mergedCtx.Value("nonexistent_key"))
68+
ctx1 := context.WithValue(context.Background(), testTimestampKey, 1234)
69+
ctx2 := context.WithValue(context.Background(), testTimestampKey, 2345)
70+
batchContext := mergeContextHelper(ctx1, ctx2)
71+
require.Equal(t, 2345, batchContext.Value(testTimestampKey))
8272
}

exporter/exporterhelper/internal/queuebatch/batcher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type batcherSettings[T any] struct {
2626
itemsSizer request.Sizer[T]
2727
bytesSizer request.Sizer[T]
2828
partitioner Partitioner[T]
29+
mergeCtx func(context.Context, context.Context) context.Context
2930
next sender.SendFunc[T]
3031
maxWorkers int
3132
logger *zap.Logger
@@ -42,10 +43,10 @@ func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[re
4243
}
4344

4445
if set.partitioner == nil {
45-
return newPartitionBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.next, set.logger), nil
46+
return newPartitionBatcher(*cfg.Get(), sizer, set.mergeCtx, newWorkerPool(set.maxWorkers), set.next, set.logger), nil
4647
}
4748

48-
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next, set.logger), nil
49+
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.mergeCtx, set.next, set.logger), nil
4950
}
5051

5152
func activeSizer[T any](sizerType request.SizerType, itemsSizer, bytesSizer request.Sizer[T]) request.Sizer[T] {

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type multiBatcher struct {
2121
wp *workerPool
2222
sizer request.Sizer[request.Request]
2323
partitioner Partitioner[request.Request]
24+
mergeCtx func(context.Context, context.Context) context.Context
2425
consumeFunc sender.SendFunc[request.Request]
2526
shards sync.Map
2627
logger *zap.Logger
@@ -31,6 +32,7 @@ func newMultiBatcher(
3132
sizer request.Sizer[request.Request],
3233
wp *workerPool,
3334
partitioner Partitioner[request.Request],
35+
mergeCtx func(context.Context, context.Context) context.Context,
3436
next sender.SendFunc[request.Request],
3537
logger *zap.Logger,
3638
) *multiBatcher {
@@ -39,6 +41,7 @@ func newMultiBatcher(
3941
wp: wp,
4042
sizer: sizer,
4143
partitioner: partitioner,
44+
mergeCtx: mergeCtx,
4245
consumeFunc: next,
4346
logger: logger,
4447
}
@@ -51,7 +54,8 @@ func (mb *multiBatcher) getPartition(ctx context.Context, req request.Request) *
5154
if found {
5255
return s.(*partitionBatcher)
5356
}
54-
newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.wp, mb.consumeFunc, mb.logger)
57+
58+
newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.mergeCtx, mb.wp, mb.consumeFunc, mb.logger)
5559
_ = newS.Start(ctx, nil)
5660
s, loaded := mb.shards.LoadOrStore(key, newS)
5761
// If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard.

exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func TestMultiBatcher_NoTimeout(t *testing.T) {
3333
NewPartitioner(func(ctx context.Context, _ request.Request) string {
3434
return ctx.Value(partitionKey{}).(string)
3535
}),
36+
nil,
3637
sink.Export,
3738
zap.NewNop(),
3839
)
@@ -85,6 +86,7 @@ func TestMultiBatcher_Timeout(t *testing.T) {
8586
NewPartitioner(func(ctx context.Context, _ request.Request) string {
8687
return ctx.Value(partitionKey{}).(string)
8788
}),
89+
nil,
8890
sink.Export,
8991
zap.NewNop(),
9092
)

exporter/exporterhelper/internal/queuebatch/partition_batcher.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type partitionBatcher struct {
3030
cfg BatchConfig
3131
wp *workerPool
3232
sizer request.Sizer[request.Request]
33+
mergeCtx func(context.Context, context.Context) context.Context
3334
consumeFunc sender.SendFunc[request.Request]
3435
stopWG sync.WaitGroup
3536
currentBatchMu sync.Mutex
@@ -42,6 +43,7 @@ type partitionBatcher struct {
4243
func newPartitionBatcher(
4344
cfg BatchConfig,
4445
sizer request.Sizer[request.Request],
46+
mergeCtx func(context.Context, context.Context) context.Context,
4547
wp *workerPool,
4648
next sender.SendFunc[request.Request],
4749
logger *zap.Logger,
@@ -50,6 +52,7 @@ func newPartitionBatcher(
5052
cfg: cfg,
5153
wp: wp,
5254
sizer: sizer,
55+
mergeCtx: mergeCtx,
5356
consumeFunc: next,
5457
shutdownCh: make(chan struct{}, 1),
5558
logger: logger,
@@ -147,7 +150,12 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
147150
// Logic on how to deal with the current batch:
148151
qb.currentBatch.req = reqList[0]
149152
qb.currentBatch.done = append(qb.currentBatch.done, done)
150-
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)
153+
154+
if qb.mergeCtx != nil {
155+
qb.currentBatch.ctx = contextWithMergedLinks(qb.mergeCtx(qb.currentBatch.ctx, ctx), qb.currentBatch.ctx, ctx)
156+
} else {
157+
qb.currentBatch.ctx = contextWithMergedLinks(context.Background(), qb.currentBatch.ctx, ctx)
158+
}
151159

152160
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
153161
// cannot unlock and re-lock because we are not done processing all the responses.

exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T)
6262
}
6363

6464
sink := requesttest.NewSink()
65-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
65+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
6666
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
6767
t.Cleanup(func() {
6868
require.NoError(t, ba.Shutdown(context.Background()))
@@ -128,7 +128,7 @@ func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
128128
}
129129

130130
sink := requesttest.NewSink()
131-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
131+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
132132
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
133133

134134
done := newFakeDone()
@@ -209,7 +209,7 @@ func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) {
209209
}
210210

211211
sink := requesttest.NewSink()
212-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
212+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
213213
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
214214
t.Cleanup(func() {
215215
require.NoError(t, ba.Shutdown(context.Background()))
@@ -281,7 +281,7 @@ func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) {
281281
}
282282

283283
sink := requesttest.NewSink()
284-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
284+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
285285
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
286286

287287
done := newFakeDone()
@@ -329,7 +329,7 @@ func TestPartitionBatcher_Shutdown(t *testing.T) {
329329
}
330330

331331
sink := requesttest.NewSink()
332-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop())
332+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
333333
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
334334

335335
done := newFakeDone()
@@ -358,7 +358,7 @@ func TestPartitionBatcher_MergeError(t *testing.T) {
358358
}
359359

360360
sink := requesttest.NewSink()
361-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop())
361+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
362362
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
363363
t.Cleanup(func() {
364364
require.NoError(t, ba.Shutdown(context.Background()))
@@ -392,7 +392,7 @@ func TestPartitionBatcher_PartialSuccessError(t *testing.T) {
392392
core, observed := observer.New(zap.WarnLevel)
393393
logger := zap.New(core)
394394
sink := requesttest.NewSink()
395-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
395+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
396396
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
397397

398398
done := newFakeDone()
@@ -434,7 +434,7 @@ func TestSPartitionBatcher_PartialSuccessError_AfterOkRequest(t *testing.T) {
434434
core, observed := observer.New(zap.WarnLevel)
435435
logger := zap.New(core)
436436
sink := requesttest.NewSink()
437-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
437+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
438438
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
439439

440440
done := newFakeDone()
@@ -494,7 +494,7 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
494494
}
495495

496496
sink := requesttest.NewSink()
497-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, zap.NewNop())
497+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, zap.NewNop())
498498
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
499499
t.Cleanup(func() {
500500
require.NoError(t, ba.Shutdown(context.Background()))

0 commit comments

Comments
 (0)