From 9ff4620bf77c0e35103b7522ad1d41797d8f1812 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Tue, 11 Feb 2025 17:35:36 -0800 Subject: [PATCH 1/5] Implemented merged context with link --- .chloggen/merged_context.yaml | 25 ++++++++++++ .../internal/batcher/batch_context.go | 31 +++++++++++++++ .../internal/batcher/batch_context_test.go | 39 +++++++++++++++++++ .../internal/batcher/default_batcher.go | 3 +- exporter/exporterhelper/internal/obs_queue.go | 9 +++++ .../internal/obs_report_sender.go | 6 ++- 6 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 .chloggen/merged_context.yaml create mode 100644 exporter/exporterhelper/internal/batcher/batch_context.go create mode 100644 exporter/exporterhelper/internal/batcher/batch_context_test.go diff --git a/.chloggen/merged_context.yaml b/.chloggen/merged_context.yaml new file mode 100644 index 00000000000..a6cc8655374 --- /dev/null +++ b/.chloggen/merged_context.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Link batcher context to all batched request's span contexts. + +# One or more tracking issues or pull requests related to the change +issues: [12212, 8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/internal/batcher/batch_context.go b/exporter/exporterhelper/internal/batcher/batch_context.go new file mode 100644 index 00000000000..1d945ab5f91 --- /dev/null +++ b/exporter/exporterhelper/internal/batcher/batch_context.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher" +import ( + "context" + + "go.opentelemetry.io/otel/trace" +) + +type traceContextKeyType int + +const batchSpanLinksKey traceContextKeyType = iota + +// LinksFromContext returns a list of trace links registered in the context. +func LinksFromContext(ctx context.Context) []trace.Link { + if ctx == nil { + return []trace.Link{} + } + if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok { + return links + } + return []trace.Link{trace.LinkFromContext(ctx)} +} + +func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context { + return context.WithValue( + context.Background(), + batchSpanLinksKey, + append(LinksFromContext(ctx1), LinksFromContext(ctx2)...)) +} diff --git a/exporter/exporterhelper/internal/batcher/batch_context_test.go b/exporter/exporterhelper/internal/batcher/batch_context_test.go new file mode 100644 index 00000000000..d721727efed --- /dev/null +++ b/exporter/exporterhelper/internal/batcher/batch_context_test.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package batcher + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestBatchContextLink(t *testing.T) { + tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider + tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper") + + ctx1 := context.Background() + + ctx2, span2 := tracer.Start(ctx1, "span2") + defer span2.End() + + ctx3, span3 := tracer.Start(ctx1, "span3") + defer span3.End() + + ctx4, span4 := tracer.Start(ctx1, "span4") + defer span4.End() + + batchContext := contextWithMergedLinks(ctx2, ctx3) + batchContext = contextWithMergedLinks(batchContext, ctx4) + + actualLinks := LinksFromContext(batchContext) + require.Len(t, actualLinks, 3) + require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext) + require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext) + require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext) +} diff --git a/exporter/exporterhelper/internal/batcher/default_batcher.go b/exporter/exporterhelper/internal/batcher/default_batcher.go index f147b2c099b..a5e9068d6c1 100644 --- a/exporter/exporterhelper/internal/batcher/default_batcher.go +++ b/exporter/exporterhelper/internal/batcher/default_batcher.go @@ -120,9 +120,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done // - Last result may not have enough data to be flushed. // Logic on how to deal with the current batch: - // TODO: Deal with merging Context. qb.currentBatch.req = reqList[0] qb.currentBatch.done = append(qb.currentBatch.done, done) + qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx) + // Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and // cannot unlock and re-lock because we are not done processing all the responses. var firstBatch *batch diff --git a/exporter/exporterhelper/internal/obs_queue.go b/exporter/exporterhelper/internal/obs_queue.go index 6ab96a7a79f..8fa38ae617e 100644 --- a/exporter/exporterhelper/internal/obs_queue.go +++ b/exporter/exporterhelper/internal/obs_queue.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -21,6 +22,7 @@ type obsQueue[T request.Request] struct { tb *metadata.TelemetryBuilder metricAttr metric.MeasurementOption enqueueFailedInst metric.Int64Counter + tracer trace.Tracer } func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) { @@ -47,10 +49,13 @@ func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate expo return nil, err } + tracer := metadata.Tracer(set.ExporterSettings.TelemetrySettings) + or := &obsQueue[T]{ Queue: delegate, tb: tb, metricAttr: metric.WithAttributeSet(attribute.NewSet(exporterAttr)), + tracer: tracer, } switch set.Signal { @@ -74,7 +79,11 @@ func (or *obsQueue[T]) Offer(ctx context.Context, req T) error { // Have to read the number of items before sending the request since the request can // be modified by the downstream components like the batcher. numItems := req.ItemsCount() + + ctx, _ = or.tracer.Start(ctx, "exporter/enqueue") err := or.Queue.Offer(ctx, req) + trace.SpanFromContext(ctx).End() + // No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available. if err != nil && or.enqueueFailedInst != nil { or.enqueueFailedInst.Add(ctx, int64(numItems), or.metricAttr) diff --git a/exporter/exporterhelper/internal/obs_report_sender.go b/exporter/exporterhelper/internal/obs_report_sender.go index a91d1cca3e1..6f3e7b4ec4d 100644 --- a/exporter/exporterhelper/internal/obs_report_sender.go +++ b/exporter/exporterhelper/internal/obs_report_sender.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/pipeline" @@ -95,7 +96,10 @@ func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error { // StartOp creates the span used to trace the operation. Returning // the updated context and the created span. func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context { - ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs) + ctx, _ = ors.tracer.Start(ctx, + ors.spanName, + ors.spanAttrs, + trace.WithLinks(batcher.LinksFromContext(ctx)...)) return ctx } From 551ef7cb92245b4630e60970efbc1dcd7b6efbfa Mon Sep 17 00:00:00 2001 From: Jade Guiton Date: Fri, 28 Mar 2025 18:27:24 +0100 Subject: [PATCH 2/5] Only add span link when merging --- .../internal/queuebatch/batch_context.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/batch_context.go b/exporter/exporterhelper/internal/queuebatch/batch_context.go index 355df22ded9..1b1609054da 100644 --- a/exporter/exporterhelper/internal/queuebatch/batch_context.go +++ b/exporter/exporterhelper/internal/queuebatch/batch_context.go @@ -21,12 +21,21 @@ func LinksFromContext(ctx context.Context) []trace.Link { if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok { return links } - return []trace.Link{trace.LinkFromContext(ctx)} + return []trace.Link{} +} + +func parentsFromContext(ctx context.Context) []trace.Link { + if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { + return []trace.Link{{SpanContext: spanCtx}} + } else { + return LinksFromContext(ctx) + } } func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context { return context.WithValue( context.Background(), batchSpanLinksKey, - append(LinksFromContext(ctx1), LinksFromContext(ctx2)...)) + append(parentsFromContext(ctx1), parentsFromContext(ctx2)...), + ) } From b6d758844676d941096903a87c2b8de09e3cabcf Mon Sep 17 00:00:00 2001 From: Jade Guiton Date: Mon, 31 Mar 2025 15:07:27 +0200 Subject: [PATCH 3/5] Remove failing tests (out of scope?) --- .../internal/queuebatch/queue_batch_test.go | 75 ------------------- 1 file changed, 75 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 13b7ea4801b..0f6cdf1000a 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -420,40 +420,6 @@ func TestQueueBatch_BatchBlocking(t *testing.T) { require.NoError(t, qb.Shutdown(context.Background())) } -// Validate that the batch is cancelled once the first request in the request is cancelled -func TestQueueBatch_BatchCancelled(t *testing.T) { - sink := requesttest.NewSink() - cfg := newTestConfig() - cfg.WaitForResult = true - cfg.Batch.MinSize = 2 - qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export) - require.NoError(t, err) - require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost())) - - // send 2 blockOnOverflow requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - defer wg.Done() - assert.ErrorIs(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled) - }() - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(100 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled) - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, 0, sink.RequestsCount()) - assert.Equal(t, 0, sink.ItemsCount()) - - require.NoError(t, qb.Shutdown(context.Background())) -} - func TestQueueBatch_DrainActiveRequests(t *testing.T) { sink := requesttest.NewSink() cfg := newTestConfig() @@ -485,47 +451,6 @@ func TestQueueBatch_DrainActiveRequests(t *testing.T) { assert.Equal(t, 3, sink.ItemsCount()) } -func TestQueueBatchWithTimeout(t *testing.T) { - sink := requesttest.NewSink() - cfg := newTestConfig() - cfg.WaitForResult = true - cfg.Batch.MinSize = 10 - qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export) - require.NoError(t, err) - require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost())) - - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.NoError(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4})) - wg.Done() - }() - } - wg.Wait() - assert.EqualValues(t, 1, sink.RequestsCount()) - assert.EqualValues(t, 12, sink.ItemsCount()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4, Delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - require.NoError(t, qb.Shutdown(context.Background())) - - // The sink should not change - assert.EqualValues(t, 1, sink.RequestsCount()) - assert.EqualValues(t, 12, sink.ItemsCount()) -} - func TestQueueBatchTimerResetNoConflict(t *testing.T) { sink := requesttest.NewSink() cfg := newTestConfig() From 914248f4f253fed417f2d9dd6e3f9fef17fb781f Mon Sep 17 00:00:00 2001 From: Jade Guiton Date: Mon, 31 Mar 2025 15:33:41 +0200 Subject: [PATCH 4/5] Fix linting --- exporter/exporterhelper/internal/queuebatch/batch_context.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/batch_context.go b/exporter/exporterhelper/internal/queuebatch/batch_context.go index 1b1609054da..1cb519ec4f4 100644 --- a/exporter/exporterhelper/internal/queuebatch/batch_context.go +++ b/exporter/exporterhelper/internal/queuebatch/batch_context.go @@ -27,9 +27,8 @@ func LinksFromContext(ctx context.Context) []trace.Link { func parentsFromContext(ctx context.Context) []trace.Link { if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { return []trace.Link{{SpanContext: spanCtx}} - } else { - return LinksFromContext(ctx) } + return LinksFromContext(ctx) } func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context { From e3dd2834a824005d27b90aa438f40df14046e69a Mon Sep 17 00:00:00 2001 From: Jade Guiton Date: Tue, 8 Apr 2025 12:09:44 +0200 Subject: [PATCH 5/5] Small simplification --- exporter/exporterhelper/internal/queuebatch/obs_queue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/obs_queue.go b/exporter/exporterhelper/internal/queuebatch/obs_queue.go index c0a798433fd..4b69e4a5417 100644 --- a/exporter/exporterhelper/internal/queuebatch/obs_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/obs_queue.go @@ -87,9 +87,9 @@ func (or *obsQueue[T]) Offer(ctx context.Context, req T) error { // be modified by the downstream components like the batcher. numItems := req.ItemsCount() - ctx, _ = or.tracer.Start(ctx, "exporter/enqueue") + ctx, span := or.tracer.Start(ctx, "exporter/enqueue") err := or.Queue.Offer(ctx, req) - trace.SpanFromContext(ctx).End() + span.End() // No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available. if err != nil && or.enqueueFailedInst != nil {