diff --git a/src/examples/server/ssePollingExample.ts b/src/examples/server/ssePollingExample.ts index 8bb8cfbc9..ea1d752f0 100644 --- a/src/examples/server/ssePollingExample.ts +++ b/src/examples/server/ssePollingExample.ts @@ -7,7 +7,7 @@ * Key features: * - Configures `retryInterval` to tell clients how long to wait before reconnecting * - Uses `eventStore` to persist events for replay after reconnection - * - Calls `closeSSEStream()` to gracefully disconnect clients mid-operation + * - Uses `extra.closeSSEStream()` callback to gracefully disconnect clients mid-operation * * Run with: npx tsx src/examples/server/ssePollingExample.ts * Test with: curl or the MCP Inspector @@ -31,9 +31,6 @@ const server = new McpServer( } ); -// Track active transports by session ID for closeSSEStream access -const transports = new Map(); - // Register a long-running tool that demonstrates server-initiated disconnect server.tool( 'long-task', @@ -65,11 +62,11 @@ server.tool( await sleep(1000); // Server decides to disconnect the client to free resources - // Client will reconnect via GET with Last-Event-ID after retryInterval - const transport = transports.get(extra.sessionId!); - if (transport) { + // Client will reconnect via GET with Last-Event-ID after the transport's retryInterval + // Use extra.closeSSEStream callback - available when eventStore is configured + if (extra.closeSSEStream) { console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`); - transport.closeSSEStream(extra.requestId); + extra.closeSSEStream(); } // Continue processing while client is disconnected @@ -112,6 +109,9 @@ app.use(cors()); // Create event store for resumability const eventStore = new InMemoryEventStore(); +// Track transports by session ID for session reuse +const transports = new Map(); + // Handle all MCP requests - use express.json() only for this route app.all('/mcp', express.json(), async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; @@ -123,7 +123,7 @@ app.all('/mcp', express.json(), async (req: Request, res: Response) => { transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), eventStore, - retryInterval: 2000, // Client should reconnect after 2 seconds + retryInterval: 2000, // Default retry interval for priming events onsessioninitialized: id => { console.log(`[${id}] Session initialized`); transports.set(id, transport!); diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 39c2e5805..0e1acd6be 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1802,6 +1802,120 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { // Clean up - resolve the tool promise toolResolve!(); }); + + it('should provide closeSSEStream callback in extra when eventStore is configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track whether closeSSEStream callback was provided + let receivedCloseSSEStream: (() => void) | undefined; + + // Register a tool that captures the extra.closeSSEStream callback + mcpServer.tool('test-callback-tool', 'Test tool', {}, async (_args, extra) => { + receivedCloseSSEStream = extra.closeSSEStream; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 200, + method: 'tools/call', + params: { name: 'test-callback-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read all events to completion + const reader = postResponse.body?.getReader(); + while (true) { + const { done } = await reader!.read(); + if (done) break; + } + + // Verify closeSSEStream callback was provided + expect(receivedCloseSSEStream).toBeDefined(); + expect(typeof receivedCloseSSEStream).toBe('function'); + }); + + it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID() + // No eventStore + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track whether closeSSEStream callback was provided + let receivedCloseSSEStream: (() => void) | undefined; + + // Register a tool that captures the extra.closeSSEStream callback + mcpServer.tool('test-no-callback-tool', 'Test tool', {}, async (_args, extra) => { + receivedCloseSSEStream = extra.closeSSEStream; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 201, + method: 'tools/call', + params: { name: 'test-no-callback-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read all events to completion + const reader = postResponse.body?.getReader(); + while (true) { + const { done } = await reader!.read(); + if (done) break; + } + + // Verify closeSSEStream callback was NOT provided + expect(receivedCloseSSEStream).toBeUndefined(); + }); }); // Test onsessionclosed callback diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 4514e619c..bdd8e516c 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -649,7 +649,15 @@ export class StreamableHTTPServerTransport implements Transport { // handle each message for (const message of messages) { - this.onmessage?.(message, { authInfo, requestInfo }); + // Build closeSSEStream callback for requests when eventStore is configured + let closeSSEStream: (() => void) | undefined; + if (isJSONRPCRequest(message) && this._eventStore) { + closeSSEStream = () => { + this.closeSSEStream(message.id); + }; + } + + this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream }); } // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses // This will be handled by the send() method when responses are ready diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index 61312926e..ce25e45fb 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -283,6 +283,13 @@ export type RequestHandlerExtra(request: SendRequestT, resultSchema: U, options?: TaskRequestOptions) => Promise>; + + /** + * Closes the SSE stream for this request, triggering client reconnection. + * Only available when using StreamableHTTPServerTransport with eventStore configured. + * Use this to implement polling behavior during long-running operations. + */ + closeSSEStream?: () => void; }; /** @@ -728,7 +735,8 @@ export abstract class Protocol void; } /* JSON-RPC types */