|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import json |
3 | 4 | import math |
4 | 5 | import os |
5 | 6 | import queue |
@@ -30,6 +31,8 @@ def export(self, items: list[Trace | Span[Any]]) -> None: |
30 | 31 |
|
31 | 32 | class BackendSpanExporter(TracingExporter): |
32 | 33 | _OPENAI_TRACING_INGEST_ENDPOINT = "https://api.openai.com/v1/traces/ingest" |
| 34 | + _OPENAI_TRACING_MAX_FIELD_BYTES = 100_000 |
| 35 | + _OPENAI_TRACING_STRING_TRUNCATION_SUFFIX = "... [truncated]" |
33 | 36 | _OPENAI_TRACING_ALLOWED_USAGE_KEYS = frozenset( |
34 | 37 | { |
35 | 38 | "input_tokens", |
@@ -181,36 +184,183 @@ def _should_sanitize_for_openai_tracing_api(self) -> bool: |
181 | 184 | return self.endpoint.rstrip("/") == self._OPENAI_TRACING_INGEST_ENDPOINT.rstrip("/") |
182 | 185 |
|
183 | 186 | def _sanitize_for_openai_tracing_api(self, payload_item: dict[str, Any]) -> dict[str, Any]: |
184 | | - """Move unsupported generation usage fields under usage.details for traces ingest.""" |
| 187 | + """Drop or truncate span fields known to be rejected by traces ingest.""" |
185 | 188 | span_data = payload_item.get("span_data") |
186 | 189 | if not isinstance(span_data, dict): |
187 | 190 | return payload_item |
188 | 191 |
|
| 192 | + sanitized_span_data = span_data |
| 193 | + did_mutate = False |
| 194 | + |
| 195 | + for field_name in ("input", "output"): |
| 196 | + if field_name not in span_data: |
| 197 | + continue |
| 198 | + sanitized_field = self._truncate_span_field_value(span_data[field_name]) |
| 199 | + if sanitized_field is span_data[field_name]: |
| 200 | + continue |
| 201 | + if not did_mutate: |
| 202 | + sanitized_span_data = dict(span_data) |
| 203 | + did_mutate = True |
| 204 | + sanitized_span_data[field_name] = sanitized_field |
| 205 | + |
189 | 206 | if span_data.get("type") != "generation": |
190 | | - return payload_item |
| 207 | + if not did_mutate: |
| 208 | + return payload_item |
| 209 | + sanitized_payload_item = dict(payload_item) |
| 210 | + sanitized_payload_item["span_data"] = sanitized_span_data |
| 211 | + return sanitized_payload_item |
191 | 212 |
|
192 | 213 | usage = span_data.get("usage") |
193 | 214 | if not isinstance(usage, dict): |
194 | | - return payload_item |
| 215 | + if not did_mutate: |
| 216 | + return payload_item |
| 217 | + sanitized_payload_item = dict(payload_item) |
| 218 | + sanitized_payload_item["span_data"] = sanitized_span_data |
| 219 | + return sanitized_payload_item |
195 | 220 |
|
196 | 221 | sanitized_usage = self._sanitize_generation_usage_for_openai_tracing_api(usage) |
197 | 222 |
|
198 | 223 | if sanitized_usage is None: |
199 | | - sanitized_span_data = dict(span_data) |
| 224 | + if not did_mutate: |
| 225 | + sanitized_span_data = dict(span_data) |
| 226 | + did_mutate = True |
200 | 227 | sanitized_span_data.pop("usage", None) |
201 | | - sanitized_payload_item = dict(payload_item) |
202 | | - sanitized_payload_item["span_data"] = sanitized_span_data |
203 | | - return sanitized_payload_item |
| 228 | + elif sanitized_usage != usage: |
| 229 | + if not did_mutate: |
| 230 | + sanitized_span_data = dict(span_data) |
| 231 | + did_mutate = True |
| 232 | + sanitized_span_data["usage"] = sanitized_usage |
204 | 233 |
|
205 | | - if sanitized_usage == usage: |
| 234 | + if not did_mutate: |
206 | 235 | return payload_item |
207 | 236 |
|
208 | | - sanitized_span_data = dict(span_data) |
209 | | - sanitized_span_data["usage"] = sanitized_usage |
210 | 237 | sanitized_payload_item = dict(payload_item) |
211 | 238 | sanitized_payload_item["span_data"] = sanitized_span_data |
212 | 239 | return sanitized_payload_item |
213 | 240 |
|
| 241 | + def _value_json_size_bytes(self, value: Any) -> int: |
| 242 | + try: |
| 243 | + serialized = json.dumps(value, ensure_ascii=False, separators=(",", ":")) |
| 244 | + except (TypeError, ValueError): |
| 245 | + return self._OPENAI_TRACING_MAX_FIELD_BYTES + 1 |
| 246 | + return len(serialized.encode("utf-8")) |
| 247 | + |
| 248 | + def _truncate_string_for_json_limit(self, value: str, max_bytes: int) -> str: |
| 249 | + value_size = self._value_json_size_bytes(value) |
| 250 | + if value_size <= max_bytes: |
| 251 | + return value |
| 252 | + |
| 253 | + suffix = self._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX |
| 254 | + suffix_size = self._value_json_size_bytes(suffix) |
| 255 | + if suffix_size > max_bytes: |
| 256 | + return "" |
| 257 | + if suffix_size == max_bytes: |
| 258 | + return suffix |
| 259 | + |
| 260 | + budget_without_suffix = max_bytes - suffix_size |
| 261 | + estimated_chars = int(len(value) * budget_without_suffix / max(value_size, 1)) |
| 262 | + estimated_chars = max(0, min(len(value), estimated_chars)) |
| 263 | + |
| 264 | + best = value[:estimated_chars] + suffix |
| 265 | + best_size = self._value_json_size_bytes(best) |
| 266 | + while best_size > max_bytes and estimated_chars > 0: |
| 267 | + overflow_ratio = (best_size - max_bytes) / max(best_size, 1) |
| 268 | + trim_chars = max(1, int(estimated_chars * overflow_ratio) + 1) |
| 269 | + estimated_chars = max(0, estimated_chars - trim_chars) |
| 270 | + best = value[:estimated_chars] + suffix |
| 271 | + best_size = self._value_json_size_bytes(best) |
| 272 | + |
| 273 | + return best |
| 274 | + |
| 275 | + def _truncate_span_field_value(self, value: Any) -> Any: |
| 276 | + max_bytes = self._OPENAI_TRACING_MAX_FIELD_BYTES |
| 277 | + if self._value_json_size_bytes(value) <= max_bytes: |
| 278 | + return value |
| 279 | + |
| 280 | + sanitized_value = self._sanitize_json_compatible_value(value) |
| 281 | + if sanitized_value is self._UNSERIALIZABLE: |
| 282 | + return self._truncated_preview(value) |
| 283 | + |
| 284 | + return self._truncate_json_value_for_limit(sanitized_value, max_bytes) |
| 285 | + |
| 286 | + def _truncate_json_value_for_limit(self, value: Any, max_bytes: int) -> Any: |
| 287 | + if self._value_json_size_bytes(value) <= max_bytes: |
| 288 | + return value |
| 289 | + |
| 290 | + if isinstance(value, str): |
| 291 | + return self._truncate_string_for_json_limit(value, max_bytes) |
| 292 | + |
| 293 | + if isinstance(value, dict): |
| 294 | + return self._truncate_mapping_for_json_limit(value, max_bytes) |
| 295 | + |
| 296 | + if isinstance(value, list): |
| 297 | + return self._truncate_list_for_json_limit(value, max_bytes) |
| 298 | + |
| 299 | + return self._truncated_preview(value) |
| 300 | + |
| 301 | + def _truncate_mapping_for_json_limit( |
| 302 | + self, value: dict[str, Any], max_bytes: int |
| 303 | + ) -> dict[str, Any]: |
| 304 | + truncated = dict(value) |
| 305 | + current_size = self._value_json_size_bytes(truncated) |
| 306 | + |
| 307 | + while truncated and current_size > max_bytes: |
| 308 | + largest_key = max( |
| 309 | + truncated, key=lambda key: self._value_json_size_bytes(truncated[key]) |
| 310 | + ) |
| 311 | + child = truncated[largest_key] |
| 312 | + child_size = self._value_json_size_bytes(child) |
| 313 | + child_budget = max(0, max_bytes - (current_size - child_size)) |
| 314 | + truncated_child = self._truncate_json_value_for_limit(child, child_budget) |
| 315 | + |
| 316 | + if truncated_child == child: |
| 317 | + truncated.pop(largest_key) |
| 318 | + else: |
| 319 | + truncated[largest_key] = truncated_child |
| 320 | + |
| 321 | + current_size = self._value_json_size_bytes(truncated) |
| 322 | + |
| 323 | + return truncated |
| 324 | + |
| 325 | + def _truncate_list_for_json_limit(self, value: list[Any], max_bytes: int) -> list[Any]: |
| 326 | + truncated = list(value) |
| 327 | + current_size = self._value_json_size_bytes(truncated) |
| 328 | + |
| 329 | + while truncated and current_size > max_bytes: |
| 330 | + largest_index = max( |
| 331 | + range(len(truncated)), |
| 332 | + key=lambda index: self._value_json_size_bytes(truncated[index]), |
| 333 | + ) |
| 334 | + child = truncated[largest_index] |
| 335 | + child_size = self._value_json_size_bytes(child) |
| 336 | + child_budget = max(0, max_bytes - (current_size - child_size)) |
| 337 | + truncated_child = self._truncate_json_value_for_limit(child, child_budget) |
| 338 | + |
| 339 | + if truncated_child == child: |
| 340 | + truncated.pop(largest_index) |
| 341 | + else: |
| 342 | + truncated[largest_index] = truncated_child |
| 343 | + |
| 344 | + current_size = self._value_json_size_bytes(truncated) |
| 345 | + |
| 346 | + return truncated |
| 347 | + |
| 348 | + def _truncated_preview(self, value: Any) -> dict[str, Any]: |
| 349 | + type_name = type(value).__name__ |
| 350 | + preview = f"<{type_name} truncated>" |
| 351 | + if isinstance(value, dict): |
| 352 | + preview = f"<{type_name} len={len(value)} truncated>" |
| 353 | + elif isinstance(value, (list, tuple, set, frozenset)): |
| 354 | + preview = f"<{type_name} len={len(value)} truncated>" |
| 355 | + elif isinstance(value, (bytes, bytearray, memoryview)): |
| 356 | + preview = f"<{type_name} bytes={len(value)} truncated>" |
| 357 | + |
| 358 | + return { |
| 359 | + "truncated": True, |
| 360 | + "original_type": type_name, |
| 361 | + "preview": preview, |
| 362 | + } |
| 363 | + |
214 | 364 | def _sanitize_generation_usage_for_openai_tracing_api( |
215 | 365 | self, usage: dict[str, Any] |
216 | 366 | ) -> dict[str, Any] | None: |
|
0 commit comments