-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Ensure FastMCP handles nested SSE and SHTTP apps properly in ASGI frameworks #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| --- | ||
| title: Integrating FastMCP in ASGI Applications | ||
| sidebarTitle: ASGI Integration | ||
| description: Integrate FastMCP servers into existing Starlette, FastAPI, or other ASGI applications | ||
| icon: plug | ||
| --- | ||
|
|
||
| While FastMCP provides standalone server capabilities, you can also integrate your FastMCP server into existing web applications. This approach is useful for: | ||
|
|
||
| - Adding MCP functionality to an existing website or API | ||
| - Mounting MCP servers under specific URL paths | ||
| - Combining multiple services in a single application | ||
| - Leveraging existing authentication and middleware | ||
|
|
||
| Please note that all FastMCP servers have a `run()` method that can be used to start the server. This guide focuses on integration with broader ASGI frameworks. | ||
|
|
||
| ## ASGI Server | ||
|
|
||
|
|
||
| FastMCP servers can be created as [Starlette](https://www.starlette.io/) ASGI apps for straightforward hosting or integration into existing applications. | ||
|
|
||
| The first step is to obtain a Starlette application instance from your FastMCP server using either the `streamable_http_app()` (preferred) or `sse_app()` (legacy) methods: | ||
|
|
||
| ```python | ||
| from fastmcp import FastMCP | ||
|
|
||
| mcp = FastMCP("MyServer") | ||
|
|
||
| @mcp.tool() | ||
| def hello(name: str) -> str: | ||
| return f"Hello, {name}!" | ||
|
|
||
| # Get a Starlette app instance for the preferred transport | ||
| http_app = mcp.streamable_http_app() # For Streamable HTTP transport | ||
| sse_app = mcp.sse_app() # For SSE transport | ||
| ``` | ||
|
|
||
| Both methods return a Starlette application that can be integrated with other ASGI-compatible web frameworks. | ||
|
|
||
| The MCP server's endpoint is mounted at the root path `/mcp` for Streamable HTTP transport, and `/sse` for SSE transport, though you can change these paths by passing a `path` argument to the `streamable_http_app()` or `sse_app()` methods: | ||
|
|
||
| ```python | ||
| http_app = mcp.streamable_http_app(path="/custom-mcp-path") | ||
| sse_app = mcp.sse_app(path="/custom-sse-path") | ||
| ``` | ||
|
|
||
| ### Running the Server | ||
|
|
||
| To run the FastMCP server, you can use the `uvicorn` ASGI server: | ||
|
|
||
| ```python | ||
| import uvicorn | ||
|
|
||
| # (define the app here) | ||
|
|
||
| if __name__ == "__main__": | ||
| uvicorn.run(http_app, host="0.0.0.0", port=8000) | ||
| ``` | ||
|
|
||
| Or, from the command line: | ||
|
|
||
| ```bash | ||
| uvicorn path.to.your.app:http_app --host 0.0.0.0 --port 8000 | ||
| ``` | ||
|
|
||
|
|
||
|
|
||
| ## Starlette Integration | ||
|
|
||
| You can mount your FastMCP server in another Starlette application using the `Mount` class. | ||
|
|
||
| ```python | ||
| from fastmcp import FastMCP | ||
| from starlette.applications import Starlette | ||
| from starlette.routing import Mount | ||
|
|
||
| # Create your FastMCP server as well as any tools, resources, etc. | ||
| mcp = FastMCP("MyServer") | ||
|
|
||
| # Create the ASGI app | ||
| mcp_app = mcp.streamable_http_app(path='/mcp') | ||
|
|
||
| # Create a Starlette app and mount the MCP server | ||
| app = Starlette( | ||
| routes=[ | ||
| Mount("/mcp-server", app=mcp_app), | ||
| # Add other routes as needed | ||
| ], | ||
| lifespan=mcp_app.router.lifespan_context, | ||
| ) | ||
| ``` | ||
|
|
||
| The MCP endpoint will be available at `/mcp-server/mcp` of the resulting Starlette app. | ||
|
|
||
| <Warning> | ||
| For Streamable HTTP transport, you **must** pass the lifespan context from the FastMCP app to the resulting Starlette app, as nested lifespans are not recognized. Otherwise, the FastMCP server's session manager will not be properly initialized. | ||
| </Warning> | ||
|
|
||
| ### Nested Mounts | ||
|
|
||
|
|
||
| You can create complex routing structures by nesting mounts: | ||
|
|
||
| ```python | ||
| from fastmcp import FastMCP | ||
| from starlette.applications import Starlette | ||
| from starlette.routing import Mount | ||
|
|
||
| # Create your FastMCP server as well as any tools, resources, etc. | ||
| mcp = FastMCP("MyServer") | ||
|
|
||
| # Create the ASGI app | ||
| mcp_app = mcp.streamable_http_app(path='/mcp') | ||
|
|
||
| # Create nested application structure | ||
| inner_app = Starlette(routes=[Mount("/inner", app=mcp_app)]) | ||
| app = Starlette( | ||
| routes=[Mount("/outer", app=inner_app)], | ||
| lifespan=mcp_app.router.lifespan_context, | ||
| ) | ||
| ``` | ||
|
|
||
| In this setup, the MCP server is accessible at the `/outer/inner/mcp` path of the resulting Starlette app. | ||
|
|
||
| <Warning> | ||
| For Streamable HTTP transport, you **must** pass the lifespan context from the FastMCP app to the *outer* Starlette app, as nested lifespans are not recognized. Otherwise, the FastMCP server's session manager will not be properly initialized. | ||
| </Warning> | ||
| ## FastAPI Integration | ||
|
|
||
| FastAPI is built on Starlette, so you can mount your FastMCP server in a similar way: | ||
|
|
||
| ```python | ||
| from fastmcp import FastMCP | ||
| from fastapi import FastAPI | ||
| from starlette.routing import Mount | ||
|
|
||
| # Create your FastMCP server as well as any tools, resources, etc. | ||
| mcp = FastMCP("MyServer") | ||
|
|
||
| # Create the ASGI app | ||
| mcp_app = mcp.streamable_http_app(path='/mcp') | ||
|
|
||
| # Create a FastAPI app and mount the MCP server | ||
| app = FastAPI(lifespan=mcp_app.router.lifespan_context) | ||
| app.mount("/mcp-server", mcp_app) | ||
| ``` | ||
|
|
||
| The MCP endpoint will be available at `/mcp-server/mcp` of the resulting FastAPI app. | ||
|
|
||
| <Warning> | ||
| For Streamable HTTP transport, you **must** pass the lifespan context from the FastMCP app to the resulting FastAPI app, as nested lifespans are not recognized. Otherwise, the FastMCP server's session manager will not be properly initialized. | ||
| </Warning> | ||
|
|
||
|
|
||
| ## Custom Routes | ||
|
|
||
| In addition to adding your FastMCP server to an existing ASGI app, you can also add custom web routes to your FastMCP server, which will be exposed alongside the MCP endpoint. To do so, use the `@custom_route` decorator. Note that this is less flexible than using a full ASGI framework, but can be useful for adding simple endpoints like health checks to your standalone server. | ||
|
|
||
| ```python | ||
| from fastmcp import FastMCP | ||
| from starlette.requests import Request | ||
| from starlette.responses import JSONResponse | ||
|
|
||
| mcp = FastMCP("MyServer") | ||
|
|
||
| @mcp.custom_route("/health", methods=["GET"]) | ||
| async def health_check(request: Request) -> JSONResponse: | ||
| return JSONResponse({"status": "healthy"}) | ||
| ``` | ||
|
|
||
| These routes will be included in the FastMCP app when mounted in your web application. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Patched low-level objects. When possisble, we prefer the official SDK, but we patch bugs here if necessary. | ||
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| import logging | ||
| from contextlib import asynccontextmanager | ||
| from typing import Any | ||
| from urllib.parse import quote | ||
| from uuid import uuid4 | ||
|
|
||
| import anyio | ||
| from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream | ||
| from mcp.server.sse import SseServerTransport as LowLevelSSEServerTransport | ||
| from mcp.shared.message import SessionMessage | ||
| from sse_starlette import EventSourceResponse | ||
| from starlette.types import Receive, Scope, Send | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class SseServerTransport(LowLevelSSEServerTransport): | ||
| """ | ||
| Patched SSE server transport | ||
| """ | ||
|
|
||
| @asynccontextmanager | ||
| async def connect_sse(self, scope: Scope, receive: Receive, send: Send): | ||
| """ | ||
| See https://github.com/modelcontextprotocol/python-sdk/pull/659/ | ||
| """ | ||
| if scope["type"] != "http": | ||
| logger.error("connect_sse received non-HTTP request") | ||
| raise ValueError("connect_sse can only handle HTTP requests") | ||
|
|
||
| logger.debug("Setting up SSE connection") | ||
| read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] | ||
| read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] | ||
|
|
||
| write_stream: MemoryObjectSendStream[SessionMessage] | ||
| write_stream_reader: MemoryObjectReceiveStream[SessionMessage] | ||
|
|
||
| read_stream_writer, read_stream = anyio.create_memory_object_stream(0) | ||
| write_stream, write_stream_reader = anyio.create_memory_object_stream(0) | ||
|
|
||
| session_id = uuid4() | ||
| self._read_stream_writers[session_id] = read_stream_writer | ||
| logger.debug(f"Created new session with ID: {session_id}") | ||
|
|
||
| # Determine the full path for the message endpoint to be sent to the client. | ||
| # scope['root_path'] is the prefix where the current Starlette app | ||
| # instance is mounted. | ||
| # e.g., "" if top-level, or "/api_prefix" if mounted under "/api_prefix". | ||
| root_path = scope.get("root_path", "") | ||
|
|
||
| # self._endpoint is the path *within* this app, e.g., "/messages". | ||
| # Concatenating them gives the full absolute path from the server root. | ||
| # e.g., "" + "/messages" -> "/messages" | ||
| # e.g., "/api_prefix" + "/messages" -> "/api_prefix/messages" | ||
| full_message_path_for_client = root_path.rstrip("/") + self._endpoint | ||
|
|
||
| # This is the URI (path + query) the client will use to POST messages. | ||
| client_post_uri_data = ( | ||
| f"{quote(full_message_path_for_client)}?session_id={session_id.hex}" | ||
| ) | ||
|
|
||
| sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[ | ||
| dict[str, Any] | ||
| ](0) | ||
|
|
||
| async def sse_writer(): | ||
| logger.debug("Starting SSE writer") | ||
| async with sse_stream_writer, write_stream_reader: | ||
| await sse_stream_writer.send( | ||
| {"event": "endpoint", "data": client_post_uri_data} | ||
| ) | ||
| logger.debug(f"Sent endpoint event: {client_post_uri_data}") | ||
|
|
||
| async for session_message in write_stream_reader: | ||
| logger.debug(f"Sending message via SSE: {session_message}") | ||
| await sse_stream_writer.send( | ||
| { | ||
| "event": "message", | ||
| "data": session_message.message.model_dump_json( | ||
| by_alias=True, exclude_none=True | ||
| ), | ||
| } | ||
| ) | ||
|
|
||
| async with anyio.create_task_group() as tg: | ||
|
|
||
| async def response_wrapper(scope: Scope, receive: Receive, send: Send): | ||
| """ | ||
| The EventSourceResponse returning signals a client close / disconnect. | ||
| In this case we close our side of the streams to signal the client that | ||
| the connection has been closed. | ||
| """ | ||
| await EventSourceResponse( | ||
| content=sse_stream_reader, data_sender_callable=sse_writer | ||
| )(scope, receive, send) | ||
| await read_stream_writer.aclose() | ||
| await write_stream_reader.aclose() | ||
| logging.debug(f"Client session disconnected {session_id}") | ||
|
|
||
| logger.debug("Starting SSE response task") | ||
| tg.start_soon(response_wrapper, scope, receive, send) | ||
|
|
||
| logger.debug("Yielding read and write streams") | ||
| yield (read_stream, write_stream) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.