Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/examples/server/ssePollingExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Key features:
* - Configures `retryInterval` to tell clients how long to wait before reconnecting
* - Uses `eventStore` to persist events for replay after reconnection
* - Calls `closeSSEStream()` to gracefully disconnect clients mid-operation
* - Uses `extra.closeSSEStream()` callback to gracefully disconnect clients mid-operation
*
* Run with: npx tsx src/examples/server/ssePollingExample.ts
* Test with: curl or the MCP Inspector
Expand All @@ -31,9 +31,6 @@ const server = new McpServer(
}
);

// Track active transports by session ID for closeSSEStream access
const transports = new Map<string, StreamableHTTPServerTransport>();

// Register a long-running tool that demonstrates server-initiated disconnect
server.tool(
'long-task',
Expand Down Expand Up @@ -65,11 +62,11 @@ server.tool(
await sleep(1000);

// Server decides to disconnect the client to free resources
// Client will reconnect via GET with Last-Event-ID after retryInterval
const transport = transports.get(extra.sessionId!);
if (transport) {
// Client will reconnect via GET with Last-Event-ID after the transport's retryInterval
// Use extra.closeSSEStream callback - available when eventStore is configured
if (extra.closeSSEStream) {
console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`);
transport.closeSSEStream(extra.requestId);
extra.closeSSEStream();
}

// Continue processing while client is disconnected
Expand Down Expand Up @@ -112,6 +109,9 @@ app.use(cors());
// Create event store for resumability
const eventStore = new InMemoryEventStore();

// Track transports by session ID for session reuse
const transports = new Map<string, StreamableHTTPServerTransport>();

// Handle all MCP requests - use express.json() only for this route
app.all('/mcp', express.json(), async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
Expand All @@ -123,7 +123,7 @@ app.all('/mcp', express.json(), async (req: Request, res: Response) => {
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore,
retryInterval: 2000, // Client should reconnect after 2 seconds
retryInterval: 2000, // Default retry interval for priming events
onsessioninitialized: id => {
console.log(`[${id}] Session initialized`);
transports.set(id, transport!);
Expand Down
114 changes: 114 additions & 0 deletions src/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,120 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
// Clean up - resolve the tool promise
toolResolve!();
});

it('should provide closeSSEStream callback in extra when eventStore is configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track whether closeSSEStream callback was provided
let receivedCloseSSEStream: (() => void) | undefined;

// Register a tool that captures the extra.closeSSEStream callback
mcpServer.tool('test-callback-tool', 'Test tool', {}, async (_args, extra) => {
receivedCloseSSEStream = extra.closeSSEStream;
return { content: [{ type: 'text', text: 'Done' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Call the tool
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 200,
method: 'tools/call',
params: { name: 'test-callback-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

// Read all events to completion
const reader = postResponse.body?.getReader();
while (true) {
const { done } = await reader!.read();
if (done) break;
}

// Verify closeSSEStream callback was provided
expect(receivedCloseSSEStream).toBeDefined();
expect(typeof receivedCloseSSEStream).toBe('function');
});

it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID()
// No eventStore
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track whether closeSSEStream callback was provided
let receivedCloseSSEStream: (() => void) | undefined;

// Register a tool that captures the extra.closeSSEStream callback
mcpServer.tool('test-no-callback-tool', 'Test tool', {}, async (_args, extra) => {
receivedCloseSSEStream = extra.closeSSEStream;
return { content: [{ type: 'text', text: 'Done' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Call the tool
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 201,
method: 'tools/call',
params: { name: 'test-no-callback-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

// Read all events to completion
const reader = postResponse.body?.getReader();
while (true) {
const { done } = await reader!.read();
if (done) break;
}

// Verify closeSSEStream callback was NOT provided
expect(receivedCloseSSEStream).toBeUndefined();
});
});

// Test onsessionclosed callback
Expand Down
10 changes: 9 additions & 1 deletion src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,15 @@ export class StreamableHTTPServerTransport implements Transport {

// handle each message
for (const message of messages) {
this.onmessage?.(message, { authInfo, requestInfo });
// Build closeSSEStream callback for requests when eventStore is configured
let closeSSEStream: (() => void) | undefined;
if (isJSONRPCRequest(message) && this._eventStore) {
closeSSEStream = () => {
this.closeSSEStream(message.id);
};
}

this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream });
}
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
// This will be handled by the send() method when responses are ready
Expand Down
10 changes: 9 additions & 1 deletion src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ export type RequestHandlerExtra<SendRequestT extends Request, SendNotificationT
* This is used by certain transports to correctly associate related messages.
*/
sendRequest: <U extends AnySchema>(request: SendRequestT, resultSchema: U, options?: TaskRequestOptions) => Promise<SchemaOutput<U>>;

/**
* Closes the SSE stream for this request, triggering client reconnection.
* Only available when using StreamableHTTPServerTransport with eventStore configured.
* Use this to implement polling behavior during long-running operations.
*/
closeSSEStream?: () => void;
};

/**
Expand Down Expand Up @@ -728,7 +735,8 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
requestInfo: extra?.requestInfo,
taskId: relatedTaskId,
taskStore: taskStore,
taskRequestedTtl: taskCreationParams?.ttl
taskRequestedTtl: taskCreationParams?.ttl,
closeSSEStream: extra?.closeSSEStream
};

// Starting with Promise.resolve() puts any synchronous errors into the monad as well.
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,12 @@ export interface MessageExtraInfo {
* The authentication information.
*/
authInfo?: AuthInfo;

/**
* Callback to close the SSE stream for this request, triggering client reconnection.
* Only available when using StreamableHTTPServerTransport with eventStore configured.
*/
closeSSEStream?: () => void;
}

/* JSON-RPC types */
Expand Down
Loading