-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[exporterhelper] Add span links across batcher when merging multiple requests #12768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9ff4620
8da4377
551ef7c
b6d7588
914248f
25db7c2
e3dd283
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| # Use this changelog template to create an entry for release notes. | ||
|
|
||
| # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
| change_type: enhancement | ||
|
|
||
| # The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) | ||
| component: exporterhelper | ||
|
|
||
| # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
| note: Link batcher context to all batched request's span contexts. | ||
|
|
||
| # One or more tracking issues or pull requests related to the change | ||
| issues: [12212, 8122] | ||
|
|
||
| # (Optional) One or more lines of additional information to render under the primary note. | ||
| # These lines will be padded with 2 spaces and then inserted directly into the document. | ||
| # Use pipe (|) for multiline entries. | ||
| subtext: | ||
|
|
||
| # Optional: The change log or logs in which this entry should be included. | ||
| # e.g. '[user]' or '[user, api]' | ||
| # Include 'user' if the change is relevant to end users. | ||
| # Include 'api' if there is a change to a library API. | ||
| # Default: '[user]' | ||
| change_logs: [user] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "go.opentelemetry.io/otel/trace" | ||
| ) | ||
|
|
||
| type traceContextKeyType int | ||
|
|
||
| const batchSpanLinksKey traceContextKeyType = iota | ||
|
|
||
| // LinksFromContext returns a list of trace links registered in the context. | ||
| func LinksFromContext(ctx context.Context) []trace.Link { | ||
| if ctx == nil { | ||
| return []trace.Link{} | ||
| } | ||
| if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok { | ||
| return links | ||
| } | ||
| return []trace.Link{} | ||
| } | ||
|
|
||
| func parentsFromContext(ctx context.Context) []trace.Link { | ||
| if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { | ||
| return []trace.Link{{SpanContext: spanCtx}} | ||
| } | ||
| return LinksFromContext(ctx) | ||
| } | ||
|
|
||
| func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context { | ||
| return context.WithValue( | ||
| context.Background(), | ||
| batchSpanLinksKey, | ||
| append(parentsFromContext(ctx1), parentsFromContext(ctx2)...), | ||
| ) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package queuebatch | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| "go.opentelemetry.io/otel/trace" | ||
|
|
||
| "go.opentelemetry.io/collector/component/componenttest" | ||
| ) | ||
|
|
||
| func TestBatchContextLink(t *testing.T) { | ||
| tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider | ||
| tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper") | ||
|
|
||
| ctx1 := context.Background() | ||
|
|
||
| ctx2, span2 := tracer.Start(ctx1, "span2") | ||
| defer span2.End() | ||
|
|
||
| ctx3, span3 := tracer.Start(ctx1, "span3") | ||
| defer span3.End() | ||
|
|
||
| ctx4, span4 := tracer.Start(ctx1, "span4") | ||
| defer span4.End() | ||
|
|
||
| batchContext := contextWithMergedLinks(ctx2, ctx3) | ||
| batchContext = contextWithMergedLinks(batchContext, ctx4) | ||
|
|
||
| actualLinks := LinksFromContext(batchContext) | ||
| require.Len(t, actualLinks, 3) | ||
| require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext) | ||
| require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext) | ||
| require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -449,40 +449,6 @@ func TestQueueBatch_BatchBlocking(t *testing.T) { | |
| require.NoError(t, qb.Shutdown(context.Background())) | ||
| } | ||
|
|
||
| // Validate that the batch is cancelled once the first request in the request is cancelled | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that this test should be removed. It does not make sense, IMO, to cancel a whole batch because one of the inputs timed out, for example. |
||
| func TestQueueBatch_BatchCancelled(t *testing.T) { | ||
| sink := requesttest.NewSink() | ||
| cfg := newTestConfig() | ||
| cfg.WaitForResult = true | ||
| cfg.Batch.MinSize = 2 | ||
| qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export) | ||
| require.NoError(t, err) | ||
| require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost())) | ||
|
|
||
| // send 2 blockOnOverflow requests | ||
| wg := sync.WaitGroup{} | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| assert.ErrorIs(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled) | ||
| }() | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| time.Sleep(100 * time.Millisecond) // ensure this call is the second | ||
| assert.ErrorIs(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Delay: 100 * time.Millisecond}), context.Canceled) | ||
| }() | ||
| cancel() // canceling the first request should cancel the whole batch | ||
| wg.Wait() | ||
|
|
||
| // nothing should be delivered | ||
| assert.Equal(t, 0, sink.RequestsCount()) | ||
| assert.Equal(t, 0, sink.ItemsCount()) | ||
|
|
||
| require.NoError(t, qb.Shutdown(context.Background())) | ||
| } | ||
|
|
||
| func TestQueueBatch_DrainActiveRequests(t *testing.T) { | ||
| sink := requesttest.NewSink() | ||
| cfg := newTestConfig() | ||
|
|
@@ -514,47 +480,6 @@ func TestQueueBatch_DrainActiveRequests(t *testing.T) { | |
| assert.Equal(t, 3, sink.ItemsCount()) | ||
| } | ||
|
|
||
| func TestQueueBatchWithTimeout(t *testing.T) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with removing this test. I assume other tests cover this behavior, it's not a very strong test condition. |
||
| sink := requesttest.NewSink() | ||
| cfg := newTestConfig() | ||
| cfg.WaitForResult = true | ||
| cfg.Batch.MinSize = 10 | ||
| qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export) | ||
| require.NoError(t, err) | ||
| require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost())) | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) | ||
| defer cancel() | ||
| // Send 3 concurrent requests that should be merged in one batch | ||
| wg := sync.WaitGroup{} | ||
| for i := 0; i < 3; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| assert.NoError(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4})) | ||
| wg.Done() | ||
| }() | ||
| } | ||
| wg.Wait() | ||
| assert.Equal(t, 1, sink.RequestsCount()) | ||
| assert.Equal(t, 12, sink.ItemsCount()) | ||
|
|
||
| // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender | ||
| for i := 0; i < 3; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| assert.Error(t, qb.Send(ctx, &requesttest.FakeRequest{Items: 4, Delay: 30 * time.Millisecond})) | ||
| wg.Done() | ||
| }() | ||
| } | ||
| wg.Wait() | ||
|
|
||
| require.NoError(t, qb.Shutdown(context.Background())) | ||
|
|
||
| // The sink should not change | ||
| assert.Equal(t, 1, sink.RequestsCount()) | ||
| assert.Equal(t, 12, sink.ItemsCount()) | ||
| } | ||
|
|
||
| func TestQueueBatchTimerResetNoConflict(t *testing.T) { | ||
| sink := requesttest.NewSink() | ||
| cfg := newTestConfig() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the unit test nag and I am wondering if we have a mental model for when context might be nil. If it is unlikely to happen I'm ok with adding a simple test or test case to exercise that code and calling it good.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think passing in nil Contexts is considered very bad practice, and almost no code in collector/collector-contrib checks for that, so I think any receivers that do that would end up crashing the Collector very early anyway.
So I don't think that check is really needed (it's a holdover from the previous PR); if we really want the test coverage for its own sake, I'd say we remove the check entirely instead.