Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/fastmcp/server/auth/oauth_proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,7 @@ async def load_access_token(self, token: str) -> AccessToken | None: # type: ig
# 1. Verify FastMCP JWT signature and claims
payload = self.jwt_issuer.verify_token(token)
jti = payload["jti"]
upstream_claims = payload.get("upstream_claims")

# 2. Look up upstream token via JTI mapping
jti_mapping = await self._jti_mapping_store.get(key=jti)
Expand Down Expand Up @@ -1693,6 +1694,17 @@ async def load_access_token(self, token: str) -> AccessToken | None: # type: ig
}
)

# Propagate upstream claims from the verified FastMCP JWT into the
# final AccessToken object. This allows subclasses to access custom
# identity data extracted during the initial authorization flow.
# We perform a model copy to avoid mutating a potentially cached
# reference shared across concurrent requests.
if validated and upstream_claims:
validated = validated.model_copy(deep=True)
if validated.claims is None:
validated.claims = {}
validated.claims["upstream_claims"] = upstream_claims
Comment on lines +1702 to +1706
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Copy claims before injecting upstream_claims

load_access_token mutates validated.claims in place, but some verifiers (notably IntrospectionTokenVerifier) return cached AccessToken instances by reference (src/fastmcp/server/auth/providers/introspection.py:197-200,292). In that setup, this write can leak/overwrite claim state across requests sharing the same upstream token object; e.g., a request with upstream_claims can persist data that is then returned for a later token where upstream_claims is absent (the branch is skipped, so stale data remains). Copying the token/claims before mutation avoids cross-request contamination.

Useful? React with 👍 / 👎.

Comment on lines +1697 to +1706
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_access_token mutates validated.claims in-place when injecting upstream_claims. If the configured _token_validator caches and reuses AccessToken instances (e.g., IntrospectionTokenVerifier caches results), this can leak/retain upstream_claims across requests/tokens. Prefer returning a copied AccessToken with merged claims (e.g., via model_copy/new dict) to avoid mutating shared/cached objects and to ensure the claims dict isn’t shared (shallow copies can still share nested dicts).

Copilot uses AI. Check for mistakes.

logger.debug(
"Token swap successful for JTI=%s (upstream validated)", jti[:8]
)
Expand Down
82 changes: 78 additions & 4 deletions tests/server/auth/oauth_proxy/test_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
from key_value.aio.stores.memory import MemoryStore
from mcp.server.auth.handlers.token import TokenErrorResponse
from mcp.server.auth.handlers.token import TokenHandler as SDKTokenHandler
from mcp.server.auth.provider import AccessToken, AuthorizationCode
from mcp.server.auth.provider import AuthorizationCode
from mcp.shared.auth import OAuthClientInformationFull
from pydantic import AnyUrl

from fastmcp.server.auth.auth import RefreshToken, TokenHandler, TokenVerifier
from fastmcp.server.auth.auth import (
AccessToken,
RefreshToken,
TokenHandler,
TokenVerifier,
)
from fastmcp.server.auth.oauth_proxy import OAuthProxy
from fastmcp.server.auth.oauth_proxy.models import (
DEFAULT_ACCESS_TOKEN_EXPIRY_NO_REFRESH_SECONDS,
Expand Down Expand Up @@ -581,14 +586,22 @@ def mock_verifier(self):
verifier = Mock(spec=TokenVerifier)
verifier.required_scopes = ["read"]

# Cache tokens to test mutation of returned objects
cache: dict[str, AccessToken] = {}

async def verify(token: str) -> AccessToken | None:
if token.startswith("refreshed-"):
return AccessToken(
if token in cache:
return cache[token]

if token.startswith("refreshed-") or token.startswith("valid-"):
t = AccessToken(
token=token,
client_id="test-client",
scopes=["read"],
expires_at=int(time.time() + 3600),
)
cache[token] = t
return t
return None

verifier.verify_token = AsyncMock(side_effect=verify)
Expand Down Expand Up @@ -660,6 +673,44 @@ async def _setup_expired_session(
)
return fastmcp_jwt

async def _setup_session_with_claims(self, proxy, *, upstream_claims=None):
"""Set up a proxy JWT pointing at a valid upstream token, with optional upstream_claims."""
upstream_token_id = "upstream-tok-id"
access_jti = "test-claims-jti"

upstream_token_set = UpstreamTokenSet(
upstream_token_id=upstream_token_id,
access_token="valid-upstream-access",
refresh_token=None,
refresh_token_expires_at=None,
expires_at=time.time() + 3600,
token_type="Bearer",
scope="read",
client_id="test-client",
created_at=time.time(),
)
await proxy._upstream_token_store.put(
key=upstream_token_id,
value=upstream_token_set,
ttl=3600,
)
await proxy._jti_mapping_store.put(
key=access_jti,
value=JTIMapping(
jti=access_jti,
upstream_token_id=upstream_token_id,
created_at=time.time(),
),
ttl=3600,
)
return proxy.jwt_issuer.issue_access_token(
client_id="test-client",
scopes=["read"],
jti=access_jti,
expires_in=3600,
upstream_claims=upstream_claims,
)

async def test_transparent_refresh_on_expired_upstream(self, proxy):
"""load_access_token refreshes upstream token when validation fails."""
fastmcp_jwt = await self._setup_expired_session(proxy)
Expand Down Expand Up @@ -855,3 +906,26 @@ async def mock_get(key: str) -> UpstreamTokenSet | None:

assert result is not None
assert result.token == "refreshed-upstream-access"

async def test_upstream_claims_propagated(self, proxy):
jwt = await self._setup_session_with_claims(
proxy, upstream_claims={"sub": "user-123"}
)
result = await proxy.load_access_token(jwt)
assert result is not None
assert result.claims["upstream_claims"] == {"sub": "user-123"}

async def test_upstream_claims_not_mutated_on_cached_token(
self, proxy, mock_verifier
):
jwt = await self._setup_session_with_claims(
proxy, upstream_claims={"sub": "user-123"}
)
result = await proxy.load_access_token(jwt)
assert result is not None
assert result.claims["upstream_claims"] == {"sub": "user-123"}
# Original verifier result must not be mutated
for call in list(mock_verifier.verify_token.call_args_list):
returned = await mock_verifier.verify_token(call.args[0])
if returned:
assert "upstream_claims" not in returned.claims