Skip to content

Commit 89c5405

Browse files
authored
Fix memory leak by preventing stale SSE unregister dropping active heandlers (#109)
## Problem After adding Anubis 0.17.1 I noticed that memory usage of my Phoenix application slowly climbed over the hours until it ran out of memory and died. It's a pre-production app with little usage and when that happened there were just logs to connect to the MCP server. ## Solution After adding some tracking, GPT-5.3-Codex diagnosed this as follows: - In streamable HTTP transport, SSE handlers are keyed by session_id. - Reconnects on the same `session_id` can overwrite map entries, and old stream processes can become orphaned (no longer in `sse_handlers`). - Orphaned stream loops sit in receive forever. - `expiry_timers` map in Anubis server is not cleaned on `:session_expired` path. With this fix: - adding pid-aware `unregister_sse_handler/3` so only the expected handler can remove itself - replacing existing handlers safely on register (demonitor old ref, close old handler, monitor/store the new handler) - updating plug on-close and stale-handler cleanup paths to pass the handler pid ## Rationale I've tested this and deployed it to my app, and memory usage has been stable the last few hours. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **Bug Fixes** * Enhanced Server-Sent Events (SSE) handler registration logic to properly manage concurrent handler instances for the same session, preventing race conditions and ensuring consistent handler state. * **Tests** * Added test case to verify SSE handler lifecycle behavior when multiple handlers are registered for the same session. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 7df9c0d commit 89c5405

3 files changed

Lines changed: 72 additions & 8 deletions

File tree

lib/anubis/server/transport/streamable_http.ex

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,9 @@ defmodule Anubis.Server.Transport.StreamableHTTP do
192192
@doc """
193193
Unregisters the SSE handler for a session. Called when the SSE connection closes.
194194
"""
195-
@spec unregister_sse_handler(GenServer.server(), String.t()) :: :ok
196-
def unregister_sse_handler(transport, session_id) do
197-
GenServer.cast(transport, {:unregister_sse_handler, session_id})
195+
@spec unregister_sse_handler(GenServer.server(), String.t(), pid() | nil) :: :ok
196+
def unregister_sse_handler(transport, session_id, expected_pid \\ nil) do
197+
GenServer.cast(transport, {:unregister_sse_handler, session_id, expected_pid})
198198
end
199199

200200
@doc """
@@ -251,9 +251,23 @@ defmodule Anubis.Server.Transport.StreamableHTTP do
251251

252252
@impl GenServer
253253
def handle_call({:register_sse_handler, session_id, pid}, _from, state) do
254-
ref = Process.monitor(pid)
254+
sse_handlers =
255+
case Map.get(state.sse_handlers, session_id) do
256+
{^pid, old_ref} ->
257+
Process.demonitor(old_ref, [:flush])
258+
state.sse_handlers
259+
260+
{old_pid, old_ref} ->
261+
Process.demonitor(old_ref, [:flush])
262+
send(old_pid, :close_sse)
263+
state.sse_handlers
264+
265+
nil ->
266+
state.sse_handlers
267+
end
255268

256-
sse_handlers = Map.put(state.sse_handlers, session_id, {pid, ref})
269+
ref = Process.monitor(pid)
270+
sse_handlers = Map.put(sse_handlers, session_id, {pid, ref})
257271

258272
Logging.transport_event("sse_handler_registered", %{
259273
session_id: session_id,
@@ -299,8 +313,16 @@ defmodule Anubis.Server.Transport.StreamableHTTP do
299313

300314
@impl GenServer
301315
def handle_cast({:unregister_sse_handler, session_id}, state) do
316+
handle_cast({:unregister_sse_handler, session_id, nil}, state)
317+
end
318+
319+
@impl GenServer
320+
def handle_cast({:unregister_sse_handler, session_id, expected_pid}, state) do
302321
sse_handlers =
303322
case Map.get(state.sse_handlers, session_id) do
323+
{pid, _ref} when is_pid(expected_pid) and pid != expected_pid ->
324+
state.sse_handlers
325+
304326
{_pid, ref} ->
305327
Process.demonitor(ref, [:flush])
306328
Map.delete(state.sse_handlers, session_id)

lib/anubis/server/transport/streamable_http/plug.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ if Code.ensure_loaded?(Plug) do
275275
|> send_resp(202, "{}")
276276

277277
handler_pid ->
278-
StreamableHTTP.unregister_sse_handler(transport, session_id)
278+
StreamableHTTP.unregister_sse_handler(transport, session_id, handler_pid)
279279
establish_sse_for_request(conn, response, session_id, opts)
280280

281281
true ->
@@ -288,6 +288,7 @@ if Code.ensure_loaded?(Plug) do
288288

289289
case StreamableHTTP.register_sse_handler(transport, session_id) do
290290
:ok ->
291+
handler_pid = self()
291292
self_pid = self()
292293
Task.start(fn -> send(self_pid, {:sse_message, response}) end)
293294

@@ -296,7 +297,7 @@ if Code.ensure_loaded?(Plug) do
296297
|> Streaming.prepare_connection()
297298
|> Streaming.start(transport, session_id,
298299
on_close: fn ->
299-
StreamableHTTP.unregister_sse_handler(transport, session_id)
300+
StreamableHTTP.unregister_sse_handler(transport, session_id, handler_pid)
300301
end
301302
)
302303

@@ -525,13 +526,14 @@ if Code.ensure_loaded?(Plug) do
525526

526527
defp start_sse_streaming(conn, params) do
527528
%{transport: transport, session_id: session_id, session_header: session_header} = params
529+
handler_pid = self()
528530

529531
conn
530532
|> put_resp_header(session_header, session_id)
531533
|> Streaming.prepare_connection()
532534
|> Streaming.start(transport, session_id,
533535
on_close: fn ->
534-
StreamableHTTP.unregister_sse_handler(transport, session_id)
536+
StreamableHTTP.unregister_sse_handler(transport, session_id, handler_pid)
535537
end
536538
)
537539
end

test/anubis/server/transport/streamable_http_test.exs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,46 @@ defmodule Anubis.Server.Transport.StreamableHTTPTest do
4747
refute StreamableHTTP.get_sse_handler(transport, session_id)
4848
end
4949

50+
test "stale unregister cannot remove a newer handler", %{transport: transport} do
51+
session_id = "test-session-race"
52+
test_pid = self()
53+
54+
old_handler =
55+
spawn(fn ->
56+
:ok = StreamableHTTP.register_sse_handler(transport, session_id)
57+
send(test_pid, {:registered, self()})
58+
59+
receive do
60+
:stop -> :ok
61+
end
62+
end)
63+
64+
assert_receive {:registered, ^old_handler}
65+
66+
new_handler =
67+
spawn(fn ->
68+
:ok = StreamableHTTP.register_sse_handler(transport, session_id)
69+
send(test_pid, {:registered, self()})
70+
71+
receive do
72+
:stop -> :ok
73+
end
74+
end)
75+
76+
assert_receive {:registered, ^new_handler}
77+
assert ^new_handler = StreamableHTTP.get_sse_handler(transport, session_id)
78+
79+
# Simulate delayed close from old SSE connection.
80+
assert :ok = StreamableHTTP.unregister_sse_handler(transport, session_id, old_handler)
81+
assert ^new_handler = StreamableHTTP.get_sse_handler(transport, session_id)
82+
83+
assert :ok = StreamableHTTP.unregister_sse_handler(transport, session_id, new_handler)
84+
refute StreamableHTTP.get_sse_handler(transport, session_id)
85+
86+
send(old_handler, :stop)
87+
send(new_handler, :stop)
88+
end
89+
5090
test "routes messages to sessions", %{transport: transport} do
5191
session_id = "test-session-789"
5292

0 commit comments

Comments
 (0)