Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .chloggen/improve-loadbalancer-performance.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add batching for each endpoint in the loadbalancingexporter.

# One or more tracking issues related to the change
issues: [17173]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

42 changes: 29 additions & 13 deletions exporter/loadbalancingexporter/log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,41 @@ func (e *logExporterImp) Shutdown(context.Context) error {

func (e *logExporterImp) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error
endpointToLogData := make(map[string]plog.Logs)
batches := batchpersignal.SplitLogs(ld)
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeLog(ctx, batch))

// Map the logs to their respective endpoints.
for batch := range batches {
// Each batch contains at most one trace ID.
traceID := traceIDFromLogs(ld)
balancingKey := traceID
if traceID == pcommon.NewTraceIDEmpty() {
// every log may not contain a traceID
// generate a random traceID as balancingKey
// so the log can be routed to a random backend
balancingKey = random()
}

endpoint := e.loadBalancer.Endpoint(balancingKey[:])
if _, ok := endpointToLogData[endpoint]; ok {
// append
batches[batch].ResourceLogs().MoveAndAppendTo(endpointToLogData[endpoint].ResourceLogs())
} else {
newLog := plog.NewLogs()
batches[batch].ResourceLogs().MoveAndAppendTo(newLog.ResourceLogs())
endpointToLogData[endpoint] = newLog
}
}

// Send the logs off by endpoint.
for endpoint, logs := range endpointToLogData {
errs = multierr.Append(errs, e.consumeLog(ctx, logs, endpoint))
}

return errs
}

func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error {
traceID := traceIDFromLogs(ld)
balancingKey := traceID
if traceID == pcommon.NewTraceIDEmpty() {
// every log may not contain a traceID
// generate a random traceID as balancingKey
// so the log can be routed to a random backend
balancingKey = random()
}

endpoint := e.loadBalancer.Endpoint(balancingKey[:])
func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs, endpoint string) error {
exp, err := e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ func TestLogBatchWithTwoTraces(t *testing.T) {

// verify
assert.NoError(t, err)
assert.Len(t, sink.AllLogs(), 2)
assert.Len(t, sink.AllLogs(), 1)
assert.Equal(t, sink.LogRecordCount(), 2)
}

func TestNoLogsInBatch(t *testing.T) {
Expand Down
112 changes: 81 additions & 31 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,49 +87,99 @@ func (e *traceExporterImp) Shutdown(context.Context) error {
return nil
}

// If we are sending to only one endpoint, returns that endpoint.
// Otherwise returns empty string.
func (e *traceExporterImp) HasMultipleEndpoints(ctx context.Context, td ptrace.Traces, batches []ptrace.Traces) (string, error) {
var errs error

// First check if we're sending to multiple exporters.
endpoint := ""
hasMultipleEndpoints := false

for batch := range batches {
routingIDs, err := routingIdentifiersFromTraces(batches[batch], e.routingKey)
errs = multierr.Append(errs, err)
if len(routingIDs) > 2 {
hasMultipleEndpoints = true
break
} else {
for rid := range routingIDs {
if endpoint != "" && endpoint != rid {
hasMultipleEndpoints = true
break
} else {
endpoint = rid
}
}
}
}

if hasMultipleEndpoints {
return "", errs
}
return endpoint, errs
}

func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error
batches := batchpersignal.SplitTraces(td)
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeTrace(ctx, batch))
end, err := e.HasMultipleEndpoints(ctx, td, batches)
errs = multierr.Append(errs, err)

if end != "" {
// We don't need to batch; we only send to one backend.
endpoint := e.loadBalancer.Endpoint([]byte(end))
errs = multierr.Append(errs, e.consumeTrace(ctx, td, endpoint))
return errs
}

// Map the trace data to their respective endpoints.
endpointToTraceData := make(map[string]ptrace.Traces)
for batch := range batches {
routingIDs, err := routingIdentifiersFromTraces(batches[batch], e.routingKey)
errs = multierr.Append(errs, err)
for rid := range routingIDs {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
if _, ok := endpointToTraceData[endpoint]; ok {
batches[batch].ResourceSpans().MoveAndAppendTo(endpointToTraceData[endpoint].ResourceSpans())
} else {
newTrace := ptrace.NewTraces()
batches[batch].ResourceSpans().MoveAndAppendTo(newTrace.ResourceSpans())
endpointToTraceData[endpoint] = newTrace
}
}
}

// Send the trace data off by endpoint.
for endpoint, traces := range endpointToTraceData {
errs = multierr.Append(errs, e.consumeTrace(ctx, traces, endpoint))
}
return errs
}

func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error {
var exp component.Component
routingIds, err := routingIdentifiersFromTraces(td, e.routingKey)
func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, endpoint string) error {
exp, err := e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
}
for rid := range routingIds {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err = e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
}

te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}

start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)

if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}
start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)
if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
return err
}
Expand Down
3 changes: 2 additions & 1 deletion exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ func TestBatchWithTwoTraces(t *testing.T) {

// verify
assert.NoError(t, err)
assert.Len(t, sink.AllTraces(), 2)
assert.Len(t, sink.AllTraces(), 1)
assert.Equal(t, sink.SpanCount(), 2)
}

func TestNoTracesInBatch(t *testing.T) {
Expand Down