Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/marvin/engine/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,14 @@ def _process_pydantic_event(
tools_map: dict[str, Callable[..., Any]],
end_turn_tools_map: dict[str, EndTurn],
) -> Event | None:
def _get_snapshot(index: int) -> ModelResponsePart:
return parts_manager.get_parts()[index]
def _get_snapshot(index: int) -> ModelResponsePart | None:
# Use the internal _parts list directly since event.index refers to that.
# get_parts() filters out ToolCallPartDelta objects which causes index mismatch.
# Note: _parts can contain both ModelResponsePart and ToolCallPartDelta
if index < len(parts_manager._parts):
return parts_manager._parts[index]
# This should never happen with valid events from pydantic-ai
return None

# Handle Part Start Events
if isinstance(event, PartStartEvent):
Expand Down Expand Up @@ -227,7 +233,7 @@ def _get_snapshot(index: int) -> ModelResponsePart:
tool_call_id=event.part.tool_call_id,
),
snapshot=snapshot,
tool_call_id=snapshot.tool_call_id,
tool_call_id=snapshot.tool_call_id if snapshot else None,
tool=tools_map.get(event.part.tool_name),
)
# Handle Part Delta Events
Expand Down
54 changes: 54 additions & 0 deletions tests/basic/engine/test_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Tests for the streaming module."""

from pydantic_ai._parts_manager import ModelResponsePartsManager
from pydantic_ai.messages import PartDeltaEvent, ToolCallPartDelta

from marvin import Agent
from marvin.engine.streaming import _process_pydantic_event


def test_get_snapshot_with_incomplete_tool_call():
"""Test that _get_snapshot handles ToolCallPartDelta correctly.

This tests the fix for issue #1207 where accessing get_parts()[index]
would raise IndexError when the part at that index is a ToolCallPartDelta
that gets filtered out by get_parts().
"""
# Setup
parts_manager = ModelResponsePartsManager()
actor = Agent(name="test")
tools_map = {}
end_turn_tools_map = {}

# Create an incomplete tool call (ToolCallPartDelta with no tool name)
# This simulates what happens when streaming starts a tool call
parts_manager.handle_tool_call_delta(
vendor_part_id=0,
tool_name=None, # No tool name yet - creates ToolCallPartDelta
args="",
tool_call_id=None,
)

# Verify the setup - we should have a ToolCallPartDelta in _parts
# but get_parts() should return empty
assert len(parts_manager._parts) == 1
assert len(parts_manager.get_parts()) == 0

# Create a PartDeltaEvent that references index 0
event = PartDeltaEvent(index=0, delta=ToolCallPartDelta(args_delta="{"))

# Process the event - this should NOT raise IndexError
# Before the fix, this would fail with "list index out of range"
result = _process_pydantic_event(
event=event,
actor=actor,
parts_manager=parts_manager,
tools_map=tools_map,
end_turn_tools_map=end_turn_tools_map,
)

# The result should be a ToolCallDeltaEvent with the snapshot
assert result is not None
assert hasattr(result, "snapshot")
# The snapshot should be the ToolCallPartDelta from _parts[0]
assert result.snapshot == parts_manager._parts[0]