Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from .trace import decode_v07 as trace_decode_v07
from .trace import pprint_trace
from .trace import v04TracePayload
from .trace_checks import CheckMetaEventsIsValidJSON
from .trace_checks import CheckMetaTracerVersionHeader
from .trace_checks import CheckTraceContentLength
from .trace_checks import CheckTraceCountHeader
Expand Down Expand Up @@ -1090,11 +1091,12 @@ async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"
except ValueError:
log.info("Chunk %d could not be displayed (might be incomplete).", i)

# perform peer service check on span
# perform per-span checks
for span in trace:
await checks.check(
"trace_peer_service", span=span, dd_config_env=request.get("_dd_trace_env_variables", {})
)
await checks.check("meta_events_is_valid_json", span=span)

await checks.check(
"trace_dd_service", trace=trace, dd_config_env=request.get("_dd_trace_env_variables", {})
Expand Down Expand Up @@ -1869,6 +1871,7 @@ async def _cleanup_claude_proxy(app: web.Application) -> None:
CheckTraceStallAsync,
CheckTracePeerService,
CheckTraceDDService,
CheckMetaEventsIsValidJSON,
],
enabled=enabled_checks,
)
Expand Down
16 changes: 16 additions & 0 deletions ddapm_test_agent/trace_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ def check(self, span: Span, dd_config_env: Dict[str, str]) -> None:
)


class CheckMetaEventsIsValidJSON(Check):
name = "meta_events_is_valid_json"
description = "meta.events must contain valid JSON when present"

def check(self, span: Span) -> None:
meta = span.get("meta", {})
if "events" not in meta:
return
try:
parsed = json.loads(meta["events"])
if not isinstance(parsed, list):
self.fail(f"meta.events is not a JSON array: {type(parsed)}")
except (json.JSONDecodeError, TypeError) as e:
self.fail(f"meta.events is not valid JSON: {e}")


class CheckTraceDDService(Check):
name = "trace_dd_service"
description = """
Expand Down
37 changes: 33 additions & 4 deletions ddapm_test_agent/trace_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
log = logging.getLogger(__name__)


def _normalize_meta_events(events_json_str: str) -> str:
"""Parse meta.events, sort attribute keys, re-encode as canonical JSON."""
events = json.loads(events_json_str)
for event in events:
if "attributes" in event:
event["attributes"] = dict(sorted(event["attributes"].items()))
return json.dumps(events, separators=(",", ": "))


DEFAULT_SNAPSHOT_IGNORES = "span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,metrics._dd.tracer_kr,meta.runtime-id,span_links.trace_id_high,span_events.time_unix_nano,meta.pathway.hash,meta._dd.p.tid"


Expand All @@ -52,6 +61,15 @@ def _normalize_span_for_comparison(span: Span) -> Span:
if normalized.get("service") is None:
normalized["service"] = ""

span_meta = span.get("meta")
if span_meta is not None and "events" in span_meta:
try:
new_meta = dict(span_meta)
new_meta["events"] = _normalize_meta_events(span_meta["events"])
normalized["meta"] = new_meta
except (json.JSONDecodeError, TypeError):
pass # leave as-is; the check will catch invalid JSON separately

return cast(Span, normalized)


Expand Down Expand Up @@ -514,8 +532,19 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]:
]
for k in order:
if k in s:
if k in ["meta", "metrics"]:
# Sort the meta and metrics dictionaries alphanumerically
if k == "meta":
ordered_meta: OrderedDictType[str, Any] = OrderedDict()
for mk, mv in sorted(s[k].items(), key=operator.itemgetter(0)): # type: ignore
if mk == "events":
try:
ordered_meta[mk] = _normalize_meta_events(mv)
except (json.JSONDecodeError, TypeError):
ordered_meta[mk] = mv
else:
ordered_meta[mk] = mv
d[k] = ordered_meta
elif k == "metrics":
# Sort the metrics dictionary alphanumerically
d[k] = OrderedDict(sorted(s[k].items(), key=operator.itemgetter(0))) # type: ignore
else:
d[k] = s[k] # type: ignore
Expand All @@ -527,12 +556,12 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]:
if "span_links" in d:
for link in d["span_links"]:
if "attributes" in link:
link["attributes"] = OrderedDict(sorted(link["attributes"].items(), key=operator.itemgetter(0)))
link["attributes"] = OrderedDict(sorted(link["attributes"].items(), key=operator.itemgetter(0))) # type: ignore

if "span_events" in d:
for event in d["span_events"]:
if "attributes" in event:
event["attributes"] = OrderedDict(sorted(event["attributes"].items(), key=operator.itemgetter(0)))
event["attributes"] = OrderedDict(sorted(event["attributes"].items(), key=operator.itemgetter(0))) # type: ignore

for k in ["meta", "metrics"]:
if k in d and len(d[k]) == 0:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
Add a new ``meta_events_is_valid_json`` check that validates ``meta.events``
contains a valid JSON array when present. Normalize ``meta.events`` attribute
key ordering during snapshot generation and comparison to ensure snapshot tests
pass regardless of the iteration order of tracer-side hash maps.
39 changes: 39 additions & 0 deletions tests/test_checks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import platform
import time

Expand Down Expand Up @@ -121,3 +122,41 @@ async def test_trace_stall(
assert resp.status == 200, await resp.text()
end = time.monotonic_ns()
assert (end - start) / 1e9 >= 0.8


@pytest.mark.parametrize("agent_enabled_checks", [["meta_events_is_valid_json"], []])
async def test_meta_events_is_valid_json_invalid(agent, agent_enabled_checks):
"""A span with invalid JSON in meta.events should fail the check when enabled."""
s = span(meta={"events": "not valid json"})
resp = await v04_trace(agent, [[s]], "msgpack")
if "meta_events_is_valid_json" in agent_enabled_checks:
assert resp.status == 400, await resp.text()
assert "Check 'meta_events_is_valid_json' failed" in await resp.text()
else:
assert resp.status == 200, await resp.text()


@pytest.mark.parametrize("agent_enabled_checks", [["meta_events_is_valid_json"]])
async def test_meta_events_is_valid_json_not_array(agent, agent_enabled_checks):
"""meta.events must be a JSON array, not an object or scalar."""
s = span(meta={"events": json.dumps({"name": "not-an-array"})})
resp = await v04_trace(agent, [[s]], "msgpack")
assert resp.status == 400, await resp.text()
assert "meta.events is not a JSON array" in await resp.text()


@pytest.mark.parametrize("agent_enabled_checks", [["meta_events_is_valid_json"]])
async def test_meta_events_is_valid_json_valid(agent, agent_enabled_checks):
"""A span with valid JSON array in meta.events should pass the check."""
events = [{"name": "my.event", "time_unix_nano": 1234, "attributes": {"key": "val"}}]
s = span(meta={"events": json.dumps(events)})
resp = await v04_trace(agent, [[s]], "msgpack")
assert resp.status == 200, await resp.text()


@pytest.mark.parametrize("agent_enabled_checks", [["meta_events_is_valid_json"]])
async def test_meta_events_is_valid_json_missing(agent, agent_enabled_checks):
"""A span without meta.events should pass the check silently."""
s = span()
resp = await v04_trace(agent, [[s]], "msgpack")
assert resp.status == 200, await resp.text()
85 changes: 85 additions & 0 deletions tests/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,91 @@ async def test_snapshot_trace_differences_removed_start(agent, expected_traces,
assert resp.status == 200, resp_text


def test_normalize_meta_events_sorts_attribute_keys():
"""_normalize_meta_events sorts attribute keys within each event."""
events = [{"name": "evt", "time_unix_nano": 1, "attributes": {"z": "last", "a": "first", "m": "mid"}}]
result = trace_snapshot._normalize_meta_events(json.dumps(events))
parsed = json.loads(result)
assert list(parsed[0]["attributes"].keys()) == ["a", "m", "z"]


def test_normalize_meta_events_no_attributes():
"""_normalize_meta_events leaves events without attributes unchanged."""
events = [{"name": "evt", "time_unix_nano": 1}]
result = trace_snapshot._normalize_meta_events(json.dumps(events))
parsed = json.loads(result)
assert parsed == events


def test_normalize_meta_events_multiple_events():
"""_normalize_meta_events handles multiple events correctly."""
events = [
{"name": "e1", "time_unix_nano": 1, "attributes": {"b": 2, "a": 1}},
{"name": "e2", "time_unix_nano": 2, "attributes": {"y": "y", "x": "x"}},
]
result = trace_snapshot._normalize_meta_events(json.dumps(events))
parsed = json.loads(result)
assert list(parsed[0]["attributes"].keys()) == ["a", "b"]
assert list(parsed[1]["attributes"].keys()) == ["x", "y"]


def test_normalize_meta_events_already_sorted():
"""_normalize_meta_events is idempotent when keys are already sorted."""
events = [{"name": "evt", "time_unix_nano": 1, "attributes": {"a": 1, "b": 2, "c": 3}}]
raw = json.dumps(events)
result = trace_snapshot._normalize_meta_events(raw)
assert json.loads(result) == json.loads(raw)


@pytest.mark.parametrize("snapshot_ci_mode", [False])
async def test_snapshot_meta_events_attribute_order_independent(agent, snapshot_ci_mode):
"""Snapshot comparison succeeds when meta.events attribute key order differs."""
events_v1 = [{"name": "evt", "time_unix_nano": 100, "attributes": {"a": "1", "b": "2"}}]
events_v2 = [{"name": "evt", "time_unix_nano": 100, "attributes": {"b": "2", "a": "1"}}]

span_v1 = {
"name": "s",
"span_id": 1,
"trace_id": 1,
"parent_id": 0,
"resource": "/",
"duration": 1,
"type": "web",
"error": 0,
"meta": {"events": json.dumps(events_v1)},
"metrics": {},
}

span_v2 = {
"name": "s",
"span_id": 1,
"trace_id": 1,
"parent_id": 0,
"resource": "/",
"duration": 1,
"type": "web",
"error": 0,
"meta": {"events": json.dumps(events_v2)},
"metrics": {},
}

# Generate snapshot from v1 ordering
resp = await v04_trace(agent, [[span_v1]], token="test_events_order")
assert resp.status == 200, await resp.text()
resp = await agent.get("/test/session/snapshot", params={"test_session_token": "test_events_order"})
assert resp.status == 200, await resp.text()

# Clear and send v2 ordering (different attribute key order)
resp = await agent.get("/test/session/clear", params={"test_session_token": "test_events_order"})
assert resp.status == 200, await resp.text()
resp = await v04_trace(agent, [[span_v2]], token="test_events_order")
assert resp.status == 200, await resp.text()

# Should pass despite different attribute key ordering
resp = await agent.get("/test/session/snapshot", params={"test_session_token": "test_events_order"})
assert resp.status == 200, await resp.text()


@pytest.mark.parametrize("snapshot_removed_attrs", [{"span_id"}])
async def test_removed_attributes_fails_span_id(agent, tmp_path, snapshot_removed_attrs, do_reference_v04_http_trace):
resp = await do_reference_v04_http_trace(token="test_case")
Expand Down