-
-
Notifications
You must be signed in to change notification settings - Fork 25
Description
Problem
When using the Streamable HTTP transport with SSE responses, there's a race condition that can cause tool responses to be lost, leaving clients hanging indefinitely.
Scenario
- Client POSTs a
tools/callrequest withAccept: text/event-stream, application/json - Server processes the request successfully (e.g., creates a todo item)
- Server checks if SSE handler exists for the session → returns true (pid is in the map)
- Server calls
send(handler_pid, {:sse_message, response})(async, always "succeeds") - Server returns
202 Acceptedto the client - SSE handler process tries to
Plug.Conn.chunk()the message - Connection is already closed →
{:error, :closed} - Error logged as
sse_send_failed - Client received 202 but never receives the actual tool response via SSE
- Client hangs waiting for a response that will never arrive
Root Cause
The issue is in lib/anubis/server/transport/streamable_http/plug.ex at route_sse_response/3:
defp route_sse_response(conn, response, params) do
%{transport: transport, session_id: session_id} = params
if handler_pid = StreamableHTTP.get_sse_handler(transport, session_id) do
send(handler_pid, {:sse_message, response}) # ← async, fire-and-forget
conn
|> put_resp_content_type("application/json")
|> send_resp(202, "{}") # ← 202 sent regardless of whether SSE delivery succeeds
else
establish_sse_for_request(conn, params)
end
endThe problem is that send/2 in Elixir is asynchronous and never fails—it just puts the message in the process mailbox. The actual failure happens later when Plug.Conn.chunk/2 is called in the SSE streaming loop, but by then the HTTP response (202) has already been sent.
Observed Behavior
[info] Sent 202 in 70ms
[error] MCP transport event: sse_send_failed
[error] MCP transport details: %{reason: :closed, session_id: "session_GIqPnCT2pNNEWUo-92g="}
The tool action completes server-side (e.g., todo is created), but the client never learns about it.
Suggested Solutions
Option 1: Synchronous SSE confirmation (recommended)
Change route_sse_response to use a synchronous call to confirm the message was actually sent:
defp route_sse_response(conn, response, params) do
%{transport: transport, session_id: session_id} = params
case StreamableHTTP.try_send_sse(transport, session_id, response) do
:ok ->
# SSE send confirmed successful
conn
|> put_resp_content_type("application/json")
|> send_resp(202, "{}")
{:error, :connection_closed} ->
# SSE connection dead, fall back to inline response
conn
|> put_resp_content_type("application/json")
|> maybe_add_session_header(params.session_header, params.session_id)
|> send_resp(200, response)
{:error, :no_sse_handler} ->
establish_sse_for_request(conn, params)
end
endThis would require the SSE handler to confirm delivery before the plug returns 202.
Option 2: Configuration to disable SSE responses
Add a transport option to always return responses inline:
# In config
config :anubis_mcp, :streamable_http,
inline_responses: true # Always return responses in HTTP body, never via SSEOption 3: Lower the log level for expected disconnections
At minimum, the sse_send_failed with reason: :closed should probably be :warning or :debug level rather than :error, since client disconnections are a normal operational condition:
# In lib/anubis/sse/streaming.ex
{:error, :closed} ->
Logging.transport_event(
"sse_send_failed",
%{session_id: session_id, reason: :closed},
level: :debug # or :warning, not :error
)
{:error, reason} ->
Logging.transport_event(
"sse_send_failed",
%{session_id: session_id, reason: reason},
level: :error
)Workaround
We've implemented a workaround by adding a plug that strips text/event-stream from the Accept header, forcing all responses to be inline:
defmodule MyApp.Plugs.MCPForceInline do
import Plug.Conn
def init(opts), do: opts
def call(conn, _opts) do
case get_req_header(conn, "accept") do
[accept_header] ->
new_header =
accept_header
|> String.replace("text/event-stream", "")
|> String.replace(~r/,\s*,/, ",")
|> String.replace(~r/^,\s*/, "")
|> String.replace(~r/,\s*$/, "")
|> String.trim()
new_header = if new_header == "", do: "application/json", else: new_header
put_req_header(conn, "accept", new_header)
_ ->
conn
end
end
endEnvironment
- Anubis MCP version: 0.17.0
- Elixir: 1.18.4
- Client: Claude Code (Anthropic's CLI)