Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cookbook/models/openai/chat/audio_output_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Iterator

from agno.agent import Agent, RunOutputEvent # noqa
from agno.db.in_memory import InMemoryDb
from agno.models.openai import OpenAIChat

# Audio Configuration
Expand All @@ -20,6 +21,7 @@
"format": "pcm16",
}, # Only pcm16 is supported with streaming
),
db=InMemoryDb(),
)
output_stream: Iterator[RunOutputEvent] = agent.run(
"Tell me a 10 second story", stream=True
Expand Down Expand Up @@ -48,3 +50,6 @@
print(f"Error decoding audio: {e}")
print()
print(f"Saved audio to {filename}")

print("Metrics:")
print(agent.get_last_run_output().metrics)
24 changes: 24 additions & 0 deletions cookbook/models/openai/chat/basic_stream_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Iterator # noqa
from agno.agent import Agent, RunOutputEvent # noqa
from agno.models.openai import OpenAIChat
from agno.db.in_memory import InMemoryDb

agent = Agent(model=OpenAIChat(id="gpt-4o"), db=InMemoryDb(), markdown=True)

# Get the response in a variable
# run_response: Iterator[RunOutputEvent] = agent.run("Share a 2 sentence horror story", stream=True)
# for chunk in run_response:
# print(chunk.content)

# Print the response in the terminal
agent.print_response("Share a 2 sentence horror story", stream=True)

run_output = agent.get_last_run_output()
print("Metrics:")
print(run_output.metrics)

print("Message Metrics:")
for message in run_output.messages:
if message.role == "assistant":
print(message.role)
print(message.metrics)
98 changes: 52 additions & 46 deletions libs/agno/agno/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class MessageData:
response_video: Optional[Video] = None
response_file: Optional[File] = None

response_metrics: Optional[Metrics] = None

# Data from the provider that we might need on subsequent messages
response_provider_data: Optional[Dict[str, Any]] = None

Expand Down Expand Up @@ -759,7 +761,6 @@ def _populate_assistant_message(
Returns:
Message: The populated assistant message
"""
# Add role to assistant message
if provider_response.role is not None:
assistant_message.role = provider_response.role

Expand Down Expand Up @@ -837,14 +838,14 @@ def process_response_stream(
tool_choice=tool_choice or self._tool_choice,
run_response=run_response,
):
yield from self._populate_stream_data_and_assistant_message(
for model_response_delta in self._populate_stream_data(
stream_data=stream_data,
assistant_message=assistant_message,
model_response_delta=response_delta,
)
):
yield model_response_delta

# Add final metrics to assistant message
self._populate_assistant_message(assistant_message=assistant_message, provider_response=response_delta)
# Populate assistant message from stream data after the stream ends
self._populate_assistant_message_from_stream_data(assistant_message=assistant_message, stream_data=stream_data)

def response_stream(
self,
Expand Down Expand Up @@ -908,22 +909,6 @@ def response_stream(
streaming_responses.append(response)
yield response

# Populate assistant message from stream data
if stream_data.response_content:
assistant_message.content = stream_data.response_content
if stream_data.response_reasoning_content:
assistant_message.reasoning_content = stream_data.response_reasoning_content
if stream_data.response_redacted_reasoning_content:
assistant_message.redacted_reasoning_content = stream_data.response_redacted_reasoning_content
if stream_data.response_provider_data:
assistant_message.provider_data = stream_data.response_provider_data
if stream_data.response_citations:
assistant_message.citations = stream_data.response_citations
if stream_data.response_audio:
assistant_message.audio_output = stream_data.response_audio
if stream_data.response_tool_calls and len(stream_data.response_tool_calls) > 0:
assistant_message.tool_calls = self.parse_tool_calls(stream_data.response_tool_calls)

else:
self._process_model_response(
messages=messages,
Expand Down Expand Up @@ -1035,15 +1020,14 @@ async def aprocess_response_stream(
tool_choice=tool_choice or self._tool_choice,
run_response=run_response,
): # type: ignore
for model_response in self._populate_stream_data_and_assistant_message(
for model_response_delta in self._populate_stream_data(
stream_data=stream_data,
assistant_message=assistant_message,
model_response_delta=response_delta,
):
yield model_response
yield model_response_delta

# Populate the assistant message
self._populate_assistant_message(assistant_message=assistant_message, provider_response=model_response)
# Populate assistant message from stream data after the stream ends
self._populate_assistant_message_from_stream_data(assistant_message=assistant_message, stream_data=stream_data)

async def aresponse_stream(
self,
Expand Down Expand Up @@ -1107,20 +1091,6 @@ async def aresponse_stream(
streaming_responses.append(model_response)
yield model_response

# Populate assistant message from stream data
if stream_data.response_content:
assistant_message.content = stream_data.response_content
if stream_data.response_reasoning_content:
assistant_message.reasoning_content = stream_data.response_reasoning_content
if stream_data.response_redacted_reasoning_content:
assistant_message.redacted_reasoning_content = stream_data.response_redacted_reasoning_content
if stream_data.response_provider_data:
assistant_message.provider_data = stream_data.response_provider_data
if stream_data.response_audio:
assistant_message.audio_output = stream_data.response_audio
if stream_data.response_tool_calls and len(stream_data.response_tool_calls) > 0:
assistant_message.tool_calls = self.parse_tool_calls(stream_data.response_tool_calls)

else:
await self._aprocess_model_response(
messages=messages,
Expand Down Expand Up @@ -1212,15 +1182,51 @@ async def aresponse_stream(
if self.cache_response and cache_key and streaming_responses:
self._save_streaming_responses_to_cache(cache_key, streaming_responses)

def _populate_stream_data_and_assistant_message(
self, stream_data: MessageData, assistant_message: Message, model_response_delta: ModelResponse
def _populate_assistant_message_from_stream_data(
self, assistant_message: Message, stream_data: MessageData
) -> None:
"""
Populate an assistant message with the stream data.
"""
if stream_data.response_role is not None:
assistant_message.role = stream_data.response_role
if stream_data.response_metrics is not None:
assistant_message.metrics = stream_data.response_metrics
if stream_data.response_content:
assistant_message.content = stream_data.response_content
if stream_data.response_reasoning_content:
assistant_message.reasoning_content = stream_data.response_reasoning_content
if stream_data.response_redacted_reasoning_content:
assistant_message.redacted_reasoning_content = stream_data.response_redacted_reasoning_content
if stream_data.response_provider_data:
assistant_message.provider_data = stream_data.response_provider_data
if stream_data.response_citations:
assistant_message.citations = stream_data.response_citations
if stream_data.response_audio:
assistant_message.audio_output = stream_data.response_audio
if stream_data.response_image:
assistant_message.image_output = stream_data.response_image
if stream_data.response_video:
assistant_message.video_output = stream_data.response_video
if stream_data.response_file:
assistant_message.file_output = stream_data.response_file
if stream_data.response_tool_calls and len(stream_data.response_tool_calls) > 0:
assistant_message.tool_calls = self.parse_tool_calls(stream_data.response_tool_calls)

def _populate_stream_data(
self, stream_data: MessageData, model_response_delta: ModelResponse
) -> Iterator[ModelResponse]:
"""Update the stream data and assistant message with the model response."""
# Add role to assistant message
if model_response_delta.role is not None:
assistant_message.role = model_response_delta.role

should_yield = False
if model_response_delta.role is not None:
stream_data.response_role = model_response_delta.role # type: ignore

if model_response_delta.response_usage is not None:
if stream_data.response_metrics is None:
stream_data.response_metrics = Metrics()
stream_data.response_metrics += model_response_delta.response_usage

# Update stream_data content
if model_response_delta.content is not None:
stream_data.response_content += model_response_delta.content
Expand Down
59 changes: 1 addition & 58 deletions libs/agno/agno/models/openai/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from agno.exceptions import ModelProviderError
from agno.media import File
from agno.models.base import MessageData, Model
from agno.models.base import Model
from agno.models.message import Citations, Message, UrlCitation
from agno.models.metrics import Metrics
from agno.models.response import ModelResponse
Expand Down Expand Up @@ -810,63 +810,6 @@ def format_function_call_results(
_fc_message.tool_call_id = tool_call_ids[_fc_message_index]
messages.append(_fc_message)

def process_response_stream(
self,
messages: List[Message],
assistant_message: Message,
stream_data: MessageData,
response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: Optional[Union[str, Dict[str, Any]]] = None,
run_response: Optional[RunOutput] = None,
) -> Iterator[ModelResponse]:
"""Process the synchronous response stream."""
for model_response_delta in self.invoke_stream(
messages=messages,
assistant_message=assistant_message,
tools=tools,
response_format=response_format,
tool_choice=tool_choice,
run_response=run_response,
):
yield from self._populate_stream_data_and_assistant_message(
stream_data=stream_data,
assistant_message=assistant_message,
model_response_delta=model_response_delta,
)

# Add final metrics to assistant message
self._populate_assistant_message(assistant_message=assistant_message, provider_response=model_response_delta)

async def aprocess_response_stream(
self,
messages: List[Message],
assistant_message: Message,
stream_data: MessageData,
response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: Optional[Union[str, Dict[str, Any]]] = None,
run_response: Optional[RunOutput] = None,
) -> AsyncIterator[ModelResponse]:
"""Process the asynchronous response stream."""
async for model_response_delta in self.ainvoke_stream(
messages=messages,
assistant_message=assistant_message,
tools=tools,
response_format=response_format,
tool_choice=tool_choice,
run_response=run_response,
):
for model_response in self._populate_stream_data_and_assistant_message(
stream_data=stream_data,
assistant_message=assistant_message,
model_response_delta=model_response_delta,
):
yield model_response

# Add final metrics to assistant message
self._populate_assistant_message(assistant_message=assistant_message, provider_response=model_response_delta)

def _parse_provider_response(self, response: Response, **kwargs) -> ModelResponse:
"""
Parse the OpenAI response into a ModelResponse.
Expand Down
6 changes: 3 additions & 3 deletions libs/agno/agno/run/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def to_dict(self) -> Dict[str, Any]:

if hasattr(self, "tools") and self.tools is not None:
from agno.models.response import ToolExecution

_dict["tools"] = []
for tool in self.tools:
if isinstance(tool, ToolExecution):
Expand All @@ -120,7 +120,7 @@ def to_dict(self) -> Dict[str, Any]:

if hasattr(self, "tool") and self.tool is not None:
from agno.models.response import ToolExecution

if isinstance(self.tool, ToolExecution):
_dict["tool"] = self.tool.to_dict()
else:
Expand Down Expand Up @@ -155,7 +155,7 @@ def from_dict(cls, data: Dict[str, Any]):
tool = data.pop("tool", None)
if tool:
from agno.models.response import ToolExecution

data["tool"] = ToolExecution.from_dict(tool)

images = data.pop("images", None)
Expand Down
32 changes: 28 additions & 4 deletions libs/agno/tests/integration/models/openai/chat/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,25 @@ def test_basic(openai_model):
_assert_metrics(response)


def test_basic_stream(openai_model):
agent = Agent(model=openai_model, markdown=True, telemetry=False)
def test_basic_stream(openai_model, shared_db):
agent = Agent(model=openai_model, db=shared_db, markdown=True, telemetry=False)

run_stream = agent.run("Say 'hi'", stream=True)
for chunk in run_stream:
assert chunk.content is not None

run_output = agent.get_last_run_output()

assert run_output.content is not None
assert run_output.messages is not None
assert len(run_output.messages) == 3
assert [m.role for m in run_output.messages] == ["system", "user", "assistant"]
assert run_output.messages[2].content is not None
assert run_output.messages[2].role == "assistant"
assert run_output.messages[2].metrics.input_tokens is not None
assert run_output.messages[2].metrics.output_tokens is not None
assert run_output.messages[2].metrics.total_tokens is not None


@pytest.mark.asyncio
async def test_async_basic(openai_model):
Expand All @@ -62,12 +74,24 @@ async def test_async_basic(openai_model):


@pytest.mark.asyncio
async def test_async_basic_stream(openai_model):
agent = Agent(model=openai_model, markdown=True, telemetry=False)
async def test_async_basic_stream(openai_model, shared_db):
agent = Agent(model=openai_model, db=shared_db, markdown=True, telemetry=False)

async for response in agent.arun("Share a 2 sentence horror story", stream=True):
assert response.content is not None

run_output = agent.get_last_run_output()

assert run_output.content is not None
assert run_output.messages is not None
assert len(run_output.messages) == 3
assert [m.role for m in run_output.messages] == ["system", "user", "assistant"]
assert run_output.messages[2].content is not None
assert run_output.messages[2].role == "assistant"
assert run_output.messages[2].metrics.input_tokens is not None
assert run_output.messages[2].metrics.output_tokens is not None
assert run_output.messages[2].metrics.total_tokens is not None


def test_exception_handling():
agent = Agent(model=OpenAIChat(id="gpt-100"), markdown=True, telemetry=False)
Expand Down
Loading