MCP distributed sessions#5105
Conversation
📝 WalkthroughWalkthroughThis PR introduces distributed MCP session management using Erlang Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant HTTPPlug as HTTP Plug
participant Registry as Registry.Pg
participant PG as :pg Scope
participant Session as Session GenServer
Client->>HTTPPlug: DELETE /session with mcp-session-id header
HTTPPlug->>Registry: lookup_session(name, session_id)
Registry->>PG: :pg.get_members({name, session_id})
PG-->>Registry: [pid]
Registry-->>HTTPPlug: {:ok, pid}
HTTPPlug->>Session: GenServer.stop(pid, :normal)
Session-->>HTTPPlug: stopped
HTTPPlug-->>Client: 200 with {}
sequenceDiagram
participant Server as MCP Server
participant Registry as Registry.Pg
participant PG as :pg Scope
participant Supervisor as App Supervisor
Supervisor->>PG: start_link(:sanbase_mcp_sessions)
PG-->>Supervisor: {:ok, pid}
Supervisor->>Registry: start_child(Registry.Pg)
Registry-->>Supervisor: :ignore (external supervision)
Server->>Registry: register_session(name, session_id, pid)
Registry->>PG: :pg.join(:sanbase_mcp_sessions, {name, session_id}, pid)
PG-->>Registry: :ok
Registry-->>Server: :ok
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (3)
test/sanbase/mcp/registry_pg_test.exs (1)
73-75:Process.sleep/1for async cleanup may cause flaky tests.Using a fixed 50ms sleep to wait for
:pgcleanup is timing-dependent and may fail under load. Consider polling with a small timeout or using a helper that retries the assertion.♻️ More robust assertion approach
- # :pg cleanup is async — give it a moment - Process.sleep(50) - assert {:error, :not_found} = Pg.lookup_session(`@registry_name`, "sess-3") + # :pg cleanup is async — poll until cleaned up or timeout + assert_eventually(fn -> + {:error, :not_found} == Pg.lookup_session(`@registry_name`, "sess-3") + end, timeout: 500, interval: 10)You could add an
assert_eventuallyhelper or useSanbase.TestHelpers.try_few_times/2if available.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/sanbase/mcp/registry_pg_test.exs` around lines 73 - 75, Replace the brittle fixed sleep before asserting registry cleanup by polling/retrying the lookup instead: remove Process.sleep(50) and use a retry helper (e.g. Sanbase.TestHelpers.try_few_times/2 or an assert_eventually helper) to repeatedly call Pg.lookup_session(`@registry_name`, "sess-3") until it returns {:error, :not_found} or a short timeout elapses, ensuring the test waits deterministically for :pg async cleanup.lib/sanbase/mcp/http_plugs.ex (1)
123-140: Consider extracting sharedhandle_delete/1logic.The
handle_delete/1function is duplicated nearly verbatim betweenStreamableHTTPPlugandStreamableHTTPDevPlug. Extract it to a shared private module or helper to reduce duplication and ensure consistent behavior.Also applies to: 192-209
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/sanbase/mcp/http_plugs.ex` around lines 123 - 140, The handle_delete/1 logic in StreamableHTTPPlug and StreamableHTTPDevPlug is duplicated; extract it into a single shared helper (e.g., a private function in a new module like Sanbase.MCP.HTTPPlugs.Helper.delete_session/2 or a private function in a common module) and have both plugs delegate to it; the helper should accept the conn and the server or registry_name (compute registry_name = Anubis.Server.Registry.registry_name(`@server`) in the caller or pass `@server` into the helper), perform the same get_req_header("mcp-session-id") check, call Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) and GenServer.stop(pid, :normal) on {:ok, pid}, and return the conn after calling put_resp_content_type("application/json") and send_resp with the same 200/"{}" or 400/{"error":"Session ID required"} responses so behavior remains identical.lib/sanbase/mcp/registry/pg.ex (1)
21-22: Consider adding@specannotations to public functions.Per coding guidelines, public functions should have typespecs. While the callbacks inherit types from the behaviour, adding explicit specs improves documentation and enables Dialyzer checks.
📝 Suggested typespecs
`@doc` "Returns the `:pg` scope atom used by this registry." + `@spec` scope() :: atom() def scope, do: `@scope` # The :pg scope is started in Sanbase.Application.Mcp, not here. `@impl` Anubis.Server.Registry + `@spec` child_spec(keyword()) :: :ignore def child_spec(_opts), do: :ignore `@impl` Anubis.Server.Registry + `@spec` register_session(atom(), String.t(), pid()) :: :ok def register_session(name, session_id, pid) do :pg.join(`@scope`, {name, session_id}, pid) end `@impl` Anubis.Server.Registry + `@spec` lookup_session(atom(), String.t()) :: {:ok, pid()} | {:error, :not_found} def lookup_session(name, session_id) do case :pg.get_members(`@scope`, {name, session_id}) do [pid | _] -> {:ok, pid} [] -> {:error, :not_found} end end `@impl` Anubis.Server.Registry + `@spec` unregister_session(atom(), String.t()) :: :ok def unregister_session(name, session_id) doAlso applies to: 28-31, 33-38, 41-48
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/sanbase/mcp/registry/pg.ex` around lines 21 - 22, Add `@spec` annotations for all public functions in this module (including scope/0 and the other public functions around lines 28-48) so Dialyzer and docs have concrete types; for example annotate scope/0 as returning an atom, annotate any via/1 or name/1 functions to return the {:via, module(), term()} tuple or appropriate name type, and annotate register/2/unregister/1 (or similarly named register functions) with their argument and return types using the behaviour's types where applicable; ensure each public function has a matching `@spec` that references the behaviour types when available and uses concrete types (atom(), pid(), term(), {:ok, term()} | {:error, term()} etc.) to match existing callbacks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@lib/sanbase/application/mcp.ex`:
- Around line 11-21: The children list built in mcp_children includes nil
entries because start_in/2 returns nil in non-matching envs; update the
mcp_children construction in lib/sanbase/application/mcp.ex to filter out nils
(e.g., pipe the list through Enum.reject(&is_nil/1) or Enum.filter(& &1)) before
returning it so the supervisor never receives nil child specs (alternatively,
add Enum.reject(&is_nil/1) to the concatenated children list in
lib/sanbase/application/application.ex before the Enum.uniq() call on line
~142).
In `@lib/sanbase/mcp/http_plugs.ex`:
- Around line 192-209: In handle_delete, protect the GenServer.stop call and
ensure the Plug pipeline is halted: when lookup_session returns {:ok, pid} wrap
GenServer.stop(pid, :normal) in a safe rescue/catch (e.g., try do
GenServer.stop(pid, :normal) rescue _ -> :ok end) to avoid exceptions from
remote nodes, and after each send_resp(200, "{}") and send_resp(400, ...) call
invoke halt() on the conn (e.g., conn |> send_resp(...) |> halt()) so the
request processing stops; reference the function handle_delete and the
GenServer.stop(pid, :normal) call to locate the changes.
- Around line 128-131: The GenServer.stop call in the
Sanbase.MCP.Registry.Pg.lookup_session branch can block or raise for remote
PIDs; change the call to a defensive version by using GenServer.stop(pid,
:normal, <small_timeout_ms>) and wrap it in a try/rescue (or try/catch) to catch
exits/timeouts so it won’t crash the caller; update the code around
Sanbase.MCP.Registry.Pg.lookup_session and GenServer.stop to log or ignore
errors on stop and return :ok on failure.
- Around line 123-140: The handle_delete/1 function sends responses but doesn't
halt the Plug connection; update both branches that call send_resp in
handle_delete to immediately halt the conn afterwards (e.g. thread the conn
through send_resp then |> halt()), ensuring the returned value is the halted
conn so downstream plugs won't run; modify the code paths around
Sanbase.MCP.Registry.Pg.lookup_session/2 and the error branch to pipe the conn
into send_resp and then |> halt().
In `@test/sanbase/mcp/registry_pg_test.exs`:
- Around line 6-22: The test starts a unique :pg scope (setup returns scope) but
Sanbase.MCP.Registry.Pg uses a compile-time `@scope` :sanbase_mcp_sessions so
tests still hit the production scope; fix by making the Pg module's scope
configurable and using the test scope: change the module to read the scope from
a runtime source (e.g. Application.get_env(:sanbase, :mcp_pg_scope,
:sanbase_mcp_sessions)) or add an API to pass a scope into the public functions
(e.g. Registry.Pg.lookup/2, Registry.Pg.register/2) and update tests to call
those functions with the setup-provided scope variable; alternatively, if you
prefer not to change Pg, remove the unused setup that starts a per-test scope
and mark the test module async: false and document the dependency on the app
supervision tree.
---
Nitpick comments:
In `@lib/sanbase/mcp/http_plugs.ex`:
- Around line 123-140: The handle_delete/1 logic in StreamableHTTPPlug and
StreamableHTTPDevPlug is duplicated; extract it into a single shared helper
(e.g., a private function in a new module like
Sanbase.MCP.HTTPPlugs.Helper.delete_session/2 or a private function in a common
module) and have both plugs delegate to it; the helper should accept the conn
and the server or registry_name (compute registry_name =
Anubis.Server.Registry.registry_name(`@server`) in the caller or pass `@server` into
the helper), perform the same get_req_header("mcp-session-id") check, call
Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) and
GenServer.stop(pid, :normal) on {:ok, pid}, and return the conn after calling
put_resp_content_type("application/json") and send_resp with the same 200/"{}"
or 400/{"error":"Session ID required"} responses so behavior remains identical.
In `@lib/sanbase/mcp/registry/pg.ex`:
- Around line 21-22: Add `@spec` annotations for all public functions in this
module (including scope/0 and the other public functions around lines 28-48) so
Dialyzer and docs have concrete types; for example annotate scope/0 as returning
an atom, annotate any via/1 or name/1 functions to return the {:via, module(),
term()} tuple or appropriate name type, and annotate register/2/unregister/1 (or
similarly named register functions) with their argument and return types using
the behaviour's types where applicable; ensure each public function has a
matching `@spec` that references the behaviour types when available and uses
concrete types (atom(), pid(), term(), {:ok, term()} | {:error, term()} etc.) to
match existing callbacks.
In `@test/sanbase/mcp/registry_pg_test.exs`:
- Around line 73-75: Replace the brittle fixed sleep before asserting registry
cleanup by polling/retrying the lookup instead: remove Process.sleep(50) and use
a retry helper (e.g. Sanbase.TestHelpers.try_few_times/2 or an assert_eventually
helper) to repeatedly call Pg.lookup_session(`@registry_name`, "sess-3") until it
returns {:error, :not_found} or a short timeout elapses, ensuring the test waits
deterministically for :pg async cleanup.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9185f4cc-94fd-4e93-bbd8-4cd6e4782e41
📒 Files selected for processing (6)
lib/sanbase/application/mcp.exlib/sanbase/mcp/http_plugs.exlib/sanbase/mcp/registry/pg.expriv/repo/structure.sqltest/sanbase/mcp/mcp_auth_test.exstest/sanbase/mcp/registry_pg_test.exs
| children = [ | ||
| # Join the libcluster Postgres topology so MCP pods form a BEAM cluster. | ||
| # This enables distributed :pg session lookup across pods. | ||
| start_in( | ||
| {Cluster.Supervisor, | ||
| [ | ||
| Application.get_env(:libcluster, :topologies), | ||
| [name: Sanbase.ClusterSupervisor] | ||
| ]}, | ||
| [:dev, :prod] | ||
| ), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how start_in/2 is implemented and how children are processed
ast-grep --pattern 'def start_in($_, $_) do
$$$
end'
# Also check how children list is consumed - is there filtering?
rg -n "children" lib/sanbase/application/mcp.ex -A 10Repository: santiment/sanbase2
Length of output: 1005
🏁 Script executed:
# Check if there's any filtering logic in the children list construction
cat -n lib/sanbase/application/mcp.ex | head -50
# Check if the children list is modified before being returned
rg -n "Supervisor\|nil" lib/sanbase/application/mcp.ex
# Look for any List.flatten or similar filtering operations
rg -n "flatten\|compact\|filter" lib/sanbase/application/mcp.exRepository: santiment/sanbase2
Length of output: 1587
🏁 Script executed:
# Check how this MCP module is used in the main Application
rg -n "Sanbase.Application.Mcp" lib/ --type elixir -B 2 -A 2
# Check if there's a supervision tree setup or filtering
rg -n "children.*Mcp\|Mcp.*children" lib/ --type elixir -B 2 -A 2
# Check Elixir version requirement
grep -i "elixir" mix.exs | head -5Repository: santiment/sanbase2
Length of output: 2561
🏁 Script executed:
# Check the context where mcp_children is used
sed -n '130,150p' lib/sanbase/application/application.exRepository: santiment/sanbase2
Length of output: 835
Remove nil values from the children list before passing to the supervisor.
The start_in/2 function returns nil when the environment doesn't match (e.g., in :test). The mcp_children list will then contain nil as its first element. Although Enum.uniq() is called on line 142 of lib/sanbase/application/application.ex, it only removes duplicate entries—not nil values. This will cause the supervisor to fail with an invalid child spec error in test environments.
Filter out nil entries from mcp_children in lib/sanbase/application/mcp.ex before returning, or filter the concatenated children list in lib/sanbase/application/application.ex using Enum.reject(&is_nil/1) before passing to the supervisor.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@lib/sanbase/application/mcp.ex` around lines 11 - 21, The children list built
in mcp_children includes nil entries because start_in/2 returns nil in
non-matching envs; update the mcp_children construction in
lib/sanbase/application/mcp.ex to filter out nils (e.g., pipe the list through
Enum.reject(&is_nil/1) or Enum.filter(& &1)) before returning it so the
supervisor never receives nil child specs (alternatively, add
Enum.reject(&is_nil/1) to the concatenated children list in
lib/sanbase/application/application.ex before the Enum.uniq() call on line
~142).
| defp handle_delete(conn) do | ||
| registry_name = Anubis.Server.Registry.registry_name(@server) | ||
|
|
||
| case get_req_header(conn, "mcp-session-id") do | ||
| [session_id] when session_id != "" -> | ||
| case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do | ||
| {:ok, pid} -> GenServer.stop(pid, :normal) | ||
| _ -> :ok | ||
| end | ||
|
|
||
| conn |> put_resp_content_type("application/json") |> send_resp(200, "{}") | ||
|
|
||
| _ -> | ||
| conn | ||
| |> put_resp_content_type("application/json") | ||
| |> send_resp(400, ~s({"error":"Session ID required"})) | ||
| end | ||
| end |
There was a problem hiding this comment.
Missing halt() after sending response.
After calling send_resp/3, the connection should be halted to prevent further plug processing. This is standard Plug practice and prevents potential issues with downstream plugs.
🐛 Proposed fix
+ import Plug.Conn,
+ only: [get_req_header: 2, put_req_header: 3, put_resp_content_type: 2, send_resp: 3, halt: 1]
defp handle_delete(conn) do
registry_name = Anubis.Server.Registry.registry_name(`@server`)
case get_req_header(conn, "mcp-session-id") do
[session_id] when session_id != "" ->
case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do
{:ok, pid} -> GenServer.stop(pid, :normal)
_ -> :ok
end
- conn |> put_resp_content_type("application/json") |> send_resp(200, "{}")
+ conn |> put_resp_content_type("application/json") |> send_resp(200, "{}") |> halt()
_ ->
conn
|> put_resp_content_type("application/json")
|> send_resp(400, ~s({"error":"Session ID required"}))
+ |> halt()
end
end🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@lib/sanbase/mcp/http_plugs.ex` around lines 123 - 140, The handle_delete/1
function sends responses but doesn't halt the Plug connection; update both
branches that call send_resp in handle_delete to immediately halt the conn
afterwards (e.g. thread the conn through send_resp then |> halt()), ensuring the
returned value is the halted conn so downstream plugs won't run; modify the code
paths around Sanbase.MCP.Registry.Pg.lookup_session/2 and the error branch to
pipe the conn into send_resp and then |> halt().
| case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do | ||
| {:ok, pid} -> GenServer.stop(pid, :normal) | ||
| _ -> :ok | ||
| end |
There was a problem hiding this comment.
GenServer.stop/2 on a remote PID may raise or timeout.
When the session PID is on another node, GenServer.stop(pid, :normal) performs a synchronous call with default 5-second timeout. If the remote node is unreachable or the process is already dead, this could raise an exception or hang. Consider wrapping in a try/catch or using GenServer.stop/3 with an explicit timeout.
🛡️ Proposed defensive handling
[session_id] when session_id != "" ->
case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do
- {:ok, pid} -> GenServer.stop(pid, :normal)
+ {:ok, pid} ->
+ try do
+ GenServer.stop(pid, :normal, 5_000)
+ catch
+ :exit, _ -> :ok
+ end
_ -> :ok
end📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do | |
| {:ok, pid} -> GenServer.stop(pid, :normal) | |
| _ -> :ok | |
| end | |
| case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do | |
| {:ok, pid} -> | |
| try do | |
| GenServer.stop(pid, :normal, 5_000) | |
| catch | |
| :exit, _ -> :ok | |
| end | |
| _ -> :ok | |
| end |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@lib/sanbase/mcp/http_plugs.ex` around lines 128 - 131, The GenServer.stop
call in the Sanbase.MCP.Registry.Pg.lookup_session branch can block or raise for
remote PIDs; change the call to a defensive version by using GenServer.stop(pid,
:normal, <small_timeout_ms>) and wrap it in a try/rescue (or try/catch) to catch
exits/timeouts so it won’t crash the caller; update the code around
Sanbase.MCP.Registry.Pg.lookup_session and GenServer.stop to log or ignore
errors on stop and return :ok on failure.
| defp handle_delete(conn) do | ||
| registry_name = Anubis.Server.Registry.registry_name(@server) | ||
|
|
||
| case get_req_header(conn, "mcp-session-id") do | ||
| [session_id] when session_id != "" -> | ||
| case Sanbase.MCP.Registry.Pg.lookup_session(registry_name, session_id) do | ||
| {:ok, pid} -> GenServer.stop(pid, :normal) | ||
| _ -> :ok | ||
| end | ||
|
|
||
| conn |> put_resp_content_type("application/json") |> send_resp(200, "{}") | ||
|
|
||
| _ -> | ||
| conn | ||
| |> put_resp_content_type("application/json") | ||
| |> send_resp(400, ~s({"error":"Session ID required"})) | ||
| end | ||
| end |
There was a problem hiding this comment.
Same issues apply: missing halt() and unprotected GenServer.stop/2.
The StreamableHTTPDevPlug.handle_delete/1 has the same issues as the main plug - missing halt() after send_resp/3 and potential exception from GenServer.stop/2 on remote nodes. Apply the same fixes here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@lib/sanbase/mcp/http_plugs.ex` around lines 192 - 209, In handle_delete,
protect the GenServer.stop call and ensure the Plug pipeline is halted: when
lookup_session returns {:ok, pid} wrap GenServer.stop(pid, :normal) in a safe
rescue/catch (e.g., try do GenServer.stop(pid, :normal) rescue _ -> :ok end) to
avoid exceptions from remote nodes, and after each send_resp(200, "{}") and
send_resp(400, ...) call invoke halt() on the conn (e.g., conn |> send_resp(...)
|> halt()) so the request processing stops; reference the function handle_delete
and the GenServer.stop(pid, :normal) call to locate the changes.
| @registry_name :"test.registry.#{System.unique_integer([:positive])}" | ||
|
|
||
| setup do | ||
| # Start a dedicated :pg scope for test isolation | ||
| scope = :"test_pg_#{System.unique_integer([:positive])}" | ||
|
|
||
| start_supervised!(%{ | ||
| id: {:pg, scope}, | ||
| start: {:pg, :start_link, [scope]} | ||
| }) | ||
|
|
||
| # Temporarily override the scope for testing by using the real scope | ||
| # Since the module uses a compile-time constant, we test via the real scope | ||
| # that is started in the application supervision tree. | ||
| # For unit tests, we test the behaviour contract directly. | ||
| {:ok, scope: scope} | ||
| end |
There was a problem hiding this comment.
Test setup creates isolated scope but Pg module uses hardcoded scope.
The setup creates a unique :pg scope (line 10), but Sanbase.MCP.Registry.Pg uses a compile-time constant @scope :sanbase_mcp_sessions. The tests will actually use the production scope, not the test-isolated one. This means:
- Tests require the production
:pgscope to be started (likely via application supervision tree) - The
scopevariable returned from setup is never used - Tests with
async: truemay interfere with each other or other tests using the same scope
Consider either:
- Making the scope configurable in
Pgmodule for test isolation - Removing the unused test setup scope and documenting that tests require the app's
:pgscope - Using
async: falseif test isolation cannot be achieved
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@test/sanbase/mcp/registry_pg_test.exs` around lines 6 - 22, The test starts a
unique :pg scope (setup returns scope) but Sanbase.MCP.Registry.Pg uses a
compile-time `@scope` :sanbase_mcp_sessions so tests still hit the production
scope; fix by making the Pg module's scope configurable and using the test
scope: change the module to read the scope from a runtime source (e.g.
Application.get_env(:sanbase, :mcp_pg_scope, :sanbase_mcp_sessions)) or add an
API to pass a scope into the public functions (e.g. Registry.Pg.lookup/2,
Registry.Pg.register/2) and update tests to call those functions with the
setup-provided scope variable; alternatively, if you prefer not to change Pg,
remove the unused setup that starts a per-test scope and mark the test module
async: false and document the dependency on the app supervision tree.
Changes
Ticket
Checklist:
Summary by CodeRabbit
New Features
Chores