@@ -31,7 +31,6 @@ type SSE struct {
3131 notifyMu sync.RWMutex
3232 endpointChan chan struct {}
3333 headers map [string ]string
34- sseReadTimeout time.Duration
3534
3635 started atomic.Bool
3736 closed atomic.Bool
@@ -46,12 +45,6 @@ func WithHeaders(headers map[string]string) ClientOption {
4645 }
4746}
4847
49- func WithSSEReadTimeout (timeout time.Duration ) ClientOption {
50- return func (sc * SSE ) {
51- sc .sseReadTimeout = timeout
52- }
53- }
54-
5548// NewSSE creates a new SSE-based MCP client with the given base URL.
5649// Returns an error if the URL is invalid.
5750func NewSSE (baseURL string , options ... ClientOption ) (* SSE , error ) {
@@ -61,12 +54,11 @@ func NewSSE(baseURL string, options ...ClientOption) (*SSE, error) {
6154 }
6255
6356 smc := & SSE {
64- baseURL : parsedURL ,
65- httpClient : & http.Client {},
66- responses : make (map [int64 ]chan * JSONRPCResponse ),
67- endpointChan : make (chan struct {}),
68- sseReadTimeout : 30 * time .Second ,
69- headers : make (map [string ]string ),
57+ baseURL : parsedURL ,
58+ httpClient : & http.Client {},
59+ responses : make (map [int64 ]chan * JSONRPCResponse ),
60+ endpointChan : make (chan struct {}),
61+ headers : make (map [string ]string ),
7062 }
7163
7264 for _ , opt := range options {
@@ -139,46 +131,40 @@ func (c *SSE) readSSE(reader io.ReadCloser) {
139131 br := bufio .NewReader (reader )
140132 var event , data string
141133
142- ctx , cancel := context .WithTimeout (context .Background (), c .sseReadTimeout )
143- defer cancel ()
144-
145134 for {
146- select {
147- case <- ctx .Done ():
148- return
149- default :
150- line , err := br .ReadString ('\n' )
151- if err != nil {
152- if err == io .EOF {
153- // Process any pending event before exit
154- if event != "" && data != "" {
155- c .handleSSEEvent (event , data )
156- }
157- break
158- }
159- if ! c .closed .Load () {
160- fmt .Printf ("SSE stream error: %v\n " , err )
161- }
162- return
163- }
164-
165- // Remove only newline markers
166- line = strings .TrimRight (line , "\r \n " )
167- if line == "" {
168- // Empty line means end of event
135+ // when close or start's ctx cancel, the reader will be closed
136+ // and the for loop will break.
137+ line , err := br .ReadString ('\n' )
138+ if err != nil {
139+ if err == io .EOF {
140+ // Process any pending event before exit
169141 if event != "" && data != "" {
170142 c .handleSSEEvent (event , data )
171- event = ""
172- data = ""
173143 }
174- continue
144+ break
145+ }
146+ if ! c .closed .Load () {
147+ fmt .Printf ("SSE stream error: %v\n " , err )
175148 }
149+ return
150+ }
176151
177- if strings .HasPrefix (line , "event:" ) {
178- event = strings .TrimSpace (strings .TrimPrefix (line , "event:" ))
179- } else if strings .HasPrefix (line , "data:" ) {
180- data = strings .TrimSpace (strings .TrimPrefix (line , "data:" ))
152+ // Remove only newline markers
153+ line = strings .TrimRight (line , "\r \n " )
154+ if line == "" {
155+ // Empty line means end of event
156+ if event != "" && data != "" {
157+ c .handleSSEEvent (event , data )
158+ event = ""
159+ data = ""
181160 }
161+ continue
162+ }
163+
164+ if strings .HasPrefix (line , "event:" ) {
165+ event = strings .TrimSpace (strings .TrimPrefix (line , "event:" ))
166+ } else if strings .HasPrefix (line , "data:" ) {
167+ data = strings .TrimSpace (strings .TrimPrefix (line , "data:" ))
182168 }
183169 }
184170}
0 commit comments