Skip to content

Commit 3a20793

Browse files
authored
feat: add support for payload sizelimiting for aws-lambda (#7)
1 parent df8a761 commit 3a20793

7 files changed

Lines changed: 89 additions & 56 deletions

File tree

instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def custom_event_context_extractor(lambda_event):
7979
from opentelemetry.context.context import Context
8080
from opentelemetry.instrumentation.aws_lambda.package import _instruments
8181
from opentelemetry.instrumentation.aws_lambda.version import __version__
82+
from opentelemetry.instrumentation.aws_lambda.utils import limit_string_size
8283
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
8384
from opentelemetry.instrumentation.utils import unwrap
8485
from opentelemetry.metrics import MeterProvider, get_meter_provider
@@ -234,7 +235,7 @@ def _set_api_gateway_v1_proxy_attributes(
234235
if lambda_event.get("body"):
235236
span.set_attribute(
236237
"http.request.body",
237-
lambda_event.get("body"),
238+
limit_string_size(lambda_event.get("body")),
238239
)
239240

240241
if lambda_event.get("headers"):
@@ -286,7 +287,7 @@ def _set_api_gateway_v2_proxy_attributes(
286287
if lambda_event.get("body"):
287288
span.set_attribute(
288289
"http.request.body",
289-
lambda_event.get("body"),
290+
limit_string_size(lambda_event.get("body")),
290291
)
291292

292293
span.set_attribute(
@@ -414,7 +415,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
414415
if lambda_event["Records"][0].get("s3"):
415416
s3TriggerSpan.set_attribute(
416417
"rpc.request.body",
417-
json.dumps(lambda_event["Records"][0].get("s3")),
418+
limit_string_size(json.dumps(lambda_event["Records"][0].get("s3"))),
418419
)
419420
except Exception as ex:
420421
pass
@@ -441,7 +442,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
441442
if lambda_event["Records"][0].get("body"):
442443
sqsTriggerSpan.set_attribute(
443444
"rpc.request.body",
444-
lambda_event["Records"][0].get("body"),
445+
limit_string_size(lambda_event["Records"][0].get("body")),
445446
)
446447

447448
except Exception as ex:
@@ -461,7 +462,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
461462
if lambda_event["Records"][0]["Sns"] and lambda_event["Records"][0]["Sns"].get("Message"):
462463
snsTriggerSpan.set_attribute(
463464
"rpc.request.body",
464-
lambda_event["Records"][0]["Sns"].get("Message"),
465+
limit_string_size(lambda_event["Records"][0]["Sns"].get("Message")),
465466
)
466467
except Exception as ex:
467468
pass
@@ -482,7 +483,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
482483
if lambda_event["Records"][0].get("dynamodb"):
483484
dynamoTriggerSpan.set_attribute(
484485
"rpc.request.body",
485-
json.dumps(lambda_event["Records"][0].get("dynamodb")),
486+
limit_string_size(json.dumps(lambda_event["Records"][0].get("dynamodb"))),
486487
)
487488
except Exception as ex:
488489
pass
@@ -503,7 +504,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
503504
if lambda_event["datasetRecords"]:
504505
cognitoTriggerSpan.set_attribute(
505506
"rpc.request.body",
506-
json.dumps(lambda_event["datasetRecords"]),
507+
limit_string_size(json.dumps(lambda_event["datasetRecords"])),
507508
)
508509
except Exception as ex:
509510
pass
@@ -528,7 +529,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
528529

529530
eventBridgeTriggerSpan.set_attribute(
530531
"rpc.request.body",
531-
json.dumps(lambda_event),
532+
limit_string_size(json.dumps(lambda_event)),
532533
)
533534
except Exception as ex:
534535
pass
@@ -578,7 +579,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
578579
if isinstance(result, dict) and result.get("body"):
579580
apiGwSpan.set_attribute(
580581
"http.response.body",
581-
result.get("body"),
582+
limit_string_size(result.get("body")),
582583
)
583584
if lambda_event.get("headers"):
584585
for key, value in lambda_event.get("headers").items():
@@ -594,10 +595,13 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
594595
if lambda_event["Records"][0]["eventSource"] == "aws:sqs":
595596
span.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
596597
span.set_attribute("messaging.message",
597-
lambda_event["Records"])
598-
except Exception:
598+
limit_string_size(lambda_event["Records"]))
599+
except Exception as ex:
600+
#print(traceback.format_exc())
601+
#print("exception")
602+
#print(ex)
599603
pass
600-
except Exception:
604+
except Exception as ex:
601605
# TODO check why we get exception
602606
#print(traceback.format_exc())
603607
#print("exception")
@@ -618,7 +622,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
618622
if isinstance(result, dict) and result.get("body"):
619623
s3TriggerSpan.set_attribute(
620624
"rpc.response.body",
621-
result.get("body"),
625+
limit_string_size(result.get("body")),
622626
)
623627
except Exception:
624628
pass
@@ -635,7 +639,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
635639
if isinstance(result, dict) and result.get("body"):
636640
sqsTriggerSpan.set_attribute(
637641
"rpc.response.body",
638-
result.get("body"),
642+
limit_string_size(result.get("body")),
639643
)
640644
except Exception:
641645
pass
@@ -651,7 +655,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
651655
if isinstance(result, dict) and result.get("body"):
652656
snsTriggerSpan.set_attribute(
653657
"rpc.response.body",
654-
result.get("body"),
658+
limit_string_size(result.get("body")),
655659
)
656660
except Exception:
657661
pass
@@ -668,7 +672,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
668672
if isinstance(result, dict) and result.get("body"):
669673
dynamoTriggerSpan.set_attribute(
670674
"rpc.response.body",
671-
result.get("body"),
675+
limit_string_size(result.get("body")),
672676
)
673677
except Exception:
674678
pass
@@ -684,7 +688,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
684688
if isinstance(result, dict) and result.get("body"):
685689
cognitoTriggerSpan.set_attribute(
686690
"rpc.response.body",
687-
result.get("body"),
691+
limit_string_size(result.get("body")),
688692
)
689693
except Exception:
690694
pass
@@ -700,7 +704,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
700704
if isinstance(result, dict) and result.get("body"):
701705
eventBridgeTriggerSpan.set_attribute(
702706
"rpc.response.body",
703-
result.get("body"),
707+
limit_string_size(result.get("body")),
704708
)
705709
except Exception:
706710
pass
@@ -866,4 +870,4 @@ def keys(
866870
self, carrier: typing.Mapping[str, textmap.CarrierValT]
867871
) -> typing.List[str]:
868872
"""Keys implementation that returns all keys from a dictionary."""
869-
return list(carrier.keys())
873+
return list(carrier.keys())
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import os
2+
import logging
3+
4+
logger = logging.getLogger(__name__)
5+
6+
payload_size_limit = 51200
7+
try:
8+
payload_size_limit = int(os.environ.get("OTEL_PAYLOAD_SIZE_LIMIT", 51200))
9+
except ValueError:
10+
logger.error(
11+
"OTEL_PAYLOAD_SIZE_LIMIT is not a number"
12+
)
13+
14+
def get_payload_size_limit() -> int:
15+
return payload_size_limit
16+
def limit_string_size(s: str) -> str:
17+
if len(s) > payload_size_limit:
18+
return s[:payload_size_limit]
19+
else:
20+
return s

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def response_hook(span, service_name, operation_name, result):
8282
import json
8383
import io
8484
import os
85+
from opentelemetry.instrumentation.botocore.utils import limit_string_size, get_payload_size_limit
8586
from typing import Any, Callable, Collection, Dict, Optional, Tuple
8687

8788
from botocore.client import BaseClient
@@ -134,7 +135,6 @@ def __init__(self):
134135
super().__init__()
135136
self.request_hook = None
136137
self.response_hook = None
137-
self.payload_size_limit = 51200
138138

139139
def instrumentation_dependencies(self) -> Collection[str]:
140140
return _instruments
@@ -147,12 +147,6 @@ def _instrument(self, **kwargs):
147147

148148
self.request_hook = kwargs.get("request_hook")
149149
self.response_hook = kwargs.get("response_hook")
150-
try:
151-
self.payload_size_limit = int(os.environ.get("OTEL_PAYLOAD_SIZE_LIMIT", 51200))
152-
except ValueError:
153-
logger.error(
154-
"OTEL_PAYLOAD_SIZE_LIMIT is not a number"
155-
)
156150
wrap_function_wrapper(
157151
"botocore.client",
158152
"BaseClient._make_api_call",
@@ -201,28 +195,28 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
201195
if call_context.operation == "ListObjects":
202196
bucket = call_context.params.get("Bucket")
203197
if bucket is not None:
204-
attributes["rpc.request.payload"] = bucket
198+
attributes["rpc.request.payload"] = limit_string_size(bucket)
205199
elif call_context.operation == "PutObject":
206200
body = call_context.params.get("Body")
207201
if body is not None:
208-
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, body.decode('ascii'))
202+
attributes["rpc.request.payload"] = limit_string_size(body.decode('ascii'))
209203
elif call_context.operation == "PutItem":
210204
body = call_context.params.get("Item")
211205
if body is not None:
212-
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(body, default=str))
206+
attributes["rpc.request.payload"] = limit_string_size(json.dumps(body, default=str))
213207
elif call_context.operation == "GetItem":
214208
body = call_context.params.get("Key")
215209
if body is not None:
216-
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str))
210+
attributes["rpc.request.payload"] = limit_string_size(json.dumps(body, default=str))
217211
elif call_context.operation == "Publish":
218212
body = call_context.params.get("Message")
219213
if body is not None:
220-
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str))
214+
attributes["rpc.request.payload"] = limit_string_size(json.dumps(body, default=str))
221215
elif call_context.service == "events" and call_context.operation == "PutEvents":
222216
call_context.span_kind = SpanKind.PRODUCER
223-
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str))
217+
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
224218
else:
225-
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str))
219+
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
226220
except Exception as ex:
227221
pass
228222

@@ -303,11 +297,11 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
303297
result = original_func(*args, **kwargs)
304298
except ClientError as error:
305299
result = getattr(error, "response", None)
306-
_apply_response_attributes(span, result, self.payload_size_limit)
300+
_apply_response_attributes(span, result)
307301
_safe_invoke(extension.on_error, span, error)
308302
raise
309303
else:
310-
_apply_response_attributes(span, result, self.payload_size_limit)
304+
_apply_response_attributes(span, result)
311305
_safe_invoke(extension.on_success, span, result)
312306
finally:
313307
context_api.detach(token)
@@ -337,7 +331,7 @@ def _call_response_hook(
337331
)
338332

339333

340-
def _apply_response_attributes(span: Span, result, payload_size_limit):
334+
def _apply_response_attributes(span: Span, result):
341335
if result is None or not span.is_recording():
342336
return
343337

@@ -380,15 +374,15 @@ def _apply_response_attributes(span: Span, result, payload_size_limit):
380374
body = result.get("Body")
381375
if buckets is not None:
382376
span.set_attribute(
383-
"rpc.response.payload", json.dumps([b.get("Name") for b in buckets]))
377+
"rpc.response.payload", limit_string_size(json.dumps([b.get("Name") for b in buckets])))
384378
elif content is not None:
385379
span.set_attribute(
386-
"rpc.response.payload", json.dumps([b.get("Key") for b in content]))
380+
"rpc.response.payload", limit_string_size(json.dumps([b.get("Key") for b in content])))
387381
elif body is not None:
388382
pass
389383
else:
390384
span.set_attribute(
391-
"rpc.response.payload", json.dumps(result, default=str))
385+
"rpc.response.payload", limit_string_size(json.dumps(result, default=str)))
392386
#elif body is not None:
393387
# try:
394388
# d = {x: result[x] for x in result if x != "Body"}
@@ -406,20 +400,20 @@ def _apply_response_attributes(span: Span, result, payload_size_limit):
406400
# except Exception as ex:
407401
# pass
408402
# Lambda Invoke
409-
elif result.get("Payload") is not None and result.get("Payload")._content_length is not None and int(result.get("Payload")._content_length) < payload_size_limit:
403+
elif result.get("Payload") is not None and result.get("Payload")._content_length is not None and int(result.get("Payload")._content_length) < get_payload_size_limit():
410404
length = result.get("Payload")._content_length
411405
strbody = result.get("Payload").read()
412406
result.get("Payload").close()
413407
span.set_attribute(
414-
"rpc.response.payload", strbody)
408+
"rpc.response.payload", limit_string_size(strbody))
415409
result['Payload'] = StreamingBody(io.BytesIO(strbody), content_length=length)
416410
# DynamoDB get item
417411
elif server == "Server":
418412
span.set_attribute(
419-
"rpc.response.payload", json.dumps(result, default=str))
413+
"rpc.response.payload", limit_string_size(json.dumps(result, default=str)))
420414
else:
421415
span.set_attribute(
422-
"rpc.response.payload", json.dumps(result, default=str))
416+
"rpc.response.payload", limit_string_size(json.dumps(result, default=str)))
423417
except Exception as ex:
424418
pass
425419

@@ -471,10 +465,3 @@ def set(
471465
"""
472466
val = {"DataType": "String", "StringValue": value}
473467
carrier[key] = val
474-
475-
def limit_string_size(max_size: int, s: str) -> str:
476-
if len(s) > max_size:
477-
return s[:max_size]
478-
else:
479-
return s
480-

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import importlib
1616
import logging
17-
1817
from opentelemetry.instrumentation.botocore.extensions.types import (
1918
_AwsSdkCallContext,
2019
_AwsSdkExtension,
@@ -49,4 +48,4 @@ def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
4948
return extension_cls(call_context)
5049
except Exception as ex: # pylint: disable=broad-except
5150
_logger.error("Error when loading extension: %s", ex)
52-
return _AwsSdkExtension(call_context)
51+
return _AwsSdkExtension(call_context)

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/lmbd.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import json
1818
import re
1919
from typing import Dict
20+
from opentelemetry.instrumentation.botocore.utils import limit_string_size
21+
import os
2022

2123
from opentelemetry.instrumentation.botocore.extensions.types import (
2224
_AttributeMapT,
@@ -67,7 +69,7 @@ def extract_attributes(
6769
] = cls._parse_function_name(call_context)
6870
attributes[SpanAttributes.FAAS_INVOKED_REGION] = call_context.region
6971
if call_context.params.get("Payload") is not None:
70-
attributes["rpc.request.body"] = call_context.params.get("Payload")
72+
attributes["rpc.request.body"] = limit_string_size(call_context.params.get("Payload"))
7173

7274
@classmethod
7375
def _parse_function_name(cls, call_context: _AwsSdkCallContext):
@@ -124,4 +126,4 @@ def before_service_call(self, span: Span):
124126
if self._op is None:
125127
return
126128

127-
self._op.before_service_call(self._call_context, span)
129+
self._op.before_service_call(self._call_context, span)

0 commit comments

Comments
 (0)