From a488fa71162a04c01bf80c20a7f2bebf031ec355 Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 20 Dec 2023 12:36:03 +0100 Subject: [PATCH 1/9] feat: add metrics&traces merge fns --- exporter/loadbalancingexporter/helpers.go | 85 ++++++++++++ .../loadbalancingexporter/helpers_test.go | 121 ++++++++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 exporter/loadbalancingexporter/helpers.go create mode 100644 exporter/loadbalancingexporter/helpers_test.go diff --git a/exporter/loadbalancingexporter/helpers.go b/exporter/loadbalancingexporter/helpers.go new file mode 100644 index 0000000000000..34e5d1ff7ed34 --- /dev/null +++ b/exporter/loadbalancingexporter/helpers.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// mergeTraces concatenates multiple ptrace.Traces into a single ptrace.Traces. +func mergeTraces(traces ...ptrace.Traces) ptrace.Traces { + merged := ptrace.NewTraces() + empty := ptrace.Traces{} + + for _, trace := range traces { + + if trace == empty { + continue + } + + for i := 0; i < trace.ResourceSpans().Len(); i++ { + rs := trace.ResourceSpans().At(i) + + newRS := merged.ResourceSpans().AppendEmpty() + rs.Resource().CopyTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) + + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ils := rs.ScopeSpans().At(j) + + newILS := newRS.ScopeSpans().AppendEmpty() + ils.Scope().CopyTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) + + for k := 0; k < ils.Spans().Len(); k++ { + span := ils.Spans().At(k) + newSpan := newILS.Spans().AppendEmpty() + span.CopyTo(newSpan) + } + } + } + + } + + return merged +} + +// mergeTraces concatenates multiple pmetric.Metrics into a single pmetric.Metrics. +func mergeMetrics(metrics ...pmetric.Metrics) pmetric.Metrics { + merged := pmetric.NewMetrics() + empty := pmetric.Metrics{} + + for _, metric := range metrics { + + if metric == empty { + continue + } + + for i := 0; i < metric.ResourceMetrics().Len(); i++ { + rs := metric.ResourceMetrics().At(i) + + newRM := merged.ResourceMetrics().AppendEmpty() + rs.Resource().CopyTo(newRM.Resource()) + newRM.SetSchemaUrl(rs.SchemaUrl()) + + for j := 0; j < rs.ScopeMetrics().Len(); j++ { + ilm := rs.ScopeMetrics().At(j) + + newILM := newRM.ScopeMetrics().AppendEmpty() + ilm.Scope().CopyTo(newILM.Scope()) + newILM.SetSchemaUrl(ilm.SchemaUrl()) + + for k := 0; k < ilm.Metrics().Len(); k++ { + m := ilm.Metrics().At(k) + newMetric := newILM.Metrics().AppendEmpty() + m.CopyTo(newMetric) + } + } + } + + } + + return merged +} diff --git a/exporter/loadbalancingexporter/helpers_test.go b/exporter/loadbalancingexporter/helpers_test.go new file mode 100644 index 0000000000000..b1ca073ac8c59 --- /dev/null +++ b/exporter/loadbalancingexporter/helpers_test.go @@ -0,0 +1,121 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" +) + +func TestMergeTracesTwoEmpty(t *testing.T) { + expectedEmpty := ptrace.NewTraces() + trace1 := ptrace.Traces{} + trace2 := ptrace.Traces{} + + mergedTraces := mergeTraces(trace1, trace2) + + require.Equal(t, expectedEmpty, mergedTraces) +} + +func TestMergeTracesSingleEmpty(t *testing.T) { + expectedTraces := simpleTraces() + + trace1 := ptrace.Traces{} + trace2 := simpleTraces() + + mergedTraces := mergeTraces(trace1, trace2) + + require.Equal(t, expectedTraces, mergedTraces) +} + +func TestMergeTraces(t *testing.T) { + expectedTraces := ptrace.NewTraces() + expectedTraces.ResourceSpans().EnsureCapacity(3) + aspans := expectedTraces.ResourceSpans().AppendEmpty() + aspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-1") + aspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + bspans := expectedTraces.ResourceSpans().AppendEmpty() + bspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-2") + bspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 2}) + cspans := expectedTraces.ResourceSpans().AppendEmpty() + cspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-3") + cspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 3}) + + trace1 := ptrace.NewTraces() + trace1.ResourceSpans().EnsureCapacity(2) + t1aspans := trace1.ResourceSpans().AppendEmpty() + t1aspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-1") + t1aspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + t1bspans := trace1.ResourceSpans().AppendEmpty() + t1bspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-2") + t1bspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 2}) + + trace2 := ptrace.NewTraces() + trace2.ResourceSpans().EnsureCapacity(1) + t2cspans := trace2.ResourceSpans().AppendEmpty() + t2cspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-3") + t2cspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 3}) + + mergedTraces := mergeTraces(trace1, trace2) + + require.Equal(t, expectedTraces, mergedTraces) +} + +func TestMergeMetricsTwoEmpty(t *testing.T) { + expectedEmpty := pmetric.NewMetrics() + metric1 := pmetric.Metrics{} + metric2 := pmetric.Metrics{} + + mergedMetrics := mergeMetrics(metric1, metric2) + + require.Equal(t, expectedEmpty, mergedMetrics) +} + +func TestMergeMetricsSingleEmpty(t *testing.T) { + expectedMetrics := simpleMetricsWithResource() + + metric1 := pmetric.Metrics{} + metric2 := simpleMetricsWithResource() + + mergedMetrics := mergeMetrics(metric1, metric2) + + require.Equal(t, expectedMetrics, mergedMetrics) +} + +func TestMergeMetrics(t *testing.T) { + expectedMetrics := pmetric.NewMetrics() + expectedMetrics.ResourceMetrics().EnsureCapacity(3) + ametrics := expectedMetrics.ResourceMetrics().AppendEmpty() + ametrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-1") + ametrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1") + bmetrics := expectedMetrics.ResourceMetrics().AppendEmpty() + bmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-2") + bmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1") + cmetrics := expectedMetrics.ResourceMetrics().AppendEmpty() + cmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-3") + cmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m2") + + metric1 := pmetric.NewMetrics() + metric1.ResourceMetrics().EnsureCapacity(2) + m1ametrics := metric1.ResourceMetrics().AppendEmpty() + m1ametrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-1") + m1ametrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1") + m1bmetrics := metric1.ResourceMetrics().AppendEmpty() + m1bmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-2") + m1bmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1") + + metric2 := pmetric.NewMetrics() + metric2.ResourceMetrics().EnsureCapacity(1) + m2cmetrics := metric2.ResourceMetrics().AppendEmpty() + m2cmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-3") + m2cmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m2") + + mergedMetrics := mergeMetrics(metric1, metric2) + + require.Equal(t, expectedMetrics, mergedMetrics) +} From cbe24793980a137142ccf84810cf9535ae82a79c Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 20 Dec 2023 12:36:39 +0100 Subject: [PATCH 2/9] feat: optimize traces sent --- .../loadbalancingexporter/trace_exporter.go | 90 ++++++++++++------- .../trace_exporter_test.go | 15 +++- 2 files changed, 72 insertions(+), 33 deletions(-) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 06a67b1426382..1c10f225506b7 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -24,6 +24,9 @@ import ( var _ exporter.Traces = (*traceExporterImp)(nil) +type exporterTraces map[component.Component]map[string]ptrace.Traces +type endpointTraces map[string]ptrace.Traces + type traceExporterImp struct { loadBalancer loadBalancer routingKey routingKey @@ -78,48 +81,73 @@ func (e *traceExporterImp) Shutdown(context.Context) error { func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error + var exp component.Component + batches := batchpersignal.SplitTraces(td) - for _, batch := range batches { - errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) - } - return errs -} + exporterSegregatedTraces := make(exporterTraces) + endpointSegregatedTraces := make(endpointTraces) -func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { - var exp component.Component - routingIds, err := routingIdentifiersFromTraces(td, e.routingKey) - if err != nil { - return err - } - for rid := range routingIds { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) + for _, batch := range batches { + routingID, err := routingIdentifiersFromTraces(batch, e.routingKey) if err != nil { return err } - te, ok := exp.(exporter.Traces) - if !ok { - return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) + for rid := range routingID { + endpoint := e.loadBalancer.Endpoint([]byte(rid)) + exp, err = e.loadBalancer.Exporter(endpoint) + if err != nil { + return err + } + + _, ok := endpointSegregatedTraces[endpoint] + if !ok { + endpointSegregatedTraces[endpoint] = ptrace.Traces{} + } + endpointSegregatedTraces[endpoint] = mergeTraces(endpointSegregatedTraces[endpoint], batch) + + _, ok = exporterSegregatedTraces[exp] + if !ok { + exporterSegregatedTraces[exp] = endpointSegregatedTraces + } + exporterSegregatedTraces[exp][endpoint] = endpointSegregatedTraces[endpoint] } + } + + errs = multierr.Append(errs, e.consumeTrace(ctx, exporterSegregatedTraces)) + + return errs +} + +func (e *traceExporterImp) consumeTrace(ctx context.Context, exporterSegregatedTraces exporterTraces) error { + var err error - start := time.Now() - err = te.ConsumeTraces(ctx, td) - duration := time.Since(start) - - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) + for exp, endpointTraces := range exporterSegregatedTraces { + for endpoint, td := range endpointTraces { + te, ok := exp.(exporter.Traces) + if !ok { + return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) + } + + start := time.Now() + err = te.ConsumeTraces(ctx, td) + duration := time.Since(start) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) + } } } + return err } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index ff0f606144ffb..f078808b5d6be 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -170,10 +170,11 @@ func TestConsumeTracesServiceBased(t *testing.T) { // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-2"}) lb.res = &mockResolver{ triggerCallbacks: true, onResolve: func(ctx context.Context) ([]string, error) { - return []string{"endpoint-1"}, nil + return []string{"endpoint-1", "endpoint-2"}, nil }, } p.loadBalancer = lb @@ -551,9 +552,19 @@ func simpleTraces() ptrace.Traces { func simpleTracesWithServiceName() ptrace.Traces { traces := ptrace.NewTraces() traces.ResourceSpans().EnsureCapacity(1) + rspans := traces.ResourceSpans().AppendEmpty() rspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-1") rspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + + bspans := traces.ResourceSpans().AppendEmpty() + bspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-2") + bspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + + aspans := traces.ResourceSpans().AppendEmpty() + aspans.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-3") + aspans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 5}) + return traces } @@ -584,7 +595,7 @@ func simpleConfig() *Config { func serviceBasedRoutingConfig() *Config { return &Config{ Resolver: ResolverSettings{ - Static: &StaticResolver{Hostnames: []string{"endpoint-1"}}, + Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}}, }, RoutingKey: "service", } From 6a4d7170d5f928b4212c888aed6690e6699c0ceb Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 20 Dec 2023 12:37:01 +0100 Subject: [PATCH 3/9] feat: optimize metrics sent --- .../loadbalancingexporter/metrics_exporter.go | 88 ++++++++++++------- .../metrics_exporter_test.go | 13 +-- 2 files changed, 64 insertions(+), 37 deletions(-) diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index f036bbd5cb208..5e7a0147ed2bd 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -28,6 +28,9 @@ import ( var _ exporter.Metrics = (*metricExporterImp)(nil) +type exporterMetrics map[component.Component]map[string]pmetric.Metrics +type endpointMetrics map[string]pmetric.Metrics + type metricExporterImp struct { loadBalancer loadBalancer routingKey routingKey @@ -80,46 +83,69 @@ func (e *metricExporterImp) Shutdown(context.Context) error { func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { var errs error + var exp component.Component + batches := batchpersignal.SplitMetrics(md) - for _, batch := range batches { - errs = multierr.Append(errs, e.consumeMetric(ctx, batch)) - } - return errs -} + exporterSegregatedMetrics := make(exporterMetrics) + endpointSegregatedMetrics := make(endpointMetrics) -func (e *metricExporterImp) consumeMetric(ctx context.Context, md pmetric.Metrics) error { - var exp component.Component - routingIds, err := routingIdentifiersFromMetrics(md, e.routingKey) - if err != nil { - return err - } - for rid := range routingIds { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) + for _, batch := range batches { + routingIds, err := routingIdentifiersFromMetrics(batch, e.routingKey) if err != nil { return err } - te, ok := exp.(exporter.Metrics) - if !ok { - return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", exp) + for rid := range routingIds { + endpoint := e.loadBalancer.Endpoint([]byte(rid)) + exp, err = e.loadBalancer.Exporter(endpoint) + if err != nil { + return err + } + + _, ok := endpointSegregatedMetrics[endpoint] + if !ok { + endpointSegregatedMetrics[endpoint] = pmetric.Metrics{} + } + endpointSegregatedMetrics[endpoint] = mergeMetrics(endpointSegregatedMetrics[endpoint], batch) + + _, ok = exporterSegregatedMetrics[exp] + if !ok { + exporterSegregatedMetrics[exp] = endpointSegregatedMetrics + } + exporterSegregatedMetrics[exp][endpoint] = endpointSegregatedMetrics[endpoint] } - start := time.Now() - err = te.ConsumeMetrics(ctx, md) - duration := time.Since(start) - - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) + errs = multierr.Append(errs, e.consumeMetric(ctx, exporterSegregatedMetrics)) + } + + return errs +} + +func (e *metricExporterImp) consumeMetric(ctx context.Context, exporterSegregatedMetrics exporterMetrics) error { + var err error + for exp, endpointMetrics := range exporterSegregatedMetrics { + for endpoint, md := range endpointMetrics { + te, ok := exp.(exporter.Metrics) + if !ok { + return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", exp) + } + + start := time.Now() + err = te.ConsumeMetrics(ctx, md) + duration := time.Since(start) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) + } } } diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index 510a1cac31de8..f22139d656c0e 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -176,11 +176,11 @@ func TestConsumeMetrics(t *testing.T) { assert.Equal(t, p.routingKey, svcRouting) // pre-load an exporter here, so that we don't use the actual OTLP exporter - lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-1", "endpoint-2"}) lb.res = &mockResolver{ triggerCallbacks: true, onResolve: func(ctx context.Context) ([]string, error) { - return []string{"endpoint-1"}, nil + return []string{"endpoint-1", "endpoint-2"}, nil }, } p.loadBalancer = lb @@ -378,10 +378,11 @@ func TestConsumeMetricsUnexpectedExporterType(t *testing.T) { // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-2"}) lb.res = &mockResolver{ triggerCallbacks: true, onResolve: func(ctx context.Context) ([]string, error) { - return []string{"endpoint-1"}, nil + return []string{"endpoint-1", "endpoint-2"}, nil }, } p.loadBalancer = lb @@ -680,7 +681,7 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { func endpoint2Config() *Config { return &Config{ Resolver: ResolverSettings{ - Static: &StaticResolver{Hostnames: []string{"endpoint-2"}}, + Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}}, }, RoutingKey: "service", } @@ -689,7 +690,7 @@ func endpoint2Config() *Config { func resourceBasedRoutingConfig() *Config { return &Config{ Resolver: ResolverSettings{ - Static: &StaticResolver{Hostnames: []string{"endpoint-1"}}, + Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}}, }, RoutingKey: resourceRouteKey, } @@ -698,7 +699,7 @@ func resourceBasedRoutingConfig() *Config { func metricNameBasedRoutingConfig() *Config { return &Config{ Resolver: ResolverSettings{ - Static: &StaticResolver{Hostnames: []string{"endpoint-1"}}, + Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}}, }, RoutingKey: metricRouteKey, } From bf8222a74db8b7bc498ee0f19ad5a2ee9ba8d27a Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 20 Dec 2023 12:48:39 +0100 Subject: [PATCH 4/9] chore: add changelog --- .chloggen/mat-rumian-lbe-imp.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100755 .chloggen/mat-rumian-lbe-imp.yaml diff --git a/.chloggen/mat-rumian-lbe-imp.yaml b/.chloggen/mat-rumian-lbe-imp.yaml new file mode 100755 index 0000000000000..caa81910a768c --- /dev/null +++ b/.chloggen/mat-rumian-lbe-imp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Optimize metrics and traces export" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30141] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From bfb28370a33c82918e4fc64a1b9c2f2f73f1f3ec Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 20 Dec 2023 12:52:09 +0100 Subject: [PATCH 5/9] fix: checks --- exporter/loadbalancingexporter/helpers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/loadbalancingexporter/helpers.go b/exporter/loadbalancingexporter/helpers.go index 34e5d1ff7ed34..42dd732f92cfe 100644 --- a/exporter/loadbalancingexporter/helpers.go +++ b/exporter/loadbalancingexporter/helpers.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package loadbalancingexporter +package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" import ( "go.opentelemetry.io/collector/pdata/pmetric" From d9d9ef73801413e00b50ea69530c3262ea70f59d Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 20 Dec 2023 14:11:16 +0100 Subject: [PATCH 6/9] fix: exporter type check --- .../loadbalancingexporter/metrics_exporter.go | 16 +++++++++------- exporter/loadbalancingexporter/trace_exporter.go | 11 ++++++----- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index 5e7a0147ed2bd..eb2abf4b7729d 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -102,8 +102,12 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri if err != nil { return err } + _, ok := exp.(exporter.Metrics) + if !ok { + return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", exp) + } - _, ok := endpointSegregatedMetrics[endpoint] + _, ok = endpointSegregatedMetrics[endpoint] if !ok { endpointSegregatedMetrics[endpoint] = pmetric.Metrics{} } @@ -115,21 +119,19 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri } exporterSegregatedMetrics[exp][endpoint] = endpointSegregatedMetrics[endpoint] } - - errs = multierr.Append(errs, e.consumeMetric(ctx, exporterSegregatedMetrics)) } + errs = multierr.Append(errs, e.consumeMetric(ctx, exporterSegregatedMetrics)) + return errs } func (e *metricExporterImp) consumeMetric(ctx context.Context, exporterSegregatedMetrics exporterMetrics) error { var err error + for exp, endpointMetrics := range exporterSegregatedMetrics { for endpoint, md := range endpointMetrics { - te, ok := exp.(exporter.Metrics) - if !ok { - return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", exp) - } + te, _ := exp.(exporter.Metrics) start := time.Now() err = te.ConsumeMetrics(ctx, md) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 1c10f225506b7..18d80cca2ba59 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -100,8 +100,12 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) if err != nil { return err } + _, ok := exp.(exporter.Traces) + if !ok { + return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) + } - _, ok := endpointSegregatedTraces[endpoint] + _, ok = endpointSegregatedTraces[endpoint] if !ok { endpointSegregatedTraces[endpoint] = ptrace.Traces{} } @@ -125,10 +129,7 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, exporterSegregatedT for exp, endpointTraces := range exporterSegregatedTraces { for endpoint, td := range endpointTraces { - te, ok := exp.(exporter.Traces) - if !ok { - return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) - } + te, _ := exp.(exporter.Traces) start := time.Now() err = te.ConsumeTraces(ctx, td) From b445cb6038ae6344b73ad340e594b4737179e631 Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 20 Dec 2023 15:10:39 +0100 Subject: [PATCH 7/9] fix: tests and segregation assignments --- exporter/loadbalancingexporter/metrics_exporter.go | 2 +- exporter/loadbalancingexporter/metrics_exporter_test.go | 1 - exporter/loadbalancingexporter/trace_exporter.go | 3 +-- exporter/loadbalancingexporter/trace_exporter_test.go | 3 ++- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index eb2abf4b7729d..f8004e448082d 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -115,7 +115,7 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri _, ok = exporterSegregatedMetrics[exp] if !ok { - exporterSegregatedMetrics[exp] = endpointSegregatedMetrics + exporterSegregatedMetrics[exp] = endpointMetrics{} } exporterSegregatedMetrics[exp][endpoint] = endpointSegregatedMetrics[endpoint] } diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index f22139d656c0e..cd6d984523497 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -729,7 +729,6 @@ func simpleMetricsWithServiceName() pmetric.Metrics { } func simpleMetricsWithResource() pmetric.Metrics { - metrics := pmetric.NewMetrics() metrics.ResourceMetrics().EnsureCapacity(1) rmetrics := metrics.ResourceMetrics().AppendEmpty() diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 18d80cca2ba59..2b05c6e088505 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -87,7 +87,6 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) exporterSegregatedTraces := make(exporterTraces) endpointSegregatedTraces := make(endpointTraces) - for _, batch := range batches { routingID, err := routingIdentifiersFromTraces(batch, e.routingKey) if err != nil { @@ -113,7 +112,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) _, ok = exporterSegregatedTraces[exp] if !ok { - exporterSegregatedTraces[exp] = endpointSegregatedTraces + exporterSegregatedTraces[exp] = endpointTraces{} } exporterSegregatedTraces[exp][endpoint] = endpointSegregatedTraces[endpoint] } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index f078808b5d6be..f77464a31744b 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -346,7 +346,8 @@ func TestBatchWithTwoTraces(t *testing.T) { // verify assert.NoError(t, err) - assert.Len(t, sink.AllTraces(), 2) + assert.Len(t, sink.AllTraces(), 1) + assert.Equal(t, sink.AllTraces()[0].SpanCount(), 2) } func TestNoTracesInBatch(t *testing.T) { From fd6a12ce493bae2104b6a96464d6d09a2d5f06d3 Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 3 Jan 2024 13:58:28 +0100 Subject: [PATCH 8/9] fix: mergering functions --- exporter/loadbalancingexporter/helpers.go | 134 +++++++++++------- .../loadbalancingexporter/helpers_test.go | 12 +- 2 files changed, 92 insertions(+), 54 deletions(-) diff --git a/exporter/loadbalancingexporter/helpers.go b/exporter/loadbalancingexporter/helpers.go index 42dd732f92cfe..b275ebd52fbcd 100644 --- a/exporter/loadbalancingexporter/helpers.go +++ b/exporter/loadbalancingexporter/helpers.go @@ -8,78 +8,116 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -// mergeTraces concatenates multiple ptrace.Traces into a single ptrace.Traces. -func mergeTraces(traces ...ptrace.Traces) ptrace.Traces { - merged := ptrace.NewTraces() - empty := ptrace.Traces{} +// mergeTraces concatenates two ptrace.Traces into a single ptrace.Traces. +func mergeTraces(t1 ptrace.Traces, t2 ptrace.Traces) ptrace.Traces { + mergedTraces := ptrace.NewTraces() - for _, trace := range traces { + if t1.SpanCount() == 0 && t2.SpanCount() == 0 { + return mergedTraces + } + + // Iterate over the first trace and append spans to the merged traces + for i := 0; i < t1.ResourceSpans().Len(); i++ { + rs := t1.ResourceSpans().At(i) + newRS := mergedTraces.ResourceSpans().AppendEmpty() + + rs.Resource().MoveTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) + + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ils := rs.ScopeSpans().At(j) - if trace == empty { - continue + newILS := newRS.ScopeSpans().AppendEmpty() + ils.Scope().MoveTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) + + for k := 0; k < ils.Spans().Len(); k++ { + span := ils.Spans().At(k) + newSpan := newILS.Spans().AppendEmpty() + span.MoveTo(newSpan) + } } + } - for i := 0; i < trace.ResourceSpans().Len(); i++ { - rs := trace.ResourceSpans().At(i) + // Iterate over the second trace and append spans to the merged traces + for i := 0; i < t2.ResourceSpans().Len(); i++ { + rs := t2.ResourceSpans().At(i) + newRS := mergedTraces.ResourceSpans().AppendEmpty() - newRS := merged.ResourceSpans().AppendEmpty() - rs.Resource().CopyTo(newRS.Resource()) - newRS.SetSchemaUrl(rs.SchemaUrl()) + rs.Resource().MoveTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) - for j := 0; j < rs.ScopeSpans().Len(); j++ { - ils := rs.ScopeSpans().At(j) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ils := rs.ScopeSpans().At(j) - newILS := newRS.ScopeSpans().AppendEmpty() - ils.Scope().CopyTo(newILS.Scope()) - newILS.SetSchemaUrl(ils.SchemaUrl()) + newILS := newRS.ScopeSpans().AppendEmpty() + ils.Scope().MoveTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) - for k := 0; k < ils.Spans().Len(); k++ { - span := ils.Spans().At(k) - newSpan := newILS.Spans().AppendEmpty() - span.CopyTo(newSpan) - } + for k := 0; k < ils.Spans().Len(); k++ { + span := ils.Spans().At(k) + newSpan := newILS.Spans().AppendEmpty() + span.MoveTo(newSpan) } } - } - return merged + return mergedTraces } -// mergeTraces concatenates multiple pmetric.Metrics into a single pmetric.Metrics. -func mergeMetrics(metrics ...pmetric.Metrics) pmetric.Metrics { - merged := pmetric.NewMetrics() - empty := pmetric.Metrics{} +// mergeMetrics concatenates two pmetric.Metrics into a single pmetric.Metrics. +func mergeMetrics(m1 pmetric.Metrics, m2 pmetric.Metrics) pmetric.Metrics { + mergedMetrics := pmetric.NewMetrics() - for _, metric := range metrics { + if m1.MetricCount() == 0 && m2.MetricCount() == 0 { + return mergedMetrics + } + + // Iterate over the first metric and append metrics to the merged metrics + for i := 0; i < m1.ResourceMetrics().Len(); i++ { + rs := m1.ResourceMetrics().At(i) + newRS := mergedMetrics.ResourceMetrics().AppendEmpty() + + rs.Resource().MoveTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) + + for j := 0; j < rs.ScopeMetrics().Len(); j++ { + ils := rs.ScopeMetrics().At(j) - if metric == empty { - continue + newILS := newRS.ScopeMetrics().AppendEmpty() + ils.Scope().MoveTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) + + for k := 0; k < ils.Metrics().Len(); k++ { + metric := ils.Metrics().At(k) + newMetric := newILS.Metrics().AppendEmpty() + metric.MoveTo(newMetric) + } } + } - for i := 0; i < metric.ResourceMetrics().Len(); i++ { - rs := metric.ResourceMetrics().At(i) + // Iterate over the second metric and append metrics to the merged metrics + for i := 0; i < m2.ResourceMetrics().Len(); i++ { + rs := m2.ResourceMetrics().At(i) + newRS := mergedMetrics.ResourceMetrics().AppendEmpty() - newRM := merged.ResourceMetrics().AppendEmpty() - rs.Resource().CopyTo(newRM.Resource()) - newRM.SetSchemaUrl(rs.SchemaUrl()) + rs.Resource().MoveTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) - for j := 0; j < rs.ScopeMetrics().Len(); j++ { - ilm := rs.ScopeMetrics().At(j) + for j := 0; j < rs.ScopeMetrics().Len(); j++ { + ils := rs.ScopeMetrics().At(j) - newILM := newRM.ScopeMetrics().AppendEmpty() - ilm.Scope().CopyTo(newILM.Scope()) - newILM.SetSchemaUrl(ilm.SchemaUrl()) + newILS := newRS.ScopeMetrics().AppendEmpty() + ils.Scope().MoveTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) - for k := 0; k < ilm.Metrics().Len(); k++ { - m := ilm.Metrics().At(k) - newMetric := newILM.Metrics().AppendEmpty() - m.CopyTo(newMetric) - } + for k := 0; k < ils.Metrics().Len(); k++ { + metric := ils.Metrics().At(k) + newMetric := newILS.Metrics().AppendEmpty() + metric.MoveTo(newMetric) } } - } - return merged + return mergedMetrics } diff --git a/exporter/loadbalancingexporter/helpers_test.go b/exporter/loadbalancingexporter/helpers_test.go index b1ca073ac8c59..768999b4ad4d5 100644 --- a/exporter/loadbalancingexporter/helpers_test.go +++ b/exporter/loadbalancingexporter/helpers_test.go @@ -14,8 +14,8 @@ import ( func TestMergeTracesTwoEmpty(t *testing.T) { expectedEmpty := ptrace.NewTraces() - trace1 := ptrace.Traces{} - trace2 := ptrace.Traces{} + trace1 := ptrace.NewTraces() + trace2 := ptrace.NewTraces() mergedTraces := mergeTraces(trace1, trace2) @@ -25,7 +25,7 @@ func TestMergeTracesTwoEmpty(t *testing.T) { func TestMergeTracesSingleEmpty(t *testing.T) { expectedTraces := simpleTraces() - trace1 := ptrace.Traces{} + trace1 := ptrace.NewTraces() trace2 := simpleTraces() mergedTraces := mergeTraces(trace1, trace2) @@ -68,8 +68,8 @@ func TestMergeTraces(t *testing.T) { func TestMergeMetricsTwoEmpty(t *testing.T) { expectedEmpty := pmetric.NewMetrics() - metric1 := pmetric.Metrics{} - metric2 := pmetric.Metrics{} + metric1 := pmetric.NewMetrics() + metric2 := pmetric.NewMetrics() mergedMetrics := mergeMetrics(metric1, metric2) @@ -79,7 +79,7 @@ func TestMergeMetricsTwoEmpty(t *testing.T) { func TestMergeMetricsSingleEmpty(t *testing.T) { expectedMetrics := simpleMetricsWithResource() - metric1 := pmetric.Metrics{} + metric1 := pmetric.NewMetrics() metric2 := simpleMetricsWithResource() mergedMetrics := mergeMetrics(metric1, metric2) From b94780b4bedeae29ce4931a42bd27b868524fe41 Mon Sep 17 00:00:00 2001 From: Mateusz Rumian Date: Wed, 3 Jan 2024 14:50:06 +0100 Subject: [PATCH 9/9] fix tests --- exporter/loadbalancingexporter/metrics_exporter.go | 2 +- exporter/loadbalancingexporter/trace_exporter.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index f8004e448082d..0b685801e3463 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -109,7 +109,7 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri _, ok = endpointSegregatedMetrics[endpoint] if !ok { - endpointSegregatedMetrics[endpoint] = pmetric.Metrics{} + endpointSegregatedMetrics[endpoint] = pmetric.NewMetrics() } endpointSegregatedMetrics[endpoint] = mergeMetrics(endpointSegregatedMetrics[endpoint], batch) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 2b05c6e088505..d3ce46d8e2f6f 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -106,7 +106,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) _, ok = endpointSegregatedTraces[endpoint] if !ok { - endpointSegregatedTraces[endpoint] = ptrace.Traces{} + endpointSegregatedTraces[endpoint] = ptrace.NewTraces() } endpointSegregatedTraces[endpoint] = mergeTraces(endpointSegregatedTraces[endpoint], batch)