Skip to content

Commit b4f957f

Browse files
zxxf18zhaoxin
andauthored
mcp: add DisableListening option to StreamableClientTransport (#729)
mcp: add DisableStandaloneSSE option to control standalone SSE stream Currently, StreamableClientTransport always establishes a standalone SSE connection after initialization to receive server-initiated messages. While this is useful for receiving notifications like ToolListChangedNotification, it causes problems in several real-world scenarios: 1. **Unnecessary resource usage**: Many use cases (for me), especially in scientific computing scenarios, only require simple request-response communication. Clients don't need server-initiated notifications, making the persistent SSE connection wasteful. 2. **Server compatibility issues**: Some third-party MCP servers don't properly handle GET requests for SSE streams. When the client automatically tries to establish the SSE connection, it fails or hangs, blocking the entire connection process. This is particularly problematic when using third-party servers that cannot be modified. This issue is similar to what was discussed in issue #634. 3. **Lack of user control**: The MCP specification states that the standalone SSE stream is optional ("The client MAY issue an HTTP GET"), but the current SDK implementation doesn't provide a way to opt out. This implementation is also inconsistent with other MCP SDKs like github.com/mark3labs/mcp-go, which provides getListeningEnabled control (defaults to false, requires explicit enablement). Fixes #728 Co-authored-by: zhaoxin <[email protected]>
1 parent a225d4d commit b4f957f

File tree

2 files changed

+158
-13
lines changed

2 files changed

+158
-13
lines changed

mcp/streamable.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,6 +1389,26 @@ type StreamableClientTransport struct {
13891389
// It defaults to 5. To disable retries, use a negative number.
13901390
MaxRetries int
13911391

1392+
// DisableStandaloneSSE controls whether the client establishes a standalone SSE stream
1393+
// for receiving server-initiated messages.
1394+
//
1395+
// When false (the default), after initialization the client sends an HTTP GET request
1396+
// to establish a persistent server-sent events (SSE) connection. This allows the server
1397+
// to send messages to the client at any time, such as ToolListChangedNotification or
1398+
// other server-initiated requests and notifications. The connection persists for the
1399+
// lifetime of the session and automatically reconnects if interrupted.
1400+
//
1401+
// When true, the client does not establish the standalone SSE stream. The client will
1402+
// only receive responses to its own POST requests. Server-initiated messages will not
1403+
// be received.
1404+
//
1405+
// According to the MCP specification, the standalone SSE stream is optional.
1406+
// Setting DisableStandaloneSSE to true is useful when:
1407+
// - You only need request-response communication and don't need server-initiated notifications
1408+
// - The server doesn't properly handle GET requests for SSE streams
1409+
// - You want to avoid maintaining a persistent connection
1410+
DisableStandaloneSSE bool
1411+
13921412
// TODO(rfindley): propose exporting these.
13931413
// If strict is set, the transport is in 'strict mode', where any violation
13941414
// of the MCP spec causes a failure.
@@ -1453,16 +1473,17 @@ func (t *StreamableClientTransport) Connect(ctx context.Context) (Connection, er
14531473
// middleware), yet only cancel the standalone stream when the connection is closed.
14541474
connCtx, cancel := context.WithCancel(xcontext.Detach(ctx))
14551475
conn := &streamableClientConn{
1456-
url: t.Endpoint,
1457-
client: client,
1458-
incoming: make(chan jsonrpc.Message, 10),
1459-
done: make(chan struct{}),
1460-
maxRetries: maxRetries,
1461-
strict: t.strict,
1462-
logger: ensureLogger(t.logger), // must be non-nil for safe logging
1463-
ctx: connCtx,
1464-
cancel: cancel,
1465-
failed: make(chan struct{}),
1476+
url: t.Endpoint,
1477+
client: client,
1478+
incoming: make(chan jsonrpc.Message, 10),
1479+
done: make(chan struct{}),
1480+
maxRetries: maxRetries,
1481+
strict: t.strict,
1482+
logger: ensureLogger(t.logger), // must be non-nil for safe logging
1483+
ctx: connCtx,
1484+
cancel: cancel,
1485+
failed: make(chan struct{}),
1486+
disableStandaloneSSE: t.DisableStandaloneSSE,
14661487
}
14671488
return conn, nil
14681489
}
@@ -1477,6 +1498,10 @@ type streamableClientConn struct {
14771498
strict bool // from [StreamableClientTransport.strict]
14781499
logger *slog.Logger // from [StreamableClientTransport.logger]
14791500

1501+
// disableStandaloneSSE controls whether to disable the standalone SSE stream
1502+
// for receiving server-to-client notifications when no request is in flight.
1503+
disableStandaloneSSE bool // from [StreamableClientTransport.DisableStandaloneSSE]
1504+
14801505
// Guard calls to Close, as it may be called multiple times.
14811506
closeOnce sync.Once
14821507
closeErr error
@@ -1518,7 +1543,7 @@ func (c *streamableClientConn) sessionUpdated(state clientSessionState) {
15181543
c.mu.Unlock()
15191544

15201545
// Start the standalone SSE stream as soon as we have the initialized
1521-
// result.
1546+
// result, if continuous listening is enabled.
15221547
//
15231548
// § 2.2: The client MAY issue an HTTP GET to the MCP endpoint. This can be
15241549
// used to open an SSE stream, allowing the server to communicate to the
@@ -1528,9 +1553,11 @@ func (c *streamableClientConn) sessionUpdated(state clientSessionState) {
15281553
// initialized, we don't know whether the server requires a sessionID.
15291554
//
15301555
// § 2.5: A server using the Streamable HTTP transport MAY assign a session
1531-
// ID at initialization time, by including it in an Mcp-Session-Id header
1556+
// ID at initialization time, by including it in a Mcp-Session-Id header
15321557
// on the HTTP response containing the InitializeResult.
1533-
c.connectStandaloneSSE()
1558+
if !c.disableStandaloneSSE {
1559+
c.connectStandaloneSSE()
1560+
}
15341561
}
15351562

15361563
func (c *streamableClientConn) connectStandaloneSSE() {

mcp/streamable_client_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,3 +696,121 @@ func TestStreamableClientTransientErrors(t *testing.T) {
696696
})
697697
}
698698
}
699+
700+
func TestStreamableClientDisableStandaloneSSE(t *testing.T) {
701+
ctx := context.Background()
702+
703+
tests := []struct {
704+
name string
705+
disableStandaloneSSE bool
706+
expectGETRequest bool
707+
}{
708+
{
709+
name: "default behavior (standalone SSE enabled)",
710+
disableStandaloneSSE: false,
711+
expectGETRequest: true,
712+
},
713+
{
714+
name: "standalone SSE disabled",
715+
disableStandaloneSSE: true,
716+
expectGETRequest: false,
717+
},
718+
}
719+
720+
for _, test := range tests {
721+
t.Run(test.name, func(t *testing.T) {
722+
getRequestKey := streamableRequestKey{"GET", "123", "", ""}
723+
724+
fake := &fakeStreamableServer{
725+
t: t,
726+
responses: fakeResponses{
727+
{"POST", "", methodInitialize, ""}: {
728+
header: header{
729+
"Content-Type": "application/json",
730+
sessionIDHeader: "123",
731+
},
732+
body: jsonBody(t, initResp),
733+
},
734+
{"POST", "123", notificationInitialized, ""}: {
735+
status: http.StatusAccepted,
736+
wantProtocolVersion: latestProtocolVersion,
737+
},
738+
getRequestKey: {
739+
header: header{
740+
"Content-Type": "text/event-stream",
741+
},
742+
wantProtocolVersion: latestProtocolVersion,
743+
optional: !test.expectGETRequest,
744+
},
745+
{"DELETE", "123", "", ""}: {
746+
optional: true,
747+
},
748+
},
749+
}
750+
751+
httpServer := httptest.NewServer(fake)
752+
defer httpServer.Close()
753+
754+
transport := &StreamableClientTransport{
755+
Endpoint: httpServer.URL,
756+
DisableStandaloneSSE: test.disableStandaloneSSE,
757+
}
758+
client := NewClient(testImpl, nil)
759+
session, err := client.Connect(ctx, transport, nil)
760+
if err != nil {
761+
t.Fatalf("client.Connect() failed: %v", err)
762+
}
763+
764+
// Give some time for the standalone SSE connection to be established (if enabled)
765+
time.Sleep(100 * time.Millisecond)
766+
767+
// Verify the connection state
768+
streamableConn, ok := session.mcpConn.(*streamableClientConn)
769+
if !ok {
770+
t.Fatalf("Expected *streamableClientConn, got %T", session.mcpConn)
771+
}
772+
773+
if got, want := streamableConn.disableStandaloneSSE, test.disableStandaloneSSE; got != want {
774+
t.Errorf("disableStandaloneSSE field: got %v, want %v", got, want)
775+
}
776+
777+
// Clean up
778+
if err := session.Close(); err != nil {
779+
t.Errorf("closing session: %v", err)
780+
}
781+
782+
// Check if GET request was received
783+
fake.calledMu.Lock()
784+
getRequestReceived := false
785+
if fake.called != nil {
786+
getRequestReceived = fake.called[getRequestKey]
787+
}
788+
fake.calledMu.Unlock()
789+
790+
if got, want := getRequestReceived, test.expectGETRequest; got != want {
791+
t.Errorf("GET request received: got %v, want %v", got, want)
792+
}
793+
794+
// If we expected a GET request, verify it was actually received
795+
if test.expectGETRequest {
796+
if missing := fake.missingRequests(); len(missing) > 0 {
797+
// Filter out optional requests
798+
var requiredMissing []streamableRequestKey
799+
for _, key := range missing {
800+
if resp, ok := fake.responses[key]; ok && !resp.optional {
801+
requiredMissing = append(requiredMissing, key)
802+
}
803+
}
804+
if len(requiredMissing) > 0 {
805+
t.Errorf("did not receive expected requests: %v", requiredMissing)
806+
}
807+
}
808+
} else {
809+
// If we didn't expect a GET request, verify it wasn't sent
810+
if getRequestReceived {
811+
t.Error("GET request was sent unexpectedly when DisableStandaloneSSE is true")
812+
}
813+
}
814+
})
815+
}
816+
}

0 commit comments

Comments
 (0)