@@ -135,40 +135,9 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
135135 attempts := 0
136136
137137 for {
138- // If this isn't the first attempt, we're retrying
139- if attempts > 0 {
140- if attempts > maxRetries {
141- eventChan <- ProviderEvent {
142- Type : EventError ,
143- Error : errors .New ("maximum retry attempts reached for rate limit (429)" ),
144- }
145- return
146- }
147-
148- // Inform user we're retrying with attempt number
149- eventChan <- ProviderEvent {
150- Type : EventWarning ,
151- Info : fmt .Sprintf ("[Retrying due to rate limit... attempt %d of %d]" , attempts , maxRetries ),
152- }
153-
154- // Calculate backoff with exponential backoff and jitter
155- backoffMs := 2000 * (1 << (attempts - 1 )) // 2s, 4s, 8s, 16s, 32s
156- jitterMs := int (float64 (backoffMs ) * 0.2 )
157- totalBackoffMs := backoffMs + jitterMs
158-
159- // Sleep with backoff, respecting context cancellation
160- select {
161- case <- ctx .Done ():
162- eventChan <- ProviderEvent {Type : EventError , Error : ctx .Err ()}
163- return
164- case <- time .After (time .Duration (totalBackoffMs ) * time .Millisecond ):
165- // Continue with retry
166- }
167- }
168138
169139 attempts ++
170140
171- // Create new streaming request
172141 stream := a .client .Messages .NewStreaming (
173142 ctx ,
174143 anthropic.MessageNewParams {
@@ -189,11 +158,8 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
189158 },
190159 )
191160
192- // Process stream events
193161 accumulatedMessage := anthropic.Message {}
194- streamSuccess := false
195162
196- // Process the stream until completion or error
197163 for stream .Next () {
198164 event := stream .Current ()
199165 err := accumulatedMessage .Accumulate (event )
@@ -223,7 +189,6 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
223189 eventChan <- ProviderEvent {Type : EventContentStop }
224190
225191 case anthropic.MessageStopEvent :
226- streamSuccess = true
227192 content := ""
228193 for _ , block := range accumulatedMessage .Content {
229194 if text , ok := block .AsAny ().(anthropic.TextBlock ); ok {
@@ -246,51 +211,59 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
246211 }
247212 }
248213
249- // If the stream completed successfully, we're done
250- if streamSuccess {
214+ err := stream . Err ()
215+ if err == nil {
251216 return
252217 }
253218
254- // Check for stream errors
255- err := stream .Err ()
256- if err != nil {
257- var apierr * anthropic.Error
258- if errors .As (err , & apierr ) {
259- if apierr .StatusCode == 429 || apierr .StatusCode == 529 {
260- // Check for Retry-After header
261- if retryAfterValues := apierr .Response .Header .Values ("Retry-After" ); len (retryAfterValues ) > 0 {
262- // Parse the retry after value (seconds)
263- var retryAfterSec int
264- if _ , err := fmt .Sscanf (retryAfterValues [0 ], "%d" , & retryAfterSec ); err == nil {
265- retryMs := retryAfterSec * 1000
266-
267- // Inform user of retry with specific wait time
268- eventChan <- ProviderEvent {
269- Type : EventWarning ,
270- Info : fmt .Sprintf ("[Rate limited: waiting %d seconds as specified by API]" , retryAfterSec ),
271- }
272-
273- // Sleep respecting context cancellation
274- select {
275- case <- ctx .Done ():
276- eventChan <- ProviderEvent {Type : EventError , Error : ctx .Err ()}
277- return
278- case <- time .After (time .Duration (retryMs ) * time .Millisecond ):
279- // Continue with retry after specified delay
280- continue
281- }
282- }
283- }
219+ var apierr * anthropic.Error
220+ if ! errors .As (err , & apierr ) {
221+ eventChan <- ProviderEvent {Type : EventError , Error : err }
222+ return
223+ }
284224
285- // Fall back to exponential backoff if Retry-After parsing failed
286- continue
225+ if apierr .StatusCode != 429 && apierr .StatusCode != 529 {
226+ eventChan <- ProviderEvent {Type : EventError , Error : err }
227+ return
228+ }
229+
230+ if attempts > maxRetries {
231+ eventChan <- ProviderEvent {
232+ Type : EventError ,
233+ Error : errors .New ("maximum retry attempts reached for rate limit (429)" ),
234+ }
235+ return
236+ }
237+
238+ retryMs := 0
239+ retryAfterValues := apierr .Response .Header .Values ("Retry-After" )
240+ if len (retryAfterValues ) > 0 {
241+ var retryAfterSec int
242+ if _ , err := fmt .Sscanf (retryAfterValues [0 ], "%d" , & retryAfterSec ); err == nil {
243+ retryMs = retryAfterSec * 1000
244+ eventChan <- ProviderEvent {
245+ Type : EventWarning ,
246+ Info : fmt .Sprintf ("[Rate limited: waiting %d seconds as specified by API]" , retryAfterSec ),
287247 }
288248 }
249+ } else {
250+ eventChan <- ProviderEvent {
251+ Type : EventWarning ,
252+ Info : fmt .Sprintf ("[Retrying due to rate limit... attempt %d of %d]" , attempts , maxRetries ),
253+ }
289254
290- // For non-rate limit errors, report and exit
291- eventChan <- ProviderEvent {Type : EventError , Error : err }
255+ backoffMs := 2000 * (1 << (attempts - 1 ))
256+ jitterMs := int (float64 (backoffMs ) * 0.2 )
257+ retryMs = backoffMs + jitterMs
258+ }
259+ select {
260+ case <- ctx .Done ():
261+ eventChan <- ProviderEvent {Type : EventError , Error : ctx .Err ()}
292262 return
263+ case <- time .After (time .Duration (retryMs ) * time .Millisecond ):
264+ continue
293265 }
266+
294267 }
295268 }()
296269
@@ -388,7 +361,6 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag
388361 blocks = append (blocks , anthropic .ContentBlockParamOfRequestToolUseBlock (toolCall .ID , inputMap , toolCall .Name ))
389362 }
390363
391- // Skip empty assistant messages completely
392364 if len (blocks ) > 0 {
393365 anthropicMessages = append (anthropicMessages , anthropic .NewAssistantMessage (blocks ... ))
394366 }
@@ -404,4 +376,3 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag
404376
405377 return anthropicMessages
406378}
407-
0 commit comments