Skip to content
3 changes: 3 additions & 0 deletions coffeeAGNTCY/coffee_agents/lungo/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ COMPOSE_PROFILES=farms,logistics,recruiter
# Recommended temperature setting for OpenAI models
OPENAI_TEMPERATURE=0.7

# When true, agents fail at startup if the configured LLM does not support streaming (based on LiteLLM metadata).
ENSURE_STREAMING_LLM=false

# === Agntcy TBAC Settings (Docker Compose) ===
# Set these variables to enable Agntcy Identity Auth (TBAC).
IDENTITY_AUTH_ENABLED="false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

from agents.supervisors.auction.graph.graph import ExchangeGraph
from agents.supervisors.auction.graph import shared
from config.config import DEFAULT_MESSAGE_TRANSPORT
from config.config import DEFAULT_MESSAGE_TRANSPORT, LLM_MODEL
from config.logging_config import setup_logging
from pathlib import Path
from common.version import get_version_info
from common.streaming_capability import require_streaming_capability
from agents.supervisors.auction.api import create_apps_router

setup_logging()
Expand All @@ -41,6 +42,7 @@

app.include_router(create_apps_router())

require_streaming_capability("auction_supervisor", LLM_MODEL)
exchange_graph = ExchangeGraph()

class PromptRequest(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
from agents.supervisors.logistics.graph.graph import LogisticGraph
from agents.supervisors.logistics.graph import shared
from agents.logistics.shipper.card import AGENT_CARD # assuming similar structure
from config.config import DEFAULT_MESSAGE_TRANSPORT, TRANSPORT_SERVER_ENDPOINT
from config.config import DEFAULT_MESSAGE_TRANSPORT, LLM_MODEL, TRANSPORT_SERVER_ENDPOINT
from config.logging_config import setup_logging
from common.streaming_capability import require_streaming_capability
from pathlib import Path

setup_logging()
Expand All @@ -40,6 +41,7 @@
allow_headers=["*"],
)

require_streaming_capability("logistics_supervisor", LLM_MODEL)
logistic_graph = LogisticGraph()

class PromptRequest(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from google.genai import types

from config.config import LLM_MODEL
from common.streaming_capability import require_streaming_capability

from agents.supervisors.recruiter.dynamic_workflow_agent import (
DynamicWorkflowAgent,
Expand Down Expand Up @@ -251,6 +252,7 @@ async def send_to_agent(
# Root Supervisor Agent
# ---------------------------------------------------------------------------

require_streaming_capability("recruiter_supervisor", LLM_MODEL)
root_agent = Agent(
name="recruiter_supervisor",
model=LiteLlm(model=LLM_MODEL),
Expand Down
60 changes: 60 additions & 0 deletions coffeeAGNTCY/coffee_agents/lungo/common/streaming_capability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright AGNTCY Contributors (https://github.com/agntcy)
# SPDX-License-Identifier: Apache-2.0

import logging

import litellm

from config.config import ENSURE_STREAMING_LLM

logger = logging.getLogger(__name__)


class StreamingNotSupportedError(Exception):
"""Raised when the given LLM does not support streaming but the agent requires it."""

def __init__(self, agent_name: str, model: str, message: str):
self.agent_name = agent_name
self.model = model
self.message = message
super().__init__(message)


def get_llm_streaming_capability(model: str) -> tuple[bool, BaseException | None]:
"""Return (True, None) if the given LLM supports native streaming; (False, None) if metadata says no or key missing; (False, e) if an error occurred, with e the original exception."""
try:
model_info = litellm.get_model_info(model=model)
if model_info.get("supports_native_streaming") is True:
return (True, None)
return (False, None)
except (litellm.NotFoundError, litellm.BadRequestError, litellm.APIConnectionError, litellm.APIError, litellm.Timeout) as e:
logger.debug("Could not get streaming capability for model %s: %s", model, e)
return (False, e)
except Exception as e:
logger.debug("Unexpected error getting model info for %s: %s", model, e)
return (False, e)


def require_streaming_capability(agent_name: str, model: str) -> None:
"""If ENSURE_STREAMING_LLM is true and the given LLM does not support streaming, log and raise StreamingNotSupportedError. agent_name and model are required."""
if not ENSURE_STREAMING_LLM:
return
supported, err = get_llm_streaming_capability(model)
if supported:
logger.info("[%s] Streaming capability check passed.", agent_name or "agent")
return
msg = (
f"Configured model does not support streaming. "
f"Set LLM_MODEL to a streaming-capable model (e.g. openai/gpt-4o)."
)
if err is not None:
if agent_name:
logger.error("[%s] %s Model: %s (cause: %s)", agent_name, msg, model, err)
else:
logger.error("%s Model: %s (cause: %s)", msg, model, err)
raise StreamingNotSupportedError(agent_name=agent_name, model=model, message=msg) from err
if agent_name:
logger.error("[%s] %s Model: %s", agent_name, msg, model)
else:
logger.error("%s Model: %s", msg, model)
raise StreamingNotSupportedError(agent_name=agent_name, model=model, message=msg)
1 change: 1 addition & 0 deletions coffeeAGNTCY/coffee_agents/lungo/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
LOGGING_LEVEL = os.getenv("LOGGING_LEVEL", "INFO").upper()

ENABLE_HTTP = os.getenv("ENABLE_HTTP", "true").lower() in ("true", "1", "yes")
ENSURE_STREAMING_LLM = os.getenv("ENSURE_STREAMING_LLM", "false").strip().lower() in ("true", "1", "yes")

# This is for demo purposes only. In production, use secure methods to manage API keys.
IDENTITY_API_KEY = os.getenv("IDENTITY_API_KEY", "487>t:7:Ke5N[kZ[dOmDg2]0RQx))6k}bjARRN+afG3806h(4j6j[}]F5O)f[6PD")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright AGNTCY Contributors (https://github.com/agntcy)
# SPDX-License-Identifier: Apache-2.0

"""Unit tests for common.streaming_capability. No app-sdk; asserts on return values and exception type/attributes only.
require_streaming_capability tests that exercise the check use ENSURE_STREAMING_LLM=true."""
import importlib.util
import sys
from pathlib import Path

_root = Path(__file__).resolve().parents[3]
_cap_path = _root / "common" / "streaming_capability.py"
_STREAMING_MODULE = "_streaming_capability_under_test"
_spec = importlib.util.spec_from_file_location(_STREAMING_MODULE, _cap_path)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_STREAMING_MODULE] = _mod
_spec.loader.exec_module(_mod)

_TEST_MODEL = "openai/gpt-4o-mini"
StreamingNotSupportedError = _mod.StreamingNotSupportedError
get_llm_streaming_capability = _mod.get_llm_streaming_capability
require_streaming_capability = _mod.require_streaming_capability

from unittest.mock import patch

import litellm
import pytest


class TestGetLlmStreamingCapability:
@pytest.mark.parametrize(
"model_info,expected_ok",
[
({"supports_native_streaming": True}, True),
({"supports_native_streaming": False}, False),
({}, False),
({"supports_native_streaming": None}, False),
],
ids=["supports_true", "supports_false", "key_missing", "key_none"],
)
def test_get_llm_streaming_capability_metadata(self, model_info, expected_ok):
with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value=model_info) as mock_get_model_info:
ok, err = get_llm_streaming_capability(_TEST_MODEL)
assert ok == expected_ok
assert err is None
mock_get_model_info.assert_called_once_with(model=_TEST_MODEL)

@pytest.mark.parametrize(
"side_effect,expected_args,expected_type",
[
(Exception("unknown model"), ("unknown model",), None),
(litellm.NotFoundError("model not found", _TEST_MODEL, "openai"), None, litellm.NotFoundError),
],
ids=["generic_exception", "litellm_not_found"],
)
def test_get_llm_streaming_capability_on_error(self, side_effect, expected_args, expected_type):
with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", side_effect=side_effect) as mock_get_model_info:
ok, err = get_llm_streaming_capability(_TEST_MODEL)
assert ok is False
assert err is not None
if expected_args is not None:
assert err.args == expected_args
if expected_type is not None:
assert isinstance(err, expected_type)
mock_get_model_info.assert_called_once_with(model=_TEST_MODEL)


class TestRequireStreamingCapability:
def test_early_return_when_flag_disabled(self):
"""When ENSURE_STREAMING_LLM is not true, require_streaming_capability returns without raising and does not call get_model_info."""
with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", False):
with patch(f"{_STREAMING_MODULE}.litellm.get_model_info") as mock_get_model_info:
require_streaming_capability("test_agent", _TEST_MODEL)
mock_get_model_info.assert_not_called()

def test_does_not_raise_when_capable(self):
with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True):
with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value={"supports_native_streaming": True}) as mock_get_model_info:
require_streaming_capability("test_agent", _TEST_MODEL)
mock_get_model_info.assert_called_once_with(model=_TEST_MODEL)

def test_raises_streaming_not_supported_error_when_not_capable(self):
with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True):
with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value={"supports_native_streaming": False}) as mock_get_model_info:
with pytest.raises(StreamingNotSupportedError) as exc_info:
require_streaming_capability("test_agent", _TEST_MODEL)
assert exc_info.value.agent_name == "test_agent"
assert exc_info.value.model == _TEST_MODEL
assert len(exc_info.value.message) > 0
assert exc_info.value.__cause__ is None
mock_get_model_info.assert_called_once_with(model=_TEST_MODEL)

def test_raises_when_get_model_info_raises(self):
with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True):
with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", side_effect=Exception("unknown")) as mock_get_model_info:
with pytest.raises(StreamingNotSupportedError) as exc_info:
require_streaming_capability("other", _TEST_MODEL)
assert exc_info.value.__cause__ is not None
assert "unknown" in str(exc_info.value.__cause__)
mock_get_model_info.assert_called_once_with(model=_TEST_MODEL)

def test_optional_agent_name_stored_on_exception(self):
with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True):
with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value={"supports_native_streaming": False}) as mock_get_model_info:
with pytest.raises(StreamingNotSupportedError) as exc_info:
require_streaming_capability("", _TEST_MODEL)
assert exc_info.value.agent_name == ""
mock_get_model_info.assert_called_once_with(model=_TEST_MODEL)


class TestStreamingNotSupportedError:
def test_exception_attributes(self):
err = StreamingNotSupportedError(agent_name="a", model="m", message="msg")
assert err.agent_name == "a"
assert err.model == "m"
assert err.message == "msg"
assert str(err) == "msg"

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright AGNTCY Contributors (https://github.com/agntcy)
# SPDX-License-Identifier: Apache-2.0

"""Integration-style: with ENSURE_STREAMING_LLM=true, supervisor startup fails when configured LLM does not support streaming."""
import importlib
import sys
from pathlib import Path
from unittest.mock import patch

import pytest

_root = str(Path(__file__).resolve().parents[3])


def _ensure_root_on_path():
try:
sys.path.remove(_root)
except ValueError:
pass
sys.path.insert(0, _root)


_ensure_root_on_path()


def _purge_modules(prefixes):
to_delete = [
m
for m in list(sys.modules)
if any(m == p or m.startswith(p + ".") for p in prefixes)
]
for m in to_delete:
sys.modules.pop(m, None)


@pytest.mark.parametrize(
"import_module,expected_agent_name",
[
("agents.supervisors.auction.main", "auction_supervisor"),
("agents.supervisors.logistics.main", "logistics_supervisor"),
("agents.supervisors.recruiter.agent", "recruiter_supervisor"),
],
ids=["auction", "logistics", "recruiter"],
)
def test_supervisor_raises_when_llm_does_not_support_streaming(monkeypatch, import_module, expected_agent_name):
"""With ENSURE_STREAMING_LLM=true, supervisor startup fails when get_model_info says no streaming."""
_ensure_root_on_path()
monkeypatch.setenv("LLM_MODEL", "openai/gpt-4o-mini")
monkeypatch.setenv("ENSURE_STREAMING_LLM", "true")
with patch("litellm.get_model_info", return_value={"supports_native_streaming": False}) as mock_get_model_info:
prefix = import_module.rsplit(".", 1)[0]
_purge_modules([prefix, "config.config", "common"])
try:
importlib.import_module(import_module)
except Exception as e:
assert type(e).__name__ == "StreamingNotSupportedError", f"Expected StreamingNotSupportedError, got {type(e)}"
assert e.agent_name == expected_agent_name
mock_get_model_info.assert_called_once_with(model="openai/gpt-4o-mini")
return
pytest.fail("Expected StreamingNotSupportedError")

Loading