Skip to content

Conversation

@FH-30
Copy link

@FH-30 FH-30 commented Jun 9, 2025

Description

Implemented for client-side streamable-http transport

  • option to attempt resumption up till max retry attempts on failure to get final JSON-RPC Response when server sends back an SSE response stream

Fixes #<issue_number> (if applicable)

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • MCP spec compatibility implementation
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring (no functional changes)
  • Performance improvement
  • Tests only (no functional changes)
  • Other (please describe):

Checklist

  • My code follows the code style of this project
  • I have performed a self-review of my own code
  • I have added tests that prove my fix is effective or that my feature works
  • I have updated the documentation accordingly

MCP Spec Compliance

  • This PR implements a feature defined in the MCP specification
  • Link to relevant spec section: Link text
  • Implementation follows the specification exactly

Additional Information

Will add the tests in a later commit

Summary by CodeRabbit

  • New Features

    • Added support for automatic resumption of interrupted Server-Sent Events (SSE) connections, with configurable retry attempts to ensure seamless streaming and minimize event loss.
  • Style

    • Removed unnecessary blank lines in test files for improved code readability.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 9, 2025

Walkthrough

This update introduces support for resuming interrupted Server-Sent Events (SSE) streams in the StreamableHTTP client by tracking the last received event ID and retrying failed connections. It adds a new resumption option, related methods, and updates SSE event handling to propagate event IDs. Minor whitespace cleanups are also included in test files.

Changes

Files Change Summary
client/transport/streamable_http.go Added SSE resumption support: new option, event ID tracking, retry logic, new methods, updated SSE event parsing and handler signatures.
client/transport/sse_test.go, client/transport/streamable_http_test.go Removed extraneous blank lines in SSE-related test cases.

Possibly related PRs

Suggested labels

type: enhancement, area: mcp spec

Suggested reviewers

  • rwjblue-glean

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (1.64.8)

Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2
Failed executing command with error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
client/transport/streamable_http.go (2)

334-340: Improve retry loop readability and error message specificity

The retry logic is correct but could be more idiomatic and the error message more specific.

-		for range c.maxRetryCount {
-			resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)
-			if err == nil || lastEventId == "" || !canRetry {
-				return resp, err
-			}
-		}
-		return nil, fmt.Errorf("failed to retrieve response after attempting resumption %d times", c.maxRetryCount)
+		for i := 0; i < c.maxRetryCount; i++ {
+			resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)
+			if err == nil || lastEventId == "" || !canRetry {
+				return resp, err
+			}
+		}
+		return nil, fmt.Errorf("failed to resume SSE stream after %d retry attempts", c.maxRetryCount)

57-437: Consider adding tests for the resumption functionality

The resumption implementation is well-designed and follows the MCP specification. However, I notice that tests for this new functionality are not included in this PR. As mentioned in the PR objectives, tests are planned for a future commit.

Would you like me to help generate comprehensive test cases for the resumption functionality, including:

  • Successful resumption scenarios
  • Retry exhaustion cases
  • Error handling during resumption
  • Null byte filtering in event IDs
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c7a09d and 21760bc.

📒 Files selected for processing (3)
  • client/transport/sse_test.go (2 hunks)
  • client/transport/streamable_http.go (10 hunks)
  • client/transport/streamable_http_test.go (2 hunks)
🔇 Additional comments (6)
client/transport/streamable_http_test.go (1)

420-420: LGTM - Whitespace cleanup

Removing trailing blank lines improves code consistency.

Also applies to: 440-440

client/transport/sse_test.go (1)

411-411: LGTM - Consistent whitespace cleanup

Removing trailing blank lines to maintain consistency with the changes in streamable_http_test.go.

Also applies to: 413-413, 452-452

client/transport/streamable_http.go (4)

57-75: Well-implemented resumption option

The WithResumption function properly validates input and follows the established pattern for configuration options. Good documentation referencing the MCP specification.


90-96: Appropriate struct fields for resumption support

The new fields resumptionEnabled and maxRetryCount are well-placed and properly support the resumption feature.


346-437: Excellent implementation of SSE resumption logic

The performSSEResumption method is comprehensive and handles all edge cases properly:

  • Correct use of GET request with Last-Event-Id header
  • Appropriate error handling with meaningful canRetry flag
  • Consistent OAuth and session management

559-566: Good security consideration for event ID handling

Excellent implementation that properly filters null bytes to prevent HTTP header injection vulnerabilities. The comment clearly explains the security rationale.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
client/transport/streamable_http.go (1)

372-374: Validate empty lastEventId parameter handling.

The check for empty lastEventId is good, but this condition should theoretically never occur based on the calling logic in SendRequest. Consider adding a more descriptive error message or debug logging.

-	if lastEventId == "" {
-		return nil, fmt.Errorf("sse resumption request requires a last event id"), false
-	}
+	if lastEventId == "" {
+		return nil, fmt.Errorf("sse resumption request requires a last event id (this should not happen)"), false
+	}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 21760bc and da980e8.

📒 Files selected for processing (1)
  • client/transport/streamable_http.go (10 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/transport/streamable_http.go (3)
client/transport/interface.go (2)
  • HTTPHeaderFunc (13-13)
  • JSONRPCResponse (41-50)
mcp/types.go (1)
  • JSONRPCResponse (324-328)
client/oauth.go (1)
  • OAuthAuthorizationRequiredError (61-61)
🔇 Additional comments (9)
client/transport/streamable_http.go (9)

57-74: Well-implemented resumption configuration option.

The function properly validates the retry count and provides clear documentation referencing the MCP specification. The parameter validation ensuring maxRetryCount >= 1 prevents invalid configurations.


90-95: Struct fields properly integrated for resumption support.

The new fields resumptionEnabled and maxRetryCount are appropriately added to support the resumption functionality without breaking existing code.


181-182: Header constants follow existing patterns.

The new headerKeyLastEventID constant is properly defined following the same naming convention as the existing headerKeySessionID.


346-439: Comprehensive resumption method with proper error handling.

The performSSEResumption method is well-implemented with appropriate OAuth handling, session management, and error classification via the canRetry boolean return value.


443-447: Method signature change is appropriate for resumption support.

The addition of the resumptionCallback parameter maintains backward compatibility since this is an internal method and enables the resumption functionality.


470-472: Event ID tracking correctly integrated.

The resumption callback is properly invoked when an event ID is present, enabling the retry mechanism to track progress through the SSE stream.


561-567: Excellent security practice with null byte filtering.

The null byte filtering prevents HTTP header injection attacks when the event ID is sent back in the Last-Event-Id header. This is a crucial security consideration.


507-571: Enhanced SSE parsing with proper event ID handling.

The modifications to readSSE correctly parse the id: field from SSE streams and properly reset the ID after each event. The implementation follows the SSE specification accurately.


323-340:

✅ Verification successful

Verify retry loop termination conditions.

The resumption logic looks correct, but ensure the loop termination is robust. The current implementation could potentially retry on non-recoverable errors.

Consider adding exponential backoff or different retry strategies:

-		for range c.maxRetryCount {
+		for attempt := 1; attempt <= c.maxRetryCount; attempt++ {
+			if attempt > 1 {
+				// Add exponential backoff delay
+				time.Sleep(time.Duration(attempt) * 100 * time.Millisecond)
+			}
 			resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)

🏁 Script executed:

#!/bin/bash
# Search for similar retry patterns in the codebase to ensure consistency
rg -A 5 -B 5 "for.*range.*maxRetry" --type go

Length of output: 781


🏁 Script executed:

#!/bin/bash
# Locate the declaration of maxRetryCount in client/transport/streamable_http.go
rg -n "maxRetryCount" client/transport/streamable_http.go

Length of output: 385


Retry loop termination verified—no infinite retries
The loop runs exactly c.maxRetryCount times and exits early on success (err == nil), missing IDs (lastEventId == ""), or non-recoverable errors (canRetry == false). No change is strictly required.

Optional improvement: switch to an index-based loop to add exponential backoff:

-   for range c.maxRetryCount {
+   for attempt := 1; attempt <= c.maxRetryCount; attempt++ {
+       if attempt > 1 {
+           time.Sleep(time.Duration(attempt) * 100 * time.Millisecond)
+       }
        resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)
        if err == nil || lastEventId == "" || !canRetry {
            return resp, err
        }
    }

@ezynda3 ezynda3 added the type: enhancement New feature or enhancement request label Sep 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type: enhancement New feature or enhancement request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants