Skip to content

Commit 2ed261f

Browse files
knusbaumdineshg13
authored andcommitted
pkg/trace/api: limit simultaneous otlp requests, do not drop payloads (#23085)
pkg/trace/api: limit simultaneous otlp requests, do not drop payloads Co-authored-by: dineshg13 <[email protected]> (cherry picked from commit a396d12)
1 parent 36d7ecd commit 2ed261f

File tree

6 files changed

+87
-7
lines changed

6 files changed

+87
-7
lines changed

comp/otelcol/otlp/map_provider_config_not_serverless.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ exporters:
2020
tls:
2121
insecure: true
2222
compression: none
23+
sending_queue:
24+
enabled: false
2325
2426
service:
2527
telemetry:

comp/otelcol/otlp/map_provider_not_serverless_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ func TestNewMap(t *testing.T) {
5555
},
5656
"compression": "none",
5757
"endpoint": "localhost:5003",
58+
"sending_queue": map[string]interface{}{
59+
"enabled": false,
60+
},
5861
},
5962
},
6063
"service": map[string]interface{}{
@@ -111,6 +114,9 @@ func TestNewMap(t *testing.T) {
111114
},
112115
"compression": "none",
113116
"endpoint": "localhost:5003",
117+
"sending_queue": map[string]interface{}{
118+
"enabled": false,
119+
},
114120
},
115121
"serializer": map[string]interface{}{
116122
"metrics": map[string]interface{}{
@@ -184,6 +190,9 @@ func TestNewMap(t *testing.T) {
184190
},
185191
"compression": "none",
186192
"endpoint": "localhost:5003",
193+
"sending_queue": map[string]interface{}{
194+
"enabled": false,
195+
},
187196
},
188197
"serializer": map[string]interface{}{
189198
"metrics": map[string]interface{}{
@@ -244,6 +253,9 @@ func TestNewMap(t *testing.T) {
244253
},
245254
"compression": "none",
246255
"endpoint": "localhost:5003",
256+
"sending_queue": map[string]interface{}{
257+
"enabled": false,
258+
},
247259
},
248260
},
249261
"service": map[string]interface{}{
@@ -345,6 +357,9 @@ func TestNewMap(t *testing.T) {
345357
},
346358
"compression": "none",
347359
"endpoint": "localhost:5003",
360+
"sending_queue": map[string]interface{}{
361+
"enabled": false,
362+
},
348363
},
349364
"logging": map[string]interface{}{
350365
"loglevel": "info",
@@ -465,6 +480,9 @@ func TestNewMap(t *testing.T) {
465480
},
466481
"compression": "none",
467482
"endpoint": "localhost:5003",
483+
"sending_queue": map[string]interface{}{
484+
"enabled": false,
485+
},
468486
},
469487
"serializer": map[string]interface{}{
470488
"metrics": map[string]interface{}{
@@ -525,6 +543,9 @@ func TestNewMap(t *testing.T) {
525543
},
526544
"compression": "none",
527545
"endpoint": "localhost:5003",
546+
"sending_queue": map[string]interface{}{
547+
"enabled": false,
548+
},
528549
},
529550
"logsagent": interface{}(nil),
530551
},
@@ -593,6 +614,9 @@ func TestNewMap(t *testing.T) {
593614
},
594615
"compression": "none",
595616
"endpoint": "localhost:5003",
617+
"sending_queue": map[string]interface{}{
618+
"enabled": false,
619+
},
596620
},
597621
"serializer": map[string]interface{}{
598622
"metrics": map[string]interface{}{
@@ -673,6 +697,9 @@ func TestNewMap(t *testing.T) {
673697
},
674698
"compression": "none",
675699
"endpoint": "localhost:5003",
700+
"sending_queue": map[string]interface{}{
701+
"enabled": false,
702+
},
676703
},
677704
"serializer": map[string]interface{}{
678705
"metrics": map[string]interface{}{
@@ -745,6 +772,9 @@ func TestNewMap(t *testing.T) {
745772
},
746773
"compression": "none",
747774
"endpoint": "localhost:5003",
775+
"sending_queue": map[string]interface{}{
776+
"enabled": false,
777+
},
748778
},
749779
"logsagent": interface{}(nil),
750780
},
@@ -865,6 +895,9 @@ func TestNewMap(t *testing.T) {
865895
},
866896
"compression": "none",
867897
"endpoint": "localhost:5003",
898+
"sending_queue": map[string]interface{}{
899+
"enabled": false,
900+
},
868901
},
869902
"logging": map[string]interface{}{
870903
"loglevel": "info",
@@ -999,6 +1032,9 @@ func TestNewMap(t *testing.T) {
9991032
},
10001033
"compression": "none",
10011034
"endpoint": "localhost:5003",
1035+
"sending_queue": map[string]interface{}{
1036+
"enabled": false,
1037+
},
10021038
},
10031039
"serializer": map[string]interface{}{
10041040
"metrics": map[string]interface{}{

comp/otelcol/otlp/map_provider_serverless_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ func TestNewMap(t *testing.T) {
5151
},
5252
"compression": "none",
5353
"endpoint": "localhost:5003",
54+
"sending_queue": map[string]interface{}{
55+
"enabled": false,
56+
},
5457
},
5558
},
5659
"service": map[string]interface{}{
@@ -102,6 +105,9 @@ func TestNewMap(t *testing.T) {
102105
},
103106
"compression": "none",
104107
"endpoint": "localhost:5003",
108+
"sending_queue": map[string]interface{}{
109+
"enabled": false,
110+
},
105111
},
106112
"serializer": map[string]interface{}{
107113
"metrics": map[string]interface{}{
@@ -169,6 +175,9 @@ func TestNewMap(t *testing.T) {
169175
},
170176
"compression": "none",
171177
"endpoint": "localhost:5003",
178+
"sending_queue": map[string]interface{}{
179+
"enabled": false,
180+
},
172181
},
173182
"serializer": map[string]interface{}{
174183
"metrics": map[string]interface{}{
@@ -228,6 +237,9 @@ func TestNewMap(t *testing.T) {
228237
},
229238
"compression": "none",
230239
"endpoint": "localhost:5003",
240+
"sending_queue": map[string]interface{}{
241+
"enabled": false,
242+
},
231243
},
232244
},
233245
"service": map[string]interface{}{
@@ -323,6 +335,9 @@ func TestNewMap(t *testing.T) {
323335
},
324336
"compression": "none",
325337
"endpoint": "localhost:5003",
338+
"sending_queue": map[string]interface{}{
339+
"enabled": false,
340+
},
326341
},
327342
"logging": map[string]interface{}{
328343
"loglevel": "info",
@@ -432,6 +447,9 @@ func TestNewMap(t *testing.T) {
432447
},
433448
"compression": "none",
434449
"endpoint": "localhost:5003",
450+
"sending_queue": map[string]interface{}{
451+
"enabled": false,
452+
},
435453
},
436454
"serializer": map[string]interface{}{
437455
"metrics": map[string]interface{}{

pkg/trace/api/otlp.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ func (o *OTLPReceiver) Start() {
7676
if err != nil {
7777
log.Criticalf("Error starting OpenTelemetry gRPC server: %v", err)
7878
} else {
79-
o.grpcsrv = grpc.NewServer(grpc.MaxRecvMsgSize(10 * 1024 * 1024))
79+
o.grpcsrv = grpc.NewServer(
80+
grpc.MaxRecvMsgSize(10*1024*1024),
81+
grpc.MaxConcurrentStreams(1), // Each payload must be sent to processing stage before we decode the next.
82+
)
8083
ptraceotlp.RegisterGRPCServer(o.grpcsrv, o)
8184
o.wg.Add(1)
8285
go func() {
@@ -307,12 +310,8 @@ func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.R
307310
tagContainersTags: payloadTags.String(),
308311
}
309312
}
310-
select {
311-
case o.out <- &p:
312-
// success
313-
default:
314-
log.Warn("Payload in channel full. Dropped 1 payload.")
315-
}
313+
314+
o.out <- &p
316315
return src
317316
}
318317

pkg/trace/api/otlp_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,18 @@ func TestOTLPMetrics(t *testing.T) {
167167
},
168168
}).Traces().ResourceSpans()
169169

170+
stop := make(chan struct{})
171+
go func() {
172+
for {
173+
select {
174+
case <-out:
175+
case <-stop:
176+
return
177+
}
178+
}
179+
}()
180+
defer close(stop)
181+
170182
rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{})
171183
rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{})
172184

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Each section from every release note are combined when the
2+
# CHANGELOG.rst is rendered. So the text needs to be worded so that
3+
# it does not depend on any information only available in another
4+
# section. This may mean repeating some details, but each section
5+
# must be readable independently of the other.
6+
#
7+
# Each section note must be formatted as reStructuredText.
8+
---
9+
fixes:
10+
- |
11+
APM: Stop dropping incoming OTel payloads when the processing channel is full
12+
and eliminate OOM issues in the trace agent and collector component in high
13+
load scenarios, making the OTel pipeline more reliable.

0 commit comments

Comments
 (0)