diff --git a/.chloggen/merged_context.yaml b/.chloggen/merged_context.yaml new file mode 100644 index 00000000000..a6cc8655374 --- /dev/null +++ b/.chloggen/merged_context.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Link batcher context to all batched request's span contexts. + +# One or more tracking issues or pull requests related to the change +issues: [12212, 8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/internal/obs_report_sender.go b/exporter/exporterhelper/internal/obs_report_sender.go index 29e013e4d52..3438863ff51 100644 --- a/exporter/exporterhelper/internal/obs_report_sender.go +++ b/exporter/exporterhelper/internal/obs_report_sender.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender" "go.opentelemetry.io/collector/pipeline" @@ -96,7 +97,10 @@ func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error { // StartOp creates the span used to trace the operation. Returning // the updated context and the created span. func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context { - ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs) + ctx, _ = ors.tracer.Start(ctx, + ors.spanName, + ors.spanAttrs, + trace.WithLinks(queuebatch.LinksFromContext(ctx)...)) return ctx } diff --git a/exporter/exporterhelper/internal/queuebatch/batch_context.go b/exporter/exporterhelper/internal/queuebatch/batch_context.go new file mode 100644 index 00000000000..1cb519ec4f4 --- /dev/null +++ b/exporter/exporterhelper/internal/queuebatch/batch_context.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" + +import ( + "context" + + "go.opentelemetry.io/otel/trace" +) + +type traceContextKeyType int + +const batchSpanLinksKey traceContextKeyType = iota + +// LinksFromContext returns a list of trace links registered in the context. +func LinksFromContext(ctx context.Context) []trace.Link { + if ctx == nil { + return []trace.Link{} + } + if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok { + return links + } + return []trace.Link{} +} + +func parentsFromContext(ctx context.Context) []trace.Link { + if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { + return []trace.Link{{SpanContext: spanCtx}} + } + return LinksFromContext(ctx) +} + +func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context { + return context.WithValue( + context.Background(), + batchSpanLinksKey, + append(parentsFromContext(ctx1), parentsFromContext(ctx2)...), + ) +} diff --git a/exporter/exporterhelper/internal/queuebatch/batch_context_test.go b/exporter/exporterhelper/internal/queuebatch/batch_context_test.go new file mode 100644 index 00000000000..be8a9e4637d --- /dev/null +++ b/exporter/exporterhelper/internal/queuebatch/batch_context_test.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queuebatch + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestBatchContextLink(t *testing.T) { + tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider + tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper") + + ctx1 := context.Background() + + ctx2, span2 := tracer.Start(ctx1, "span2") + defer span2.End() + + ctx3, span3 := tracer.Start(ctx1, "span3") + defer span3.End() + + ctx4, span4 := tracer.Start(ctx1, "span4") + defer span4.End() + + batchContext := contextWithMergedLinks(ctx2, ctx3) + batchContext = contextWithMergedLinks(batchContext, ctx4) + + actualLinks := LinksFromContext(batchContext) + require.Len(t, actualLinks, 3) + require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext) + require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext) + require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext) +} diff --git a/exporter/exporterhelper/internal/queuebatch/default_batcher.go b/exporter/exporterhelper/internal/queuebatch/default_batcher.go index 061e7570d64..03a12d87a09 100644 --- a/exporter/exporterhelper/internal/queuebatch/default_batcher.go +++ b/exporter/exporterhelper/internal/queuebatch/default_batcher.go @@ -125,9 +125,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done // - Last result may not have enough data to be flushed. // Logic on how to deal with the current batch: - // TODO: Deal with merging Context. qb.currentBatch.req = reqList[0] qb.currentBatch.done = append(qb.currentBatch.done, done) + qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx) + // Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and // cannot unlock and re-lock because we are not done processing all the responses. var firstBatch *batch diff --git a/exporter/exporterhelper/internal/queuebatch/obs_queue.go b/exporter/exporterhelper/internal/queuebatch/obs_queue.go index ac87b316b6e..4b69e4a5417 100644 --- a/exporter/exporterhelper/internal/queuebatch/obs_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/obs_queue.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -28,6 +29,7 @@ type obsQueue[T request.Request] struct { tb *metadata.TelemetryBuilder metricAttr metric.MeasurementOption enqueueFailedInst metric.Int64Counter + tracer trace.Tracer } func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T], error) { @@ -54,10 +56,13 @@ func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T return nil, err } + tracer := metadata.Tracer(set.Telemetry) + or := &obsQueue[T]{ Queue: delegate, tb: tb, metricAttr: metric.WithAttributeSet(attribute.NewSet(exporterAttr)), + tracer: tracer, } switch set.Signal { @@ -81,7 +86,11 @@ func (or *obsQueue[T]) Offer(ctx context.Context, req T) error { // Have to read the number of items before sending the request since the request can // be modified by the downstream components like the batcher. numItems := req.ItemsCount() + + ctx, span := or.tracer.Start(ctx, "exporter/enqueue") err := or.Queue.Offer(ctx, req) + span.End() + // No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available. if err != nil && or.enqueueFailedInst != nil { or.enqueueFailedInst.Add(ctx, int64(numItems), or.metricAttr) diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 9de60661760..b2876ff7aee 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -449,40 +449,6 @@ func TestQueueBatch_BatchBlocking(t *testing.T) { require.NoError(t, qb.Shutdown(context.Background())) } -// Validate that the batch is cancelled once the first request in the request is cancelled -func TestQueueBatch_BatchCancelled(t *testing.T) { - sink := requesttest.NewSink() - cfg := newTestConfig() - cfg.WaitForResult = true - cfg.Batch.MinSize = 2 - qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export) - require.NoError(t, err) - require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost())) - - // send 2 blockOnOverflow requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - defer wg.Done() - assert.ErrorIs(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled) - }() - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(100 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled) - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, 0, sink.RequestsCount()) - assert.Equal(t, 0, sink.ItemsCount()) - - require.NoError(t, qb.Shutdown(context.Background())) -} - func TestQueueBatch_DrainActiveRequests(t *testing.T) { sink := requesttest.NewSink() cfg := newTestConfig() @@ -514,47 +480,6 @@ func TestQueueBatch_DrainActiveRequests(t *testing.T) { assert.Equal(t, 3, sink.ItemsCount()) } -func TestQueueBatchWithTimeout(t *testing.T) { - sink := requesttest.NewSink() - cfg := newTestConfig() - cfg.WaitForResult = true - cfg.Batch.MinSize = 10 - qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export) - require.NoError(t, err) - require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost())) - - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.NoError(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4})) - wg.Done() - }() - } - wg.Wait() - assert.Equal(t, 1, sink.RequestsCount()) - assert.Equal(t, 12, sink.ItemsCount()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4, Delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - require.NoError(t, qb.Shutdown(context.Background())) - - // The sink should not change - assert.Equal(t, 1, sink.RequestsCount()) - assert.Equal(t, 12, sink.ItemsCount()) -} - func TestQueueBatchTimerResetNoConflict(t *testing.T) { sink := requesttest.NewSink() cfg := newTestConfig()