@@ -4,7 +4,10 @@ import (
44 "context"
55 "encoding/json"
66 "errors"
7+ "fmt"
8+ "log"
79 "strings"
10+ "time"
811
912 "github.com/anthropics/anthropic-sdk-go"
1013 "github.com/anthropics/anthropic-sdk-go/option"
@@ -125,87 +128,145 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
125128 temperature = anthropic .Float (1 )
126129 }
127130
128- stream := a .client .Messages .NewStreaming (
129- ctx ,
130- anthropic.MessageNewParams {
131- Model : anthropic .Model (a .model .APIModel ),
132- MaxTokens : a .maxTokens ,
133- Temperature : temperature ,
134- Messages : anthropicMessages ,
135- Tools : anthropicTools ,
136- Thinking : thinkingParam ,
137- System : []anthropic.TextBlockParam {
138- {
139- Text : a .systemMessage ,
140- CacheControl : anthropic.CacheControlEphemeralParam {
141- Type : "ephemeral" ,
142- },
143- },
144- },
145- },
146- option .WithMaxRetries (8 ),
147- )
148-
149131 eventChan := make (chan ProviderEvent )
150132
151133 go func () {
152134 defer close (eventChan )
153135
154- accumulatedMessage := anthropic.Message {}
136+ const maxRetries = 8
137+ attempts := 0
155138
156- for stream .Next () {
157- event := stream .Current ()
158- err := accumulatedMessage .Accumulate (event )
159- if err != nil {
160- eventChan <- ProviderEvent {Type : EventError , Error : err }
161- return
139+ for {
140+ // If this isn't the first attempt, we're retrying
141+ if attempts > 0 {
142+ if attempts > maxRetries {
143+ eventChan <- ProviderEvent {
144+ Type : EventError ,
145+ Error : errors .New ("maximum retry attempts reached for rate limit (429)" ),
146+ }
147+ return
148+ }
149+
150+ // Inform user we're retrying with attempt number
151+ eventChan <- ProviderEvent {
152+ Type : EventContentDelta ,
153+ Content : fmt .Sprintf ("\n \n [Retrying due to rate limit... attempt %d of %d]\n \n " , attempts , maxRetries ),
154+ }
155+
156+ // Calculate backoff with exponential backoff and jitter
157+ backoffMs := 2000 * (1 << (attempts - 1 )) // 2s, 4s, 8s, 16s, 32s
158+ jitterMs := int (float64 (backoffMs ) * 0.2 )
159+ totalBackoffMs := backoffMs + jitterMs
160+
161+ // Sleep with backoff, respecting context cancellation
162+ select {
163+ case <- ctx .Done ():
164+ eventChan <- ProviderEvent {Type : EventError , Error : ctx .Err ()}
165+ return
166+ case <- time .After (time .Duration (totalBackoffMs ) * time .Millisecond ):
167+ // Continue with retry
168+ }
162169 }
163170
164- switch event := event .AsAny ().(type ) {
165- case anthropic.ContentBlockStartEvent :
166- eventChan <- ProviderEvent {Type : EventContentStart }
171+ attempts ++
172+
173+ // Create new streaming request
174+ stream := a .client .Messages .NewStreaming (
175+ ctx ,
176+ anthropic.MessageNewParams {
177+ Model : anthropic .Model (a .model .APIModel ),
178+ MaxTokens : a .maxTokens ,
179+ Temperature : temperature ,
180+ Messages : anthropicMessages ,
181+ Tools : anthropicTools ,
182+ Thinking : thinkingParam ,
183+ System : []anthropic.TextBlockParam {
184+ {
185+ Text : a .systemMessage ,
186+ CacheControl : anthropic.CacheControlEphemeralParam {
187+ Type : "ephemeral" ,
188+ },
189+ },
190+ },
191+ },
192+ )
167193
168- case anthropic.ContentBlockDeltaEvent :
169- if event .Delta .Type == "thinking_delta" && event .Delta .Thinking != "" {
170- eventChan <- ProviderEvent {
171- Type : EventThinkingDelta ,
172- Thinking : event .Delta .Thinking ,
194+ // Process stream events
195+ accumulatedMessage := anthropic.Message {}
196+ streamSuccess := false
197+
198+ // Process the stream until completion or error
199+ for stream .Next () {
200+ event := stream .Current ()
201+ err := accumulatedMessage .Accumulate (event )
202+ if err != nil {
203+ eventChan <- ProviderEvent {Type : EventError , Error : err }
204+ return // Don't retry on accumulation errors
205+ }
206+
207+ switch event := event .AsAny ().(type ) {
208+ case anthropic.ContentBlockStartEvent :
209+ eventChan <- ProviderEvent {Type : EventContentStart }
210+
211+ case anthropic.ContentBlockDeltaEvent :
212+ if event .Delta .Type == "thinking_delta" && event .Delta .Thinking != "" {
213+ eventChan <- ProviderEvent {
214+ Type : EventThinkingDelta ,
215+ Thinking : event .Delta .Thinking ,
216+ }
217+ } else if event .Delta .Type == "text_delta" && event .Delta .Text != "" {
218+ eventChan <- ProviderEvent {
219+ Type : EventContentDelta ,
220+ Content : event .Delta .Text ,
221+ }
173222 }
174- } else if event .Delta .Type == "text_delta" && event .Delta .Text != "" {
175- eventChan <- ProviderEvent {
176- Type : EventContentDelta ,
177- Content : event .Delta .Text ,
223+
224+ case anthropic.ContentBlockStopEvent :
225+ eventChan <- ProviderEvent {Type : EventContentStop }
226+
227+ case anthropic.MessageStopEvent :
228+ streamSuccess = true
229+ content := ""
230+ for _ , block := range accumulatedMessage .Content {
231+ if text , ok := block .AsAny ().(anthropic.TextBlock ); ok {
232+ content += text .Text
233+ }
178234 }
179- }
180235
181- case anthropic. ContentBlockStopEvent :
182- eventChan <- ProviderEvent { Type : EventContentStop }
236+ toolCalls := a . extractToolCalls ( accumulatedMessage . Content )
237+ tokenUsage := a . extractTokenUsage ( accumulatedMessage . Usage )
183238
184- case anthropic.MessageStopEvent :
185- content := ""
186- for _ , block := range accumulatedMessage .Content {
187- if text , ok := block .AsAny ().(anthropic.TextBlock ); ok {
188- content += text .Text
239+ eventChan <- ProviderEvent {
240+ Type : EventComplete ,
241+ Response : & ProviderResponse {
242+ Content : content ,
243+ ToolCalls : toolCalls ,
244+ Usage : tokenUsage ,
245+ FinishReason : string (accumulatedMessage .StopReason ),
246+ },
189247 }
190248 }
249+ }
191250
192- toolCalls := a .extractToolCalls (accumulatedMessage .Content )
193- tokenUsage := a .extractTokenUsage (accumulatedMessage .Usage )
251+ // If the stream completed successfully, we're done
252+ if streamSuccess {
253+ return
254+ }
194255
195- eventChan <- ProviderEvent {
196- Type : EventComplete ,
197- Response : & ProviderResponse {
198- Content : content ,
199- ToolCalls : toolCalls ,
200- Usage : tokenUsage ,
201- FinishReason : string ( accumulatedMessage . StopReason ),
202- },
256+ // Check for stream errors
257+ err := stream . Err ()
258+ if err != nil {
259+ log . Println ( "error" , err )
260+
261+ var apierr * anthropic. Error
262+ if errors . As ( err , & apierr ) && apierr . StatusCode == 429 {
263+ continue
203264 }
204- }
205- }
206265
207- if stream .Err () != nil {
208- eventChan <- ProviderEvent {Type : EventError , Error : stream .Err ()}
266+ // For non-rate limit errors, report and exit
267+ eventChan <- ProviderEvent {Type : EventError , Error : err }
268+ return
269+ }
209270 }
210271 }()
211272
@@ -319,3 +380,4 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag
319380
320381 return anthropicMessages
321382}
383+
0 commit comments