Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .chloggen/improve-loadbalancer-performance.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add batching for each endpoint in the loadbalancingexporter.

# One or more tracking issues related to the change
issues: [17173]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

46 changes: 33 additions & 13 deletions exporter/loadbalancingexporter/log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,45 @@ func (e *logExporterImp) Shutdown(context.Context) error {

func (e *logExporterImp) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error
endpointToLogData := make(map[string]plog.Logs)
batches := batchpersignal.SplitLogs(ld)
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeLog(ctx, batch))

// Map the logs to their respective endpoints.
for batch := range batches {
// Each batch contains at most one trace ID.
traceID := traceIDFromLogs(ld)
balancingKey := traceID
if traceID == pcommon.NewTraceIDEmpty() {
// every log may not contain a traceID
// generate a random traceID as balancingKey
// so the log can be routed to a random backend
balancingKey = random()
}

endpoint := e.loadBalancer.Endpoint(balancingKey[:])
if _, ok := endpointToLogData[endpoint]; ok {
// append
for i := 0; i < batches[batch].ResourceLogs().Len(); i++ {
batches[batch].ResourceLogs().At(i).CopyTo(endpointToLogData[endpoint].ResourceLogs().AppendEmpty())
}
} else {
newLog := plog.NewLogs()
for i := 0; i < batches[batch].ResourceLogs().Len(); i++ {
batches[batch].ResourceLogs().At(i).CopyTo(newLog.ResourceLogs().AppendEmpty())
}
endpointToLogData[endpoint] = newLog
}
}

// Send the logs off by endpoint.
for endpoint, logs := range endpointToLogData {
errs = multierr.Append(errs, e.consumeLog(ctx, logs, endpoint))
}

return errs
}

func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error {
traceID := traceIDFromLogs(ld)
balancingKey := traceID
if traceID == pcommon.NewTraceIDEmpty() {
// every log may not contain a traceID
// generate a random traceID as balancingKey
// so the log can be routed to a random backend
balancingKey = random()
}

endpoint := e.loadBalancer.Endpoint(balancingKey[:])
func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs, endpoint string) error {
exp, err := e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ func TestLogBatchWithTwoTraces(t *testing.T) {

// verify
assert.NoError(t, err)
assert.Len(t, sink.AllLogs(), 2)
assert.Len(t, sink.AllLogs(), 1)
assert.Equal(t, sink.LogRecordCount(), 2)
}

func TestNoLogsInBatch(t *testing.T) {
Expand Down
75 changes: 44 additions & 31 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,46 +90,59 @@ func (e *traceExporterImp) Shutdown(context.Context) error {
func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error
batches := batchpersignal.SplitTraces(td)
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeTrace(ctx, batch))
endpointToTraceData := make(map[string]ptrace.Traces)

// Map the trace data to their respective endpoints.
for batch := range batches {
routingIDs, err := routingIdentifiersFromTraces(batches[batch], e.routingKey)
errs = multierr.Append(errs, err)
for rid := range routingIDs {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
if _, ok := endpointToTraceData[endpoint]; ok {
// append
for i := 0; i < batches[batch].ResourceSpans().Len(); i++ {
batches[batch].ResourceSpans().At(i).CopyTo(endpointToTraceData[endpoint].ResourceSpans().AppendEmpty())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually here we could also benefit from moving, but previously checking that routingIDs has the length of 1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we check routingIDs has length of 1? Shouldn't we just copy the trace to each backend it's supposed to be routed to, even if that's multiple backends?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I mean that in case there is only 1 routing backend - why would we need to copy? In case of >= 2 each backend would share the same traces structure, so we should copy in this case for safety.

Sure, we can always copy, but there is room for optimization - this is my point

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I think we should check for only one endpoint outside of that for loop and routingIDs though. The point of the optimization is that there may be shared endpoints throughout different batches, and different calls to create routingIDs, and those should be sent with the same batched call. I made a function to see if there is just one endpoint there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call about this optimization, this could indeed save come RPC calls (not actually what I meant :)
But I have one concern - due to this code we could send one pdata.Traces containing spans with different TraceIDs. This could be a breaking change.
@jpkrohling Does this exporter guarantee, that in one sent pdata.Traces there is only one TraceID? Or there could be spans from different traces belonging to that backend?

So about my previous point. I meant the situation when there is for example only one trace, which is routed to multiple backends. This way it should be fully copied for each of these backends. And if we see, that there is only one routingID for batch, then I think we could safely move. Maybe we could create a test case for this to be clear what I try to say?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the situation when there is for example only one trace, which is routed to multiple backends

Right now, one trace can be sent to only one backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, one trace can be sent to only one backend.
Is that so? Why is routingIDs a slice then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One pdata.Traces may contain multiple traces (represented as a ResourceSpan), whose routing keys (service names or trace IDs) are mapped to backends.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we get several routing ids for one batch, which already contains one trace. With custom routing key (for example by service of some span in the trace) we actually can route one trace to several backends

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning to my main point - I believe we can move traces only if len(routingIDs) is 1. For simplicity of this PR let's just copy everything as it was in the first place

} else {
newTrace := ptrace.NewTraces()
for i := 0; i < batches[batch].ResourceSpans().Len(); i++ {
batches[batch].ResourceSpans().At(i).CopyTo(newTrace.ResourceSpans().AppendEmpty())
}
endpointToTraceData[endpoint] = newTrace
}
}
}

// Send the trace data off by endpoint.
for endpoint, traces := range endpointToTraceData {
errs = multierr.Append(errs, e.consumeTrace(ctx, traces, endpoint))
}
return errs
}

func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error {
var exp component.Component
routingIds, err := routingIdentifiersFromTraces(td, e.routingKey)
func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, endpoint string) error {
exp, err := e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
}
for rid := range routingIds {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err = e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
}

te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}

start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)

if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}
start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)
if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
return err
}
Expand Down
3 changes: 2 additions & 1 deletion exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ func TestBatchWithTwoTraces(t *testing.T) {

// verify
assert.NoError(t, err)
assert.Len(t, sink.AllTraces(), 2)
assert.Len(t, sink.AllTraces(), 1)
assert.Equal(t, sink.SpanCount(), 2)
}

func TestNoTracesInBatch(t *testing.T) {
Expand Down