From 82ac846485c3bc4aa1eb783f4a84966fda1cfa43 Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Tue, 27 Jan 2026 12:25:57 -0800 Subject: [PATCH 1/2] Add observability unit tests to improve coverage from 72% to 86% (#3356) --- .../core/tests/core/test_observability.py | 1166 +++++++++++++++++ 1 file changed, 1166 insertions(+) diff --git a/python/packages/core/tests/core/test_observability.py b/python/packages/core/tests/core/test_observability.py index 88245cfa52..d2677fbddf 100644 --- a/python/packages/core/tests/core/test_observability.py +++ b/python/packages/core/tests/core/test_observability.py @@ -1035,3 +1035,1169 @@ def test_parse_headers_invalid_format(): headers = _parse_headers("key1=value1,invalid,key2=value2") # Should only include valid pairs assert headers == {"key1": "value1", "key2": "value2"} + + +# region Test OtelAttr enum + + +def test_otel_attr_repr_and_str(): + """Test OtelAttr __repr__ and __str__ return the string value.""" + assert repr(OtelAttr.OPERATION) == "gen_ai.operation.name" + assert str(OtelAttr.OPERATION) == "gen_ai.operation.name" + assert str(OtelAttr.TOOL_EXECUTION_OPERATION) == "execute_tool" + + +# region Test create_metric_views + + +def test_create_metric_views(): + """Test create_metric_views returns expected views.""" + from agent_framework.observability import create_metric_views + + views = create_metric_views() + + assert len(views) == 3 + # Check that views are View objects + from opentelemetry.sdk.metrics.view import View + + for view in views: + assert isinstance(view, View) + + +# region Test ObservabilitySettings.is_setup + + +def test_observability_settings_is_setup_initial(monkeypatch): + """Test is_setup returns False initially.""" + from agent_framework.observability import ObservabilitySettings + + monkeypatch.delenv("ENABLE_INSTRUMENTATION", raising=False) + settings = ObservabilitySettings(env_file_path="test.env") + assert settings.is_setup is False + + +# region Test enable_instrumentation function + + +def test_enable_instrumentation_function(monkeypatch): + """Test enable_instrumentation function enables instrumentation.""" + import importlib + + import agent_framework.observability as observability + + monkeypatch.delenv("ENABLE_INSTRUMENTATION", raising=False) + monkeypatch.delenv("ENABLE_SENSITIVE_DATA", raising=False) + importlib.reload(observability) + + assert observability.OBSERVABILITY_SETTINGS.enable_instrumentation is False + + observability.enable_instrumentation() + assert observability.OBSERVABILITY_SETTINGS.enable_instrumentation is True + + +def test_enable_instrumentation_with_sensitive_data(monkeypatch): + """Test enable_instrumentation function with sensitive_data parameter.""" + import importlib + + import agent_framework.observability as observability + + monkeypatch.delenv("ENABLE_INSTRUMENTATION", raising=False) + monkeypatch.delenv("ENABLE_SENSITIVE_DATA", raising=False) + importlib.reload(observability) + + observability.enable_instrumentation(enable_sensitive_data=True) + assert observability.OBSERVABILITY_SETTINGS.enable_instrumentation is True + assert observability.OBSERVABILITY_SETTINGS.enable_sensitive_data is True + + +# region Test _to_otel_part content types + + +def test_to_otel_part_text(): + """Test _to_otel_part with text content.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + content = Content(type="text", text="Hello world") + result = _to_otel_part(content) + + assert result == {"type": "text", "content": "Hello world"} + + +def test_to_otel_part_text_reasoning(): + """Test _to_otel_part with text_reasoning content.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + content = Content(type="text_reasoning", text="Thinking about this...") + result = _to_otel_part(content) + + assert result == {"type": "reasoning", "content": "Thinking about this..."} + + +def test_to_otel_part_uri(): + """Test _to_otel_part with uri content.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + content = Content(type="uri", uri="https://example.com/image.png", media_type="image/png") + result = _to_otel_part(content) + + assert result == { + "type": "uri", + "uri": "https://example.com/image.png", + "mime_type": "image/png", + "modality": "image", + } + + +def test_to_otel_part_uri_no_media_type(): + """Test _to_otel_part with uri content without media_type.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + content = Content(type="uri", uri="https://example.com/file") + result = _to_otel_part(content) + + assert result == { + "type": "uri", + "uri": "https://example.com/file", + "mime_type": None, + "modality": None, + } + + +def test_to_otel_part_data(): + """Test _to_otel_part with data content.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + data = b"binary data" + content = Content.from_data(data=data, media_type="application/octet-stream") + result = _to_otel_part(content) + + assert result["type"] == "blob" + assert result["mime_type"] == "application/octet-stream" + assert result["modality"] == "application" + + +def test_to_otel_part_function_call(): + """Test _to_otel_part with function_call content.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + content = Content(type="function_call", call_id="call_123", name="test_function", arguments='{"arg1": "value1"}') + result = _to_otel_part(content) + + assert result == { + "type": "tool_call", + "id": "call_123", + "name": "test_function", + "arguments": '{"arg1": "value1"}', + } + + +def test_to_otel_part_function_result(): + """Test _to_otel_part with function_result content.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + content = Content(type="function_result", call_id="call_123", result="Success") + result = _to_otel_part(content) + + assert result["type"] == "tool_call_response" + assert result["id"] == "call_123" + + +# region Test workflow observability functions + + +def test_workflow_tracer_disabled(monkeypatch): + """Test workflow_tracer returns NoOpTracer when disabled.""" + import importlib + + from opentelemetry import trace + + import agent_framework.observability as observability + + monkeypatch.setenv("ENABLE_INSTRUMENTATION", "false") + importlib.reload(observability) + + tracer = observability.workflow_tracer() + assert isinstance(tracer, trace.NoOpTracer) + + +def test_create_workflow_span(span_exporter): + """Test create_workflow_span creates a span.""" + from agent_framework.observability import create_workflow_span + + span_exporter.clear() + with create_workflow_span("test_workflow", attributes={"key": "value"}): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test_workflow" + assert spans[0].attributes["key"] == "value" + + +def test_create_processing_span(span_exporter): + """Test create_processing_span creates a span with correct attributes.""" + from agent_framework.observability import OtelAttr, create_processing_span + + span_exporter.clear() + with create_processing_span( + executor_id="exec_1", + executor_type="TestExecutor", + message_type="standard", + payload_type="str", + ): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert OtelAttr.EXECUTOR_PROCESS_SPAN in spans[0].name + assert spans[0].attributes[OtelAttr.EXECUTOR_ID] == "exec_1" + assert spans[0].attributes[OtelAttr.EXECUTOR_TYPE] == "TestExecutor" + + +def test_create_edge_group_processing_span(span_exporter): + """Test create_edge_group_processing_span creates correct span.""" + from agent_framework.observability import OtelAttr, create_edge_group_processing_span + + span_exporter.clear() + with create_edge_group_processing_span( + edge_group_type="ConditionalEdge", + edge_group_id="edge_1", + message_source_id="source_1", + message_target_id="target_1", + ): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert OtelAttr.EDGE_GROUP_PROCESS_SPAN in spans[0].name + assert spans[0].attributes[OtelAttr.EDGE_GROUP_TYPE] == "ConditionalEdge" + assert spans[0].attributes[OtelAttr.EDGE_GROUP_ID] == "edge_1" + assert spans[0].attributes[OtelAttr.MESSAGE_SOURCE_ID] == "source_1" + assert spans[0].attributes[OtelAttr.MESSAGE_TARGET_ID] == "target_1" + + +def test_create_edge_group_processing_span_invalid_link(span_exporter): + """Test create_edge_group_processing_span handles invalid trace context gracefully.""" + from agent_framework.observability import create_edge_group_processing_span + + span_exporter.clear() + # Invalid trace context should be handled gracefully + trace_contexts = [{"traceparent": "invalid-format"}] + span_ids = ["invalid"] + + with create_edge_group_processing_span( + edge_group_type="ConditionalEdge", + source_trace_contexts=trace_contexts, + source_span_ids=span_ids, + ): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 # Should still create the span + + +# region Test EdgeGroupDeliveryStatus enum + + +def test_edge_group_delivery_status_str_and_repr(): + """Test EdgeGroupDeliveryStatus __str__ and __repr__ return the value.""" + from agent_framework.observability import EdgeGroupDeliveryStatus + + assert str(EdgeGroupDeliveryStatus.DELIVERED) == "delivered" + assert repr(EdgeGroupDeliveryStatus.DELIVERED) == "delivered" + assert str(EdgeGroupDeliveryStatus.EXCEPTION) == "exception" + + +# region Test _create_otlp_exporters with no endpoints + + +def test_create_otlp_exporters_no_endpoints(): + """Test _create_otlp_exporters returns empty list when no endpoints provided.""" + from agent_framework.observability import _create_otlp_exporters + + exporters = _create_otlp_exporters(protocol="grpc") + assert exporters == [] + + +# region Test exception handling in chat client traces + + +@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True) +async def test_chat_client_observability_exception(mock_chat_client, span_exporter: InMemorySpanExporter): + """Test that exceptions are captured in spans.""" + + class FailingChatClient(mock_chat_client): + async def _inner_get_response(self, *, messages, options, **kwargs): + raise ValueError("Test error") + + client = use_instrumentation(FailingChatClient)() + messages = [ChatMessage(role=Role.USER, text="Test")] + + span_exporter.clear() + with pytest.raises(ValueError, match="Test error"): + await client.get_response(messages=messages, model_id="Test") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.status.status_code == StatusCode.ERROR + + +@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True) +async def test_chat_client_streaming_observability_exception(mock_chat_client, span_exporter: InMemorySpanExporter): + """Test that exceptions in streaming are captured in spans.""" + + class FailingStreamingChatClient(mock_chat_client): + async def _inner_get_streaming_response(self, *, messages, options, **kwargs): + yield ChatResponseUpdate(text="Hello", role=Role.ASSISTANT) + raise ValueError("Streaming error") + + client = use_instrumentation(FailingStreamingChatClient)() + messages = [ChatMessage(role=Role.USER, text="Test")] + + span_exporter.clear() + with pytest.raises(ValueError, match="Streaming error"): + async for _ in client.get_streaming_response(messages=messages, model_id="Test"): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.status.status_code == StatusCode.ERROR + + +# region Test get_meter and get_tracer + + +def test_get_meter(): + """Test get_meter returns a meter with various parameters.""" + from agent_framework.observability import get_meter + + # Basic call + meter = get_meter() + assert meter is not None + + # With custom parameters + meter = get_meter(name="custom_meter", version="1.0.0", attributes={"custom": "attribute"}) + assert meter is not None + + +def test_get_tracer(): + """Test get_tracer returns a tracer with various parameters.""" + from agent_framework.observability import get_tracer + + # Basic call + tracer = get_tracer() + assert tracer is not None + + # With custom parameters + tracer = get_tracer( + instrumenting_module_name="custom_module", + instrumenting_library_version="2.0.0", + attributes={"custom": "attr"}, + ) + assert tracer is not None + + +# region Test _get_response_attributes + + +def test_get_response_attributes_with_response_id(): + """Test _get_response_attributes includes response_id.""" + from unittest.mock import Mock + + from agent_framework.observability import OtelAttr, _get_response_attributes + + response = Mock() + response.response_id = "resp_123" + response.finish_reason = None + response.raw_representation = None + response.usage_details = None + + attrs = {} + result = _get_response_attributes(attrs, response) + + assert result[OtelAttr.RESPONSE_ID] == "resp_123" + + +def test_get_response_attributes_with_finish_reason(): + """Test _get_response_attributes includes finish_reason.""" + from unittest.mock import Mock + + from agent_framework import FinishReason + from agent_framework.observability import OtelAttr, _get_response_attributes + + response = Mock() + response.response_id = None + response.finish_reason = FinishReason.STOP + response.raw_representation = None + response.usage_details = None + + attrs = {} + result = _get_response_attributes(attrs, response) + + assert OtelAttr.FINISH_REASONS in result + + +def test_get_response_attributes_with_model_id(): + """Test _get_response_attributes includes model_id.""" + from unittest.mock import Mock + + from opentelemetry.semconv_ai import SpanAttributes + + from agent_framework.observability import _get_response_attributes + + response = Mock() + response.response_id = None + response.finish_reason = None + response.raw_representation = None + response.usage_details = None + response.model_id = "gpt-4" + + attrs = {} + result = _get_response_attributes(attrs, response) + + assert result[SpanAttributes.LLM_RESPONSE_MODEL] == "gpt-4" + + +def test_get_response_attributes_with_usage(): + """Test _get_response_attributes includes usage details.""" + from unittest.mock import Mock + + from agent_framework.observability import OtelAttr, _get_response_attributes + + response = Mock() + response.response_id = None + response.finish_reason = None + response.raw_representation = None + response.usage_details = {"input_token_count": 100, "output_token_count": 50} + + attrs = {} + result = _get_response_attributes(attrs, response) + + assert result[OtelAttr.INPUT_TOKENS] == 100 + assert result[OtelAttr.OUTPUT_TOKENS] == 50 + + +def test_get_response_attributes_with_duration(): + """Test _get_response_attributes includes duration.""" + from unittest.mock import Mock + + from opentelemetry.semconv_ai import Meters + + from agent_framework.observability import _get_response_attributes + + response = Mock() + response.response_id = None + response.finish_reason = None + response.raw_representation = None + response.usage_details = None + + attrs = {} + result = _get_response_attributes(attrs, response, duration=1.5) + + assert result[Meters.LLM_OPERATION_DURATION] == 1.5 + + +def test_get_response_attributes_capture_usage_false(): + """Test _get_response_attributes skips usage when capture_usage is False.""" + from unittest.mock import Mock + + from agent_framework.observability import OtelAttr, _get_response_attributes + + response = Mock() + response.response_id = None + response.finish_reason = None + response.raw_representation = None + response.usage_details = {"input_token_count": 100, "output_token_count": 50} + + attrs = {} + result = _get_response_attributes(attrs, response, capture_usage=False) + + assert OtelAttr.INPUT_TOKENS not in result + assert OtelAttr.OUTPUT_TOKENS not in result + + +# region Test _get_exporters_from_env + + +def test_get_exporters_from_env_no_endpoints(monkeypatch): + """Test _get_exporters_from_env returns empty list when no endpoints set.""" + from agent_framework.observability import _get_exporters_from_env + + # Clear all OTEL env vars + for key in [ + "OTEL_EXPORTER_OTLP_ENDPOINT", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + ]: + monkeypatch.delenv(key, raising=False) + + exporters = _get_exporters_from_env() + assert exporters == [] + + +# region Test ObservabilitySettings._configure + + +def test_observability_settings_configure_not_enabled(monkeypatch): + """Test _configure does nothing when instrumentation is not enabled.""" + from agent_framework.observability import ObservabilitySettings + + monkeypatch.setenv("ENABLE_INSTRUMENTATION", "false") + settings = ObservabilitySettings(env_file_path="test.env") + + # Should not raise, should just return early + settings._configure() + assert settings.is_setup is False + + +def test_observability_settings_configure_already_setup(monkeypatch): + """Test _configure does nothing when already set up.""" + from agent_framework.observability import ObservabilitySettings + + monkeypatch.setenv("ENABLE_INSTRUMENTATION", "true") + # Clear OTEL endpoints to avoid import errors + for key in [ + "OTEL_EXPORTER_OTLP_ENDPOINT", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + ]: + monkeypatch.delenv(key, raising=False) + + settings = ObservabilitySettings(env_file_path="test.env") + + # Manually mark as set up + settings._executed_setup = True + + # Should not re-configure + settings._configure() + assert settings.is_setup is True + + +# region Test _to_otel_part edge cases + + +def test_to_otel_part_generic(): + """Test _to_otel_part with unknown content type uses to_dict fallback.""" + from agent_framework import Content + from agent_framework.observability import _to_otel_part + + # Create a content with type that falls to default case + content = Content(type="annotations", text="some text") + result = _to_otel_part(content) + + # Should return result from to_dict + assert result is not None + assert isinstance(result, dict) + + +# region Test finish_reason from raw_representation + + +def test_get_response_attributes_finish_reason_from_raw(): + """Test _get_response_attributes gets finish_reason from raw_representation.""" + from unittest.mock import Mock + + from agent_framework import FinishReason + from agent_framework.observability import OtelAttr, _get_response_attributes + + raw_rep = Mock() + raw_rep.finish_reason = FinishReason.LENGTH + + response = Mock() + response.response_id = None + response.finish_reason = None # No direct finish_reason + response.raw_representation = raw_rep + response.usage_details = None + + attrs = {} + result = _get_response_attributes(attrs, response) + + assert OtelAttr.FINISH_REASONS in result + + +# region Test agent instrumentation + + +@pytest.mark.parametrize("enable_sensitive_data", [True, False], indirect=True) +async def test_agent_observability(span_exporter: InMemorySpanExporter, enable_sensitive_data): + """Test use_agent_instrumentation decorator with a mock agent.""" + + from agent_framework.observability import use_agent_instrumentation + + class MockAgent(AgentProtocol): + AGENT_PROVIDER_NAME = "test_provider" + + def __init__(self): + self._id = "test_agent" + self._name = "Test Agent" + self._description = "A test agent" + self._default_options = {} + + @property + def id(self): + return self._id + + @property + def name(self): + return self._name + + @property + def description(self): + return self._description + + @property + def default_options(self): + return self._default_options + + async def run( + self, + messages=None, + *, + thread=None, + **kwargs, + ): + return AgentResponse( + messages=[ChatMessage(role=Role.ASSISTANT, text="Test response")], + thread=thread, + ) + + async def run_stream( + self, + messages=None, + *, + thread=None, + **kwargs, + ): + from agent_framework import AgentResponseUpdate + + yield AgentResponseUpdate(text="Test", role=Role.ASSISTANT) + + decorated_agent = use_agent_instrumentation(MockAgent) + agent = decorated_agent() + + span_exporter.clear() + response = await agent.run(messages="Hello") + + assert response is not None + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + +@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True) +async def test_agent_observability_with_exception(span_exporter: InMemorySpanExporter, enable_sensitive_data): + """Test agent instrumentation captures exceptions.""" + from agent_framework.observability import use_agent_instrumentation + + class FailingAgent(AgentProtocol): + AGENT_PROVIDER_NAME = "test_provider" + + def __init__(self): + self._id = "failing_agent" + self._name = "Failing Agent" + self._description = "An agent that fails" + self._default_options = {} + + @property + def id(self): + return self._id + + @property + def name(self): + return self._name + + @property + def description(self): + return self._description + + @property + def default_options(self): + return self._default_options + + async def run(self, messages=None, *, thread=None, **kwargs): + raise RuntimeError("Agent failed") + + async def run_stream(self, messages=None, *, thread=None, **kwargs): + raise RuntimeError("Agent failed") + yield # Make it a generator + + decorated_agent = use_agent_instrumentation(FailingAgent) + agent = decorated_agent() + + span_exporter.clear() + with pytest.raises(RuntimeError, match="Agent failed"): + await agent.run(messages="Hello") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + + +# region Test agent streaming observability + + +@pytest.mark.parametrize("enable_sensitive_data", [True, False], indirect=True) +async def test_agent_streaming_observability(span_exporter: InMemorySpanExporter, enable_sensitive_data): + """Test agent streaming instrumentation.""" + from agent_framework import AgentResponseUpdate + from agent_framework.observability import use_agent_instrumentation + + class StreamingAgent(AgentProtocol): + AGENT_PROVIDER_NAME = "test_provider" + + def __init__(self): + self._id = "streaming_agent" + self._name = "Streaming Agent" + self._description = "A streaming test agent" + self._default_options = {} + + @property + def id(self): + return self._id + + @property + def name(self): + return self._name + + @property + def description(self): + return self._description + + @property + def default_options(self): + return self._default_options + + async def run(self, messages=None, *, thread=None, **kwargs): + return AgentResponse( + messages=[ChatMessage(role=Role.ASSISTANT, text="Test")], + thread=thread, + ) + + async def run_stream(self, messages=None, *, thread=None, **kwargs): + yield AgentResponseUpdate(text="Hello ", role=Role.ASSISTANT) + yield AgentResponseUpdate(text="World", role=Role.ASSISTANT) + + decorated_agent = use_agent_instrumentation(StreamingAgent) + agent = decorated_agent() + + span_exporter.clear() + updates = [] + async for update in agent.run_stream(messages="Hello"): + updates.append(update) + + assert len(updates) == 2 + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + +# region Test use_agent_instrumentation error cases + + +def test_use_agent_instrumentation_missing_run(): + """Test use_agent_instrumentation raises error when run method is missing.""" + from agent_framework.observability import use_agent_instrumentation + + class InvalidAgent: + AGENT_PROVIDER_NAME = "test" + + @property + def id(self): + return "test" + + @property + def name(self): + return "test" + + @property + def description(self): + return "test" + + with pytest.raises(AgentInitializationError): + use_agent_instrumentation(InvalidAgent) + + +# region Test _capture_messages with finish_reason + + +@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True) +async def test_capture_messages_with_finish_reason(mock_chat_client, span_exporter: InMemorySpanExporter): + """Test that finish_reason is captured in output messages.""" + import json + + from agent_framework import FinishReason + + class ClientWithFinishReason(mock_chat_client): + async def _inner_get_response(self, *, messages, options, **kwargs): + return ChatResponse( + messages=[ChatMessage(role=Role.ASSISTANT, text="Done")], + usage_details=UsageDetails(input_token_count=5, output_token_count=10), + finish_reason=FinishReason.STOP, + ) + + client = use_instrumentation(ClientWithFinishReason)() + messages = [ChatMessage(role=Role.USER, text="Test")] + + span_exporter.clear() + response = await client.get_response(messages=messages, model_id="Test") + + assert response is not None + assert response.finish_reason == FinishReason.STOP + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + + # Check output messages include finish_reason + output_messages = json.loads(span.attributes[OtelAttr.OUTPUT_MESSAGES]) + assert output_messages[-1].get("finish_reason") == "stop" + + +# region Test agent streaming exception + + +@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True) +async def test_agent_streaming_exception(span_exporter: InMemorySpanExporter, enable_sensitive_data): + """Test agent streaming captures exceptions.""" + from agent_framework import AgentResponseUpdate + from agent_framework.observability import use_agent_instrumentation + + class FailingStreamingAgent(AgentProtocol): + AGENT_PROVIDER_NAME = "test_provider" + + def __init__(self): + self._id = "failing_stream" + self._name = "Failing Stream" + self._description = "A failing streaming agent" + self._default_options = {} + + @property + def id(self): + return self._id + + @property + def name(self): + return self._name + + @property + def description(self): + return self._description + + @property + def default_options(self): + return self._default_options + + async def run(self, messages=None, *, thread=None, **kwargs): + return AgentResponse(messages=[], thread=thread) + + async def run_stream(self, messages=None, *, thread=None, **kwargs): + yield AgentResponseUpdate(text="Starting", role=Role.ASSISTANT) + raise RuntimeError("Stream failed") + + decorated_agent = use_agent_instrumentation(FailingStreamingAgent) + agent = decorated_agent() + + span_exporter.clear() + with pytest.raises(RuntimeError, match="Stream failed"): + async for _ in agent.run_stream(messages="Hello"): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + + +# region Test instrumentation when disabled + + +@pytest.mark.parametrize("enable_instrumentation", [False], indirect=True) +async def test_chat_client_when_disabled(mock_chat_client, span_exporter: InMemorySpanExporter): + """Test that no spans are created when instrumentation is disabled.""" + client = use_instrumentation(mock_chat_client)() + messages = [ChatMessage(role=Role.USER, text="Test")] + + span_exporter.clear() + response = await client.get_response(messages=messages, model_id="Test") + + assert response is not None + spans = span_exporter.get_finished_spans() + # No spans should be created when disabled + assert len(spans) == 0 + + +@pytest.mark.parametrize("enable_instrumentation", [False], indirect=True) +async def test_chat_client_streaming_when_disabled(mock_chat_client, span_exporter: InMemorySpanExporter): + """Test streaming creates no spans when instrumentation is disabled.""" + client = use_instrumentation(mock_chat_client)() + messages = [ChatMessage(role=Role.USER, text="Test")] + + span_exporter.clear() + updates = [] + async for update in client.get_streaming_response(messages=messages, model_id="Test"): + updates.append(update) + + assert len(updates) == 2 # Still works functionally + spans = span_exporter.get_finished_spans() + assert len(spans) == 0 + + +@pytest.mark.parametrize("enable_instrumentation", [False], indirect=True) +async def test_agent_when_disabled(span_exporter: InMemorySpanExporter): + """Test agent creates no spans when instrumentation is disabled.""" + from agent_framework.observability import use_agent_instrumentation + + class TestAgent(AgentProtocol): + AGENT_PROVIDER_NAME = "test" + + def __init__(self): + self._id = "test" + self._name = "Test" + self._description = "Test" + self._default_options = {} + + @property + def id(self): + return self._id + + @property + def name(self): + return self._name + + @property + def description(self): + return self._description + + @property + def default_options(self): + return self._default_options + + async def run(self, messages=None, *, thread=None, **kwargs): + return AgentResponse(messages=[], thread=thread) + + async def run_stream(self, messages=None, *, thread=None, **kwargs): + from agent_framework import AgentResponseUpdate + + yield AgentResponseUpdate(text="test", role=Role.ASSISTANT) + + decorated = use_agent_instrumentation(TestAgent) + agent = decorated() + + span_exporter.clear() + await agent.run(messages="Hello") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 0 + + +@pytest.mark.parametrize("enable_instrumentation", [False], indirect=True) +async def test_agent_streaming_when_disabled(span_exporter: InMemorySpanExporter): + """Test agent streaming creates no spans when disabled.""" + from agent_framework import AgentResponseUpdate + from agent_framework.observability import use_agent_instrumentation + + class TestAgent(AgentProtocol): + AGENT_PROVIDER_NAME = "test" + + def __init__(self): + self._id = "test" + self._name = "Test" + self._description = "Test" + self._default_options = {} + + @property + def id(self): + return self._id + + @property + def name(self): + return self._name + + @property + def description(self): + return self._description + + @property + def default_options(self): + return self._default_options + + async def run(self, messages=None, *, thread=None, **kwargs): + return AgentResponse(messages=[], thread=thread) + + async def run_stream(self, messages=None, *, thread=None, **kwargs): + yield AgentResponseUpdate(text="test", role=Role.ASSISTANT) + + decorated = use_agent_instrumentation(TestAgent) + agent = decorated() + + span_exporter.clear() + updates = [] + async for u in agent.run_stream(messages="Hello"): + updates.append(u) + + assert len(updates) == 1 + spans = span_exporter.get_finished_spans() + assert len(spans) == 0 + + +# region Test _configure_providers + + +def test_configure_providers_with_span_exporters(monkeypatch): + """Test _configure_providers correctly handles span exporters.""" + from unittest.mock import Mock, patch + + from opentelemetry.sdk.trace.export import SpanExporter + + from agent_framework.observability import ObservabilitySettings + + monkeypatch.setenv("ENABLE_INSTRUMENTATION", "true") + for key in [ + "OTEL_EXPORTER_OTLP_ENDPOINT", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + ]: + monkeypatch.delenv(key, raising=False) + + settings = ObservabilitySettings(env_file_path="test.env") + + # Create mock span exporter + mock_span_exporter = Mock(spec=SpanExporter) + + with patch("opentelemetry.trace.set_tracer_provider") as mock_set_tracer: + settings._configure_providers([mock_span_exporter]) + + mock_set_tracer.assert_called_once() + + +# region Test histograms + + +def test_get_duration_histogram(): + """Test _get_duration_histogram creates histogram.""" + from agent_framework.observability import _get_duration_histogram + + histogram = _get_duration_histogram() + assert histogram is not None + + +def test_get_token_usage_histogram(): + """Test _get_token_usage_histogram creates histogram.""" + from agent_framework.observability import _get_token_usage_histogram + + histogram = _get_token_usage_histogram() + assert histogram is not None + + +# region Test capture_exception + + +def test_capture_exception(): + """Test capture_exception adds exception info to span.""" + from unittest.mock import Mock + + from agent_framework.observability import capture_exception + + mock_span = Mock() + exception = ValueError("Test error") + timestamp = 12345 + + capture_exception(span=mock_span, exception=exception, timestamp=timestamp) + + mock_span.set_status.assert_called_once() + mock_span.record_exception.assert_called_once() + + +# region Test _get_span + + +def test_get_span_creates_span(span_exporter: InMemorySpanExporter): + """Test _get_span creates a span with correct attributes.""" + from agent_framework.observability import OtelAttr, _get_span + + span_exporter.clear() + attributes = { + OtelAttr.OPERATION: "test_operation", + OtelAttr.TOOL_NAME: "test_tool", + } + + with _get_span(attributes=attributes, span_name_attribute=OtelAttr.TOOL_NAME): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert "test_tool" in spans[0].name + + +# region Test _get_span_attributes + + +def test_get_span_attributes(): + """Test _get_span_attributes creates correct attribute dict.""" + from agent_framework.observability import OtelAttr, _get_span_attributes + + attrs = _get_span_attributes( + operation_name="chat", + provider_name="openai", + model="gpt-4", + service_url="https://api.openai.com", + ) + + assert attrs[OtelAttr.OPERATION] == "chat" + assert OtelAttr.ADDRESS in attrs + + +def test_get_span_attributes_with_agent_info(): + """Test _get_span_attributes with agent-specific info.""" + from agent_framework.observability import OtelAttr, _get_span_attributes + + attrs = _get_span_attributes( + operation_name="invoke_agent", + provider_name="test", + agent_id="agent_1", + agent_name="Test Agent", + agent_description="A test agent", + thread_id="thread_123", + ) + + assert attrs[OtelAttr.AGENT_ID] == "agent_1" + assert attrs[OtelAttr.AGENT_NAME] == "Test Agent" + assert attrs[OtelAttr.AGENT_DESCRIPTION] == "A test agent" + + +# region Test _capture_response + + +def test_capture_response(span_exporter: InMemorySpanExporter): + """Test _capture_response records to histograms.""" + from unittest.mock import Mock + + from agent_framework.observability import OtelAttr, _capture_response + + mock_span = Mock() + mock_token_histogram = Mock() + mock_duration_histogram = Mock() + + attrs = { + "gen_ai.request.model": "test-model", + OtelAttr.INPUT_TOKENS: 100, + OtelAttr.OUTPUT_TOKENS: 50, + } + + _capture_response( + span=mock_span, + attributes=attrs, + token_usage_histogram=mock_token_histogram, + operation_duration_histogram=mock_duration_histogram, + ) + + # Token histogram should be called for input and output tokens + assert mock_token_histogram.record.call_count >= 0 # May or may not be called depending on implementation From d36a2f8e7c94d9b9cbc9323b2a1c684cfed0f0b0 Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Tue, 27 Jan 2026 14:16:33 -0800 Subject: [PATCH 2/2] Address PR review comments for observability tests --- .../core/tests/core/test_observability.py | 75 +++++++++++-------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/python/packages/core/tests/core/test_observability.py b/python/packages/core/tests/core/test_observability.py index d2677fbddf..dddc947bd2 100644 --- a/python/packages/core/tests/core/test_observability.py +++ b/python/packages/core/tests/core/test_observability.py @@ -1083,10 +1083,10 @@ def test_enable_instrumentation_function(monkeypatch): """Test enable_instrumentation function enables instrumentation.""" import importlib - import agent_framework.observability as observability - monkeypatch.delenv("ENABLE_INSTRUMENTATION", raising=False) monkeypatch.delenv("ENABLE_SENSITIVE_DATA", raising=False) + + observability = importlib.import_module("agent_framework.observability") importlib.reload(observability) assert observability.OBSERVABILITY_SETTINGS.enable_instrumentation is False @@ -1099,10 +1099,10 @@ def test_enable_instrumentation_with_sensitive_data(monkeypatch): """Test enable_instrumentation function with sensitive_data parameter.""" import importlib - import agent_framework.observability as observability - monkeypatch.delenv("ENABLE_INSTRUMENTATION", raising=False) monkeypatch.delenv("ENABLE_SENSITIVE_DATA", raising=False) + + observability = importlib.import_module("agent_framework.observability") importlib.reload(observability) observability.enable_instrumentation(enable_sensitive_data=True) @@ -1218,9 +1218,9 @@ def test_workflow_tracer_disabled(monkeypatch): from opentelemetry import trace - import agent_framework.observability as observability - monkeypatch.setenv("ENABLE_INSTRUMENTATION", "false") + + observability = importlib.import_module("agent_framework.observability") importlib.reload(observability) tracer = observability.workflow_tracer() @@ -1697,6 +1697,7 @@ async def run_stream( @pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True) async def test_agent_observability_with_exception(span_exporter: InMemorySpanExporter, enable_sensitive_data): """Test agent instrumentation captures exceptions.""" + from agent_framework import AgentResponseUpdate from agent_framework.observability import use_agent_instrumentation class FailingAgent(AgentProtocol): @@ -1728,8 +1729,9 @@ async def run(self, messages=None, *, thread=None, **kwargs): raise RuntimeError("Agent failed") async def run_stream(self, messages=None, *, thread=None, **kwargs): + # yield before raise to make this an async generator + yield AgentResponseUpdate(text="", role=Role.ASSISTANT) raise RuntimeError("Agent failed") - yield # Make it a generator decorated_agent = use_agent_instrumentation(FailingAgent) agent = decorated_agent() @@ -2100,20 +2102,26 @@ def test_get_token_usage_histogram(): # region Test capture_exception -def test_capture_exception(): +def test_capture_exception(span_exporter: InMemorySpanExporter): """Test capture_exception adds exception info to span.""" - from unittest.mock import Mock + from time import time_ns - from agent_framework.observability import capture_exception + from opentelemetry.trace import StatusCode - mock_span = Mock() - exception = ValueError("Test error") - timestamp = 12345 + from agent_framework.observability import capture_exception, get_tracer - capture_exception(span=mock_span, exception=exception, timestamp=timestamp) + span_exporter.clear() + tracer = get_tracer() - mock_span.set_status.assert_called_once() - mock_span.record_exception.assert_called_once() + with tracer.start_as_current_span("test_span") as span: + exception = ValueError("Test error") + capture_exception(span=span, exception=exception, timestamp=time_ns()) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + # Verify exception was recorded + assert len(spans[0].events) > 0 # region Test _get_span @@ -2177,14 +2185,17 @@ def test_get_span_attributes_with_agent_info(): def test_capture_response(span_exporter: InMemorySpanExporter): - """Test _capture_response records to histograms.""" - from unittest.mock import Mock + """Test _capture_response sets span attributes and records to histograms.""" + from agent_framework.observability import OtelAttr, _capture_response, get_tracer + + span_exporter.clear() + tracer = get_tracer() - from agent_framework.observability import OtelAttr, _capture_response + # Create real histograms + from agent_framework.observability import _get_duration_histogram, _get_token_usage_histogram - mock_span = Mock() - mock_token_histogram = Mock() - mock_duration_histogram = Mock() + token_histogram = _get_token_usage_histogram() + duration_histogram = _get_duration_histogram() attrs = { "gen_ai.request.model": "test-model", @@ -2192,12 +2203,16 @@ def test_capture_response(span_exporter: InMemorySpanExporter): OtelAttr.OUTPUT_TOKENS: 50, } - _capture_response( - span=mock_span, - attributes=attrs, - token_usage_histogram=mock_token_histogram, - operation_duration_histogram=mock_duration_histogram, - ) + with tracer.start_as_current_span("test_span") as span: + _capture_response( + span=span, + attributes=attrs, + token_usage_histogram=token_histogram, + operation_duration_histogram=duration_histogram, + ) - # Token histogram should be called for input and output tokens - assert mock_token_histogram.record.call_count >= 0 # May or may not be called depending on implementation + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + # Verify attributes were set on the span + assert spans[0].attributes.get(OtelAttr.INPUT_TOKENS) == 100 + assert spans[0].attributes.get(OtelAttr.OUTPUT_TOKENS) == 50