-
Notifications
You must be signed in to change notification settings - Fork 38
Implement fail-fast in Lungo project in case the configured LLM doesn't support it #383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
delthazor
wants to merge
9
commits into
main
Choose a base branch
from
fix-247-fail-fast-streaming
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
306723c
Implement fail-fast in Lungo project in case the configured LLM doesn…
delthazor f552fcc
Change test import for the common package
delthazor 6abd4d0
Change checker to use model name param
delthazor 64ad7cc
Get streaming capability should return exceptions
delthazor bd79f75
Use parameteriyed tests to reduce bloat
delthazor d32ef14
In tests, also check the number of calls are as expected
delthazor 8f9dadc
Use more local set of test env var
delthazor ad4670f
Fix specific config import to general
delthazor 467f1bb
Do a proper restore of purged modules
delthazor File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
coffeeAGNTCY/coffee_agents/lungo/common/streaming_capability.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| # Copyright AGNTCY Contributors (https://github.com/agntcy) | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import logging | ||
|
|
||
| import litellm | ||
|
|
||
| from config.config import ENSURE_STREAMING_LLM | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class StreamingNotSupportedError(Exception): | ||
| """Raised when the given LLM does not support streaming but the agent requires it.""" | ||
|
|
||
| def __init__(self, agent_name: str, model: str, message: str): | ||
| self.agent_name = agent_name | ||
| self.model = model | ||
| self.message = message | ||
| super().__init__(message) | ||
|
|
||
|
|
||
| def get_llm_streaming_capability(model: str) -> tuple[bool, BaseException | None]: | ||
| """Return (True, None) if the given LLM supports native streaming; (False, None) if metadata says no or key missing; (False, e) if an error occurred, with e the original exception.""" | ||
| try: | ||
| model_info = litellm.get_model_info(model=model) | ||
| if model_info.get("supports_native_streaming") is True: | ||
| return (True, None) | ||
| return (False, None) | ||
| except (litellm.NotFoundError, litellm.BadRequestError, litellm.APIConnectionError, litellm.APIError, litellm.Timeout) as e: | ||
| logger.debug("Could not get streaming capability for model %s: %s", model, e) | ||
| return (False, e) | ||
| except Exception as e: | ||
| logger.debug("Unexpected error getting model info for %s: %s", model, e) | ||
| return (False, e) | ||
|
|
||
|
|
||
| def require_streaming_capability(agent_name: str, model: str) -> None: | ||
| """If ENSURE_STREAMING_LLM is true and the given LLM does not support streaming, log and raise StreamingNotSupportedError. agent_name and model are required.""" | ||
| if not ENSURE_STREAMING_LLM: | ||
| return | ||
| supported, err = get_llm_streaming_capability(model) | ||
| if supported: | ||
| logger.info("[%s] Streaming capability check passed.", agent_name or "agent") | ||
| return | ||
| msg = ( | ||
| f"Configured model does not support streaming. " | ||
| f"Set LLM_MODEL to a streaming-capable model (e.g. openai/gpt-4o)." | ||
| ) | ||
| if err is not None: | ||
| if agent_name: | ||
| logger.error("[%s] %s Model: %s (cause: %s)", agent_name, msg, model, err) | ||
| else: | ||
| logger.error("%s Model: %s (cause: %s)", msg, model, err) | ||
| raise StreamingNotSupportedError(agent_name=agent_name, model=model, message=msg) from err | ||
| if agent_name: | ||
| logger.error("[%s] %s Model: %s", agent_name, msg, model) | ||
| else: | ||
| logger.error("%s Model: %s", msg, model) | ||
| raise StreamingNotSupportedError(agent_name=agent_name, model=model, message=msg) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
117 changes: 117 additions & 0 deletions
117
coffeeAGNTCY/coffee_agents/lungo/tests/unit/common/test_streaming_capability.py
pregnor marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| # Copyright AGNTCY Contributors (https://github.com/agntcy) | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """Unit tests for common.streaming_capability. No app-sdk; asserts on return values and exception type/attributes only. | ||
| require_streaming_capability tests that exercise the check use ENSURE_STREAMING_LLM=true.""" | ||
| import importlib.util | ||
| import sys | ||
| from pathlib import Path | ||
|
|
||
| _root = Path(__file__).resolve().parents[3] | ||
| _cap_path = _root / "common" / "streaming_capability.py" | ||
| _STREAMING_MODULE = "_streaming_capability_under_test" | ||
| _spec = importlib.util.spec_from_file_location(_STREAMING_MODULE, _cap_path) | ||
| _mod = importlib.util.module_from_spec(_spec) | ||
| sys.modules[_STREAMING_MODULE] = _mod | ||
| _spec.loader.exec_module(_mod) | ||
|
|
||
| _TEST_MODEL = "openai/gpt-4o-mini" | ||
| StreamingNotSupportedError = _mod.StreamingNotSupportedError | ||
| get_llm_streaming_capability = _mod.get_llm_streaming_capability | ||
| require_streaming_capability = _mod.require_streaming_capability | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| import litellm | ||
| import pytest | ||
|
|
||
|
|
||
| class TestGetLlmStreamingCapability: | ||
| @pytest.mark.parametrize( | ||
| "model_info,expected_ok", | ||
| [ | ||
| ({"supports_native_streaming": True}, True), | ||
| ({"supports_native_streaming": False}, False), | ||
| ({}, False), | ||
| ({"supports_native_streaming": None}, False), | ||
| ], | ||
| ids=["supports_true", "supports_false", "key_missing", "key_none"], | ||
| ) | ||
| def test_get_llm_streaming_capability_metadata(self, model_info, expected_ok): | ||
| with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value=model_info) as mock_get_model_info: | ||
| ok, err = get_llm_streaming_capability(_TEST_MODEL) | ||
| assert ok == expected_ok | ||
| assert err is None | ||
| mock_get_model_info.assert_called_once_with(model=_TEST_MODEL) | ||
|
|
||
| @pytest.mark.parametrize( | ||
| "side_effect,expected_args,expected_type", | ||
| [ | ||
| (Exception("unknown model"), ("unknown model",), None), | ||
| (litellm.NotFoundError("model not found", _TEST_MODEL, "openai"), None, litellm.NotFoundError), | ||
| ], | ||
| ids=["generic_exception", "litellm_not_found"], | ||
| ) | ||
| def test_get_llm_streaming_capability_on_error(self, side_effect, expected_args, expected_type): | ||
| with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", side_effect=side_effect) as mock_get_model_info: | ||
| ok, err = get_llm_streaming_capability(_TEST_MODEL) | ||
| assert ok is False | ||
| assert err is not None | ||
| if expected_args is not None: | ||
| assert err.args == expected_args | ||
| if expected_type is not None: | ||
| assert isinstance(err, expected_type) | ||
| mock_get_model_info.assert_called_once_with(model=_TEST_MODEL) | ||
|
|
||
|
|
||
| class TestRequireStreamingCapability: | ||
| def test_early_return_when_flag_disabled(self): | ||
| """When ENSURE_STREAMING_LLM is not true, require_streaming_capability returns without raising and does not call get_model_info.""" | ||
| with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", False): | ||
| with patch(f"{_STREAMING_MODULE}.litellm.get_model_info") as mock_get_model_info: | ||
| require_streaming_capability("test_agent", _TEST_MODEL) | ||
| mock_get_model_info.assert_not_called() | ||
|
|
||
| def test_does_not_raise_when_capable(self): | ||
| with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True): | ||
| with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value={"supports_native_streaming": True}) as mock_get_model_info: | ||
| require_streaming_capability("test_agent", _TEST_MODEL) | ||
| mock_get_model_info.assert_called_once_with(model=_TEST_MODEL) | ||
|
|
||
| def test_raises_streaming_not_supported_error_when_not_capable(self): | ||
| with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True): | ||
| with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value={"supports_native_streaming": False}) as mock_get_model_info: | ||
| with pytest.raises(StreamingNotSupportedError) as exc_info: | ||
| require_streaming_capability("test_agent", _TEST_MODEL) | ||
| assert exc_info.value.agent_name == "test_agent" | ||
| assert exc_info.value.model == _TEST_MODEL | ||
| assert len(exc_info.value.message) > 0 | ||
| assert exc_info.value.__cause__ is None | ||
| mock_get_model_info.assert_called_once_with(model=_TEST_MODEL) | ||
|
|
||
| def test_raises_when_get_model_info_raises(self): | ||
| with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True): | ||
| with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", side_effect=Exception("unknown")) as mock_get_model_info: | ||
| with pytest.raises(StreamingNotSupportedError) as exc_info: | ||
| require_streaming_capability("other", _TEST_MODEL) | ||
| assert exc_info.value.__cause__ is not None | ||
| assert "unknown" in str(exc_info.value.__cause__) | ||
| mock_get_model_info.assert_called_once_with(model=_TEST_MODEL) | ||
|
|
||
| def test_optional_agent_name_stored_on_exception(self): | ||
| with patch(f"{_STREAMING_MODULE}.ENSURE_STREAMING_LLM", True): | ||
| with patch(f"{_STREAMING_MODULE}.litellm.get_model_info", return_value={"supports_native_streaming": False}) as mock_get_model_info: | ||
| with pytest.raises(StreamingNotSupportedError) as exc_info: | ||
| require_streaming_capability("", _TEST_MODEL) | ||
| assert exc_info.value.agent_name == "" | ||
pregnor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| mock_get_model_info.assert_called_once_with(model=_TEST_MODEL) | ||
|
|
||
|
|
||
| class TestStreamingNotSupportedError: | ||
| def test_exception_attributes(self): | ||
| err = StreamingNotSupportedError(agent_name="a", model="m", message="msg") | ||
| assert err.agent_name == "a" | ||
| assert err.model == "m" | ||
| assert err.message == "msg" | ||
| assert str(err) == "msg" | ||
|
|
||
69 changes: 69 additions & 0 deletions
69
coffeeAGNTCY/coffee_agents/lungo/tests/unit/common/test_streaming_fail_fast.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| # Copyright AGNTCY Contributors (https://github.com/agntcy) | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """Integration-style: with ENSURE_STREAMING_LLM=true, supervisor startup fails when configured LLM does not support streaming.""" | ||
| import importlib | ||
| import sys | ||
| from pathlib import Path | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
|
|
||
| _root = str(Path(__file__).resolve().parents[3]) | ||
|
|
||
|
|
||
| def _ensure_root_on_path(): | ||
| try: | ||
| sys.path.remove(_root) | ||
| except ValueError: | ||
| pass | ||
| sys.path.insert(0, _root) | ||
|
|
||
|
|
||
| _ensure_root_on_path() | ||
|
|
||
|
|
||
| def _purge_modules(prefixes): | ||
| to_delete = [ | ||
| m | ||
| for m in list(sys.modules) | ||
| if any(m == p or m.startswith(p + ".") for p in prefixes) | ||
| ] | ||
| for m in to_delete: | ||
| sys.modules.pop(m, None) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "import_module,expected_agent_name", | ||
| [ | ||
| ("agents.supervisors.auction.main", "auction_supervisor"), | ||
| ("agents.supervisors.logistics.main", "logistics_supervisor"), | ||
| ("agents.supervisors.recruiter.agent", "recruiter_supervisor"), | ||
| ], | ||
| ids=["auction", "logistics", "recruiter"], | ||
| ) | ||
| def test_supervisor_raises_when_llm_does_not_support_streaming(monkeypatch, import_module, expected_agent_name): | ||
| """With ENSURE_STREAMING_LLM=true, supervisor startup fails when get_model_info says no streaming.""" | ||
| _ensure_root_on_path() | ||
| import config.config as config_mod | ||
| saved_ensure = getattr(config_mod, "ENSURE_STREAMING_LLM", False) | ||
| saved_llm_model = getattr(config_mod, "LLM_MODEL", "") | ||
|
|
||
| try: | ||
| with patch("litellm.get_model_info", return_value={"supports_native_streaming": False}) as mock_get_model_info: | ||
| prefix = import_module.rsplit(".", 1)[0] | ||
| _purge_modules([prefix, "common"]) | ||
| monkeypatch.setattr(config_mod, "ENSURE_STREAMING_LLM", True) | ||
| monkeypatch.setattr(config_mod, "LLM_MODEL", "openai/gpt-4o-mini") | ||
| try: | ||
| importlib.import_module(import_module) | ||
| except Exception as e: | ||
| assert type(e).__name__ == "StreamingNotSupportedError", f"Expected StreamingNotSupportedError, got {type(e)}" | ||
| assert e.agent_name == expected_agent_name | ||
| mock_get_model_info.assert_called_once_with(model="openai/gpt-4o-mini") | ||
| return | ||
| pytest.fail("Expected StreamingNotSupportedError") | ||
| finally: | ||
| monkeypatch.setattr(config_mod, "ENSURE_STREAMING_LLM", saved_ensure) | ||
| monkeypatch.setattr(config_mod, "LLM_MODEL", saved_llm_model) | ||
|
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.