Skip to content

Commit 9ff4620

Browse files
committed
Implemented merged context with link
1 parent df36fd7 commit 9ff4620

File tree

6 files changed

+111
-2
lines changed

6 files changed

+111
-2
lines changed

.chloggen/merged_context.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Link batcher context to all batched request's span contexts.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12212, 8122]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
5+
import (
6+
"context"
7+
8+
"go.opentelemetry.io/otel/trace"
9+
)
10+
11+
type traceContextKeyType int
12+
13+
const batchSpanLinksKey traceContextKeyType = iota
14+
15+
// LinksFromContext returns a list of trace links registered in the context.
16+
func LinksFromContext(ctx context.Context) []trace.Link {
17+
if ctx == nil {
18+
return []trace.Link{}
19+
}
20+
if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok {
21+
return links
22+
}
23+
return []trace.Link{trace.LinkFromContext(ctx)}
24+
}
25+
26+
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
27+
return context.WithValue(
28+
context.Background(),
29+
batchSpanLinksKey,
30+
append(LinksFromContext(ctx1), LinksFromContext(ctx2)...))
31+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package batcher
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
"go.opentelemetry.io/otel/trace"
12+
13+
"go.opentelemetry.io/collector/component/componenttest"
14+
)
15+
16+
func TestBatchContextLink(t *testing.T) {
17+
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
18+
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
19+
20+
ctx1 := context.Background()
21+
22+
ctx2, span2 := tracer.Start(ctx1, "span2")
23+
defer span2.End()
24+
25+
ctx3, span3 := tracer.Start(ctx1, "span3")
26+
defer span3.End()
27+
28+
ctx4, span4 := tracer.Start(ctx1, "span4")
29+
defer span4.End()
30+
31+
batchContext := contextWithMergedLinks(ctx2, ctx3)
32+
batchContext = contextWithMergedLinks(batchContext, ctx4)
33+
34+
actualLinks := LinksFromContext(batchContext)
35+
require.Len(t, actualLinks, 3)
36+
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
37+
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
38+
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
39+
}

exporter/exporterhelper/internal/batcher/default_batcher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
120120
// - Last result may not have enough data to be flushed.
121121

122122
// Logic on how to deal with the current batch:
123-
// TODO: Deal with merging Context.
124123
qb.currentBatch.req = reqList[0]
125124
qb.currentBatch.done = append(qb.currentBatch.done, done)
125+
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)
126+
126127
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
127128
// cannot unlock and re-lock because we are not done processing all the responses.
128129
var firstBatch *batch

exporter/exporterhelper/internal/obs_queue.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"go.opentelemetry.io/otel/attribute"
1010
"go.opentelemetry.io/otel/metric"
11+
"go.opentelemetry.io/otel/trace"
1112

1213
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1314
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -21,6 +22,7 @@ type obsQueue[T request.Request] struct {
2122
tb *metadata.TelemetryBuilder
2223
metricAttr metric.MeasurementOption
2324
enqueueFailedInst metric.Int64Counter
25+
tracer trace.Tracer
2426
}
2527

2628
func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
@@ -47,10 +49,13 @@ func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate expo
4749
return nil, err
4850
}
4951

52+
tracer := metadata.Tracer(set.ExporterSettings.TelemetrySettings)
53+
5054
or := &obsQueue[T]{
5155
Queue: delegate,
5256
tb: tb,
5357
metricAttr: metric.WithAttributeSet(attribute.NewSet(exporterAttr)),
58+
tracer: tracer,
5459
}
5560

5661
switch set.Signal {
@@ -74,7 +79,11 @@ func (or *obsQueue[T]) Offer(ctx context.Context, req T) error {
7479
// Have to read the number of items before sending the request since the request can
7580
// be modified by the downstream components like the batcher.
7681
numItems := req.ItemsCount()
82+
83+
ctx, _ = or.tracer.Start(ctx, "exporter/enqueue")
7784
err := or.Queue.Offer(ctx, req)
85+
trace.SpanFromContext(ctx).End()
86+
7887
// No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available.
7988
if err != nil && or.enqueueFailedInst != nil {
8089
or.enqueueFailedInst.Add(ctx, int64(numItems), or.metricAttr)

exporter/exporterhelper/internal/obs_report_sender.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/exporter"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
"go.opentelemetry.io/collector/pipeline"
@@ -95,7 +96,10 @@ func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
9596
// StartOp creates the span used to trace the operation. Returning
9697
// the updated context and the created span.
9798
func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context {
98-
ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs)
99+
ctx, _ = ors.tracer.Start(ctx,
100+
ors.spanName,
101+
ors.spanAttrs,
102+
trace.WithLinks(batcher.LinksFromContext(ctx)...))
99103
return ctx
100104
}
101105

0 commit comments

Comments
 (0)