-
Notifications
You must be signed in to change notification settings - Fork 23
MCP distributed sessions #5105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
MCP distributed sessions #5105
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -91,24 +91,54 @@ defmodule Sanbase.MCP.StreamableHTTPPlug do | |||||||||||||||||||||||||||
| @moduledoc "Wrapper plug to expose Sanbase.MCP.Server via forward" | ||||||||||||||||||||||||||||
| @behaviour Plug | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import Plug.Conn, only: [get_req_header: 2, put_req_header: 3] | ||||||||||||||||||||||||||||
| import Plug.Conn, | ||||||||||||||||||||||||||||
| only: [get_req_header: 2, put_req_header: 3, put_resp_content_type: 2, send_resp: 3] | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @server Sanbase.MCP.Server | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @impl Plug | ||||||||||||||||||||||||||||
| def init(opts), do: opts | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @impl Plug | ||||||||||||||||||||||||||||
| def call(%Plug.Conn{method: "DELETE"} = conn, _opts) do | ||||||||||||||||||||||||||||
| # Handle DELETE ourselves to support cross-pod session termination. | ||||||||||||||||||||||||||||
| # Anubis uses DynamicSupervisor.terminate_child which only works on the | ||||||||||||||||||||||||||||
| # local node. We stop the session process directly, which works across | ||||||||||||||||||||||||||||
| # distributed Erlang nodes. | ||||||||||||||||||||||||||||
| handle_delete(conn) | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def call(conn, _opts) do | ||||||||||||||||||||||||||||
| conn = normalize_post_accept_header(conn) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Anubis.Server.Transport.StreamableHTTP.Plug.call( | ||||||||||||||||||||||||||||
| conn, | ||||||||||||||||||||||||||||
| Anubis.Server.Transport.StreamableHTTP.Plug.init( | ||||||||||||||||||||||||||||
| server: Sanbase.MCP.Server, | ||||||||||||||||||||||||||||
| server: @server, | ||||||||||||||||||||||||||||
| request_timeout: 150_000 | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| defp handle_delete(conn) do | ||||||||||||||||||||||||||||
| registry_name = Anubis.Server.Registry.registry_name(@server) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| case get_req_header(conn, "mcp-session-id") do | ||||||||||||||||||||||||||||
| [session_id] when session_id != "" -> | ||||||||||||||||||||||||||||
| case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do | ||||||||||||||||||||||||||||
| {:ok, pid} -> GenServer.stop(pid, :normal) | ||||||||||||||||||||||||||||
| _ -> :ok | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
Comment on lines
+128
to
+131
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the session PID is on another node, 🛡️ Proposed defensive handling [session_id] when session_id != "" ->
case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do
- {:ok, pid} -> GenServer.stop(pid, :normal)
+ {:ok, pid} ->
+ try do
+ GenServer.stop(pid, :normal, 5_000)
+ catch
+ :exit, _ -> :ok
+ end
_ -> :ok
end📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| conn |> put_resp_content_type("application/json") |> send_resp(200, "{}") | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| _ -> | ||||||||||||||||||||||||||||
| conn | ||||||||||||||||||||||||||||
| |> put_resp_content_type("application/json") | ||||||||||||||||||||||||||||
| |> send_resp(400, ~s({"error":"Session ID required"})) | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
Comment on lines
+123
to
+140
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing After calling 🐛 Proposed fix+ import Plug.Conn,
+ only: [get_req_header: 2, put_req_header: 3, put_resp_content_type: 2, send_resp: 3, halt: 1]
defp handle_delete(conn) do
registry_name = Anubis.Server.Registry.registry_name(`@server`)
case get_req_header(conn, "mcp-session-id") do
[session_id] when session_id != "" ->
case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do
{:ok, pid} -> GenServer.stop(pid, :normal)
_ -> :ok
end
- conn |> put_resp_content_type("application/json") |> send_resp(200, "{}")
+ conn |> put_resp_content_type("application/json") |> send_resp(200, "{}") |> halt()
_ ->
conn
|> put_resp_content_type("application/json")
|> send_resp(400, ~s({"error":"Session ID required"}))
+ |> halt()
end
end🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # When both JSON and SSE are advertised on POST, Anubis can choose SSE | ||||||||||||||||||||||||||||
| # response mode and keep the response stream open, causing client timeouts. | ||||||||||||||||||||||||||||
| # Force JSON responses for POST requests; SSE remains available via GET. | ||||||||||||||||||||||||||||
|
|
@@ -134,24 +164,50 @@ defmodule Sanbase.MCP.StreamableHTTPDevPlug do | |||||||||||||||||||||||||||
| @moduledoc "Wrapper plug to expose Sanbase.MCP.DevServer via forward" | ||||||||||||||||||||||||||||
| @behaviour Plug | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import Plug.Conn, only: [get_req_header: 2, put_req_header: 3] | ||||||||||||||||||||||||||||
| import Plug.Conn, | ||||||||||||||||||||||||||||
| only: [get_req_header: 2, put_req_header: 3, put_resp_content_type: 2, send_resp: 3] | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @server Sanbase.MCP.DevServer | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @impl Plug | ||||||||||||||||||||||||||||
| def init(opts), do: opts | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @impl Plug | ||||||||||||||||||||||||||||
| def call(%Plug.Conn{method: "DELETE"} = conn, _opts) do | ||||||||||||||||||||||||||||
| handle_delete(conn) | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def call(conn, _opts) do | ||||||||||||||||||||||||||||
| conn = normalize_post_accept_header(conn) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Anubis.Server.Transport.StreamableHTTP.Plug.call( | ||||||||||||||||||||||||||||
| conn, | ||||||||||||||||||||||||||||
| Anubis.Server.Transport.StreamableHTTP.Plug.init( | ||||||||||||||||||||||||||||
| server: Sanbase.MCP.DevServer, | ||||||||||||||||||||||||||||
| server: @server, | ||||||||||||||||||||||||||||
| request_timeout: 150_000 | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| defp handle_delete(conn) do | ||||||||||||||||||||||||||||
| registry_name = Anubis.Server.Registry.registry_name(@server) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| case get_req_header(conn, "mcp-session-id") do | ||||||||||||||||||||||||||||
| [session_id] when session_id != "" -> | ||||||||||||||||||||||||||||
| case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do | ||||||||||||||||||||||||||||
| {:ok, pid} -> GenServer.stop(pid, :normal) | ||||||||||||||||||||||||||||
| _ -> :ok | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| conn |> put_resp_content_type("application/json") |> send_resp(200, "{}") | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| _ -> | ||||||||||||||||||||||||||||
| conn | ||||||||||||||||||||||||||||
| |> put_resp_content_type("application/json") | ||||||||||||||||||||||||||||
| |> send_resp(400, ~s({"error":"Session ID required"})) | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||
|
Comment on lines
+192
to
+209
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issues apply: missing The 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| defp normalize_post_accept_header(%Plug.Conn{method: "POST"} = conn) do | ||||||||||||||||||||||||||||
| case get_req_header(conn, "accept") do | ||||||||||||||||||||||||||||
| [accept | _] -> | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| defmodule Sanbase.MCP.Registry.Pg do | ||
| @moduledoc """ | ||
| Distributed MCP session registry backed by Erlang's `:pg` process groups. | ||
|
|
||
| Uses `:pg` to register session PIDs in named groups that are automatically | ||
| synchronized across all connected BEAM nodes (via libcluster). Any pod can | ||
| look up a session PID regardless of which pod owns it, and the caller can | ||
| then make a remote `GenServer.call` to that PID through distributed Erlang. | ||
|
|
||
| The `:pg` scope (`:sanbase_mcp_sessions`) must be started in the application | ||
| supervision tree before any MCP servers — see `Sanbase.Application.Mcp`. | ||
|
|
||
| Automatic cleanup: `:pg` removes a PID from all its groups when the process | ||
| exits, so no explicit cleanup is needed on session timeout or pod shutdown. | ||
| """ | ||
|
|
||
| @behaviour Anubis.Server.Registry | ||
|
|
||
| @scope :sanbase_mcp_sessions | ||
|
|
||
| @doc "Returns the `:pg` scope atom used by this registry." | ||
| def scope, do: @scope | ||
|
|
||
| # The :pg scope is started in Sanbase.Application.Mcp, not here. | ||
| @impl Anubis.Server.Registry | ||
| def child_spec(_opts), do: :ignore | ||
|
|
||
| @impl Anubis.Server.Registry | ||
| def register_session(name, session_id, pid) do | ||
| :pg.join(@scope, {name, session_id}, pid) | ||
| end | ||
|
|
||
| @impl Anubis.Server.Registry | ||
| def lookup_session(name, session_id) do | ||
| case :pg.get_members(@scope, {name, session_id}) do | ||
| [pid | _] -> {:ok, pid} | ||
| [] -> {:error, :not_found} | ||
| end | ||
| end | ||
|
|
||
| @impl Anubis.Server.Registry | ||
| def unregister_session(name, session_id) do | ||
| for pid <- :pg.get_members(@scope, {name, session_id}) do | ||
| :pg.leave(@scope, {name, session_id}, pid) | ||
| end | ||
|
|
||
| :ok | ||
| end | ||
| end |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| defmodule Sanbase.MCP.Registry.PgTest do | ||
| use ExUnit.Case, async: true | ||
|
|
||
| alias Sanbase.MCP.Registry.Pg | ||
|
|
||
| @registry_name :"test.registry.#{System.unique_integer([:positive])}" | ||
|
|
||
| setup do | ||
| # Start a dedicated :pg scope for test isolation | ||
| scope = :"test_pg_#{System.unique_integer([:positive])}" | ||
|
|
||
| start_supervised!(%{ | ||
| id: {:pg, scope}, | ||
| start: {:pg, :start_link, [scope]} | ||
| }) | ||
|
|
||
| # Temporarily override the scope for testing by using the real scope | ||
| # Since the module uses a compile-time constant, we test via the real scope | ||
| # that is started in the application supervision tree. | ||
| # For unit tests, we test the behaviour contract directly. | ||
| {:ok, scope: scope} | ||
| end | ||
|
Comment on lines
+6
to
+22
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test setup creates isolated scope but The setup creates a unique
Consider either:
🤖 Prompt for AI Agents |
||
|
|
||
| describe "child_spec/1" do | ||
| test "returns :ignore since :pg scope is started externally" do | ||
| assert :ignore = Pg.child_spec([]) | ||
| end | ||
| end | ||
|
|
||
| describe "register_session/3 and lookup_session/2" do | ||
| test "registers and looks up a session" do | ||
| assert {:error, :not_found} = Pg.lookup_session(@registry_name, "sess-1") | ||
|
|
||
| assert :ok = Pg.register_session(@registry_name, "sess-1", self()) | ||
| assert {:ok, pid} = Pg.lookup_session(@registry_name, "sess-1") | ||
| assert pid == self() | ||
| end | ||
|
|
||
| test "returns :not_found for unknown session" do | ||
| assert {:error, :not_found} = Pg.lookup_session(@registry_name, "nonexistent") | ||
| end | ||
| end | ||
|
|
||
| describe "unregister_session/2" do | ||
| test "removes a session" do | ||
| :ok = Pg.register_session(@registry_name, "sess-2", self()) | ||
| assert {:ok, _} = Pg.lookup_session(@registry_name, "sess-2") | ||
|
|
||
| :ok = Pg.unregister_session(@registry_name, "sess-2") | ||
| assert {:error, :not_found} = Pg.lookup_session(@registry_name, "sess-2") | ||
| end | ||
|
|
||
| test "is a no-op for unknown session" do | ||
| assert :ok = Pg.unregister_session(@registry_name, "nonexistent") | ||
| end | ||
| end | ||
|
|
||
| describe "automatic cleanup on process exit" do | ||
| test "session is removed when the owning process exits" do | ||
| {pid, ref} = | ||
| spawn_monitor(fn -> | ||
| receive do | ||
| :stop -> :ok | ||
| end | ||
| end) | ||
|
|
||
| :ok = Pg.register_session(@registry_name, "sess-3", pid) | ||
| assert {:ok, ^pid} = Pg.lookup_session(@registry_name, "sess-3") | ||
|
|
||
| send(pid, :stop) | ||
| assert_receive {:DOWN, ^ref, :process, ^pid, :normal} | ||
|
|
||
| # :pg cleanup is async — give it a moment | ||
| Process.sleep(50) | ||
| assert {:error, :not_found} = Pg.lookup_session(@registry_name, "sess-3") | ||
| end | ||
| end | ||
| end | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: santiment/sanbase2
Length of output: 1005
🏁 Script executed:
Repository: santiment/sanbase2
Length of output: 1587
🏁 Script executed:
Repository: santiment/sanbase2
Length of output: 2561
🏁 Script executed:
Repository: santiment/sanbase2
Length of output: 835
Remove nil values from the children list before passing to the supervisor.
The
start_in/2function returnsnilwhen the environment doesn't match (e.g., in:test). Themcp_childrenlist will then containnilas its first element. AlthoughEnum.uniq()is called on line 142 oflib/sanbase/application/application.ex, it only removes duplicate entries—notnilvalues. This will cause the supervisor to fail with an invalid child spec error in test environments.Filter out
nilentries frommcp_childreninlib/sanbase/application/mcp.exbefore returning, or filter the concatenated children list inlib/sanbase/application/application.exusingEnum.reject(&is_nil/1)before passing to the supervisor.🤖 Prompt for AI Agents