Skip to content

Commit aefdc0d

Browse files
authored
fix: Streamable HTTP client ping response handling (#578)
* fix: add ping response capability for clients https://modelcontextprotocol.io/specification/2025-03-26/basic/utilities/ping * fix: add testcase for ping response capability of client * fix: streamable http test tighten JSON-RPC response (ping ACK) detection
1 parent 7c8a082 commit aefdc0d

File tree

2 files changed

+70
-11
lines changed

2 files changed

+70
-11
lines changed

client/transport/streamable_http.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -406,13 +406,6 @@ func (c *StreamableHTTP) handleSSEResponse(ctx context.Context, reader io.ReadCl
406406
// Create a channel for this specific request
407407
responseChan := make(chan *JSONRPCResponse, 1)
408408

409-
// Add timeout context for request processing if not already set
410-
if deadline, ok := ctx.Deadline(); !ok || time.Until(deadline) > 30*time.Second {
411-
var cancel context.CancelFunc
412-
ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
413-
defer cancel()
414-
}
415-
416409
ctx, cancel := context.WithCancel(ctx)
417410
defer cancel()
418411

@@ -601,8 +594,7 @@ func (c *StreamableHTTP) IsOAuthEnabled() bool {
601594
func (c *StreamableHTTP) listenForever(ctx context.Context) {
602595
c.logger.Infof("listening to server forever")
603596
for {
604-
// Add timeout for individual connection attempts
605-
connectCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
597+
connectCtx, cancel := context.WithCancel(ctx)
606598
err := c.createGETConnectionToServer(connectCtx)
607599
cancel()
608600

@@ -754,7 +746,7 @@ func (c *StreamableHTTP) sendResponseToServer(ctx context.Context, response *JSO
754746
ctx, cancel := c.contextAwareOfClientClose(ctx)
755747
defer cancel()
756748

757-
resp, err := c.sendHTTP(ctx, http.MethodPost, bytes.NewReader(responseBody), "application/json")
749+
resp, err := c.sendHTTP(ctx, http.MethodPost, bytes.NewReader(responseBody), "application/json, text/event-stream")
758750
if err != nil {
759751
c.logger.Errorf("failed to send response to server: %v", err)
760752
return

client/transport/streamable_http_test.go

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,23 @@ func startMockStreamableWithGETSupport(getSupport bool) (string, func(), chan bo
554554
return
555555
}
556556

557+
// Handle client JSON-RPC responses (e.g., ping replies)
558+
if request["jsonrpc"] == "2.0" && request["id"] != nil && request["method"] == nil {
559+
if _, hasResult := request["result"]; hasResult {
560+
w.Header().Set("Content-Type", "application/json")
561+
w.WriteHeader(http.StatusAccepted)
562+
if err := json.NewEncoder(w).Encode(map[string]any{
563+
"jsonrpc": "2.0",
564+
"id": request["id"],
565+
"result": "response received",
566+
}); err != nil {
567+
http.Error(w, "Failed to encode response acknowledgment", http.StatusInternalServerError)
568+
return
569+
}
570+
return
571+
}
572+
}
573+
557574
method := request["method"]
558575
if method == "initialize" {
559576
// Generate a new session ID
@@ -627,7 +644,31 @@ func startMockStreamableWithGETSupport(getSupport bool) (string, func(), chan bo
627644
fmt.Fprintf(w, "event: message\ndata: %s\n\n", notificationData)
628645
flusher.Flush()
629646
sendNotification()
630-
return
647+
}
648+
649+
// Keep the connection open, send periodic pings
650+
pingTicker := time.NewTicker(3 * time.Second)
651+
defer pingTicker.Stop()
652+
653+
for {
654+
select {
655+
case <-disconnectCh:
656+
// Force disconnect
657+
return
658+
case <-r.Context().Done():
659+
// Client disconnected
660+
return
661+
case <-pingTicker.C:
662+
// Send ping message according to MCP specification
663+
pingMessage := map[string]any{
664+
"jsonrpc": "2.0",
665+
"id": fmt.Sprintf("ping-%d", time.Now().UnixNano()),
666+
"method": "ping",
667+
}
668+
pingData, _ := json.Marshal(pingMessage)
669+
fmt.Fprintf(w, "event: message\ndata: %s\n\n", pingData)
670+
flusher.Flush()
671+
}
631672
}
632673
} else {
633674
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
@@ -667,6 +708,23 @@ func TestContinuousListening(t *testing.T) {
667708
notificationReceived <- struct{}{}
668709
})
669710

711+
// Setup ping handler
712+
pingReceived := make(chan struct{}, 10)
713+
714+
// Setup request handler for ping requests
715+
trans.SetRequestHandler(func(ctx context.Context, request JSONRPCRequest) (*JSONRPCResponse, error) {
716+
if request.Method == "ping" {
717+
pingReceived <- struct{}{}
718+
// Return proper ping response according to MCP specification
719+
return &JSONRPCResponse{
720+
JSONRPC: "2.0",
721+
ID: request.ID,
722+
Result: json.RawMessage("{}"),
723+
}, nil
724+
}
725+
return nil, fmt.Errorf("unsupported request method: %s", request.Method)
726+
})
727+
670728
// Start the transport - this will launch listenForever in a goroutine
671729
if err := trans.Start(context.Background()); err != nil {
672730
t.Fatal(err)
@@ -714,6 +772,15 @@ func TestContinuousListening(t *testing.T) {
714772
return
715773
}
716774
}
775+
776+
// Wait for at least one ping to be received (should happen within 3 seconds)
777+
select {
778+
case <-pingReceived:
779+
t.Log("Received ping message successfully")
780+
time.Sleep(10 * time.Millisecond) // Allow time for response
781+
case <-time.After(5 * time.Second):
782+
t.Errorf("Expected to receive ping message within 5 seconds, but didn't")
783+
}
717784
}
718785

719786
func TestContinuousListeningMethodNotAllowed(t *testing.T) {

0 commit comments

Comments
 (0)