Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/mergectx-esexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Ensure metadata keys are always propagated in client context with batching enabled.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [41937]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ when `sending_queue` is enabled with batching support and enrich internal teleme
⚠️ This is experimental and may change at any time.

- `metadata_keys` (optional): List of metadata keys that will be used to partition the data
into batches if [sending_queue][exporterhelper] is enabled with batching support OR
`batcher::enabled` is set. The keys will also be used to enrich the exporter's internal
telemetry if defined. The keys are extracted from the client metadata available via the context
and added to the internal telemetry as attributes.
into batches if [sending_queue][exporterhelper] is enabled with batching support. With
batching enabled only the metadata keys are guaranteed to be propagated. The keys will also
be used to enrich the exporter's internal telemetry if defined. The keys are extracted from
the client metadata available via the context and added to the internal telemetry as attributes.

NOTE: The metadata keys are converted to lower case as key lookups for client metadata is case insensitive. This means that the metric produced by internal telemetry will also have the attribute in lower case.

Expand Down
266 changes: 252 additions & 14 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"fmt"
"math"
"net/http"
"net/url"
"runtime"
"slices"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -2605,7 +2607,202 @@ func TestExporterBatcher(t *testing.T) {
assert.Equal(t, "value2", requests[1].Context().Value(key{}))
}

func TestExporterSendingQueueContextPropogation(t *testing.T) {
testCtxKey := component.NewID(component.MustNewType("testctxkey"))
metadata := client.NewMetadata(map[string][]string{
"key_1": {"val_1"},
"key_2": {"val_2"},
})
configSetupFn := func(cfg *Config) {
cfg.MetadataKeys = slices.Collect(metadata.Keys())
cfg.Auth = configoptional.Some(configauth.Config{
AuthenticatorID: testCtxKey,
})
// Configure sending queue with batching enabled. Batching configuration are
// kept such that test can simulate batching and the batch matures on age.
cfg.QueueBatchConfig.WaitForResult = false
cfg.QueueBatchConfig.BlockOnOverflow = true
cfg.QueueBatchConfig.QueueSize = 100 // big enough to accommodate all requests
cfg.QueueBatchConfig.NumConsumers = 10
batchCfg := cfg.QueueBatchConfig.Batch.Get()
batchCfg.FlushTimeout = 100 * time.Millisecond
batchCfg.Sizer = exporterhelper.RequestSizerTypeItems
batchCfg.MinSize = 100 // big enough to accommodate all requests
batchCfg.MaxSize = 1000
}
setupTestHost := func(t *testing.T) (component.Host, *bulkRecorder) {
t.Helper()

rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})
esURL, err := url.Parse(server.URL)
require.NoError(t, err)
return &mockHost{
extensions: map[component.ID]component.Component{
testCtxKey: newMockAuthClient(func(req *http.Request) (*http.Response, error) {
info := client.FromContext(req.Context())
for k := range metadata.Keys() {
assert.Equal(t, metadata.Get(k), info.Metadata.Get(k))
}
req.Clone(req.Context())
req.URL.Host = esURL.Host
req.URL.Scheme = esURL.Scheme
req.Host = ""
return http.DefaultTransport.RoundTrip(req)
}),
},
}, rec
}

t.Run("metrics", func(t *testing.T) {
testHost, rec := setupTestHost(t)
exporter := newUnstartedTestMetricsExporter(t, "https://ignored", configSetupFn)
require.NoError(t, exporter.Start(t.Context(), testHost))
defer func() {
require.NoError(t, exporter.Shutdown(t.Context()))
}()

sendMetrics := func(name string) {
metrics := pmetric.NewMetrics()
resourceMetric := metrics.ResourceMetrics().AppendEmpty()
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
fooBarMetric := scopeMetric.Metrics().AppendEmpty()
fooBarMetric.SetName(name)
fooBarMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)

ctx := client.NewContext(t.Context(), client.Info{Metadata: metadata})
mustSendMetricsWithCtx(ctx, t, exporter, metrics)
}

sendMetrics("foo.bar.1")
sendMetrics("foo.bar.2")
rec.WaitItems(1) // both metric should be within a single doc grouped together
})

t.Run("logs", func(t *testing.T) {
testHost, rec := setupTestHost(t)
exporter := newUnstartedTestLogsExporter(t, "https://ignored", configSetupFn)
require.NoError(t, exporter.Start(t.Context(), testHost))
defer func() {
require.NoError(t, exporter.Shutdown(t.Context()))
}()

sendLogs := func(log string) {
logs := plog.NewLogs()
resourceLog := logs.ResourceLogs().AppendEmpty()
scopeLog := resourceLog.ScopeLogs().AppendEmpty()
fooBarLog := scopeLog.LogRecords().AppendEmpty()
fooBarLog.Body().SetStr(log)

ctx := client.NewContext(t.Context(), client.Info{Metadata: metadata})
mustSendLogsWithCtx(ctx, t, exporter, logs)
}

sendLogs("log.1")
sendLogs("log.2")
rec.WaitItems(2) // 2 log documents are expected
})

t.Run("traces", func(t *testing.T) {
testHost, rec := setupTestHost(t)
exporter := newUnstartedTestTracesExporter(t, "https://ignored", configSetupFn)
require.NoError(t, exporter.Start(t.Context(), testHost))
defer func() {
require.NoError(t, exporter.Shutdown(t.Context()))
}()

sendTraces := func(name string) {
traces := ptrace.NewTraces()
resourceSpan := traces.ResourceSpans().AppendEmpty()
scopeSpan := resourceSpan.ScopeSpans().AppendEmpty()
fooBarSpan := scopeSpan.Spans().AppendEmpty()
fooBarSpan.SetName(name)

ctx := client.NewContext(t.Context(), client.Info{Metadata: metadata})
mustSendTracesWithCtx(ctx, t, exporter, traces)
}

sendTraces("span.1")
sendTraces("span.2")
rec.WaitItems(2) // 2 span documents are expected
})

t.Run("profiles", func(t *testing.T) {
testHost, rec := setupTestHost(t)
exporter := newUnstartedTestProfilesExporter(t, "https://ignored", configSetupFn)
require.NoError(t, exporter.Start(t.Context(), testHost))
defer func() {
require.NoError(t, exporter.Shutdown(t.Context()))
}()

sendProfiles := func() {
profiles := pprofile.NewProfiles()
dic := profiles.Dictionary()
resource := profiles.ResourceProfiles().AppendEmpty()
scope := resource.ScopeProfiles().AppendEmpty()
profile := scope.Profiles().AppendEmpty()

dic.StringTable().Append("samples", "count", "cpu", "nanoseconds")
st := profile.SampleType()
st.SetTypeStrindex(0)
st.SetUnitStrindex(1)
pt := profile.PeriodType()
pt.SetTypeStrindex(2)
pt.SetUnitStrindex(3)

a := dic.AttributeTable().AppendEmpty()
a.SetKeyStrindex(4)
dic.StringTable().Append("process.executable.build_id.htlhash")
a.Value().SetStr("600DCAFE4A110000F2BF38C493F5FB92")
a = dic.AttributeTable().AppendEmpty()
a.SetKeyStrindex(5)
dic.StringTable().Append("profile.frame.type")
a.Value().SetStr("native")
a = dic.AttributeTable().AppendEmpty()
a.SetKeyStrindex(6)
dic.StringTable().Append("host.id")
a.Value().SetStr("localhost")

profile.AttributeIndices().Append(2)

sample := profile.Sample().AppendEmpty()
sample.TimestampsUnixNano().Append(0)

stack := dic.StackTable().AppendEmpty()
stack.LocationIndices().Append(0)

m := dic.MappingTable().AppendEmpty()
m.AttributeIndices().Append(0)

l := dic.LocationTable().AppendEmpty()
l.SetMappingIndex(0)
l.SetAddress(111)
l.AttributeIndices().Append(1)

ctx := client.NewContext(t.Context(), client.Info{Metadata: metadata})
mustSendProfilesWithCtx(ctx, t, exporter, profiles)
}

sendProfiles()
sendProfiles()
rec.WaitItems(5) // 5 profile documents are expected in total
})
}

func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Traces {
exp := newUnstartedTestTracesExporter(t, url, fns...)
err := exp.Start(t.Context(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background())) //nolint:usetesting
})
return exp
}

func newUnstartedTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Traces {
f := NewFactory()
cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) {
cfg.Endpoints = []string{url}
Expand All @@ -2616,16 +2813,20 @@ func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) expor
require.NoError(t, xconfmap.Validate(cfg))
exp, err := f.CreateTraces(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg)
require.NoError(t, err)
return exp
}

err = exp.Start(t.Context(), componenttest.NewNopHost())
func newTestProfilesExporter(t *testing.T, url string, fns ...func(*Config)) xexporter.Profiles {
exp := newUnstartedTestProfilesExporter(t, url, fns...)
err := exp.Start(t.Context(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background())) //nolint:usetesting
})
return exp
}

func newTestProfilesExporter(t *testing.T, url string, fns ...func(*Config)) xexporter.Profiles {
func newUnstartedTestProfilesExporter(t *testing.T, url string, fns ...func(*Config)) xexporter.Profiles {
f := NewFactory().(xexporter.Factory)
cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) {
cfg.Endpoints = []string{url}
Expand All @@ -2636,16 +2837,19 @@ func newTestProfilesExporter(t *testing.T, url string, fns ...func(*Config)) xex
require.NoError(t, xconfmap.Validate(cfg))
exp, err := f.CreateProfiles(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg)
require.NoError(t, err)
return exp
}

err = exp.Start(t.Context(), componenttest.NewNopHost())
require.NoError(t, err)
func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Metrics {
exp := newUnstartedTestMetricsExporter(t, url, fns...)
require.NoError(t, exp.Start(t.Context(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background())) //nolint:usetesting
})
return exp
}

func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Metrics {
func newUnstartedTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Metrics {
f := NewFactory()
cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) {
cfg.Endpoints = []string{url}
Expand All @@ -2656,12 +2860,6 @@ func newTestMetricsExporter(t *testing.T, url string, fns ...func(*Config)) expo
require.NoError(t, xconfmap.Validate(cfg))
exp, err := f.CreateMetrics(t.Context(), exportertest.NewNopSettings(metadata.Type), cfg)
require.NoError(t, err)

err = exp.Start(t.Context(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background())) //nolint:usetesting
})
return exp
}

Expand Down Expand Up @@ -2700,10 +2898,19 @@ func mustSendLogRecords(t *testing.T, exporter exporter.Logs, records ...plog.Lo
}

func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) {
mustSendLogsWithCtx(t.Context(), t, exporter, logs)
}

func mustSendLogsWithCtx(
ctx context.Context,
t *testing.T,
exporter exporter.Logs,
logs plog.Logs,
) {
if !exporter.Capabilities().MutatesData {
logs.MarkReadOnly()
}
err := exporter.ConsumeLogs(t.Context(), logs)
err := exporter.ConsumeLogs(ctx, logs)
require.NoError(t, err)
}

Expand Down Expand Up @@ -2732,10 +2939,19 @@ func mustSendMetricGaugeDataPoints(t *testing.T, exporter exporter.Metrics, data
}

func mustSendMetrics(t *testing.T, exporter exporter.Metrics, metrics pmetric.Metrics) {
mustSendMetricsWithCtx(t.Context(), t, exporter, metrics)
}

func mustSendMetricsWithCtx(
ctx context.Context,
t *testing.T,
exporter exporter.Metrics,
metrics pmetric.Metrics,
) {
if !exporter.Capabilities().MutatesData {
metrics.MarkReadOnly()
}
err := exporter.ConsumeMetrics(t.Context(), metrics)
err := exporter.ConsumeMetrics(ctx, metrics)
require.NoError(t, err)
}

Expand All @@ -2750,10 +2966,32 @@ func mustSendSpans(t *testing.T, exporter exporter.Traces, spans ...ptrace.Span)
}

func mustSendTraces(t *testing.T, exporter exporter.Traces, traces ptrace.Traces) {
mustSendTracesWithCtx(t.Context(), t, exporter, traces)
}

func mustSendTracesWithCtx(
ctx context.Context,
t *testing.T,
exporter exporter.Traces,
traces ptrace.Traces,
) {
if !exporter.Capabilities().MutatesData {
traces.MarkReadOnly()
}
err := exporter.ConsumeTraces(t.Context(), traces)
err := exporter.ConsumeTraces(ctx, traces)
require.NoError(t, err)
}

func mustSendProfilesWithCtx(
ctx context.Context,
t *testing.T,
exporter xexporter.Profiles,
profiles pprofile.Profiles,
) {
if !exporter.Capabilities().MutatesData {
profiles.MarkReadOnly()
}
err := exporter.ConsumeProfiles(ctx, profiles)
require.NoError(t, err)
}

Expand Down
Loading