diff --git a/comp/otelcol/otlp/map_provider_config_not_serverless.go b/comp/otelcol/otlp/map_provider_config_not_serverless.go index da1142ebbd4eaf..c06f6889ff9405 100644 --- a/comp/otelcol/otlp/map_provider_config_not_serverless.go +++ b/comp/otelcol/otlp/map_provider_config_not_serverless.go @@ -20,6 +20,8 @@ exporters: tls: insecure: true compression: none + sending_queue: + enabled: false service: telemetry: diff --git a/comp/otelcol/otlp/map_provider_not_serverless_test.go b/comp/otelcol/otlp/map_provider_not_serverless_test.go index 037f4e00a9bf05..c2ef4b42b78980 100644 --- a/comp/otelcol/otlp/map_provider_not_serverless_test.go +++ b/comp/otelcol/otlp/map_provider_not_serverless_test.go @@ -55,6 +55,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, }, "service": map[string]interface{}{ @@ -111,6 +114,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ @@ -184,6 +190,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ @@ -244,6 +253,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, }, "service": map[string]interface{}{ @@ -345,6 +357,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "logging": map[string]interface{}{ "loglevel": "info", @@ -465,6 +480,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ @@ -525,6 +543,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "logsagent": interface{}(nil), }, @@ -593,6 +614,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ @@ -673,6 +697,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ @@ -745,6 +772,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "logsagent": interface{}(nil), }, @@ -865,6 +895,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "logging": map[string]interface{}{ "loglevel": "info", @@ -999,6 +1032,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ diff --git a/comp/otelcol/otlp/map_provider_serverless_test.go b/comp/otelcol/otlp/map_provider_serverless_test.go index bdf2aa51453efd..5d93d4a7dc8ac5 100644 --- a/comp/otelcol/otlp/map_provider_serverless_test.go +++ b/comp/otelcol/otlp/map_provider_serverless_test.go @@ -51,6 +51,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, }, "service": map[string]interface{}{ @@ -102,6 +105,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ @@ -169,6 +175,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ @@ -228,6 +237,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, }, "service": map[string]interface{}{ @@ -323,6 +335,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "logging": map[string]interface{}{ "loglevel": "info", @@ -432,6 +447,9 @@ func TestNewMap(t *testing.T) { }, "compression": "none", "endpoint": "localhost:5003", + "sending_queue": map[string]interface{}{ + "enabled": false, + }, }, "serializer": map[string]interface{}{ "metrics": map[string]interface{}{ diff --git a/pkg/trace/api/otlp.go b/pkg/trace/api/otlp.go index 867fff5a97ec57..53a50079208269 100644 --- a/pkg/trace/api/otlp.go +++ b/pkg/trace/api/otlp.go @@ -76,7 +76,10 @@ func (o *OTLPReceiver) Start() { if err != nil { log.Criticalf("Error starting OpenTelemetry gRPC server: %v", err) } else { - o.grpcsrv = grpc.NewServer(grpc.MaxRecvMsgSize(10 * 1024 * 1024)) + o.grpcsrv = grpc.NewServer( + grpc.MaxRecvMsgSize(10*1024*1024), + grpc.MaxConcurrentStreams(1), // Each payload must be sent to processing stage before we decode the next. + ) ptraceotlp.RegisterGRPCServer(o.grpcsrv, o) o.wg.Add(1) go func() { @@ -307,12 +310,8 @@ func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.R tagContainersTags: payloadTags.String(), } } - select { - case o.out <- &p: - // success - default: - log.Warn("Payload in channel full. Dropped 1 payload.") - } + + o.out <- &p return src } diff --git a/pkg/trace/api/otlp_test.go b/pkg/trace/api/otlp_test.go index 733f38bac4e661..de2e5fbea8a54f 100644 --- a/pkg/trace/api/otlp_test.go +++ b/pkg/trace/api/otlp_test.go @@ -167,6 +167,18 @@ func TestOTLPMetrics(t *testing.T) { }, }).Traces().ResourceSpans() + stop := make(chan struct{}) + go func() { + for { + select { + case <-out: + case <-stop: + return + } + } + }() + defer close(stop) + rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{}) rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{}) diff --git a/releasenotes/notes/apm-otel-receiver-backpressure-40e301d75d00804d.yaml b/releasenotes/notes/apm-otel-receiver-backpressure-40e301d75d00804d.yaml new file mode 100644 index 00000000000000..3e85e5790fefab --- /dev/null +++ b/releasenotes/notes/apm-otel-receiver-backpressure-40e301d75d00804d.yaml @@ -0,0 +1,13 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +fixes: + - | + APM: Stop dropping incoming OTel payloads when the processing channel is full + and eliminate OOM issues in the trace agent and collector component in high + load scenarios, making the OTel pipeline more reliable.