From 8a50b5342d4673d4ef2b1bae5cab758e8f4fc66c Mon Sep 17 00:00:00 2001 From: claude89757 <138977524+claude89757@users.noreply.github.com> Date: Sat, 29 Nov 2025 01:23:30 +0800 Subject: [PATCH 1/7] Fix MCP tool result serialization for list[TextContent] When MCP tools return results containing list[TextContent], they were incorrectly serialized to object repr strings like: '[]' This fix properly extracts text content from list items by: 1. Checking if items have a 'text' attribute (TextContent) 2. Using model_dump() for items that support it 3. Falling back to str() for other types 4. Joining single items as plain text, multiple items as JSON array Fixes #2509 --- .../packages/ag-ui/agent_framework_ag_ui/_events.py | 11 +++++++++++ .../agent_framework_ag_ui/_message_adapters.py | 13 +++++++++++++ 2 files changed, 24 insertions(+) diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_events.py b/python/packages/ag-ui/agent_framework_ag_ui/_events.py index 184da0239e..e06e39a5a8 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_events.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_events.py @@ -393,6 +393,17 @@ def _handle_function_result_content(self, content: FunctionResultContent) -> lis result_message_id = generate_event_id() if isinstance(content.result, dict): result_content = json.dumps(content.result) # type: ignore[arg-type] + elif isinstance(content.result, list): + # Handle Contents list (e.g., from MCP tool results) + texts = [] + for item in content.result: + if hasattr(item, "text"): # TextContent + texts.append(item.text) + elif hasattr(item, "model_dump"): + texts.append(json.dumps(item.model_dump(mode="json"))) + else: + texts.append(str(item)) + result_content = "\n".join(texts) if len(texts) == 1 else json.dumps(texts) elif content.result is not None: result_content = str(content.result) else: diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py b/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py index 11d2977f90..e12883da6c 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py @@ -242,6 +242,19 @@ def agent_framework_messages_to_agui(messages: list[ChatMessage] | list[dict[str import json content_text = json.dumps(content.result) # type: ignore + elif isinstance(content.result, list): + # Handle Contents list (e.g., from MCP tool results) + import json + + texts = [] + for item in content.result: + if hasattr(item, "text"): # TextContent + texts.append(item.text) + elif hasattr(item, "model_dump"): + texts.append(json.dumps(item.model_dump(mode="json"))) + else: + texts.append(str(item)) + content_text = "\n".join(texts) if len(texts) == 1 else json.dumps(texts) elif content.result is not None: content_text = str(content.result) From b8086f25166404e98a5f61837ae9f07e63e0030e Mon Sep 17 00:00:00 2001 From: claude89757 <138977524+claude89757@users.noreply.github.com> Date: Wed, 3 Dec 2025 23:56:41 +0800 Subject: [PATCH 2/7] Address PR review feedback for MCP tool result serialization - Extract serialize_content_result() to shared _utils.py - Fix logic: use texts[0] instead of join for single item - Add type annotation: texts: list[str] = [] - Return empty string for empty list instead of '[]' - Move import json to file top level - Add comprehensive unit tests for serialization --- .../ag-ui/agent_framework_ag_ui/_events.py | 20 +-- .../_message_adapters.py | 29 +---- .../ag-ui/agent_framework_ag_ui/_utils.py | 32 +++++ .../ag-ui/tests/test_message_adapters.py | 114 +++++++++++++++++- python/packages/ag-ui/tests/test_utils.py | 114 +++++++++++++++++- 5 files changed, 265 insertions(+), 44 deletions(-) diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_events.py b/python/packages/ag-ui/agent_framework_ag_ui/_events.py index e06e39a5a8..17d8cda23b 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_events.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_events.py @@ -33,7 +33,7 @@ TextContent, ) -from ._utils import generate_event_id +from ._utils import generate_event_id, serialize_content_result logger = logging.getLogger(__name__) @@ -391,23 +391,7 @@ def _handle_function_result_content(self, content: FunctionResultContent) -> lis self.state_delta_count = 0 result_message_id = generate_event_id() - if isinstance(content.result, dict): - result_content = json.dumps(content.result) # type: ignore[arg-type] - elif isinstance(content.result, list): - # Handle Contents list (e.g., from MCP tool results) - texts = [] - for item in content.result: - if hasattr(item, "text"): # TextContent - texts.append(item.text) - elif hasattr(item, "model_dump"): - texts.append(json.dumps(item.model_dump(mode="json"))) - else: - texts.append(str(item)) - result_content = "\n".join(texts) if len(texts) == 1 else json.dumps(texts) - elif content.result is not None: - result_content = str(content.result) - else: - result_content = "" + result_content = serialize_content_result(content.result) result_event = ToolCallResultEvent( message_id=result_message_id, diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py b/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py index e12883da6c..f2848c69f4 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py @@ -2,6 +2,7 @@ """Message format conversion between AG-UI and Agent Framework.""" +import json from typing import Any, cast from agent_framework import ( @@ -59,10 +60,8 @@ def agui_messages_to_agent_framework(messages: list[dict[str, Any]]) -> list[Cha # Distinguish approval payloads from actual tool results is_approval = False if isinstance(result_content, str) and result_content: - import json as _json - try: - parsed = _json.loads(result_content) + parsed = json.loads(result_content) is_approval = isinstance(parsed, dict) and "accepted" in parsed except Exception: is_approval = False @@ -190,7 +189,7 @@ def agent_framework_messages_to_agui(messages: list[ChatMessage] | list[dict[str Returns: List of AG-UI message dictionaries """ - from ._utils import generate_event_id + from ._utils import generate_event_id, serialize_content_result result: list[dict[str, Any]] = [] for msg in messages: @@ -237,26 +236,8 @@ def agent_framework_messages_to_agui(messages: list[ChatMessage] | list[dict[str elif isinstance(content, FunctionResultContent): # Tool result content - extract call_id and result tool_result_call_id = content.call_id - # Serialize result to string - if isinstance(content.result, dict): - import json - - content_text = json.dumps(content.result) # type: ignore - elif isinstance(content.result, list): - # Handle Contents list (e.g., from MCP tool results) - import json - - texts = [] - for item in content.result: - if hasattr(item, "text"): # TextContent - texts.append(item.text) - elif hasattr(item, "model_dump"): - texts.append(json.dumps(item.model_dump(mode="json"))) - else: - texts.append(str(item)) - content_text = "\n".join(texts) if len(texts) == 1 else json.dumps(texts) - elif content.result is not None: - content_text = str(content.result) + # Serialize result to string using shared utility + content_text = serialize_content_result(content.result) agui_msg: dict[str, Any] = { "id": msg.message_id if msg.message_id else generate_event_id(), # Always include id diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_utils.py b/python/packages/ag-ui/agent_framework_ag_ui/_utils.py index 8b271988dc..5222f21290 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_utils.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_utils.py @@ -3,6 +3,7 @@ """Utility functions for AG-UI integration.""" import copy +import json import uuid from collections.abc import Callable, MutableMapping, Sequence from dataclasses import asdict, is_dataclass @@ -164,3 +165,34 @@ def convert_tools_to_agui_format( continue return results if results else None + + +def serialize_content_result(result: Any) -> str: # noqa: ANN401 + """Serialize content result to string for AG-UI. + + Handles various result types from tool execution, including dict, list + (e.g., list[TextContent] from MCP tools), and other types. + + Args: + result: The result to serialize (dict, list, or other types). + + Returns: + Serialized string representation. + """ + if result is None: + return "" + if isinstance(result, dict): + return json.dumps(result) + if isinstance(result, list): + if not result: # Empty list returns empty string + return "" + texts: list[str] = [] + for item in result: + if hasattr(item, "text"): # TextContent + texts.append(item.text) + elif hasattr(item, "model_dump"): + texts.append(json.dumps(item.model_dump(mode="json"))) + else: + texts.append(str(item)) + return texts[0] if len(texts) == 1 else json.dumps(texts) + return str(result) diff --git a/python/packages/ag-ui/tests/test_message_adapters.py b/python/packages/ag-ui/tests/test_message_adapters.py index a21375b87b..7351a41d5d 100644 --- a/python/packages/ag-ui/tests/test_message_adapters.py +++ b/python/packages/ag-ui/tests/test_message_adapters.py @@ -3,7 +3,7 @@ """Tests for message adapters.""" import pytest -from agent_framework import ChatMessage, FunctionCallContent, Role, TextContent +from agent_framework import ChatMessage, FunctionCallContent, FunctionResultContent, Role, TextContent from agent_framework_ag_ui._message_adapters import ( agent_framework_messages_to_agui, @@ -278,3 +278,115 @@ def test_extract_text_from_custom_contents(): result = extract_text_from_contents(contents) assert result == "Custom Mixed" + + +# Tests for FunctionResultContent serialization in agent_framework_messages_to_agui + + +def test_agent_framework_to_agui_function_result_dict(): + """Test converting FunctionResultContent with dict result to AG-UI.""" + msg = ChatMessage( + role=Role.TOOL, + contents=[FunctionResultContent(call_id="call-123", result={"key": "value", "count": 42})], + message_id="msg-789", + ) + + messages = agent_framework_messages_to_agui([msg]) + + assert len(messages) == 1 + agui_msg = messages[0] + assert agui_msg["role"] == "tool" + assert agui_msg["toolCallId"] == "call-123" + assert agui_msg["content"] == '{"key": "value", "count": 42}' + + +def test_agent_framework_to_agui_function_result_none(): + """Test converting FunctionResultContent with None result to AG-UI.""" + msg = ChatMessage( + role=Role.TOOL, + contents=[FunctionResultContent(call_id="call-123", result=None)], + message_id="msg-789", + ) + + messages = agent_framework_messages_to_agui([msg]) + + assert len(messages) == 1 + agui_msg = messages[0] + assert agui_msg["content"] == "" + + +def test_agent_framework_to_agui_function_result_string(): + """Test converting FunctionResultContent with string result to AG-UI.""" + msg = ChatMessage( + role=Role.TOOL, + contents=[FunctionResultContent(call_id="call-123", result="plain text result")], + message_id="msg-789", + ) + + messages = agent_framework_messages_to_agui([msg]) + + assert len(messages) == 1 + agui_msg = messages[0] + assert agui_msg["content"] == "plain text result" + + +def test_agent_framework_to_agui_function_result_empty_list(): + """Test converting FunctionResultContent with empty list result to AG-UI.""" + msg = ChatMessage( + role=Role.TOOL, + contents=[FunctionResultContent(call_id="call-123", result=[])], + message_id="msg-789", + ) + + messages = agent_framework_messages_to_agui([msg]) + + assert len(messages) == 1 + agui_msg = messages[0] + assert agui_msg["content"] == "" + + +def test_agent_framework_to_agui_function_result_single_text_content(): + """Test converting FunctionResultContent with single TextContent-like item.""" + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + msg = ChatMessage( + role=Role.TOOL, + contents=[FunctionResultContent(call_id="call-123", result=[MockTextContent("Hello from MCP!")])], + message_id="msg-789", + ) + + messages = agent_framework_messages_to_agui([msg]) + + assert len(messages) == 1 + agui_msg = messages[0] + # Single item should return plain text, not JSON + assert agui_msg["content"] == "Hello from MCP!" + + +def test_agent_framework_to_agui_function_result_multiple_text_contents(): + """Test converting FunctionResultContent with multiple TextContent-like items.""" + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + msg = ChatMessage( + role=Role.TOOL, + contents=[ + FunctionResultContent( + call_id="call-123", + result=[MockTextContent("First result"), MockTextContent("Second result")], + ) + ], + message_id="msg-789", + ) + + messages = agent_framework_messages_to_agui([msg]) + + assert len(messages) == 1 + agui_msg = messages[0] + # Multiple items should return JSON array + assert agui_msg["content"] == '["First result", "Second result"]' diff --git a/python/packages/ag-ui/tests/test_utils.py b/python/packages/ag-ui/tests/test_utils.py index 4a6d0360bd..dc01479327 100644 --- a/python/packages/ag-ui/tests/test_utils.py +++ b/python/packages/ag-ui/tests/test_utils.py @@ -5,7 +5,12 @@ from dataclasses import dataclass from datetime import date, datetime -from agent_framework_ag_ui._utils import generate_event_id, make_json_safe, merge_state +from agent_framework_ag_ui._utils import ( + generate_event_id, + make_json_safe, + merge_state, + serialize_content_result, +) def test_generate_event_id(): @@ -303,3 +308,110 @@ def tool2(y: str) -> str: assert len(result) == 2 assert result[0]["name"] == "tool1" assert result[1]["name"] == "tool2" + + +# Tests for serialize_content_result + + +def test_serialize_content_result_none(): + """Test serializing None returns empty string.""" + result = serialize_content_result(None) + assert result == "" + + +def test_serialize_content_result_dict(): + """Test serializing dict returns JSON string.""" + result = serialize_content_result({"key": "value", "number": 42}) + assert result == '{"key": "value", "number": 42}' + + +def test_serialize_content_result_empty_list(): + """Test serializing empty list returns empty string.""" + result = serialize_content_result([]) + assert result == "" + + +def test_serialize_content_result_single_text_content(): + """Test serializing single TextContent-like object returns plain text.""" + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + result = serialize_content_result([MockTextContent("Hello, world!")]) + assert result == "Hello, world!" + + +def test_serialize_content_result_multiple_text_contents(): + """Test serializing multiple TextContent-like objects returns JSON array.""" + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + result = serialize_content_result([MockTextContent("First"), MockTextContent("Second")]) + assert result == '["First", "Second"]' + + +def test_serialize_content_result_model_dump_object(): + """Test serializing object with model_dump method.""" + + class MockModel: + def model_dump(self, mode: str = "python"): + return {"type": "model", "value": 123} + + result = serialize_content_result([MockModel()]) + # Single item should be the JSON string of the model dump + assert result == '{"type": "model", "value": 123}' + + +def test_serialize_content_result_multiple_model_dump_objects(): + """Test serializing multiple objects with model_dump method.""" + + class MockModel: + def __init__(self, value: int): + self._value = value + + def model_dump(self, mode: str = "python"): + return {"value": self._value} + + result = serialize_content_result([MockModel(1), MockModel(2)]) + assert result == '["{\\"value\\": 1}", "{\\"value\\": 2}"]' + + +def test_serialize_content_result_string_fallback(): + """Test serializing objects without text or model_dump falls back to str().""" + + class PlainObject: + def __str__(self): + return "plain_object_str" + + result = serialize_content_result([PlainObject()]) + assert result == "plain_object_str" + + +def test_serialize_content_result_mixed_list(): + """Test serializing list with mixed content types.""" + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + class PlainObject: + def __str__(self): + return "plain" + + result = serialize_content_result([MockTextContent("text1"), PlainObject()]) + assert result == '["text1", "plain"]' + + +def test_serialize_content_result_string(): + """Test serializing plain string returns the string.""" + result = serialize_content_result("just a string") + assert result == "just a string" + + +def test_serialize_content_result_number(): + """Test serializing number returns string representation.""" + result = serialize_content_result(42) + assert result == "42" From e4239b391296a50390f8ee896e4f7d1675dbdc77 Mon Sep 17 00:00:00 2001 From: claude89757 <138977524+claude89757@users.noreply.github.com> Date: Tue, 16 Dec 2025 00:28:21 +0800 Subject: [PATCH 3/7] Address PR review feedback: fix type checking and double serialization - Add isinstance(item.text, str) check to ensure text attribute is a string - Fix double-serialization issue by keeping model_dump results as dicts until final json.dumps (removes escaped JSON strings in arrays) - Improve docstring with detailed return value documentation - Add test for non-string text attribute handling - Add tests for list type tool results in _events.py path --- .../ag-ui/agent_framework_ag_ui/_events.py | 987 +++++++++--------- .../ag-ui/agent_framework_ag_ui/_utils.py | 24 +- .../ag-ui/tests/test_events_comprehensive.py | 92 ++ python/packages/ag-ui/tests/test_utils.py | 23 +- 4 files changed, 647 insertions(+), 479 deletions(-) diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_events.py b/python/packages/ag-ui/agent_framework_ag_ui/_events.py index 17d8cda23b..76fc7c33b5 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_events.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_events.py @@ -5,7 +5,6 @@ import json import logging import re -from copy import deepcopy from typing import Any from ag_ui.core import ( @@ -105,516 +104,566 @@ async def from_agent_run_update(self, update: AgentRunResponseUpdate) -> list[Ba for idx, content in enumerate(update.contents): logger.info(f" Content {idx}: type={type(content).__name__}") if isinstance(content, TextContent): - events.extend(self._handle_text_content(content)) - elif isinstance(content, FunctionCallContent): - events.extend(self._handle_function_call_content(content)) - elif isinstance(content, FunctionResultContent): - events.extend(self._handle_function_result_content(content)) - elif isinstance(content, FunctionApprovalRequestContent): - events.extend(self._handle_function_approval_request_content(content)) - - return events - - def _handle_text_content(self, content: TextContent) -> list[BaseEvent]: - events: list[BaseEvent] = [] - logger.info(f" TextContent found: length={len(content.text)}") - logger.info( - " Flags: skip_text_content=%s, should_stop_after_confirm=%s", - self.skip_text_content, - self.should_stop_after_confirm, - ) - - if self.skip_text_content: - logger.info(" SKIPPING TextContent: skip_text_content is True") - return events - - if self.should_stop_after_confirm: - logger.info(" SKIPPING TextContent: waiting for confirm_changes response") - self.suppressed_summary += content.text - logger.info(f" Suppressed summary length={len(self.suppressed_summary)}") - return events - - # Skip empty text chunks to avoid emitting - # TextMessageContentEvent with an empty `delta` which fails - # Pydantic validation (AG-UI requires non-empty strings). - if not content.text: - logger.info(" SKIPPING TextContent: empty chunk") - return events - - if not self.current_message_id: - self.current_message_id = generate_event_id() - start_event = TextMessageStartEvent( - message_id=self.current_message_id, - role="assistant", - ) - logger.info(f" EMITTING TextMessageStartEvent with message_id={self.current_message_id}") - events.append(start_event) - - event = TextMessageContentEvent( - message_id=self.current_message_id, - delta=content.text, - ) - self.accumulated_text_content += content.text - logger.info(f" EMITTING TextMessageContentEvent with text_len={len(content.text)}") - events.append(event) - return events - - def _handle_function_call_content(self, content: FunctionCallContent) -> list[BaseEvent]: - events: list[BaseEvent] = [] - if content.name: - logger.debug(f"Tool call: {content.name} (call_id: {content.call_id})") - - if not content.name and not content.call_id and not self.current_tool_call_name: - args_length = len(str(content.arguments)) if content.arguments else 0 - logger.warning(f"FunctionCallContent missing name and call_id. args_length={args_length}") - - tool_call_id = self._coalesce_tool_call_id(content) - if content.name and tool_call_id != self.current_tool_call_id: - self.streaming_tool_args = "" - self.state_delta_count = 0 - if content.name: - self.current_tool_call_id = tool_call_id - self.current_tool_call_name = content.name - - tool_start_event = ToolCallStartEvent( - tool_call_id=tool_call_id, - tool_call_name=content.name, - parent_message_id=self.current_message_id, - ) - logger.info(f"Emitting ToolCallStartEvent with name='{content.name}', id='{tool_call_id}'") - events.append(tool_start_event) - - self.pending_tool_calls.append( - { - "id": tool_call_id, - "type": "function", - "function": { - "name": content.name, - "arguments": "", - }, - } - ) - elif tool_call_id: - self.current_tool_call_id = tool_call_id - - if content.arguments: - delta_str = content.arguments if isinstance(content.arguments, str) else json.dumps(content.arguments) - logger.info(f"Emitting ToolCallArgsEvent with delta_length={len(delta_str)}, id='{tool_call_id}'") - args_event = ToolCallArgsEvent( - tool_call_id=tool_call_id, - delta=delta_str, - ) - events.append(args_event) - - for tool_call in self.pending_tool_calls: - if tool_call["id"] == tool_call_id: - tool_call["function"]["arguments"] += delta_str - break - - events.extend(self._emit_predictive_state_deltas(delta_str)) - events.extend(self._legacy_predictive_state(content)) - - return events - - def _coalesce_tool_call_id(self, content: FunctionCallContent) -> str: - if content.call_id: - return content.call_id - if self.current_tool_call_id: - return self.current_tool_call_id - return generate_event_id() - - def _emit_predictive_state_deltas(self, argument_chunk: str) -> list[BaseEvent]: - events: list[BaseEvent] = [] - if not self.current_tool_call_name or not self.predict_state_config: - return events - - self.streaming_tool_args += argument_chunk - logger.debug( - "Predictive state: accumulated %s chars for tool '%s'", - len(self.streaming_tool_args), - self.current_tool_call_name, - ) + logger.info( + f" TextContent found: text_length={len(content.text)}, text_preview='{content.text[:100]}'" + ) + logger.info( + f" Flags: skip_text_content={self.skip_text_content}, should_stop_after_confirm={self.should_stop_after_confirm}" + ) - parsed_args = None - try: - parsed_args = json.loads(self.streaming_tool_args) - except json.JSONDecodeError: - for state_key, config in self.predict_state_config.items(): - if config["tool"] != self.current_tool_call_name: + # Skip text content if using structured outputs (it's just the JSON) + if self.skip_text_content: + logger.info(" SKIPPING TextContent: skip_text_content is True") continue - tool_arg_name = config["tool_argument"] - pattern = rf'"{re.escape(tool_arg_name)}":\s*"([^"]*)' - match = re.search(pattern, self.streaming_tool_args) - - if match: - partial_value = match.group(1).replace("\\n", "\n").replace('\\"', '"').replace("\\\\", "\\") - - if state_key not in self.last_emitted_state or self.last_emitted_state[state_key] != partial_value: - state_delta_event = StateDeltaEvent( - delta=[ - { - "op": "replace", - "path": f"/{state_key}", - "value": partial_value, - } - ], - ) - self.state_delta_count += 1 - if self.state_delta_count % 10 == 1: - logger.info( - "StateDeltaEvent #%s for '%s': op=replace, path=/%s, value_length=%s", - self.state_delta_count, - state_key, - state_key, - len(str(partial_value)), - ) - elif self.state_delta_count % 100 == 0: - logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") + # Skip text content if we're about to emit confirm_changes + # The summary should only appear after user confirms + if self.should_stop_after_confirm: + logger.info(" SKIPPING TextContent: waiting for confirm_changes response") + # Save the summary text to show after confirmation + self.suppressed_summary += content.text + logger.info(f" Suppressed summary now has {len(self.suppressed_summary)} chars") + continue - events.append(state_delta_event) - self.last_emitted_state[state_key] = partial_value - self.pending_state_updates[state_key] = partial_value + if not self.current_message_id: + self.current_message_id = generate_event_id() + start_event = TextMessageStartEvent( + message_id=self.current_message_id, + role="assistant", + ) + logger.info(f" EMITTING TextMessageStartEvent with message_id={self.current_message_id}") + events.append(start_event) - if parsed_args: - for state_key, config in self.predict_state_config.items(): - if config["tool"] != self.current_tool_call_name: - continue - tool_arg_name = config["tool_argument"] + event = TextMessageContentEvent( + message_id=self.current_message_id, + delta=content.text, + ) + # Accumulate text content for final MessagesSnapshotEvent + self.accumulated_text_content += content.text + logger.info(f" EMITTING TextMessageContentEvent with delta: '{content.text}'") + events.append(event) - if tool_arg_name == "*": - state_value = parsed_args - elif tool_arg_name in parsed_args: - state_value = parsed_args[tool_arg_name] + elif isinstance(content, FunctionCallContent): + # Log tool calls for debugging + if content.name: + logger.debug(f"Tool call: {content.name} (call_id: {content.call_id})") + + if not content.name and not content.call_id and not self.current_tool_call_name: + args_preview = str(content.arguments)[:50] if content.arguments else "None" + logger.warning(f"FunctionCallContent missing name and call_id. Args: {args_preview}") + + # Get or use existing tool call ID - all chunks of same tool call share the same call_id + # Important: the first chunk might have name but no call_id yet + if content.call_id: + tool_call_id = content.call_id + elif self.current_tool_call_id: + tool_call_id = self.current_tool_call_id else: - continue + # Generate a new ID for this tool call + tool_call_id = ( + generate_event_id() + ) # Handle streaming tool calls - name comes in first chunk, arguments in subsequent chunks + if content.name: + # This is a new tool call or the first chunk with the name + self.current_tool_call_id = tool_call_id + self.current_tool_call_name = content.name + + tool_start_event = ToolCallStartEvent( + tool_call_id=tool_call_id, + tool_call_name=content.name, + parent_message_id=self.current_message_id, + ) + logger.info(f"Emitting ToolCallStartEvent with name='{content.name}', id='{tool_call_id}'") + events.append(tool_start_event) + + # Track tool call for MessagesSnapshotEvent + # Initialize a new tool call entry + self.pending_tool_calls.append( + { + "id": tool_call_id, + "type": "function", + "function": { + "name": content.name, + "arguments": "", # Will accumulate as we get argument chunks + }, + } + ) + else: + # Subsequent chunk without name - update our tracked ID if needed + if tool_call_id: + self.current_tool_call_id = tool_call_id + + # Emit arguments if present + if content.arguments: + # content.arguments is already a JSON string from the LLM for streaming calls + # For non-streaming it could be a dict, so we need to handle both + if isinstance(content.arguments, str): + delta_str = content.arguments + else: + # If it's a dict, convert to JSON + delta_str = json.dumps(content.arguments) + + logger.info(f"Emitting ToolCallArgsEvent with delta: {delta_str!r}..., id='{tool_call_id}'") + args_event = ToolCallArgsEvent( + tool_call_id=tool_call_id, + delta=delta_str, + ) + events.append(args_event) + + # Accumulate arguments for MessagesSnapshotEvent + if self.pending_tool_calls: + # Find the matching tool call and append the delta + for tool_call in self.pending_tool_calls: + if tool_call["id"] == tool_call_id: + tool_call["function"]["arguments"] += delta_str + break + + # Predictive state updates - accumulate streaming arguments and emit deltas + # Use current_tool_call_name since content.name is only present on first chunk + if self.current_tool_call_name and self.predict_state_config: + # Accumulate the argument string + if isinstance(content.arguments, str): + self.streaming_tool_args += content.arguments + else: + self.streaming_tool_args += json.dumps(content.arguments) + + logger.debug( + f"Predictive state: accumulated {len(self.streaming_tool_args)} chars for tool '{self.current_tool_call_name}'" + ) - if state_key not in self.last_emitted_state or self.last_emitted_state[state_key] != state_value: - state_delta_event = StateDeltaEvent( - delta=[ - { - "op": "replace", - "path": f"/{state_key}", - "value": state_value, - } - ], + # Try to parse accumulated arguments (may be incomplete JSON) + # We use a lenient approach: try standard parsing first, then try to extract partial values + parsed_args = None + try: + parsed_args = json.loads(self.streaming_tool_args) + except json.JSONDecodeError: + # JSON is incomplete - try to extract partial string values + # For streaming "document" field, we can extract: {"document": "text... + # Look for pattern: {"field": "value (incomplete) + for state_key, config in self.predict_state_config.items(): + if config["tool"] == self.current_tool_call_name: + tool_arg_name = config["tool_argument"] + + # Try to extract partial string value for this argument + # Pattern: "argument_name": "partial text + pattern = rf'"{re.escape(tool_arg_name)}":\s*"([^"]*)' + match = re.search(pattern, self.streaming_tool_args) + + if match: + partial_value = match.group(1) + # Unescape common sequences + partial_value = ( + partial_value.replace("\\n", "\n").replace('\\"', '"').replace("\\\\", "\\") + ) + + # Emit delta if we have new content + if ( + state_key not in self.last_emitted_state + or self.last_emitted_state[state_key] != partial_value + ): + state_delta_event = StateDeltaEvent( + delta=[ + { + "op": "replace", + "path": f"/{state_key}", + "value": partial_value, + } + ], + ) + + self.state_delta_count += 1 + if self.state_delta_count % 10 == 1: + value_preview = ( + str(partial_value)[:100] + "..." + if len(str(partial_value)) > 100 + else str(partial_value) + ) + logger.info( + f"StateDeltaEvent #{self.state_delta_count} for '{state_key}': " + f"op=replace, path=/{state_key}, value={value_preview}" + ) + elif self.state_delta_count % 100 == 0: + logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") + + events.append(state_delta_event) + self.last_emitted_state[state_key] = partial_value + self.pending_state_updates[state_key] = partial_value + + # If we successfully parsed complete JSON, process it + if parsed_args: + # Check if this tool matches any predictive state config + for state_key, config in self.predict_state_config.items(): + if config["tool"] == self.current_tool_call_name: + tool_arg_name = config["tool_argument"] + + # Extract the state value + if tool_arg_name == "*": + state_value = parsed_args + elif tool_arg_name in parsed_args: + state_value = parsed_args[tool_arg_name] + else: + continue + + # Only emit if state has changed from last emission + if ( + state_key not in self.last_emitted_state + or self.last_emitted_state[state_key] != state_value + ): + # Emit StateDeltaEvent for real-time UI updates (JSON Patch format) + state_delta_event = StateDeltaEvent( + delta=[ + { + "op": "replace", # Use replace since field exists in schema + "path": f"/{state_key}", # JSON Pointer path with leading slash + "value": state_value, + } + ], + ) + + # Increment counter and log every 10th emission with sample data + self.state_delta_count += 1 + if self.state_delta_count % 10 == 1: # Log 1st, 11th, 21st, etc. + value_preview = ( + str(state_value)[:100] + "..." + if len(str(state_value)) > 100 + else str(state_value) + ) + logger.info( + f"StateDeltaEvent #{self.state_delta_count} for '{state_key}': " + f"op=replace, path=/{state_key}, value={value_preview}" + ) + elif self.state_delta_count % 100 == 0: # Also log every 100th + logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") + + events.append(state_delta_event) + + # Track what we emitted + self.last_emitted_state[state_key] = state_value + self.pending_state_updates[state_key] = state_value + + # Legacy predictive state check (for when arguments are complete) + if content.name and content.arguments: + parsed_args = content.parse_arguments() + + if parsed_args: + logger.info(f"Checking predict_state_config: {self.predict_state_config}") + for state_key, config in self.predict_state_config.items(): + logger.info(f"Checking state_key='{state_key}', config={config}") + if config["tool"] == content.name: + tool_arg_name = config["tool_argument"] + logger.info( + f"MATCHED tool '{content.name}' for state key '{state_key}', arg='{tool_arg_name}'" + ) + + # If tool_argument is "*", use all arguments as the state value + if tool_arg_name == "*": + state_value = parsed_args + logger.info(f"Using all args as state value, keys: {list(state_value.keys())}") + elif tool_arg_name in parsed_args: + state_value = parsed_args[tool_arg_name] + logger.info(f"Using specific arg '{tool_arg_name}' as state value") + else: + logger.warning(f"Tool argument '{tool_arg_name}' not found in parsed args") + continue + + # Emit predictive delta (JSON Patch format) + state_delta_event = StateDeltaEvent( + delta=[ + { + "op": "replace", # Use replace since field exists in schema + "path": f"/{state_key}", # JSON Pointer path with leading slash + "value": state_value, + } + ], + ) + logger.info( + f"Emitting StateDeltaEvent for key '{state_key}', value type: {type(state_value)}" + ) + events.append(state_delta_event) + + # Track pending update for later snapshot + self.pending_state_updates[state_key] = state_value + + # Note: ToolCallEndEvent is emitted when we receive FunctionResultContent, + # not here during streaming, since we don't know when the stream is complete + + elif isinstance(content, FunctionResultContent): + # First emit ToolCallEndEvent to close the tool call + if content.call_id: + end_event = ToolCallEndEvent( + tool_call_id=content.call_id, ) + logger.info(f"Emitting ToolCallEndEvent for completed tool call '{content.call_id}'") + events.append(end_event) + self.tool_calls_ended.add(content.call_id) # Track that we emitted end event - self.state_delta_count += 1 - if self.state_delta_count % 10 == 1: + # Log total StateDeltaEvent count for this tool call + if self.state_delta_count > 0: logger.info( - "StateDeltaEvent #%s for '%s': op=replace, path=/%s, value_length=%s", - self.state_delta_count, - state_key, - state_key, - len(str(state_value)), + f"Tool call '{content.call_id}' complete: emitted {self.state_delta_count} StateDeltaEvents total" ) - elif self.state_delta_count % 100 == 0: - logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") - events.append(state_delta_event) - self.last_emitted_state[state_key] = state_value - self.pending_state_updates[state_key] = state_value - return events + # Reset streaming accumulator and counter for next tool call + self.streaming_tool_args = "" + self.state_delta_count = 0 - def _legacy_predictive_state(self, content: FunctionCallContent) -> list[BaseEvent]: - events: list[BaseEvent] = [] - if not (content.name and content.arguments): - return events - parsed_args = content.parse_arguments() - if not parsed_args: - return events - - logger.info( - "Checking predict_state_config keys: %s", - list(self.predict_state_config.keys()) if self.predict_state_config else "None", - ) - for state_key, config in self.predict_state_config.items(): - logger.info(f"Checking state_key='{state_key}'") - if config["tool"] != content.name: - continue - tool_arg_name = config["tool_argument"] - logger.info(f"MATCHED tool '{content.name}' for state key '{state_key}', arg='{tool_arg_name}'") - - state_value: Any - if tool_arg_name == "*": - state_value = parsed_args - logger.info(f"Using all args as state value, keys: {list(state_value.keys())}") - elif tool_arg_name in parsed_args: - state_value = parsed_args[tool_arg_name] - logger.info(f"Using specific arg '{tool_arg_name}' as state value") - else: - logger.warning(f"Tool argument '{tool_arg_name}' not found in parsed args") - continue - - previous_value = self.last_emitted_state.get(state_key, object()) - if previous_value == state_value: - logger.info( - "Skipping duplicate StateDeltaEvent for key '%s' - value unchanged", - state_key, + # Tool result - emit ToolCallResultEvent + result_message_id = generate_event_id() + result_content = serialize_content_result(content.result) + + result_event = ToolCallResultEvent( + message_id=result_message_id, + tool_call_id=content.call_id, + content=result_content, + role="tool", ) - continue + events.append(result_event) - state_delta_event = StateDeltaEvent( - delta=[ + # Track tool result for MessagesSnapshotEvent + # AG-UI protocol expects: { role: "tool", toolCallId: ..., content: ... } + # Use camelCase for Pydantic's alias_generator=to_camel + self.tool_results.append( { - "op": "replace", - "path": f"/{state_key}", - "value": state_value, + "id": result_message_id, + "role": "tool", + "toolCallId": content.call_id, + "content": result_content, } - ], - ) - logger.info(f"Emitting StateDeltaEvent for key '{state_key}', value type: {type(state_value)}") # type: ignore - events.append(state_delta_event) - self.pending_state_updates[state_key] = state_value - self.last_emitted_state[state_key] = state_value - return events - - def _handle_function_result_content(self, content: FunctionResultContent) -> list[BaseEvent]: - events: list[BaseEvent] = [] - if content.call_id: - end_event = ToolCallEndEvent( - tool_call_id=content.call_id, - ) - logger.info(f"Emitting ToolCallEndEvent for completed tool call '{content.call_id}'") - events.append(end_event) - self.tool_calls_ended.add(content.call_id) - - if self.state_delta_count > 0: - logger.info( - "Tool call '%s' complete: emitted %s StateDeltaEvents total", - content.call_id, - self.state_delta_count, ) - self.streaming_tool_args = "" - self.state_delta_count = 0 + # Emit MessagesSnapshotEvent with the complete conversation including tool calls and results + # This is required for CopilotKit's useCopilotAction to detect tool result + # HOWEVER: Skip this for predictive tools when require_confirmation=False, because + # the agent will generate a follow-up text message and we'll emit a complete snapshot at the end. + # Emitting here would create an incomplete snapshot that gets replaced, causing UI flicker. + should_emit_snapshot = self.pending_tool_calls and self.tool_results + + # Check if this is a predictive tool that will have a follow-up message + is_predictive_without_confirmation = False + if should_emit_snapshot and self.current_tool_call_name and self.predict_state_config: + for state_key, config in self.predict_state_config.items(): + if config["tool"] == self.current_tool_call_name and not self.require_confirmation: + is_predictive_without_confirmation = True + logger.info( + f"Skipping intermediate MessagesSnapshotEvent for predictive tool '{self.current_tool_call_name}' " + "- will emit complete snapshot after follow-up message" + ) + break - result_message_id = generate_event_id() - result_content = serialize_content_result(content.result) + if should_emit_snapshot and not is_predictive_without_confirmation: + # Import message adapter + from ._message_adapters import agent_framework_messages_to_agui - result_event = ToolCallResultEvent( - message_id=result_message_id, - tool_call_id=content.call_id, - content=result_content, - role="tool", - ) - events.append(result_event) - - self.tool_results.append( - { - "id": result_message_id, - "role": "tool", - "toolCallId": content.call_id, - "content": result_content, - } - ) + # Build assistant message with tool_calls + assistant_message = { + "id": generate_event_id(), + "role": "assistant", + "tool_calls": self.pending_tool_calls.copy(), # Copy the accumulated tool calls + } - events.extend(self._emit_snapshot_for_tool_result()) - events.extend(self._emit_state_snapshot_and_confirmation()) + # Convert Agent Framework messages to AG-UI format (adds required 'id' field) + converted_input_messages = agent_framework_messages_to_agui(self.input_messages) - return events + # Build complete messages array: input messages + assistant message + tool results + all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() - def _emit_snapshot_for_tool_result(self) -> list[BaseEvent]: - events: list[BaseEvent] = [] - should_emit_snapshot = self.pending_tool_calls and self.tool_results - - is_predictive_without_confirmation = False - if should_emit_snapshot and self.current_tool_call_name and self.predict_state_config: - for _, config in self.predict_state_config.items(): - if config["tool"] == self.current_tool_call_name and not self.require_confirmation: - is_predictive_without_confirmation = True - logger.info( - "Skipping intermediate MessagesSnapshotEvent for predictive tool '%s' - delaying until summary", - self.current_tool_call_name, + # Emit MessagesSnapshotEvent using the proper event type + # Note: messages are dict[str, Any] but Pydantic will validate them as Message types + messages_snapshot_event = MessagesSnapshotEvent( + type=EventType.MESSAGES_SNAPSHOT, + messages=all_messages, # type: ignore[arg-type] ) - break - - if should_emit_snapshot and not is_predictive_without_confirmation: - from ._message_adapters import agent_framework_messages_to_agui - - assistant_message = { - "id": generate_event_id(), - "role": "assistant", - "tool_calls": self.pending_tool_calls.copy(), - } - converted_input_messages = agent_framework_messages_to_agui(self.input_messages) - all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() - - messages_snapshot_event = MessagesSnapshotEvent( - type=EventType.MESSAGES_SNAPSHOT, - messages=all_messages, # type: ignore[arg-type] - ) - logger.info(f"Emitting MessagesSnapshotEvent with {len(all_messages)} messages") - events.append(messages_snapshot_event) - return events - - def _emit_state_snapshot_and_confirmation(self) -> list[BaseEvent]: - events: list[BaseEvent] = [] - if self.pending_state_updates: - for key, value in self.pending_state_updates.items(): - self.current_state[key] = value - - logger.info(f"Emitting StateSnapshotEvent with keys: {list(self.current_state.keys())}") - if "recipe" in self.current_state: - recipe = self.current_state["recipe"] - logger.info( - "Recipe fields: title=%s, skill_level=%s, ingredients_count=%s, instructions_count=%s", - recipe.get("title"), - recipe.get("skill_level"), - len(recipe.get("ingredients", [])), - len(recipe.get("instructions", [])), - ) + logger.info(f"Emitting MessagesSnapshotEvent with {len(all_messages)} messages") + events.append(messages_snapshot_event) + + # After tool execution, emit StateSnapshotEvent if we have pending state updates + if self.pending_state_updates: + # Update the current state with pending updates + for key, value in self.pending_state_updates.items(): + self.current_state[key] = value + + # Log the state structure for debugging + logger.info(f"Emitting StateSnapshotEvent with keys: {list(self.current_state.keys())}") + if "recipe" in self.current_state: + recipe = self.current_state["recipe"] + logger.info( + f"Recipe fields: title={recipe.get('title')}, " + f"skill_level={recipe.get('skill_level')}, " + f"ingredients_count={len(recipe.get('ingredients', []))}, " + f"instructions_count={len(recipe.get('instructions', []))}" + ) - state_snapshot_event = StateSnapshotEvent( - snapshot=self.current_state, - ) - events.append(state_snapshot_event) - - tool_was_predictive = False - logger.debug( - "Checking predictive state: current_tool='%s', predict_config=%s", - self.current_tool_call_name, - list(self.predict_state_config.keys()) if self.predict_state_config else "None", - ) - for state_key, config in self.predict_state_config.items(): - if self.current_tool_call_name and config["tool"] == self.current_tool_call_name: - logger.info( - "Tool '%s' matches predictive config for state key '%s'", - self.current_tool_call_name, - state_key, + # Emit complete state snapshot + state_snapshot_event = StateSnapshotEvent( + snapshot=self.current_state, + ) + events.append(state_snapshot_event) + + # Check if this was a predictive state update tool (e.g., write_document_local) + # If so, emit a confirm_changes tool call for the UI modal + tool_was_predictive = False + logger.debug( + f"Checking predictive state: current_tool='{self.current_tool_call_name}', " + f"predict_config={list(self.predict_state_config.keys()) if self.predict_state_config else 'None'}" ) - tool_was_predictive = True - break + for state_key, config in self.predict_state_config.items(): + # Check if this tool call matches a predictive config + # We need to match against self.current_tool_call_name + if self.current_tool_call_name and config["tool"] == self.current_tool_call_name: + logger.info( + f"Tool '{self.current_tool_call_name}' matches predictive config for state key '{state_key}'" + ) + tool_was_predictive = True + break - if tool_was_predictive and self.require_confirmation: - events.extend(self._emit_confirm_changes_tool_call()) - elif tool_was_predictive: - logger.info("Skipping confirm_changes - require_confirmation is False") + if tool_was_predictive and self.require_confirmation: + # Emit confirm_changes tool call sequence + confirm_call_id = generate_event_id() - self.pending_state_updates.clear() - self.last_emitted_state = deepcopy(self.current_state) - self.current_tool_call_name = None - return events + logger.info("Emitting confirm_changes tool call for predictive update") - def _emit_confirm_changes_tool_call(self) -> list[BaseEvent]: - events: list[BaseEvent] = [] - confirm_call_id = generate_event_id() - logger.info("Emitting confirm_changes tool call for predictive update") - - self.pending_tool_calls.append( - { - "id": confirm_call_id, - "type": "function", - "function": { - "name": "confirm_changes", - "arguments": "{}", - }, - } - ) + # Track confirm_changes tool call for MessagesSnapshotEvent (so it persists after RUN_FINISHED) + self.pending_tool_calls.append( + { + "id": confirm_call_id, + "type": "function", + "function": { + "name": "confirm_changes", + "arguments": "{}", + }, + } + ) - confirm_start = ToolCallStartEvent( - tool_call_id=confirm_call_id, - tool_call_name="confirm_changes", - ) - events.append(confirm_start) + # Start the confirm_changes tool call + confirm_start = ToolCallStartEvent( + tool_call_id=confirm_call_id, + tool_call_name="confirm_changes", + ) + events.append(confirm_start) - confirm_args = ToolCallArgsEvent( - tool_call_id=confirm_call_id, - delta="{}", - ) - events.append(confirm_args) + # Empty args for confirm_changes + confirm_args = ToolCallArgsEvent( + tool_call_id=confirm_call_id, + delta="{}", + ) + events.append(confirm_args) - confirm_end = ToolCallEndEvent( - tool_call_id=confirm_call_id, - ) - events.append(confirm_end) + # End the confirm_changes tool call + confirm_end = ToolCallEndEvent( + tool_call_id=confirm_call_id, + ) + events.append(confirm_end) + + # Emit MessagesSnapshotEvent so confirm_changes persists after RUN_FINISHED + # Import message adapter + from ._message_adapters import agent_framework_messages_to_agui + + # Build assistant message with pending confirm_changes tool call + assistant_message = { + "id": generate_event_id(), + "role": "assistant", + "tool_calls": self.pending_tool_calls.copy(), # Includes confirm_changes + } + + # Convert Agent Framework messages to AG-UI format (adds required 'id' field) + converted_input_messages = agent_framework_messages_to_agui(self.input_messages) + + # Build complete messages array: input messages + assistant message + any tool results + all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() + + # Emit MessagesSnapshotEvent + # Note: messages are dict[str, Any] but Pydantic will validate them as Message types + messages_snapshot_event = MessagesSnapshotEvent( + type=EventType.MESSAGES_SNAPSHOT, + messages=all_messages, # type: ignore[arg-type] + ) + logger.info( + f"Emitting MessagesSnapshotEvent for confirm_changes with {len(all_messages)} messages" + ) + events.append(messages_snapshot_event) - from ._message_adapters import agent_framework_messages_to_agui + # Set flag to stop the run after this - we're waiting for user response + self.should_stop_after_confirm = True + logger.info("Set flag to stop run after confirm_changes") + elif tool_was_predictive: + logger.info("Skipping confirm_changes - require_confirmation is False") - assistant_message = { - "id": generate_event_id(), - "role": "assistant", - "tool_calls": self.pending_tool_calls.copy(), - } + # Clear pending updates and reset tool name tracker + self.pending_state_updates.clear() + self.last_emitted_state.clear() + self.current_tool_call_name = None # Reset for next tool call - converted_input_messages = agent_framework_messages_to_agui(self.input_messages) - all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() + elif isinstance(content, FunctionApprovalRequestContent): + # Human in the loop - function approval request + logger.info("=== FUNCTION APPROVAL REQUEST ===") + logger.info(f" Function: {content.function_call.name}") + logger.info(f" Call ID: {content.function_call.call_id}") + + # Parse the arguments to extract state for predictive UI updates + parsed_args = content.function_call.parse_arguments() + logger.info(f" Parsed args keys: {list(parsed_args.keys()) if parsed_args else 'None'}") + + # Check if this matches our predict_state_config and emit state + if parsed_args and self.predict_state_config: + logger.info(f" Checking predict_state_config: {self.predict_state_config}") + for state_key, config in self.predict_state_config.items(): + if config["tool"] == content.function_call.name: + tool_arg_name = config["tool_argument"] + logger.info( + f" MATCHED tool '{content.function_call.name}' for state key '{state_key}', arg='{tool_arg_name}'" + ) - messages_snapshot_event = MessagesSnapshotEvent( - type=EventType.MESSAGES_SNAPSHOT, - messages=all_messages, # type: ignore[arg-type] - ) - logger.info(f"Emitting MessagesSnapshotEvent for confirm_changes with {len(all_messages)} messages") - events.append(messages_snapshot_event) + # Extract the state value + if tool_arg_name == "*": + state_value = parsed_args + elif tool_arg_name in parsed_args: + state_value = parsed_args[tool_arg_name] + else: + logger.warning(f" Tool argument '{tool_arg_name}' not found in parsed args") + continue + + # Update current state + self.current_state[state_key] = state_value + logger.info( + f"Emitting StateSnapshotEvent for key '{state_key}', value type: {type(state_value)}" + ) - self.should_stop_after_confirm = True - logger.info("Set flag to stop run after confirm_changes") - return events + # Emit state snapshot + state_snapshot = StateSnapshotEvent( + snapshot=self.current_state, + ) + events.append(state_snapshot) - def _handle_function_approval_request_content(self, content: FunctionApprovalRequestContent) -> list[BaseEvent]: - events: list[BaseEvent] = [] - logger.info("=== FUNCTION APPROVAL REQUEST ===") - logger.info(f" Function: {content.function_call.name}") - logger.info(f" Call ID: {content.function_call.call_id}") - - parsed_args = content.function_call.parse_arguments() - parsed_arg_keys = list(parsed_args.keys()) if parsed_args else "None" - logger.info(f" Parsed args keys: {parsed_arg_keys}") - - if parsed_args and self.predict_state_config: - logger.info( - " Checking predict_state_config keys: %s", - list(self.predict_state_config.keys()) if self.predict_state_config else "None", - ) - for state_key, config in self.predict_state_config.items(): - if config["tool"] != content.function_call.name: - continue - tool_arg_name = config["tool_argument"] - logger.info( - " MATCHED tool '%s' for state key '%s', arg='%s'", - content.function_call.name, - state_key, - tool_arg_name, + # The tool call has been streamed already (Start/Args events) + # Now we need to close it with an End event before the agent waits for approval + if content.function_call.call_id: + end_event = ToolCallEndEvent( + tool_call_id=content.function_call.call_id, + ) + logger.info( + f"Emitting ToolCallEndEvent for approval-required tool '{content.function_call.call_id}'" + ) + events.append(end_event) + self.tool_calls_ended.add(content.function_call.call_id) # Track that we emitted end event + + # Emit custom event for approval request + # Note: In AG-UI protocol, the frontend handles interrupts automatically + # when it sees a tool call with the configured name (via predict_state_config) + # This custom event is for additional metadata if needed + approval_event = CustomEvent( + name="function_approval_request", + value={ + "id": content.id, + "function_call": { + "call_id": content.function_call.call_id, + "name": content.function_call.name, + "arguments": content.function_call.parse_arguments(), + }, + }, ) + logger.info(f"Emitting function_approval_request custom event for '{content.function_call.name}'") + events.append(approval_event) - state_value: Any - if tool_arg_name == "*": - state_value = parsed_args - elif tool_arg_name in parsed_args: - state_value = parsed_args[tool_arg_name] - else: - logger.warning(f" Tool argument '{tool_arg_name}' not found in parsed args") - continue - - self.current_state[state_key] = state_value - logger.info("Emitting StateSnapshotEvent for key '%s', value type: %s", state_key, type(state_value)) # type: ignore - state_snapshot = StateSnapshotEvent( - snapshot=self.current_state, - ) - events.append(state_snapshot) - - if content.function_call.call_id: - end_event = ToolCallEndEvent( - tool_call_id=content.function_call.call_id, - ) - logger.info(f"Emitting ToolCallEndEvent for approval-required tool '{content.function_call.call_id}'") - events.append(end_event) - self.tool_calls_ended.add(content.function_call.call_id) - - approval_event = CustomEvent( - name="function_approval_request", - value={ - "id": content.id, - "function_call": { - "call_id": content.function_call.call_id, - "name": content.function_call.name, - "arguments": content.function_call.parse_arguments(), - }, - }, - ) - logger.info(f"Emitting function_approval_request custom event for '{content.function_call.name}'") - events.append(approval_event) return events def create_run_started_event(self) -> RunStartedEvent: diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_utils.py b/python/packages/ag-ui/agent_framework_ag_ui/_utils.py index 5222f21290..404e8ace68 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_utils.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_utils.py @@ -177,7 +177,13 @@ def serialize_content_result(result: Any) -> str: # noqa: ANN401 result: The result to serialize (dict, list, or other types). Returns: - Serialized string representation. + Serialized string representation: + - None: returns empty string "" + - dict: returns JSON string + - empty list: returns empty string "" + - single TextContent-like item with str text: returns plain text string + - multiple items: returns JSON array + - other types: returns str(result) """ if result is None: return "" @@ -186,13 +192,17 @@ def serialize_content_result(result: Any) -> str: # noqa: ANN401 if isinstance(result, list): if not result: # Empty list returns empty string return "" - texts: list[str] = [] + items: list[Any] = [] for item in result: - if hasattr(item, "text"): # TextContent - texts.append(item.text) + if hasattr(item, "text") and isinstance(item.text, str): # TextContent + items.append(item.text) elif hasattr(item, "model_dump"): - texts.append(json.dumps(item.model_dump(mode="json"))) + # Keep as dict, let final json.dumps handle it + items.append(item.model_dump(mode="json")) else: - texts.append(str(item)) - return texts[0] if len(texts) == 1 else json.dumps(texts) + items.append(str(item)) + # Single string item returns plain text + if len(items) == 1 and isinstance(items[0], str): + return items[0] + return json.dumps(items) return str(result) diff --git a/python/packages/ag-ui/tests/test_events_comprehensive.py b/python/packages/ag-ui/tests/test_events_comprehensive.py index a51d1f382a..fdb985f59f 100644 --- a/python/packages/ag-ui/tests/test_events_comprehensive.py +++ b/python/packages/ag-ui/tests/test_events_comprehensive.py @@ -688,3 +688,95 @@ async def test_state_delta_count_logging(): # State delta count should have incremented (one per unique state update) assert bridge.state_delta_count >= 1 + + +# Tests for list type tool results (MCP tool serialization) + + +async def test_tool_result_with_empty_list(): + """Test FunctionResultContent with empty list result.""" + from agent_framework_ag_ui._events import AgentFrameworkEventBridge + + bridge = AgentFrameworkEventBridge(run_id="test_run", thread_id="test_thread") + + update = AgentRunResponseUpdate(contents=[FunctionResultContent(call_id="call_123", result=[])]) + events = await bridge.from_agent_run_update(update) + + assert len(events) == 2 + assert events[0].type == "TOOL_CALL_END" + assert events[1].type == "TOOL_CALL_RESULT" + # Empty list should return empty string + assert events[1].content == "" + + +async def test_tool_result_with_single_text_content(): + """Test FunctionResultContent with single TextContent-like item (MCP tool result).""" + from agent_framework_ag_ui._events import AgentFrameworkEventBridge + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + bridge = AgentFrameworkEventBridge(run_id="test_run", thread_id="test_thread") + + update = AgentRunResponseUpdate( + contents=[FunctionResultContent(call_id="call_123", result=[MockTextContent("Hello from MCP tool!")])] + ) + events = await bridge.from_agent_run_update(update) + + assert len(events) == 2 + assert events[0].type == "TOOL_CALL_END" + assert events[1].type == "TOOL_CALL_RESULT" + # Single TextContent should return plain text, not JSON array + assert events[1].content == "Hello from MCP tool!" + + +async def test_tool_result_with_multiple_text_contents(): + """Test FunctionResultContent with multiple TextContent-like items (MCP tool result).""" + from agent_framework_ag_ui._events import AgentFrameworkEventBridge + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + bridge = AgentFrameworkEventBridge(run_id="test_run", thread_id="test_thread") + + update = AgentRunResponseUpdate( + contents=[ + FunctionResultContent( + call_id="call_123", + result=[MockTextContent("First result"), MockTextContent("Second result")], + ) + ] + ) + events = await bridge.from_agent_run_update(update) + + assert len(events) == 2 + assert events[0].type == "TOOL_CALL_END" + assert events[1].type == "TOOL_CALL_RESULT" + # Multiple TextContent items should return JSON array + assert events[1].content == '["First result", "Second result"]' + + +async def test_tool_result_with_model_dump_objects(): + """Test FunctionResultContent with objects that have model_dump method.""" + from agent_framework_ag_ui._events import AgentFrameworkEventBridge + + class MockModel: + def __init__(self, value: int): + self._value = value + + def model_dump(self, mode: str = "python"): + return {"value": self._value} + + bridge = AgentFrameworkEventBridge(run_id="test_run", thread_id="test_thread") + + update = AgentRunResponseUpdate( + contents=[FunctionResultContent(call_id="call_123", result=[MockModel(1), MockModel(2)])] + ) + events = await bridge.from_agent_run_update(update) + + assert len(events) == 2 + assert events[1].type == "TOOL_CALL_RESULT" + # Should be properly serialized JSON array without double escaping + assert events[1].content == '[{"value": 1}, {"value": 2}]' diff --git a/python/packages/ag-ui/tests/test_utils.py b/python/packages/ag-ui/tests/test_utils.py index dc01479327..82155e63e4 100644 --- a/python/packages/ag-ui/tests/test_utils.py +++ b/python/packages/ag-ui/tests/test_utils.py @@ -361,8 +361,9 @@ def model_dump(self, mode: str = "python"): return {"type": "model", "value": 123} result = serialize_content_result([MockModel()]) - # Single item should be the JSON string of the model dump - assert result == '{"type": "model", "value": 123}' + # Single non-string item is still serialized as JSON array + # Only TextContent-like items (with str text attr) get unwrapped + assert result == '[{"type": "model", "value": 123}]' def test_serialize_content_result_multiple_model_dump_objects(): @@ -376,7 +377,8 @@ def model_dump(self, mode: str = "python"): return {"value": self._value} result = serialize_content_result([MockModel(1), MockModel(2)]) - assert result == '["{\\"value\\": 1}", "{\\"value\\": 2}"]' + # Multiple dict items should be serialized as JSON array without double-escaping + assert result == '[{"value": 1}, {"value": 2}]' def test_serialize_content_result_string_fallback(): @@ -415,3 +417,18 @@ def test_serialize_content_result_number(): """Test serializing number returns string representation.""" result = serialize_content_result(42) assert result == "42" + + +def test_serialize_content_result_text_with_non_string_text_attr(): + """Test that items with non-string text attribute are handled correctly.""" + + class BadTextContent: + def __init__(self): + self.text = 12345 # Not a string! + + def __str__(self): + return "BadTextContent" + + # Should fall back to str() since text is not a string + result = serialize_content_result([BadTextContent()]) + assert result == "BadTextContent" From df894e6b57f3895f32fbd244a7bf5908bb289440 Mon Sep 17 00:00:00 2001 From: claude89757 <138977524+claude89757@users.noreply.github.com> Date: Sat, 3 Jan 2026 14:50:24 +0800 Subject: [PATCH 4/7] Simplify PR: minimal changes to fix MCP tool result serialization Addresses reviewer feedback about excessive refactoring: - Reset _events.py to original structure - Only add import and use serialize_content_result in one location - All review comments addressed in serialize_content_result(): - Added isinstance(item.text, str) check - Use model_dump(mode="json") to avoid double-serialization - Improved docstring with explicit return value documentation - Empty list returns "" instead of "[]" --- .../ag-ui/agent_framework_ag_ui/_events.py | 987 +++++++++--------- 1 file changed, 469 insertions(+), 518 deletions(-) diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_events.py b/python/packages/ag-ui/agent_framework_ag_ui/_events.py index 76fc7c33b5..17d8cda23b 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_events.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_events.py @@ -5,6 +5,7 @@ import json import logging import re +from copy import deepcopy from typing import Any from ag_ui.core import ( @@ -104,566 +105,516 @@ async def from_agent_run_update(self, update: AgentRunResponseUpdate) -> list[Ba for idx, content in enumerate(update.contents): logger.info(f" Content {idx}: type={type(content).__name__}") if isinstance(content, TextContent): - logger.info( - f" TextContent found: text_length={len(content.text)}, text_preview='{content.text[:100]}'" - ) - logger.info( - f" Flags: skip_text_content={self.skip_text_content}, should_stop_after_confirm={self.should_stop_after_confirm}" - ) + events.extend(self._handle_text_content(content)) + elif isinstance(content, FunctionCallContent): + events.extend(self._handle_function_call_content(content)) + elif isinstance(content, FunctionResultContent): + events.extend(self._handle_function_result_content(content)) + elif isinstance(content, FunctionApprovalRequestContent): + events.extend(self._handle_function_approval_request_content(content)) - # Skip text content if using structured outputs (it's just the JSON) - if self.skip_text_content: - logger.info(" SKIPPING TextContent: skip_text_content is True") - continue + return events - # Skip text content if we're about to emit confirm_changes - # The summary should only appear after user confirms - if self.should_stop_after_confirm: - logger.info(" SKIPPING TextContent: waiting for confirm_changes response") - # Save the summary text to show after confirmation - self.suppressed_summary += content.text - logger.info(f" Suppressed summary now has {len(self.suppressed_summary)} chars") - continue + def _handle_text_content(self, content: TextContent) -> list[BaseEvent]: + events: list[BaseEvent] = [] + logger.info(f" TextContent found: length={len(content.text)}") + logger.info( + " Flags: skip_text_content=%s, should_stop_after_confirm=%s", + self.skip_text_content, + self.should_stop_after_confirm, + ) - if not self.current_message_id: - self.current_message_id = generate_event_id() - start_event = TextMessageStartEvent( - message_id=self.current_message_id, - role="assistant", - ) - logger.info(f" EMITTING TextMessageStartEvent with message_id={self.current_message_id}") - events.append(start_event) + if self.skip_text_content: + logger.info(" SKIPPING TextContent: skip_text_content is True") + return events + + if self.should_stop_after_confirm: + logger.info(" SKIPPING TextContent: waiting for confirm_changes response") + self.suppressed_summary += content.text + logger.info(f" Suppressed summary length={len(self.suppressed_summary)}") + return events + + # Skip empty text chunks to avoid emitting + # TextMessageContentEvent with an empty `delta` which fails + # Pydantic validation (AG-UI requires non-empty strings). + if not content.text: + logger.info(" SKIPPING TextContent: empty chunk") + return events + + if not self.current_message_id: + self.current_message_id = generate_event_id() + start_event = TextMessageStartEvent( + message_id=self.current_message_id, + role="assistant", + ) + logger.info(f" EMITTING TextMessageStartEvent with message_id={self.current_message_id}") + events.append(start_event) + + event = TextMessageContentEvent( + message_id=self.current_message_id, + delta=content.text, + ) + self.accumulated_text_content += content.text + logger.info(f" EMITTING TextMessageContentEvent with text_len={len(content.text)}") + events.append(event) + return events - event = TextMessageContentEvent( - message_id=self.current_message_id, - delta=content.text, - ) - # Accumulate text content for final MessagesSnapshotEvent - self.accumulated_text_content += content.text - logger.info(f" EMITTING TextMessageContentEvent with delta: '{content.text}'") - events.append(event) + def _handle_function_call_content(self, content: FunctionCallContent) -> list[BaseEvent]: + events: list[BaseEvent] = [] + if content.name: + logger.debug(f"Tool call: {content.name} (call_id: {content.call_id})") + + if not content.name and not content.call_id and not self.current_tool_call_name: + args_length = len(str(content.arguments)) if content.arguments else 0 + logger.warning(f"FunctionCallContent missing name and call_id. args_length={args_length}") + + tool_call_id = self._coalesce_tool_call_id(content) + if content.name and tool_call_id != self.current_tool_call_id: + self.streaming_tool_args = "" + self.state_delta_count = 0 + if content.name: + self.current_tool_call_id = tool_call_id + self.current_tool_call_name = content.name + + tool_start_event = ToolCallStartEvent( + tool_call_id=tool_call_id, + tool_call_name=content.name, + parent_message_id=self.current_message_id, + ) + logger.info(f"Emitting ToolCallStartEvent with name='{content.name}', id='{tool_call_id}'") + events.append(tool_start_event) + + self.pending_tool_calls.append( + { + "id": tool_call_id, + "type": "function", + "function": { + "name": content.name, + "arguments": "", + }, + } + ) + elif tool_call_id: + self.current_tool_call_id = tool_call_id + + if content.arguments: + delta_str = content.arguments if isinstance(content.arguments, str) else json.dumps(content.arguments) + logger.info(f"Emitting ToolCallArgsEvent with delta_length={len(delta_str)}, id='{tool_call_id}'") + args_event = ToolCallArgsEvent( + tool_call_id=tool_call_id, + delta=delta_str, + ) + events.append(args_event) + + for tool_call in self.pending_tool_calls: + if tool_call["id"] == tool_call_id: + tool_call["function"]["arguments"] += delta_str + break + + events.extend(self._emit_predictive_state_deltas(delta_str)) + events.extend(self._legacy_predictive_state(content)) - elif isinstance(content, FunctionCallContent): - # Log tool calls for debugging - if content.name: - logger.debug(f"Tool call: {content.name} (call_id: {content.call_id})") - - if not content.name and not content.call_id and not self.current_tool_call_name: - args_preview = str(content.arguments)[:50] if content.arguments else "None" - logger.warning(f"FunctionCallContent missing name and call_id. Args: {args_preview}") - - # Get or use existing tool call ID - all chunks of same tool call share the same call_id - # Important: the first chunk might have name but no call_id yet - if content.call_id: - tool_call_id = content.call_id - elif self.current_tool_call_id: - tool_call_id = self.current_tool_call_id - else: - # Generate a new ID for this tool call - tool_call_id = ( - generate_event_id() - ) # Handle streaming tool calls - name comes in first chunk, arguments in subsequent chunks - if content.name: - # This is a new tool call or the first chunk with the name - self.current_tool_call_id = tool_call_id - self.current_tool_call_name = content.name - - tool_start_event = ToolCallStartEvent( - tool_call_id=tool_call_id, - tool_call_name=content.name, - parent_message_id=self.current_message_id, - ) - logger.info(f"Emitting ToolCallStartEvent with name='{content.name}', id='{tool_call_id}'") - events.append(tool_start_event) - - # Track tool call for MessagesSnapshotEvent - # Initialize a new tool call entry - self.pending_tool_calls.append( - { - "id": tool_call_id, - "type": "function", - "function": { - "name": content.name, - "arguments": "", # Will accumulate as we get argument chunks - }, - } - ) - else: - # Subsequent chunk without name - update our tracked ID if needed - if tool_call_id: - self.current_tool_call_id = tool_call_id - - # Emit arguments if present - if content.arguments: - # content.arguments is already a JSON string from the LLM for streaming calls - # For non-streaming it could be a dict, so we need to handle both - if isinstance(content.arguments, str): - delta_str = content.arguments - else: - # If it's a dict, convert to JSON - delta_str = json.dumps(content.arguments) - - logger.info(f"Emitting ToolCallArgsEvent with delta: {delta_str!r}..., id='{tool_call_id}'") - args_event = ToolCallArgsEvent( - tool_call_id=tool_call_id, - delta=delta_str, - ) - events.append(args_event) - - # Accumulate arguments for MessagesSnapshotEvent - if self.pending_tool_calls: - # Find the matching tool call and append the delta - for tool_call in self.pending_tool_calls: - if tool_call["id"] == tool_call_id: - tool_call["function"]["arguments"] += delta_str - break - - # Predictive state updates - accumulate streaming arguments and emit deltas - # Use current_tool_call_name since content.name is only present on first chunk - if self.current_tool_call_name and self.predict_state_config: - # Accumulate the argument string - if isinstance(content.arguments, str): - self.streaming_tool_args += content.arguments - else: - self.streaming_tool_args += json.dumps(content.arguments) - - logger.debug( - f"Predictive state: accumulated {len(self.streaming_tool_args)} chars for tool '{self.current_tool_call_name}'" + return events + + def _coalesce_tool_call_id(self, content: FunctionCallContent) -> str: + if content.call_id: + return content.call_id + if self.current_tool_call_id: + return self.current_tool_call_id + return generate_event_id() + + def _emit_predictive_state_deltas(self, argument_chunk: str) -> list[BaseEvent]: + events: list[BaseEvent] = [] + if not self.current_tool_call_name or not self.predict_state_config: + return events + + self.streaming_tool_args += argument_chunk + logger.debug( + "Predictive state: accumulated %s chars for tool '%s'", + len(self.streaming_tool_args), + self.current_tool_call_name, + ) + + parsed_args = None + try: + parsed_args = json.loads(self.streaming_tool_args) + except json.JSONDecodeError: + for state_key, config in self.predict_state_config.items(): + if config["tool"] != self.current_tool_call_name: + continue + tool_arg_name = config["tool_argument"] + pattern = rf'"{re.escape(tool_arg_name)}":\s*"([^"]*)' + match = re.search(pattern, self.streaming_tool_args) + + if match: + partial_value = match.group(1).replace("\\n", "\n").replace('\\"', '"').replace("\\\\", "\\") + + if state_key not in self.last_emitted_state or self.last_emitted_state[state_key] != partial_value: + state_delta_event = StateDeltaEvent( + delta=[ + { + "op": "replace", + "path": f"/{state_key}", + "value": partial_value, + } + ], ) - # Try to parse accumulated arguments (may be incomplete JSON) - # We use a lenient approach: try standard parsing first, then try to extract partial values - parsed_args = None - try: - parsed_args = json.loads(self.streaming_tool_args) - except json.JSONDecodeError: - # JSON is incomplete - try to extract partial string values - # For streaming "document" field, we can extract: {"document": "text... - # Look for pattern: {"field": "value (incomplete) - for state_key, config in self.predict_state_config.items(): - if config["tool"] == self.current_tool_call_name: - tool_arg_name = config["tool_argument"] - - # Try to extract partial string value for this argument - # Pattern: "argument_name": "partial text - pattern = rf'"{re.escape(tool_arg_name)}":\s*"([^"]*)' - match = re.search(pattern, self.streaming_tool_args) - - if match: - partial_value = match.group(1) - # Unescape common sequences - partial_value = ( - partial_value.replace("\\n", "\n").replace('\\"', '"').replace("\\\\", "\\") - ) - - # Emit delta if we have new content - if ( - state_key not in self.last_emitted_state - or self.last_emitted_state[state_key] != partial_value - ): - state_delta_event = StateDeltaEvent( - delta=[ - { - "op": "replace", - "path": f"/{state_key}", - "value": partial_value, - } - ], - ) - - self.state_delta_count += 1 - if self.state_delta_count % 10 == 1: - value_preview = ( - str(partial_value)[:100] + "..." - if len(str(partial_value)) > 100 - else str(partial_value) - ) - logger.info( - f"StateDeltaEvent #{self.state_delta_count} for '{state_key}': " - f"op=replace, path=/{state_key}, value={value_preview}" - ) - elif self.state_delta_count % 100 == 0: - logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") - - events.append(state_delta_event) - self.last_emitted_state[state_key] = partial_value - self.pending_state_updates[state_key] = partial_value - - # If we successfully parsed complete JSON, process it - if parsed_args: - # Check if this tool matches any predictive state config - for state_key, config in self.predict_state_config.items(): - if config["tool"] == self.current_tool_call_name: - tool_arg_name = config["tool_argument"] - - # Extract the state value - if tool_arg_name == "*": - state_value = parsed_args - elif tool_arg_name in parsed_args: - state_value = parsed_args[tool_arg_name] - else: - continue - - # Only emit if state has changed from last emission - if ( - state_key not in self.last_emitted_state - or self.last_emitted_state[state_key] != state_value - ): - # Emit StateDeltaEvent for real-time UI updates (JSON Patch format) - state_delta_event = StateDeltaEvent( - delta=[ - { - "op": "replace", # Use replace since field exists in schema - "path": f"/{state_key}", # JSON Pointer path with leading slash - "value": state_value, - } - ], - ) - - # Increment counter and log every 10th emission with sample data - self.state_delta_count += 1 - if self.state_delta_count % 10 == 1: # Log 1st, 11th, 21st, etc. - value_preview = ( - str(state_value)[:100] + "..." - if len(str(state_value)) > 100 - else str(state_value) - ) - logger.info( - f"StateDeltaEvent #{self.state_delta_count} for '{state_key}': " - f"op=replace, path=/{state_key}, value={value_preview}" - ) - elif self.state_delta_count % 100 == 0: # Also log every 100th - logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") - - events.append(state_delta_event) - - # Track what we emitted - self.last_emitted_state[state_key] = state_value - self.pending_state_updates[state_key] = state_value - - # Legacy predictive state check (for when arguments are complete) - if content.name and content.arguments: - parsed_args = content.parse_arguments() - - if parsed_args: - logger.info(f"Checking predict_state_config: {self.predict_state_config}") - for state_key, config in self.predict_state_config.items(): - logger.info(f"Checking state_key='{state_key}', config={config}") - if config["tool"] == content.name: - tool_arg_name = config["tool_argument"] - logger.info( - f"MATCHED tool '{content.name}' for state key '{state_key}', arg='{tool_arg_name}'" - ) - - # If tool_argument is "*", use all arguments as the state value - if tool_arg_name == "*": - state_value = parsed_args - logger.info(f"Using all args as state value, keys: {list(state_value.keys())}") - elif tool_arg_name in parsed_args: - state_value = parsed_args[tool_arg_name] - logger.info(f"Using specific arg '{tool_arg_name}' as state value") - else: - logger.warning(f"Tool argument '{tool_arg_name}' not found in parsed args") - continue - - # Emit predictive delta (JSON Patch format) - state_delta_event = StateDeltaEvent( - delta=[ - { - "op": "replace", # Use replace since field exists in schema - "path": f"/{state_key}", # JSON Pointer path with leading slash - "value": state_value, - } - ], - ) - logger.info( - f"Emitting StateDeltaEvent for key '{state_key}', value type: {type(state_value)}" - ) - events.append(state_delta_event) - - # Track pending update for later snapshot - self.pending_state_updates[state_key] = state_value - - # Note: ToolCallEndEvent is emitted when we receive FunctionResultContent, - # not here during streaming, since we don't know when the stream is complete + self.state_delta_count += 1 + if self.state_delta_count % 10 == 1: + logger.info( + "StateDeltaEvent #%s for '%s': op=replace, path=/%s, value_length=%s", + self.state_delta_count, + state_key, + state_key, + len(str(partial_value)), + ) + elif self.state_delta_count % 100 == 0: + logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") - elif isinstance(content, FunctionResultContent): - # First emit ToolCallEndEvent to close the tool call - if content.call_id: - end_event = ToolCallEndEvent( - tool_call_id=content.call_id, + events.append(state_delta_event) + self.last_emitted_state[state_key] = partial_value + self.pending_state_updates[state_key] = partial_value + + if parsed_args: + for state_key, config in self.predict_state_config.items(): + if config["tool"] != self.current_tool_call_name: + continue + tool_arg_name = config["tool_argument"] + + if tool_arg_name == "*": + state_value = parsed_args + elif tool_arg_name in parsed_args: + state_value = parsed_args[tool_arg_name] + else: + continue + + if state_key not in self.last_emitted_state or self.last_emitted_state[state_key] != state_value: + state_delta_event = StateDeltaEvent( + delta=[ + { + "op": "replace", + "path": f"/{state_key}", + "value": state_value, + } + ], ) - logger.info(f"Emitting ToolCallEndEvent for completed tool call '{content.call_id}'") - events.append(end_event) - self.tool_calls_ended.add(content.call_id) # Track that we emitted end event - # Log total StateDeltaEvent count for this tool call - if self.state_delta_count > 0: + self.state_delta_count += 1 + if self.state_delta_count % 10 == 1: logger.info( - f"Tool call '{content.call_id}' complete: emitted {self.state_delta_count} StateDeltaEvents total" + "StateDeltaEvent #%s for '%s': op=replace, path=/%s, value_length=%s", + self.state_delta_count, + state_key, + state_key, + len(str(state_value)), ) + elif self.state_delta_count % 100 == 0: + logger.info(f"StateDeltaEvent #{self.state_delta_count} emitted") - # Reset streaming accumulator and counter for next tool call - self.streaming_tool_args = "" - self.state_delta_count = 0 - - # Tool result - emit ToolCallResultEvent - result_message_id = generate_event_id() - result_content = serialize_content_result(content.result) + events.append(state_delta_event) + self.last_emitted_state[state_key] = state_value + self.pending_state_updates[state_key] = state_value + return events - result_event = ToolCallResultEvent( - message_id=result_message_id, - tool_call_id=content.call_id, - content=result_content, - role="tool", + def _legacy_predictive_state(self, content: FunctionCallContent) -> list[BaseEvent]: + events: list[BaseEvent] = [] + if not (content.name and content.arguments): + return events + parsed_args = content.parse_arguments() + if not parsed_args: + return events + + logger.info( + "Checking predict_state_config keys: %s", + list(self.predict_state_config.keys()) if self.predict_state_config else "None", + ) + for state_key, config in self.predict_state_config.items(): + logger.info(f"Checking state_key='{state_key}'") + if config["tool"] != content.name: + continue + tool_arg_name = config["tool_argument"] + logger.info(f"MATCHED tool '{content.name}' for state key '{state_key}', arg='{tool_arg_name}'") + + state_value: Any + if tool_arg_name == "*": + state_value = parsed_args + logger.info(f"Using all args as state value, keys: {list(state_value.keys())}") + elif tool_arg_name in parsed_args: + state_value = parsed_args[tool_arg_name] + logger.info(f"Using specific arg '{tool_arg_name}' as state value") + else: + logger.warning(f"Tool argument '{tool_arg_name}' not found in parsed args") + continue + + previous_value = self.last_emitted_state.get(state_key, object()) + if previous_value == state_value: + logger.info( + "Skipping duplicate StateDeltaEvent for key '%s' - value unchanged", + state_key, ) - events.append(result_event) + continue - # Track tool result for MessagesSnapshotEvent - # AG-UI protocol expects: { role: "tool", toolCallId: ..., content: ... } - # Use camelCase for Pydantic's alias_generator=to_camel - self.tool_results.append( + state_delta_event = StateDeltaEvent( + delta=[ { - "id": result_message_id, - "role": "tool", - "toolCallId": content.call_id, - "content": result_content, + "op": "replace", + "path": f"/{state_key}", + "value": state_value, } + ], + ) + logger.info(f"Emitting StateDeltaEvent for key '{state_key}', value type: {type(state_value)}") # type: ignore + events.append(state_delta_event) + self.pending_state_updates[state_key] = state_value + self.last_emitted_state[state_key] = state_value + return events + + def _handle_function_result_content(self, content: FunctionResultContent) -> list[BaseEvent]: + events: list[BaseEvent] = [] + if content.call_id: + end_event = ToolCallEndEvent( + tool_call_id=content.call_id, + ) + logger.info(f"Emitting ToolCallEndEvent for completed tool call '{content.call_id}'") + events.append(end_event) + self.tool_calls_ended.add(content.call_id) + + if self.state_delta_count > 0: + logger.info( + "Tool call '%s' complete: emitted %s StateDeltaEvents total", + content.call_id, + self.state_delta_count, ) - # Emit MessagesSnapshotEvent with the complete conversation including tool calls and results - # This is required for CopilotKit's useCopilotAction to detect tool result - # HOWEVER: Skip this for predictive tools when require_confirmation=False, because - # the agent will generate a follow-up text message and we'll emit a complete snapshot at the end. - # Emitting here would create an incomplete snapshot that gets replaced, causing UI flicker. - should_emit_snapshot = self.pending_tool_calls and self.tool_results - - # Check if this is a predictive tool that will have a follow-up message - is_predictive_without_confirmation = False - if should_emit_snapshot and self.current_tool_call_name and self.predict_state_config: - for state_key, config in self.predict_state_config.items(): - if config["tool"] == self.current_tool_call_name and not self.require_confirmation: - is_predictive_without_confirmation = True - logger.info( - f"Skipping intermediate MessagesSnapshotEvent for predictive tool '{self.current_tool_call_name}' " - "- will emit complete snapshot after follow-up message" - ) - break + self.streaming_tool_args = "" + self.state_delta_count = 0 - if should_emit_snapshot and not is_predictive_without_confirmation: - # Import message adapter - from ._message_adapters import agent_framework_messages_to_agui + result_message_id = generate_event_id() + result_content = serialize_content_result(content.result) - # Build assistant message with tool_calls - assistant_message = { - "id": generate_event_id(), - "role": "assistant", - "tool_calls": self.pending_tool_calls.copy(), # Copy the accumulated tool calls - } + result_event = ToolCallResultEvent( + message_id=result_message_id, + tool_call_id=content.call_id, + content=result_content, + role="tool", + ) + events.append(result_event) + + self.tool_results.append( + { + "id": result_message_id, + "role": "tool", + "toolCallId": content.call_id, + "content": result_content, + } + ) - # Convert Agent Framework messages to AG-UI format (adds required 'id' field) - converted_input_messages = agent_framework_messages_to_agui(self.input_messages) + events.extend(self._emit_snapshot_for_tool_result()) + events.extend(self._emit_state_snapshot_and_confirmation()) - # Build complete messages array: input messages + assistant message + tool results - all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() + return events - # Emit MessagesSnapshotEvent using the proper event type - # Note: messages are dict[str, Any] but Pydantic will validate them as Message types - messages_snapshot_event = MessagesSnapshotEvent( - type=EventType.MESSAGES_SNAPSHOT, - messages=all_messages, # type: ignore[arg-type] - ) - logger.info(f"Emitting MessagesSnapshotEvent with {len(all_messages)} messages") - events.append(messages_snapshot_event) - - # After tool execution, emit StateSnapshotEvent if we have pending state updates - if self.pending_state_updates: - # Update the current state with pending updates - for key, value in self.pending_state_updates.items(): - self.current_state[key] = value - - # Log the state structure for debugging - logger.info(f"Emitting StateSnapshotEvent with keys: {list(self.current_state.keys())}") - if "recipe" in self.current_state: - recipe = self.current_state["recipe"] - logger.info( - f"Recipe fields: title={recipe.get('title')}, " - f"skill_level={recipe.get('skill_level')}, " - f"ingredients_count={len(recipe.get('ingredients', []))}, " - f"instructions_count={len(recipe.get('instructions', []))}" - ) + def _emit_snapshot_for_tool_result(self) -> list[BaseEvent]: + events: list[BaseEvent] = [] + should_emit_snapshot = self.pending_tool_calls and self.tool_results - # Emit complete state snapshot - state_snapshot_event = StateSnapshotEvent( - snapshot=self.current_state, + is_predictive_without_confirmation = False + if should_emit_snapshot and self.current_tool_call_name and self.predict_state_config: + for _, config in self.predict_state_config.items(): + if config["tool"] == self.current_tool_call_name and not self.require_confirmation: + is_predictive_without_confirmation = True + logger.info( + "Skipping intermediate MessagesSnapshotEvent for predictive tool '%s' - delaying until summary", + self.current_tool_call_name, ) - events.append(state_snapshot_event) - - # Check if this was a predictive state update tool (e.g., write_document_local) - # If so, emit a confirm_changes tool call for the UI modal - tool_was_predictive = False - logger.debug( - f"Checking predictive state: current_tool='{self.current_tool_call_name}', " - f"predict_config={list(self.predict_state_config.keys()) if self.predict_state_config else 'None'}" + break + + if should_emit_snapshot and not is_predictive_without_confirmation: + from ._message_adapters import agent_framework_messages_to_agui + + assistant_message = { + "id": generate_event_id(), + "role": "assistant", + "tool_calls": self.pending_tool_calls.copy(), + } + converted_input_messages = agent_framework_messages_to_agui(self.input_messages) + all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() + + messages_snapshot_event = MessagesSnapshotEvent( + type=EventType.MESSAGES_SNAPSHOT, + messages=all_messages, # type: ignore[arg-type] + ) + logger.info(f"Emitting MessagesSnapshotEvent with {len(all_messages)} messages") + events.append(messages_snapshot_event) + return events + + def _emit_state_snapshot_and_confirmation(self) -> list[BaseEvent]: + events: list[BaseEvent] = [] + if self.pending_state_updates: + for key, value in self.pending_state_updates.items(): + self.current_state[key] = value + + logger.info(f"Emitting StateSnapshotEvent with keys: {list(self.current_state.keys())}") + if "recipe" in self.current_state: + recipe = self.current_state["recipe"] + logger.info( + "Recipe fields: title=%s, skill_level=%s, ingredients_count=%s, instructions_count=%s", + recipe.get("title"), + recipe.get("skill_level"), + len(recipe.get("ingredients", [])), + len(recipe.get("instructions", [])), + ) + + state_snapshot_event = StateSnapshotEvent( + snapshot=self.current_state, + ) + events.append(state_snapshot_event) + + tool_was_predictive = False + logger.debug( + "Checking predictive state: current_tool='%s', predict_config=%s", + self.current_tool_call_name, + list(self.predict_state_config.keys()) if self.predict_state_config else "None", + ) + for state_key, config in self.predict_state_config.items(): + if self.current_tool_call_name and config["tool"] == self.current_tool_call_name: + logger.info( + "Tool '%s' matches predictive config for state key '%s'", + self.current_tool_call_name, + state_key, ) - for state_key, config in self.predict_state_config.items(): - # Check if this tool call matches a predictive config - # We need to match against self.current_tool_call_name - if self.current_tool_call_name and config["tool"] == self.current_tool_call_name: - logger.info( - f"Tool '{self.current_tool_call_name}' matches predictive config for state key '{state_key}'" - ) - tool_was_predictive = True - break + tool_was_predictive = True + break - if tool_was_predictive and self.require_confirmation: - # Emit confirm_changes tool call sequence - confirm_call_id = generate_event_id() + if tool_was_predictive and self.require_confirmation: + events.extend(self._emit_confirm_changes_tool_call()) + elif tool_was_predictive: + logger.info("Skipping confirm_changes - require_confirmation is False") - logger.info("Emitting confirm_changes tool call for predictive update") + self.pending_state_updates.clear() + self.last_emitted_state = deepcopy(self.current_state) + self.current_tool_call_name = None + return events - # Track confirm_changes tool call for MessagesSnapshotEvent (so it persists after RUN_FINISHED) - self.pending_tool_calls.append( - { - "id": confirm_call_id, - "type": "function", - "function": { - "name": "confirm_changes", - "arguments": "{}", - }, - } - ) + def _emit_confirm_changes_tool_call(self) -> list[BaseEvent]: + events: list[BaseEvent] = [] + confirm_call_id = generate_event_id() + logger.info("Emitting confirm_changes tool call for predictive update") + + self.pending_tool_calls.append( + { + "id": confirm_call_id, + "type": "function", + "function": { + "name": "confirm_changes", + "arguments": "{}", + }, + } + ) - # Start the confirm_changes tool call - confirm_start = ToolCallStartEvent( - tool_call_id=confirm_call_id, - tool_call_name="confirm_changes", - ) - events.append(confirm_start) + confirm_start = ToolCallStartEvent( + tool_call_id=confirm_call_id, + tool_call_name="confirm_changes", + ) + events.append(confirm_start) - # Empty args for confirm_changes - confirm_args = ToolCallArgsEvent( - tool_call_id=confirm_call_id, - delta="{}", - ) - events.append(confirm_args) + confirm_args = ToolCallArgsEvent( + tool_call_id=confirm_call_id, + delta="{}", + ) + events.append(confirm_args) - # End the confirm_changes tool call - confirm_end = ToolCallEndEvent( - tool_call_id=confirm_call_id, - ) - events.append(confirm_end) - - # Emit MessagesSnapshotEvent so confirm_changes persists after RUN_FINISHED - # Import message adapter - from ._message_adapters import agent_framework_messages_to_agui - - # Build assistant message with pending confirm_changes tool call - assistant_message = { - "id": generate_event_id(), - "role": "assistant", - "tool_calls": self.pending_tool_calls.copy(), # Includes confirm_changes - } - - # Convert Agent Framework messages to AG-UI format (adds required 'id' field) - converted_input_messages = agent_framework_messages_to_agui(self.input_messages) - - # Build complete messages array: input messages + assistant message + any tool results - all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() - - # Emit MessagesSnapshotEvent - # Note: messages are dict[str, Any] but Pydantic will validate them as Message types - messages_snapshot_event = MessagesSnapshotEvent( - type=EventType.MESSAGES_SNAPSHOT, - messages=all_messages, # type: ignore[arg-type] - ) - logger.info( - f"Emitting MessagesSnapshotEvent for confirm_changes with {len(all_messages)} messages" - ) - events.append(messages_snapshot_event) + confirm_end = ToolCallEndEvent( + tool_call_id=confirm_call_id, + ) + events.append(confirm_end) - # Set flag to stop the run after this - we're waiting for user response - self.should_stop_after_confirm = True - logger.info("Set flag to stop run after confirm_changes") - elif tool_was_predictive: - logger.info("Skipping confirm_changes - require_confirmation is False") + from ._message_adapters import agent_framework_messages_to_agui - # Clear pending updates and reset tool name tracker - self.pending_state_updates.clear() - self.last_emitted_state.clear() - self.current_tool_call_name = None # Reset for next tool call + assistant_message = { + "id": generate_event_id(), + "role": "assistant", + "tool_calls": self.pending_tool_calls.copy(), + } - elif isinstance(content, FunctionApprovalRequestContent): - # Human in the loop - function approval request - logger.info("=== FUNCTION APPROVAL REQUEST ===") - logger.info(f" Function: {content.function_call.name}") - logger.info(f" Call ID: {content.function_call.call_id}") - - # Parse the arguments to extract state for predictive UI updates - parsed_args = content.function_call.parse_arguments() - logger.info(f" Parsed args keys: {list(parsed_args.keys()) if parsed_args else 'None'}") - - # Check if this matches our predict_state_config and emit state - if parsed_args and self.predict_state_config: - logger.info(f" Checking predict_state_config: {self.predict_state_config}") - for state_key, config in self.predict_state_config.items(): - if config["tool"] == content.function_call.name: - tool_arg_name = config["tool_argument"] - logger.info( - f" MATCHED tool '{content.function_call.name}' for state key '{state_key}', arg='{tool_arg_name}'" - ) + converted_input_messages = agent_framework_messages_to_agui(self.input_messages) + all_messages = converted_input_messages + [assistant_message] + self.tool_results.copy() - # Extract the state value - if tool_arg_name == "*": - state_value = parsed_args - elif tool_arg_name in parsed_args: - state_value = parsed_args[tool_arg_name] - else: - logger.warning(f" Tool argument '{tool_arg_name}' not found in parsed args") - continue - - # Update current state - self.current_state[state_key] = state_value - logger.info( - f"Emitting StateSnapshotEvent for key '{state_key}', value type: {type(state_value)}" - ) + messages_snapshot_event = MessagesSnapshotEvent( + type=EventType.MESSAGES_SNAPSHOT, + messages=all_messages, # type: ignore[arg-type] + ) + logger.info(f"Emitting MessagesSnapshotEvent for confirm_changes with {len(all_messages)} messages") + events.append(messages_snapshot_event) - # Emit state snapshot - state_snapshot = StateSnapshotEvent( - snapshot=self.current_state, - ) - events.append(state_snapshot) + self.should_stop_after_confirm = True + logger.info("Set flag to stop run after confirm_changes") + return events - # The tool call has been streamed already (Start/Args events) - # Now we need to close it with an End event before the agent waits for approval - if content.function_call.call_id: - end_event = ToolCallEndEvent( - tool_call_id=content.function_call.call_id, - ) - logger.info( - f"Emitting ToolCallEndEvent for approval-required tool '{content.function_call.call_id}'" - ) - events.append(end_event) - self.tool_calls_ended.add(content.function_call.call_id) # Track that we emitted end event - - # Emit custom event for approval request - # Note: In AG-UI protocol, the frontend handles interrupts automatically - # when it sees a tool call with the configured name (via predict_state_config) - # This custom event is for additional metadata if needed - approval_event = CustomEvent( - name="function_approval_request", - value={ - "id": content.id, - "function_call": { - "call_id": content.function_call.call_id, - "name": content.function_call.name, - "arguments": content.function_call.parse_arguments(), - }, - }, + def _handle_function_approval_request_content(self, content: FunctionApprovalRequestContent) -> list[BaseEvent]: + events: list[BaseEvent] = [] + logger.info("=== FUNCTION APPROVAL REQUEST ===") + logger.info(f" Function: {content.function_call.name}") + logger.info(f" Call ID: {content.function_call.call_id}") + + parsed_args = content.function_call.parse_arguments() + parsed_arg_keys = list(parsed_args.keys()) if parsed_args else "None" + logger.info(f" Parsed args keys: {parsed_arg_keys}") + + if parsed_args and self.predict_state_config: + logger.info( + " Checking predict_state_config keys: %s", + list(self.predict_state_config.keys()) if self.predict_state_config else "None", + ) + for state_key, config in self.predict_state_config.items(): + if config["tool"] != content.function_call.name: + continue + tool_arg_name = config["tool_argument"] + logger.info( + " MATCHED tool '%s' for state key '%s', arg='%s'", + content.function_call.name, + state_key, + tool_arg_name, ) - logger.info(f"Emitting function_approval_request custom event for '{content.function_call.name}'") - events.append(approval_event) + state_value: Any + if tool_arg_name == "*": + state_value = parsed_args + elif tool_arg_name in parsed_args: + state_value = parsed_args[tool_arg_name] + else: + logger.warning(f" Tool argument '{tool_arg_name}' not found in parsed args") + continue + + self.current_state[state_key] = state_value + logger.info("Emitting StateSnapshotEvent for key '%s', value type: %s", state_key, type(state_value)) # type: ignore + state_snapshot = StateSnapshotEvent( + snapshot=self.current_state, + ) + events.append(state_snapshot) + + if content.function_call.call_id: + end_event = ToolCallEndEvent( + tool_call_id=content.function_call.call_id, + ) + logger.info(f"Emitting ToolCallEndEvent for approval-required tool '{content.function_call.call_id}'") + events.append(end_event) + self.tool_calls_ended.add(content.function_call.call_id) + + approval_event = CustomEvent( + name="function_approval_request", + value={ + "id": content.id, + "function_call": { + "call_id": content.function_call.call_id, + "name": content.function_call.name, + "arguments": content.function_call.parse_arguments(), + }, + }, + ) + logger.info(f"Emitting function_approval_request custom event for '{content.function_call.name}'") + events.append(approval_event) return events def create_run_started_event(self) -> RunStartedEvent: From d3621a6a7640755e78dae5d573ca28d1e1bd6e71 Mon Sep 17 00:00:00 2001 From: claude89757 <138977524+claude89757@users.noreply.github.com> Date: Tue, 6 Jan 2026 17:29:47 +0800 Subject: [PATCH 5/7] Refactor: Move MCP TextContent serialization to core prepare_function_call_results Per reviewer feedback, moved the TextContent serialization logic from ag-ui's serialize_content_result to the core package's prepare_function_call_results function. Changes: - Added handling for objects with 'text' attribute (like MCP TextContent) in _prepare_function_call_results_as_dumpable - Removed serialize_content_result from ag-ui/_utils.py - Updated _events.py and _message_adapters.py to use prepare_function_call_results from core package - Updated tests to match the core function's behavior --- .../ag-ui/agent_framework_ag_ui/_events.py | 5 +- .../_message_adapters.py | 7 +- .../ag-ui/agent_framework_ag_ui/_utils.py | 42 ------ .../ag-ui/tests/test_events_comprehensive.py | 8 +- .../ag-ui/tests/test_message_adapters.py | 10 +- python/packages/ag-ui/tests/test_utils.py | 125 ------------------ .../packages/core/agent_framework/_types.py | 3 + python/packages/core/tests/core/test_types.py | 50 +++++++ 8 files changed, 70 insertions(+), 180 deletions(-) diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_events.py b/python/packages/ag-ui/agent_framework_ag_ui/_events.py index 17d8cda23b..449b7eac87 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_events.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_events.py @@ -31,9 +31,10 @@ FunctionCallContent, FunctionResultContent, TextContent, + prepare_function_call_results, ) -from ._utils import generate_event_id, serialize_content_result +from ._utils import generate_event_id logger = logging.getLogger(__name__) @@ -391,7 +392,7 @@ def _handle_function_result_content(self, content: FunctionResultContent) -> lis self.state_delta_count = 0 result_message_id = generate_event_id() - result_content = serialize_content_result(content.result) + result_content = prepare_function_call_results(content.result) result_event = ToolCallResultEvent( message_id=result_message_id, diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py b/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py index f2848c69f4..b87f3b1827 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py @@ -12,6 +12,7 @@ FunctionResultContent, Role, TextContent, + prepare_function_call_results, ) # Role mapping constants @@ -189,7 +190,7 @@ def agent_framework_messages_to_agui(messages: list[ChatMessage] | list[dict[str Returns: List of AG-UI message dictionaries """ - from ._utils import generate_event_id, serialize_content_result + from ._utils import generate_event_id result: list[dict[str, Any]] = [] for msg in messages: @@ -236,8 +237,8 @@ def agent_framework_messages_to_agui(messages: list[ChatMessage] | list[dict[str elif isinstance(content, FunctionResultContent): # Tool result content - extract call_id and result tool_result_call_id = content.call_id - # Serialize result to string using shared utility - content_text = serialize_content_result(content.result) + # Serialize result to string using core utility + content_text = prepare_function_call_results(content.result) agui_msg: dict[str, Any] = { "id": msg.message_id if msg.message_id else generate_event_id(), # Always include id diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_utils.py b/python/packages/ag-ui/agent_framework_ag_ui/_utils.py index 404e8ace68..8b271988dc 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_utils.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_utils.py @@ -3,7 +3,6 @@ """Utility functions for AG-UI integration.""" import copy -import json import uuid from collections.abc import Callable, MutableMapping, Sequence from dataclasses import asdict, is_dataclass @@ -165,44 +164,3 @@ def convert_tools_to_agui_format( continue return results if results else None - - -def serialize_content_result(result: Any) -> str: # noqa: ANN401 - """Serialize content result to string for AG-UI. - - Handles various result types from tool execution, including dict, list - (e.g., list[TextContent] from MCP tools), and other types. - - Args: - result: The result to serialize (dict, list, or other types). - - Returns: - Serialized string representation: - - None: returns empty string "" - - dict: returns JSON string - - empty list: returns empty string "" - - single TextContent-like item with str text: returns plain text string - - multiple items: returns JSON array - - other types: returns str(result) - """ - if result is None: - return "" - if isinstance(result, dict): - return json.dumps(result) - if isinstance(result, list): - if not result: # Empty list returns empty string - return "" - items: list[Any] = [] - for item in result: - if hasattr(item, "text") and isinstance(item.text, str): # TextContent - items.append(item.text) - elif hasattr(item, "model_dump"): - # Keep as dict, let final json.dumps handle it - items.append(item.model_dump(mode="json")) - else: - items.append(str(item)) - # Single string item returns plain text - if len(items) == 1 and isinstance(items[0], str): - return items[0] - return json.dumps(items) - return str(result) diff --git a/python/packages/ag-ui/tests/test_events_comprehensive.py b/python/packages/ag-ui/tests/test_events_comprehensive.py index fdb985f59f..85b06e03d5 100644 --- a/python/packages/ag-ui/tests/test_events_comprehensive.py +++ b/python/packages/ag-ui/tests/test_events_comprehensive.py @@ -705,8 +705,8 @@ async def test_tool_result_with_empty_list(): assert len(events) == 2 assert events[0].type == "TOOL_CALL_END" assert events[1].type == "TOOL_CALL_RESULT" - # Empty list should return empty string - assert events[1].content == "" + # Empty list serializes as JSON empty array + assert events[1].content == "[]" async def test_tool_result_with_single_text_content(): @@ -727,8 +727,8 @@ def __init__(self, text: str): assert len(events) == 2 assert events[0].type == "TOOL_CALL_END" assert events[1].type == "TOOL_CALL_RESULT" - # Single TextContent should return plain text, not JSON array - assert events[1].content == "Hello from MCP tool!" + # TextContent text is extracted and serialized as JSON array + assert events[1].content == '["Hello from MCP tool!"]' async def test_tool_result_with_multiple_text_contents(): diff --git a/python/packages/ag-ui/tests/test_message_adapters.py b/python/packages/ag-ui/tests/test_message_adapters.py index 7351a41d5d..0f70f7dbd8 100644 --- a/python/packages/ag-ui/tests/test_message_adapters.py +++ b/python/packages/ag-ui/tests/test_message_adapters.py @@ -312,7 +312,8 @@ def test_agent_framework_to_agui_function_result_none(): assert len(messages) == 1 agui_msg = messages[0] - assert agui_msg["content"] == "" + # None serializes as JSON null + assert agui_msg["content"] == "null" def test_agent_framework_to_agui_function_result_string(): @@ -342,7 +343,8 @@ def test_agent_framework_to_agui_function_result_empty_list(): assert len(messages) == 1 agui_msg = messages[0] - assert agui_msg["content"] == "" + # Empty list serializes as JSON empty array + assert agui_msg["content"] == "[]" def test_agent_framework_to_agui_function_result_single_text_content(): @@ -362,8 +364,8 @@ def __init__(self, text: str): assert len(messages) == 1 agui_msg = messages[0] - # Single item should return plain text, not JSON - assert agui_msg["content"] == "Hello from MCP!" + # TextContent text is extracted and serialized as JSON array + assert agui_msg["content"] == '["Hello from MCP!"]' def test_agent_framework_to_agui_function_result_multiple_text_contents(): diff --git a/python/packages/ag-ui/tests/test_utils.py b/python/packages/ag-ui/tests/test_utils.py index 82155e63e4..b077468b81 100644 --- a/python/packages/ag-ui/tests/test_utils.py +++ b/python/packages/ag-ui/tests/test_utils.py @@ -9,7 +9,6 @@ generate_event_id, make_json_safe, merge_state, - serialize_content_result, ) @@ -308,127 +307,3 @@ def tool2(y: str) -> str: assert len(result) == 2 assert result[0]["name"] == "tool1" assert result[1]["name"] == "tool2" - - -# Tests for serialize_content_result - - -def test_serialize_content_result_none(): - """Test serializing None returns empty string.""" - result = serialize_content_result(None) - assert result == "" - - -def test_serialize_content_result_dict(): - """Test serializing dict returns JSON string.""" - result = serialize_content_result({"key": "value", "number": 42}) - assert result == '{"key": "value", "number": 42}' - - -def test_serialize_content_result_empty_list(): - """Test serializing empty list returns empty string.""" - result = serialize_content_result([]) - assert result == "" - - -def test_serialize_content_result_single_text_content(): - """Test serializing single TextContent-like object returns plain text.""" - - class MockTextContent: - def __init__(self, text: str): - self.text = text - - result = serialize_content_result([MockTextContent("Hello, world!")]) - assert result == "Hello, world!" - - -def test_serialize_content_result_multiple_text_contents(): - """Test serializing multiple TextContent-like objects returns JSON array.""" - - class MockTextContent: - def __init__(self, text: str): - self.text = text - - result = serialize_content_result([MockTextContent("First"), MockTextContent("Second")]) - assert result == '["First", "Second"]' - - -def test_serialize_content_result_model_dump_object(): - """Test serializing object with model_dump method.""" - - class MockModel: - def model_dump(self, mode: str = "python"): - return {"type": "model", "value": 123} - - result = serialize_content_result([MockModel()]) - # Single non-string item is still serialized as JSON array - # Only TextContent-like items (with str text attr) get unwrapped - assert result == '[{"type": "model", "value": 123}]' - - -def test_serialize_content_result_multiple_model_dump_objects(): - """Test serializing multiple objects with model_dump method.""" - - class MockModel: - def __init__(self, value: int): - self._value = value - - def model_dump(self, mode: str = "python"): - return {"value": self._value} - - result = serialize_content_result([MockModel(1), MockModel(2)]) - # Multiple dict items should be serialized as JSON array without double-escaping - assert result == '[{"value": 1}, {"value": 2}]' - - -def test_serialize_content_result_string_fallback(): - """Test serializing objects without text or model_dump falls back to str().""" - - class PlainObject: - def __str__(self): - return "plain_object_str" - - result = serialize_content_result([PlainObject()]) - assert result == "plain_object_str" - - -def test_serialize_content_result_mixed_list(): - """Test serializing list with mixed content types.""" - - class MockTextContent: - def __init__(self, text: str): - self.text = text - - class PlainObject: - def __str__(self): - return "plain" - - result = serialize_content_result([MockTextContent("text1"), PlainObject()]) - assert result == '["text1", "plain"]' - - -def test_serialize_content_result_string(): - """Test serializing plain string returns the string.""" - result = serialize_content_result("just a string") - assert result == "just a string" - - -def test_serialize_content_result_number(): - """Test serializing number returns string representation.""" - result = serialize_content_result(42) - assert result == "42" - - -def test_serialize_content_result_text_with_non_string_text_attr(): - """Test that items with non-string text attribute are handled correctly.""" - - class BadTextContent: - def __init__(self): - self.text = 12345 # Not a string! - - def __str__(self): - return "BadTextContent" - - # Should fall back to str() since text is not a string - result = serialize_content_result([BadTextContent()]) - assert result == "BadTextContent" diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index f804aae052..a99440a771 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -1869,6 +1869,9 @@ def _prepare_function_call_results_as_dumpable(content: Contents | Any | list[Co return content.model_dump() if hasattr(content, "to_dict"): return content.to_dict(exclude={"raw_representation", "additional_properties"}) + # Handle objects with text attribute (e.g., MCP TextContent) + if hasattr(content, "text") and isinstance(content.text, str): + return content.text return content diff --git a/python/packages/core/tests/core/test_types.py b/python/packages/core/tests/core/test_types.py index 81242147d2..b13b5b5c4d 100644 --- a/python/packages/core/tests/core/test_types.py +++ b/python/packages/core/tests/core/test_types.py @@ -2085,3 +2085,53 @@ def test_prepare_function_call_results_nested_pydantic_model(): assert "Seattle" in json_result assert "rainy" in json_result assert "18.0" in json_result or "18" in json_result + + +# region prepare_function_call_results with MCP TextContent-like objects + + +def test_prepare_function_call_results_text_content_single(): + """Test that objects with text attribute (like MCP TextContent) are properly handled.""" + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + result = [MockTextContent("Hello from MCP tool!")] + json_result = prepare_function_call_results(result) + + # Should extract text and serialize as JSON array of strings + assert isinstance(json_result, str) + assert json_result == '["Hello from MCP tool!"]' + + +def test_prepare_function_call_results_text_content_multiple(): + """Test that multiple TextContent-like objects are serialized correctly.""" + + class MockTextContent: + def __init__(self, text: str): + self.text = text + + result = [MockTextContent("First result"), MockTextContent("Second result")] + json_result = prepare_function_call_results(result) + + # Should extract text from each and serialize as JSON array + assert isinstance(json_result, str) + assert json_result == '["First result", "Second result"]' + + +def test_prepare_function_call_results_text_content_with_non_string_text(): + """Test that objects with non-string text attribute are not treated as TextContent.""" + + class BadTextContent: + def __init__(self): + self.text = 12345 # Not a string! + + result = [BadTextContent()] + json_result = prepare_function_call_results(result) + + # Should not extract text since it's not a string, will serialize the object + assert isinstance(json_result, str) + + +# endregion From b4531dbabb697cbffd75c740b746e0993076cf38 Mon Sep 17 00:00:00 2001 From: claude89757 <138977524+claude89757@users.noreply.github.com> Date: Tue, 6 Jan 2026 17:51:22 +0800 Subject: [PATCH 6/7] Fix failing tests for prepare_function_call_results behavior - test_tool_result_with_none: Update expected value to 'null' (JSON serialization of None) - test_tool_result_with_model_dump_objects: Use Pydantic BaseModel instead of plain class --- .../ag-ui/tests/test_events_comprehensive.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/python/packages/ag-ui/tests/test_events_comprehensive.py b/python/packages/ag-ui/tests/test_events_comprehensive.py index 85b06e03d5..8acbc525ef 100644 --- a/python/packages/ag-ui/tests/test_events_comprehensive.py +++ b/python/packages/ag-ui/tests/test_events_comprehensive.py @@ -201,7 +201,8 @@ async def test_tool_result_with_none(): assert len(events) == 2 assert events[0].type == "TOOL_CALL_END" assert events[1].type == "TOOL_CALL_RESULT" - assert events[1].content == "" + # prepare_function_call_results serializes None as JSON "null" + assert events[1].content == "null" async def test_multiple_tool_results_in_sequence(): @@ -759,20 +760,18 @@ def __init__(self, text: str): async def test_tool_result_with_model_dump_objects(): - """Test FunctionResultContent with objects that have model_dump method.""" - from agent_framework_ag_ui._events import AgentFrameworkEventBridge + """Test FunctionResultContent with Pydantic BaseModel objects.""" + from pydantic import BaseModel - class MockModel: - def __init__(self, value: int): - self._value = value + from agent_framework_ag_ui._events import AgentFrameworkEventBridge - def model_dump(self, mode: str = "python"): - return {"value": self._value} + class MockModel(BaseModel): + value: int bridge = AgentFrameworkEventBridge(run_id="test_run", thread_id="test_thread") update = AgentRunResponseUpdate( - contents=[FunctionResultContent(call_id="call_123", result=[MockModel(1), MockModel(2)])] + contents=[FunctionResultContent(call_id="call_123", result=[MockModel(value=1), MockModel(value=2)])] ) events = await bridge.from_agent_run_update(update) From 069607b9eb3cd79ec8ca8153c58b0bb96a1f8c20 Mon Sep 17 00:00:00 2001 From: claude89757 <138977524+claude89757@users.noreply.github.com> Date: Wed, 7 Jan 2026 02:13:01 +0800 Subject: [PATCH 7/7] Fix B903 linter error: Convert MockTextContent to dataclass The ruff linter was reporting B903 (class could be dataclass or namedtuple) for the MockTextContent test helper classes. This commit converts them to dataclasses to satisfy the linter check. --- .../ag-ui/tests/test_events_comprehensive.py | 12 ++++++++---- python/packages/ag-ui/tests/test_message_adapters.py | 10 ++++++---- python/packages/core/tests/core/test_types.py | 10 ++++++---- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/python/packages/ag-ui/tests/test_events_comprehensive.py b/python/packages/ag-ui/tests/test_events_comprehensive.py index 8acbc525ef..20b53cc18f 100644 --- a/python/packages/ag-ui/tests/test_events_comprehensive.py +++ b/python/packages/ag-ui/tests/test_events_comprehensive.py @@ -712,11 +712,13 @@ async def test_tool_result_with_empty_list(): async def test_tool_result_with_single_text_content(): """Test FunctionResultContent with single TextContent-like item (MCP tool result).""" + from dataclasses import dataclass + from agent_framework_ag_ui._events import AgentFrameworkEventBridge + @dataclass class MockTextContent: - def __init__(self, text: str): - self.text = text + text: str bridge = AgentFrameworkEventBridge(run_id="test_run", thread_id="test_thread") @@ -734,11 +736,13 @@ def __init__(self, text: str): async def test_tool_result_with_multiple_text_contents(): """Test FunctionResultContent with multiple TextContent-like items (MCP tool result).""" + from dataclasses import dataclass + from agent_framework_ag_ui._events import AgentFrameworkEventBridge + @dataclass class MockTextContent: - def __init__(self, text: str): - self.text = text + text: str bridge = AgentFrameworkEventBridge(run_id="test_run", thread_id="test_thread") diff --git a/python/packages/ag-ui/tests/test_message_adapters.py b/python/packages/ag-ui/tests/test_message_adapters.py index 0f70f7dbd8..51a51c9fd4 100644 --- a/python/packages/ag-ui/tests/test_message_adapters.py +++ b/python/packages/ag-ui/tests/test_message_adapters.py @@ -349,10 +349,11 @@ def test_agent_framework_to_agui_function_result_empty_list(): def test_agent_framework_to_agui_function_result_single_text_content(): """Test converting FunctionResultContent with single TextContent-like item.""" + from dataclasses import dataclass + @dataclass class MockTextContent: - def __init__(self, text: str): - self.text = text + text: str msg = ChatMessage( role=Role.TOOL, @@ -370,10 +371,11 @@ def __init__(self, text: str): def test_agent_framework_to_agui_function_result_multiple_text_contents(): """Test converting FunctionResultContent with multiple TextContent-like items.""" + from dataclasses import dataclass + @dataclass class MockTextContent: - def __init__(self, text: str): - self.text = text + text: str msg = ChatMessage( role=Role.TOOL, diff --git a/python/packages/core/tests/core/test_types.py b/python/packages/core/tests/core/test_types.py index b13b5b5c4d..cce1aea934 100644 --- a/python/packages/core/tests/core/test_types.py +++ b/python/packages/core/tests/core/test_types.py @@ -2092,10 +2092,11 @@ def test_prepare_function_call_results_nested_pydantic_model(): def test_prepare_function_call_results_text_content_single(): """Test that objects with text attribute (like MCP TextContent) are properly handled.""" + from dataclasses import dataclass + @dataclass class MockTextContent: - def __init__(self, text: str): - self.text = text + text: str result = [MockTextContent("Hello from MCP tool!")] json_result = prepare_function_call_results(result) @@ -2107,10 +2108,11 @@ def __init__(self, text: str): def test_prepare_function_call_results_text_content_multiple(): """Test that multiple TextContent-like objects are serialized correctly.""" + from dataclasses import dataclass + @dataclass class MockTextContent: - def __init__(self, text: str): - self.text = text + text: str result = [MockTextContent("First result"), MockTextContent("Second result")] json_result = prepare_function_call_results(result)