Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/fastmcp/server/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@
class StreamableHTTPASGIApp:
"""ASGI application wrapper for Streamable HTTP server transport."""

def __init__(self, session_manager):
def __init__(self, session_manager: StreamableHTTPSessionManager | None):
self.session_manager = session_manager

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
try:
if self.session_manager is None:
raise RuntimeError(
"Task group is not initialized. Make sure to use run()."
)
await self.session_manager.handle_request(scope, receive, send)
except RuntimeError as e:
if str(e) == "Task group is not initialized. Make sure to use run().":
Expand Down Expand Up @@ -296,17 +300,8 @@ def create_streamable_http_app(
server_routes: list[BaseRoute] = []
server_middleware: list[Middleware] = []

# Create session manager using the provided event store
session_manager = StreamableHTTPSessionManager(
app=server._mcp_server,
event_store=event_store,
retry_interval=retry_interval,
json_response=json_response,
stateless=stateless_http,
)

# Create the ASGI app wrapper
streamable_http_app = StreamableHTTPASGIApp(session_manager)
# Create the ASGI app wrapper (session manager is set each lifespan cycle)
streamable_http_app = StreamableHTTPASGIApp(None)

# Add StreamableHTTP routes with or without auth
if auth:
Expand Down Expand Up @@ -364,7 +359,17 @@ def create_streamable_http_app(
# Create a lifespan manager to start and stop the session manager
@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncGenerator[None, None]:
async with server._lifespan_manager(), session_manager.run():
streamable_http_app.session_manager = StreamableHTTPSessionManager(
app=server._mcp_server,
event_store=event_store,
retry_interval=retry_interval,
json_response=json_response,
stateless=stateless_http,
)
async with (
server._lifespan_manager(),
streamable_http_app.session_manager.run(),
):
yield

# Create and return the app with lifespan
Expand Down
57 changes: 57 additions & 0 deletions tests/server/test_fastapi_testclient_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.testclient import TestClient

from fastmcp import FastMCP


def test_fastapi_testclient_multiple_runs():
"""Test that TestClient can be used multiple times with a mounted FastMCP app.

This verifies that the StreamableHTTPSessionManager is correctly recreated
for each lifespan cycle.
"""
mcp = FastMCP("test")
mcp_app = mcp.http_app(path="/mcp")

@mcp.tool
def add(a: int, b: int) -> int:
return a + b

@asynccontextmanager
async def combined_lifespan(app: FastAPI):
# Trigger the sub-app's lifespan
async with mcp_app.router.lifespan_context(mcp_app):
yield

app = FastAPI(lifespan=combined_lifespan)
app.mount("/analytics", mcp_app)

# First test run
with TestClient(app) as client:
# We use analytics prefix since it's mounted there
client.get("/analytics/mcp") # Would raise RuntimeError before fix

# Second test run - this would fail before the fix
with TestClient(app) as client:
client.get("/analytics/mcp")


def test_fastapi_testclient_nested_lifespan():
"""Test that TestClient works with custom combined lifespans and multiple iterations."""
mcp = FastMCP("test")
mcp_app = mcp.http_app(path="/mcp")

@asynccontextmanager
async def combined_lifespan(app: FastAPI):
async with mcp_app.router.lifespan_context(mcp_app):
yield

app = FastAPI(lifespan=combined_lifespan)
app.mount("/analytics", mcp_app)

# Multiple runs with custom lifespan
for _ in range(3):
with TestClient(app) as client:
client.get("/analytics/mcp")