Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions instructor/providers/anthropic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,19 @@ def reask_anthropic_tools(
kwargs = kwargs.copy()
from anthropic.types import Message

assert isinstance(response, Message), "Response must be a Anthropic Message"
# Handle Stream objects which are not Message instances
# This happens when streaming mode is used with retries
if not isinstance(response, Message):
kwargs["messages"].append(
{
"role": "user",
"content": (
f"Validation Error found:\n{exception}\n"
"Recall the function correctly, fix the errors"
),
}
)
return kwargs

assistant_content = []
tool_use_id = None
Expand Down Expand Up @@ -197,7 +209,19 @@ def reask_anthropic_json(
kwargs = kwargs.copy()
from anthropic.types import Message

assert isinstance(response, Message), "Response must be a Anthropic Message"
# Handle Stream objects which are not Message instances
# This happens when streaming mode is used with retries
if not isinstance(response, Message):
kwargs["messages"].append(
{
"role": "user",
"content": (
f"Validation Errors found:\n{exception}\n"
"Recall the function correctly, fix the errors"
),
}
)
return kwargs

# Filter for text blocks to handle ThinkingBlock and other non-text content
text_blocks = [c for c in response.content if c.type == "text"]
Expand Down
61 changes: 61 additions & 0 deletions instructor/providers/openai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
from ...processing.schema import generate_openai_schema


def _is_stream_response(response: Any) -> bool:
"""Check if response is a Stream object rather than a ChatCompletion.

Stream objects don't have 'choices' attribute and can't be used
for detailed reask messages that reference the response content.
"""
return response is None or not hasattr(response, "choices")


def reask_tools(
kwargs: dict[str, Any],
response: Any,
Expand All @@ -32,6 +41,21 @@ def reask_tools(
- Adds: "messages" (tool response messages indicating validation errors)
"""
kwargs = kwargs.copy()

# Handle Stream objects which don't have choices attribute
# This happens when streaming mode is used with retries
if _is_stream_response(response):
kwargs["messages"].append(
{
"role": "user",
"content": (
f"Validation Error found:\n{exception}\n"
"Recall the function correctly, fix the errors"
),
}
)
return kwargs

reask_msgs = [dump_message(response.choices[0].message)]
for tool_call in response.choices[0].message.tool_calls:
reask_msgs.append(
Expand Down Expand Up @@ -62,6 +86,19 @@ def reask_responses_tools(
"""
kwargs = kwargs.copy()

# Handle Stream objects which don't have output attribute
if response is None or not hasattr(response, "output"):
kwargs["messages"].append(
{
"role": "user",
"content": (
f"Validation Error found:\n{exception}\n"
"Recall the function correctly, fix the errors"
),
}
)
return kwargs

reask_messages = []
for tool_call in response.output:
reask_messages.append(
Expand Down Expand Up @@ -90,6 +127,17 @@ def reask_md_json(
- Adds: "messages" (user message requesting JSON correction)
"""
kwargs = kwargs.copy()

# Handle Stream objects which don't have choices attribute
if _is_stream_response(response):
kwargs["messages"].append(
{
"role": "user",
"content": f"Correct your JSON ONLY RESPONSE, based on the following errors:\n{exception}",
}
)
return kwargs

reask_msgs = [dump_message(response.choices[0].message)]

reask_msgs.append(
Expand All @@ -115,6 +163,19 @@ def reask_default(
- Adds: "messages" (user message requesting function correction)
"""
kwargs = kwargs.copy()

# Handle Stream objects which don't have choices attribute
if _is_stream_response(response):
kwargs["messages"].append(
{
"role": "user",
"content": (
f"Recall the function correctly, fix the errors, exceptions found\n{exception}"
),
}
)
return kwargs

reask_msgs = [dump_message(response.choices[0].message)]

reask_msgs.append(
Expand Down
177 changes: 177 additions & 0 deletions tests/test_streaming_reask_bug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""Test for streaming reask bug fix.

Bug: When using streaming mode with max_retries > 1, if validation fails,
the reask handlers crash with "'Stream' object has no attribute 'choices'"
because they expect a ChatCompletion but receive a Stream object.

GitHub Issue: https://github.com/jxnl/instructor/issues/1991
"""

import pytest
from unittest.mock import MagicMock
from pydantic import ValidationError, BaseModel, field_validator

from instructor.mode import Mode
from instructor.processing.response import handle_reask_kwargs


class MockStream:
"""Mock Stream object that mimics openai.Stream behavior."""

def __iter__(self):
return iter([])

def __next__(self):
raise StopIteration


def create_mock_validation_error():
"""Create a real Pydantic ValidationError for testing."""

class TestModel(BaseModel):
name: str

@field_validator("name")
@classmethod
def must_have_space(cls, v):
if " " not in v:
raise ValueError("must contain space")
return v

try:
TestModel(name="John")
except ValidationError as e:
return e


class TestStreamingReaskBug:
"""Tests for the streaming reask bug fix."""

def test_reask_tools_with_stream_object_does_not_crash(self):
"""Test that reask_tools handles Stream objects without crashing.

Previously, this would crash with:
"'Stream' object has no attribute 'choices'"
"""
mock_stream = MockStream()
kwargs = {
"messages": [{"role": "user", "content": "test"}],
"tools": [{"type": "function", "function": {"name": "test"}}],
}
exception = create_mock_validation_error()

# This should not raise an AttributeError
result = handle_reask_kwargs(
kwargs=kwargs,
mode=Mode.TOOLS,
response=mock_stream,
exception=exception,
)

# Should return modified kwargs with error message
assert "messages" in result
assert len(result["messages"]) > 1 # Original + error message

def test_reask_anthropic_tools_with_stream_object(self):
"""Test that Anthropic reask handler handles Stream objects."""
mock_stream = MockStream()
kwargs = {
"messages": [{"role": "user", "content": "test"}],
}
exception = create_mock_validation_error()

result = handle_reask_kwargs(
kwargs=kwargs,
mode=Mode.ANTHROPIC_TOOLS,
response=mock_stream,
exception=exception,
)

assert "messages" in result

def test_reask_with_none_response(self):
"""Test that reask handlers handle None response gracefully."""
kwargs = {
"messages": [{"role": "user", "content": "test"}],
}
exception = create_mock_validation_error()

result = handle_reask_kwargs(
kwargs=kwargs,
mode=Mode.TOOLS,
response=None,
exception=exception,
)

assert "messages" in result

def test_reask_md_json_with_stream_object(self):
"""Test that MD_JSON reask handler handles Stream objects."""
mock_stream = MockStream()
kwargs = {
"messages": [{"role": "user", "content": "test"}],
}
exception = create_mock_validation_error()

result = handle_reask_kwargs(
kwargs=kwargs,
mode=Mode.MD_JSON,
response=mock_stream,
exception=exception,
)

assert "messages" in result


@pytest.mark.skipif(
not pytest.importorskip("openai", reason="openai not installed"),
reason="openai not installed",
)
class TestStreamingReaskIntegration:
"""Integration tests that require OpenAI API key."""

@pytest.fixture
def client(self):
"""Create instructor client if API key available."""
import os

if not os.getenv("OPENAI_API_KEY"):
pytest.skip("OPENAI_API_KEY not set")

import instructor
from openai import OpenAI

return instructor.from_openai(OpenAI())

def test_streaming_with_retries_and_failing_validator(self, client):
"""Test that streaming with retries doesn't crash on validation failure."""

class StrictUser(BaseModel):
name: str
age: int

@field_validator("name")
@classmethod
def name_must_have_space(cls, v: str) -> str:
if v and " " not in v:
raise ValueError("Name must have first and last name")
return v

# This should not crash with AttributeError
# It may raise InstructorRetryException after retries exhausted, which is expected
from instructor.core.exceptions import InstructorRetryException

with pytest.raises(InstructorRetryException):
list(
client.chat.completions.create_partial(
model="gpt-4o-mini",
max_retries=2,
messages=[
{
"role": "user",
"content": "Extract: John is 25. Return name='John' (no last name).",
}
],
response_model=StrictUser,
)
)
Loading