diff --git a/.chloggen/improve-loadbalancer-performance.yaml b/.chloggen/improve-loadbalancer-performance.yaml new file mode 100644 index 0000000000000..49e0420dc00b3 --- /dev/null +++ b/.chloggen/improve-loadbalancer-performance.yaml @@ -0,0 +1,17 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add batching for each endpoint in the loadbalancingexporter. + +# One or more tracking issues related to the change +issues: [17173] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + diff --git a/exporter/loadbalancingexporter/log_exporter.go b/exporter/loadbalancingexporter/log_exporter.go index fa49a320c4c39..5e7518ea915fb 100644 --- a/exporter/loadbalancingexporter/log_exporter.go +++ b/exporter/loadbalancingexporter/log_exporter.go @@ -76,25 +76,41 @@ func (e *logExporterImp) Shutdown(context.Context) error { func (e *logExporterImp) ConsumeLogs(ctx context.Context, ld plog.Logs) error { var errs error + endpointToLogData := make(map[string]plog.Logs) batches := batchpersignal.SplitLogs(ld) - for _, batch := range batches { - errs = multierr.Append(errs, e.consumeLog(ctx, batch)) + + // Map the logs to their respective endpoints. + for batch := range batches { + // Each batch contains at most one trace ID. + traceID := traceIDFromLogs(ld) + balancingKey := traceID + if traceID == pcommon.NewTraceIDEmpty() { + // every log may not contain a traceID + // generate a random traceID as balancingKey + // so the log can be routed to a random backend + balancingKey = random() + } + + endpoint := e.loadBalancer.Endpoint(balancingKey[:]) + if _, ok := endpointToLogData[endpoint]; ok { + // append + batches[batch].ResourceLogs().MoveAndAppendTo(endpointToLogData[endpoint].ResourceLogs()) + } else { + newLog := plog.NewLogs() + batches[batch].ResourceLogs().MoveAndAppendTo(newLog.ResourceLogs()) + endpointToLogData[endpoint] = newLog + } + } + + // Send the logs off by endpoint. + for endpoint, logs := range endpointToLogData { + errs = multierr.Append(errs, e.consumeLog(ctx, logs, endpoint)) } return errs } -func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error { - traceID := traceIDFromLogs(ld) - balancingKey := traceID - if traceID == pcommon.NewTraceIDEmpty() { - // every log may not contain a traceID - // generate a random traceID as balancingKey - // so the log can be routed to a random backend - balancingKey = random() - } - - endpoint := e.loadBalancer.Endpoint(balancingKey[:]) +func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs, endpoint string) error { exp, err := e.loadBalancer.Exporter(endpoint) if err != nil { return err diff --git a/exporter/loadbalancingexporter/log_exporter_test.go b/exporter/loadbalancingexporter/log_exporter_test.go index ea66819231799..ec0be9e3da67c 100644 --- a/exporter/loadbalancingexporter/log_exporter_test.go +++ b/exporter/loadbalancingexporter/log_exporter_test.go @@ -231,7 +231,8 @@ func TestLogBatchWithTwoTraces(t *testing.T) { // verify assert.NoError(t, err) - assert.Len(t, sink.AllLogs(), 2) + assert.Len(t, sink.AllLogs(), 1) + assert.Equal(t, sink.LogRecordCount(), 2) } func TestNoLogsInBatch(t *testing.T) { diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 327bb6ca8f180..4561eb43494c8 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -87,49 +87,99 @@ func (e *traceExporterImp) Shutdown(context.Context) error { return nil } +// If we are sending to only one endpoint, returns that endpoint. +// Otherwise returns empty string. +func (e *traceExporterImp) HasMultipleEndpoints(ctx context.Context, td ptrace.Traces, batches []ptrace.Traces) (string, error) { + var errs error + + // First check if we're sending to multiple exporters. + endpoint := "" + hasMultipleEndpoints := false + + for batch := range batches { + routingIDs, err := routingIdentifiersFromTraces(batches[batch], e.routingKey) + errs = multierr.Append(errs, err) + if len(routingIDs) > 2 { + hasMultipleEndpoints = true + break + } else { + for rid := range routingIDs { + if endpoint != "" && endpoint != rid { + hasMultipleEndpoints = true + break + } else { + endpoint = rid + } + } + } + } + + if hasMultipleEndpoints { + return "", errs + } + return endpoint, errs +} + func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error batches := batchpersignal.SplitTraces(td) - for _, batch := range batches { - errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) + end, err := e.HasMultipleEndpoints(ctx, td, batches) + errs = multierr.Append(errs, err) + + if end != "" { + // We don't need to batch; we only send to one backend. + endpoint := e.loadBalancer.Endpoint([]byte(end)) + errs = multierr.Append(errs, e.consumeTrace(ctx, td, endpoint)) + return errs } + // Map the trace data to their respective endpoints. + endpointToTraceData := make(map[string]ptrace.Traces) + for batch := range batches { + routingIDs, err := routingIdentifiersFromTraces(batches[batch], e.routingKey) + errs = multierr.Append(errs, err) + for rid := range routingIDs { + endpoint := e.loadBalancer.Endpoint([]byte(rid)) + if _, ok := endpointToTraceData[endpoint]; ok { + batches[batch].ResourceSpans().MoveAndAppendTo(endpointToTraceData[endpoint].ResourceSpans()) + } else { + newTrace := ptrace.NewTraces() + batches[batch].ResourceSpans().MoveAndAppendTo(newTrace.ResourceSpans()) + endpointToTraceData[endpoint] = newTrace + } + } + } + + // Send the trace data off by endpoint. + for endpoint, traces := range endpointToTraceData { + errs = multierr.Append(errs, e.consumeTrace(ctx, traces, endpoint)) + } return errs } -func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { - var exp component.Component - routingIds, err := routingIdentifiersFromTraces(td, e.routingKey) +func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, endpoint string) error { + exp, err := e.loadBalancer.Exporter(endpoint) if err != nil { return err } - for rid := range routingIds { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) - if err != nil { - return err - } - te, ok := exp.(exporter.Traces) - if !ok { - return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) - } - - start := time.Now() - err = te.ConsumeTraces(ctx, td) - duration := time.Since(start) - - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) - } + te, ok := exp.(exporter.Traces) + if !ok { + return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) + } + start := time.Now() + err = te.ConsumeTraces(ctx, td) + duration := time.Since(start) + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) } return err } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index d386b6ec0e2c9..177bde3482bfb 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -354,7 +354,8 @@ func TestBatchWithTwoTraces(t *testing.T) { // verify assert.NoError(t, err) - assert.Len(t, sink.AllTraces(), 2) + assert.Len(t, sink.AllTraces(), 1) + assert.Equal(t, sink.SpanCount(), 2) } func TestNoTracesInBatch(t *testing.T) {