@@ -21,6 +21,10 @@ import (
2121 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2222)
2323
24+ type testContextKey string
25+
26+ const timestampKey testContextKey = "timestamp"
27+
2428func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled (t * testing.T ) {
2529 tests := []struct {
2630 name string
@@ -62,7 +66,7 @@ func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T)
6266 }
6367
6468 sink := requesttest .NewSink ()
65- ba := newPartitionBatcher (cfg , tt .sizer , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
69+ ba := newPartitionBatcher (cfg , tt .sizer , nil , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
6670 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
6771 t .Cleanup (func () {
6872 require .NoError (t , ba .Shutdown (context .Background ()))
@@ -128,7 +132,7 @@ func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
128132 }
129133
130134 sink := requesttest .NewSink ()
131- ba := newPartitionBatcher (cfg , tt .sizer , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
135+ ba := newPartitionBatcher (cfg , tt .sizer , nil , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
132136 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
133137
134138 done := newFakeDone ()
@@ -209,7 +213,7 @@ func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) {
209213 }
210214
211215 sink := requesttest .NewSink ()
212- ba := newPartitionBatcher (cfg , tt .sizer , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
216+ ba := newPartitionBatcher (cfg , tt .sizer , nil , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
213217 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
214218 t .Cleanup (func () {
215219 require .NoError (t , ba .Shutdown (context .Background ()))
@@ -281,7 +285,7 @@ func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) {
281285 }
282286
283287 sink := requesttest .NewSink ()
284- ba := newPartitionBatcher (cfg , tt .sizer , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
288+ ba := newPartitionBatcher (cfg , tt .sizer , nil , newWorkerPool (tt .maxWorkers ), sink .Export , zap .NewNop ())
285289 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
286290
287291 done := newFakeDone ()
@@ -329,7 +333,7 @@ func TestPartitionBatcher_Shutdown(t *testing.T) {
329333 }
330334
331335 sink := requesttest .NewSink ()
332- ba := newPartitionBatcher (cfg , request .NewItemsSizer (), newWorkerPool (2 ), sink .Export , zap .NewNop ())
336+ ba := newPartitionBatcher (cfg , request .NewItemsSizer (), nil , newWorkerPool (2 ), sink .Export , zap .NewNop ())
333337 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
334338
335339 done := newFakeDone ()
@@ -358,7 +362,7 @@ func TestPartitionBatcher_MergeError(t *testing.T) {
358362 }
359363
360364 sink := requesttest .NewSink ()
361- ba := newPartitionBatcher (cfg , request .NewItemsSizer (), newWorkerPool (2 ), sink .Export , zap .NewNop ())
365+ ba := newPartitionBatcher (cfg , request .NewItemsSizer (), nil , newWorkerPool (2 ), sink .Export , zap .NewNop ())
362366 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
363367 t .Cleanup (func () {
364368 require .NoError (t , ba .Shutdown (context .Background ()))
@@ -392,7 +396,7 @@ func TestPartitionBatcher_PartialSuccessError(t *testing.T) {
392396 core , observed := observer .New (zap .WarnLevel )
393397 logger := zap .New (core )
394398 sink := requesttest .NewSink ()
395- ba := newPartitionBatcher (cfg , request .NewItemsSizer (), newWorkerPool (1 ), sink .Export , logger )
399+ ba := newPartitionBatcher (cfg , request .NewItemsSizer (), nil , newWorkerPool (1 ), sink .Export , logger )
396400 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
397401
398402 done := newFakeDone ()
@@ -434,7 +438,7 @@ func TestSPartitionBatcher_PartialSuccessError_AfterOkRequest(t *testing.T) {
434438 core , observed := observer .New (zap .WarnLevel )
435439 logger := zap .New (core )
436440 sink := requesttest .NewSink ()
437- ba := newPartitionBatcher (cfg , request .NewItemsSizer (), newWorkerPool (1 ), sink .Export , logger )
441+ ba := newPartitionBatcher (cfg , request .NewItemsSizer (), nil , newWorkerPool (1 ), sink .Export , logger )
438442 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
439443
440444 done := newFakeDone ()
@@ -494,7 +498,7 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
494498 }
495499
496500 sink := requesttest .NewSink ()
497- ba := newPartitionBatcher (cfg , request .NewItemsSizer (), newWorkerPool (1 ), sink .Export , zap .NewNop ())
501+ ba := newPartitionBatcher (cfg , request .NewItemsSizer (), nil , newWorkerPool (1 ), sink .Export , zap .NewNop ())
498502 require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
499503 t .Cleanup (func () {
500504 require .NoError (t , ba .Shutdown (context .Background ()))
@@ -513,3 +517,46 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
513517 assert .Equal (t , int64 (0 ), done .success .Load ())
514518 assert .Equal (t , 0 , sink .RequestsCount ())
515519}
520+
521+ func TestPartitionBatcher_ContextMerging (t * testing.T ) {
522+ tests := []struct {
523+ name string
524+ mergeCtxFunc func (ctx1 context.Context , ctx2 context.Context ) context.Context
525+ }{
526+ {
527+ name : "merge_context_with_timestamp" ,
528+ mergeCtxFunc : func (ctx1 context.Context , _ context.Context ) context.Context {
529+ return context .WithValue (ctx1 , timestampKey , 1234 )
530+ },
531+ },
532+ {
533+ name : "merge_context_returns_background" ,
534+ mergeCtxFunc : func (context.Context , context.Context ) context.Context {
535+ return context .Background ()
536+ },
537+ },
538+ {
539+ name : "nil_merge_context" ,
540+ mergeCtxFunc : nil ,
541+ },
542+ }
543+ for _ , tt := range tests {
544+ t .Run (tt .name , func (t * testing.T ) {
545+ cfg := BatchConfig {
546+ FlushTimeout : 0 ,
547+ Sizer : request .SizerTypeItems ,
548+ MinSize : 10 ,
549+ }
550+ sink := requesttest .NewSink ()
551+ ba := newPartitionBatcher (cfg , request .NewItemsSizer (), tt .mergeCtxFunc , newWorkerPool (1 ), sink .Export , zap .NewNop ())
552+ require .NoError (t , ba .Start (context .Background (), componenttest .NewNopHost ()))
553+
554+ done := newFakeDone ()
555+ ba .Consume (context .Background (), & requesttest.FakeRequest {Items : 8 , Bytes : 8 }, done )
556+ ba .Consume (context .Background (), & requesttest.FakeRequest {Items : 8 , Bytes : 8 }, done )
557+ <- time .After (10 * time .Millisecond )
558+ assert .Equal (t , 1 , sink .RequestsCount ())
559+ assert .EqualValues (t , 2 , done .success .Load ())
560+ })
561+ }
562+ }
0 commit comments