Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ jobs:
echo "task: $(task --version)"
uv run python --version

- name: Configure git auth for private deps
run: git config --global url."https://${{ secrets.GH_USER }}:${{ secrets.GH_PAT }}@github.com/TalonT-Org/".insteadOf "https://github.com/TalonT-Org/"

- name: Install dependencies
run: uv sync --locked --extra dev

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dev = [
"ruff>=0.15.0",
"import-linter>=2.1",
"packaging>=25.0",
"api-simulator",
]

[project.urls]
Expand Down Expand Up @@ -223,3 +224,6 @@ forbidden_modules = [
"autoskillit.server",
"autoskillit.cli",
]

[tool.uv.sources]
api-simulator = { git = "https://github.com/TalonT-Org/api-simulator", branch = "main" }
28 changes: 22 additions & 6 deletions src/autoskillit/execution/quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

_log = get_logger(__name__)

_DEFAULT_BASE_URL: str = "https://api.anthropic.com"


@dataclass
class QuotaStatus:
Expand Down Expand Up @@ -75,12 +77,17 @@ def _write_cache(cache_path: str, status: QuotaStatus) -> None:
_log.warning("quota cache write failed", path=cache_path, error=str(exc))


async def _fetch_quota(credentials_path: str) -> QuotaStatus:
async def _fetch_quota(
credentials_path: str,
*,
base_url: str = _DEFAULT_BASE_URL,
_httpx_timeout: float = 10,
) -> QuotaStatus:
"""Fetch 5-hour utilization from Anthropic quota API."""
token = _read_credentials(credentials_path)
async with httpx.AsyncClient(timeout=10) as client:
async with httpx.AsyncClient(timeout=_httpx_timeout) as client:
resp = await client.get(
"https://api.anthropic.com/api/oauth/usage",
f"{base_url}/api/oauth/usage",
headers={
"Authorization": f"Bearer {token}",
"anthropic-beta": "oauth-2025-04-20",
Expand All @@ -99,7 +106,12 @@ async def _fetch_quota(credentials_path: str) -> QuotaStatus:
)


async def check_and_sleep_if_needed(config: Any) -> dict:
async def check_and_sleep_if_needed(
config: Any,
*,
base_url: str = _DEFAULT_BASE_URL,
_httpx_timeout: float = 10,
) -> dict:
"""Check quota utilization. Returns metadata indicating whether a sleep is needed.

Does NOT sleep. The caller is responsible for sleeping (e.g. via run_cmd).
Expand All @@ -124,7 +136,9 @@ async def check_and_sleep_if_needed(config: Any) -> dict:
try:
status = _read_cache(config.cache_path, config.cache_max_age)
if status is None:
status = await _fetch_quota(config.credentials_path)
status = await _fetch_quota(
config.credentials_path, base_url=base_url, _httpx_timeout=_httpx_timeout
)
_write_cache(config.cache_path, status)

if status.utilization < config.threshold:
Expand All @@ -151,7 +165,9 @@ async def check_and_sleep_if_needed(config: Any) -> dict:
}

# Re-fetch for accurate resets_at before returning sleep metadata
status = await _fetch_quota(config.credentials_path)
status = await _fetch_quota(
config.credentials_path, base_url=base_url, _httpx_timeout=_httpx_timeout
)
_write_cache(config.cache_path, status)

if status.resets_at is None:
Expand Down
8 changes: 4 additions & 4 deletions tests/execution/test_quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async def test_below_threshold_returns_should_sleep_false(self, monkeypatch, tmp
cache_path=str(tmp_path / "cache.json"),
)

async def mock_fetch(path):
async def mock_fetch(path, **kwargs):
return QuotaStatus(utilization=50.0, resets_at=resets_at)

monkeypatch.setattr("autoskillit.execution.quota._fetch_quota", mock_fetch)
Expand Down Expand Up @@ -236,7 +236,7 @@ async def test_network_error_returns_error_dict(self, monkeypatch, tmp_path):
json.dumps({"claudeAiOauth": {"accessToken": "tok", "expiresAt": expires_ms}})
)

async def mock_fetch(path):
async def mock_fetch(path, **kwargs):
raise OSError("network down")

monkeypatch.setattr("autoskillit.execution.quota._fetch_quota", mock_fetch)
Expand Down Expand Up @@ -301,7 +301,7 @@ async def test_above_threshold_resets_at_none_first_fetch_blocks(self, monkeypat
cache_path=str(tmp_path / "cache.json"),
)

async def mock_fetch(path):
async def mock_fetch(path, **kwargs):
return QuotaStatus(utilization=90.0, resets_at=None)

monkeypatch.setattr("autoskillit.execution.quota._fetch_quota", mock_fetch)
Expand Down Expand Up @@ -377,7 +377,7 @@ async def test_fallback_sleep_uses_at_least_buffer_seconds(self, monkeypatch, tm
cache_path=str(tmp_path / "cache.json"),
)

async def mock_fetch(path):
async def mock_fetch(path, **kwargs):
return QuotaStatus(utilization=90.0, resets_at=None)

monkeypatch.setattr("autoskillit.execution.quota._fetch_quota", mock_fetch)
Expand Down
161 changes: 161 additions & 0 deletions tests/execution/test_quota_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""End-to-end HTTP tests for quota guard using api-simulator mock_http_server.

These tests exercise the real httpx client path — no monkeypatching of _fetch_quota.
They complement the unit tests in test_quota.py which mock at the function level.
"""

from __future__ import annotations

import json
import time
from datetime import UTC, datetime, timedelta
from types import SimpleNamespace

import pytest

from autoskillit.execution.quota import check_and_sleep_if_needed

pytestmark = pytest.mark.anyio

QUOTA_ENDPOINT = "/api/oauth/usage"


@pytest.fixture()
def credentials(tmp_path):
"""Write a valid .credentials.json and return its path as a string."""
creds_file = tmp_path / ".credentials.json"
creds_file.write_text(
json.dumps(
{
"claudeAiOauth": {
"accessToken": "test-token-abc123",
"expiresAt": (time.time() + 3600) * 1000,
}
}
)
)
return str(creds_file)


@pytest.fixture()
def quota_config(credentials, tmp_path):
"""Minimal config namespace for check_and_sleep_if_needed."""
return SimpleNamespace(
enabled=True,
credentials_path=credentials,
cache_path=str(tmp_path / "quota_cache.json"),
cache_max_age=120,
threshold=80,
buffer_seconds=60,
)


@pytest.fixture(autouse=True)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[warning] tests: _reset_mock is autouse=True and calls mock_http_server.reset() per-test. If mock_http_server is session-scoped (common for server fixtures), route registrations from parallel xdist workers sharing the same server can cross-contaminate. Verify that mock_http_server is function-scoped before relying on this reset pattern.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigated — this is intentional. pytest-xdist with -n 4 creates a separate OS process per worker; session-scoped fixtures are initialized once per worker process, not globally. Each worker gets its own mock_http_server instance on a different port (confirmed via api_simulator/plugin.py). Tests within a worker run sequentially, so the autouse _reset_mock cleanly isolates state. No cross-worker contamination is possible in this architecture.

def _reset_mock(mock_http_server):
"""Reset mock_http_server before each test to clear routes and recordings."""
mock_http_server.reset()


async def test_normal_utilization_returns_status_and_sends_correct_headers(
mock_http_server, quota_config
):
mock_http_server.register(
"GET",
QUOTA_ENDPOINT,
json={
"five_hour": {
"utilization": 50.0,
"resets_at": "2026-04-05T00:00:00+00:00",
}
},
)

result = await check_and_sleep_if_needed(quota_config, base_url=mock_http_server.url)

assert result["should_sleep"] is False
assert result["utilization"] == 50.0
assert result["resets_at"] == "2026-04-05T00:00:00+00:00"

requests = mock_http_server.get_requests("GET", QUOTA_ENDPOINT)
assert len(requests) == 1
assert requests[0].headers["authorization"] == "Bearer test-token-abc123"
assert requests[0].headers["anthropic-beta"] == "oauth-2025-04-20"


async def test_above_threshold_triggers_double_fetch(mock_http_server, quota_config):
resets_at = (datetime.now(UTC) + timedelta(hours=2)).isoformat()
mock_http_server.register_sequence(
"GET",
QUOTA_ENDPOINT,
responses=[
{"json": {"five_hour": {"utilization": 95.0, "resets_at": resets_at}}},
{"json": {"five_hour": {"utilization": 95.0, "resets_at": resets_at}}},
],
)

result = await check_and_sleep_if_needed(quota_config, base_url=mock_http_server.url)

assert result["should_sleep"] is True
assert result["sleep_seconds"] > 0
assert mock_http_server.request_count("GET", QUOTA_ENDPOINT) == 2


async def test_resets_at_null_blocks_with_fallback(mock_http_server, quota_config):
mock_http_server.register(
"GET",
QUOTA_ENDPOINT,
json={"five_hour": {"utilization": 95.0, "resets_at": None}},
)

result = await check_and_sleep_if_needed(quota_config, base_url=mock_http_server.url)

assert result["should_sleep"] is True
assert result["sleep_seconds"] >= 60
assert result["reason"] == "unknown_reset"
assert mock_http_server.request_count("GET", QUOTA_ENDPOINT) == 1


async def test_http_429_fails_open(mock_http_server, quota_config):
mock_http_server.register("GET", QUOTA_ENDPOINT, status=429)

result = await check_and_sleep_if_needed(quota_config, base_url=mock_http_server.url)

assert result["should_sleep"] is False
assert "error" in result


async def test_http_503_fails_open(mock_http_server, quota_config):
mock_http_server.register("GET", QUOTA_ENDPOINT, status=503)

result = await check_and_sleep_if_needed(quota_config, base_url=mock_http_server.url)

assert result["should_sleep"] is False
assert "error" in result


async def test_network_timeout_fails_open(mock_http_server, quota_config):
mock_http_server.register("GET", QUOTA_ENDPOINT, json={}, delay_seconds=0.5)

result = await check_and_sleep_if_needed(
quota_config, base_url=mock_http_server.url, _httpx_timeout=0.1
)

assert result["should_sleep"] is False
assert "error" in result


async def test_z_suffix_resets_at_parsed_correctly(mock_http_server, quota_config):
mock_http_server.register(
"GET",
QUOTA_ENDPOINT,
json={
"five_hour": {
"utilization": 50.0,
"resets_at": "2026-04-05T00:00:00Z",
}
},
)

result = await check_and_sleep_if_needed(quota_config, base_url=mock_http_server.url)

assert result["resets_at"] == "2026-04-05T00:00:00+00:00"
7 changes: 7 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading