Skip to content
This repository was archived by the owner on Sep 18, 2025. It is now read-only.

Commit fde04bb

Browse files
Merge pull request #22 from adamdottv/adam/retries
fix(anthropic): better 429/529 handling
2 parents 124bd57 + 4385fb3 commit fde04bb

10 files changed

Lines changed: 307 additions & 152 deletions

File tree

cmd/root.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ func setupSubscriptions(app *app.App) (chan tea.Msg, func()) {
107107
wg.Done()
108108
}()
109109
}
110+
{
111+
sub := app.Status.Subscribe(ctx)
112+
wg.Add(1)
113+
go func() {
114+
for ev := range sub {
115+
ch <- ev
116+
}
117+
wg.Done()
118+
}()
119+
}
110120
return ch, func() {
111121
cancel()
112122
wg.Wait()

internal/app/services.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import (
1111
"github.com/kujtimiihoxha/termai/internal/lsp/watcher"
1212
"github.com/kujtimiihoxha/termai/internal/message"
1313
"github.com/kujtimiihoxha/termai/internal/permission"
14+
"github.com/kujtimiihoxha/termai/internal/pubsub"
1415
"github.com/kujtimiihoxha/termai/internal/session"
16+
"github.com/kujtimiihoxha/termai/internal/tui/util"
1517
)
1618

1719
type App struct {
@@ -25,6 +27,7 @@ type App struct {
2527

2628
Logger logging.Interface
2729

30+
Status *pubsub.Broker[util.InfoMsg]
2831
ceanups []func()
2932
}
3033

@@ -43,6 +46,7 @@ func New(ctx context.Context, conn *sql.DB) *App {
4346
Messages: messages,
4447
Permissions: permission.NewPermissionService(),
4548
Logger: log,
49+
Status: pubsub.NewBroker[util.InfoMsg](),
4650
LSPClients: make(map[string]*lsp.Client),
4751
}
4852

internal/llm/agent/agent.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/kujtimiihoxha/termai/internal/llm/provider"
1616
"github.com/kujtimiihoxha/termai/internal/llm/tools"
1717
"github.com/kujtimiihoxha/termai/internal/message"
18+
"github.com/kujtimiihoxha/termai/internal/pubsub"
19+
"github.com/kujtimiihoxha/termai/internal/tui/util"
1820
)
1921

2022
type Agent interface {
@@ -92,9 +94,24 @@ func (c *agent) processEvent(
9294
assistantMsg.AppendContent(event.Content)
9395
return c.Messages.Update(*assistantMsg)
9496
case provider.EventError:
97+
// TODO: remove when realease
9598
log.Println("error", event.Error)
99+
c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{
100+
Type: util.InfoTypeError,
101+
Msg: event.Error.Error(),
102+
})
96103
return event.Error
97-
104+
case provider.EventWarning:
105+
c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{
106+
Type: util.InfoTypeWarn,
107+
Msg: event.Info,
108+
})
109+
return nil
110+
case provider.EventInfo:
111+
c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{
112+
Type: util.InfoTypeInfo,
113+
Msg: event.Info,
114+
})
98115
case provider.EventComplete:
99116
assistantMsg.SetToolCalls(event.Response.ToolCalls)
100117
assistantMsg.AddFinish(event.Response.FinishReason)

internal/llm/provider/anthropic.go

Lines changed: 164 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"fmt"
78
"strings"
9+
"time"
810

911
"github.com/anthropics/anthropic-sdk-go"
1012
"github.com/anthropics/anthropic-sdk-go/option"
@@ -68,21 +70,24 @@ func (a *anthropicProvider) SendMessages(ctx context.Context, messages []message
6870
anthropicMessages := a.convertToAnthropicMessages(messages)
6971
anthropicTools := a.convertToAnthropicTools(tools)
7072

71-
response, err := a.client.Messages.New(ctx, anthropic.MessageNewParams{
72-
Model: anthropic.Model(a.model.APIModel),
73-
MaxTokens: a.maxTokens,
74-
Temperature: anthropic.Float(0),
75-
Messages: anthropicMessages,
76-
Tools: anthropicTools,
77-
System: []anthropic.TextBlockParam{
78-
{
79-
Text: a.systemMessage,
80-
CacheControl: anthropic.CacheControlEphemeralParam{
81-
Type: "ephemeral",
73+
response, err := a.client.Messages.New(
74+
ctx,
75+
anthropic.MessageNewParams{
76+
Model: anthropic.Model(a.model.APIModel),
77+
MaxTokens: a.maxTokens,
78+
Temperature: anthropic.Float(0),
79+
Messages: anthropicMessages,
80+
Tools: anthropicTools,
81+
System: []anthropic.TextBlockParam{
82+
{
83+
Text: a.systemMessage,
84+
CacheControl: anthropic.CacheControlEphemeralParam{
85+
Type: "ephemeral",
86+
},
8287
},
8388
},
8489
},
85-
})
90+
)
8691
if err != nil {
8792
return nil, err
8893
}
@@ -121,83 +126,171 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
121126
temperature = anthropic.Float(1)
122127
}
123128

124-
stream := a.client.Messages.NewStreaming(ctx, anthropic.MessageNewParams{
125-
Model: anthropic.Model(a.model.APIModel),
126-
MaxTokens: a.maxTokens,
127-
Temperature: temperature,
128-
Messages: anthropicMessages,
129-
Tools: anthropicTools,
130-
Thinking: thinkingParam,
131-
System: []anthropic.TextBlockParam{
132-
{
133-
Text: a.systemMessage,
134-
CacheControl: anthropic.CacheControlEphemeralParam{
135-
Type: "ephemeral",
136-
},
137-
},
138-
},
139-
})
140-
141129
eventChan := make(chan ProviderEvent)
142130

143131
go func() {
144132
defer close(eventChan)
145133

146-
accumulatedMessage := anthropic.Message{}
134+
const maxRetries = 8
135+
attempts := 0
147136

148-
for stream.Next() {
149-
event := stream.Current()
150-
err := accumulatedMessage.Accumulate(event)
151-
if err != nil {
152-
eventChan <- ProviderEvent{Type: EventError, Error: err}
153-
return
137+
for {
138+
// If this isn't the first attempt, we're retrying
139+
if attempts > 0 {
140+
if attempts > maxRetries {
141+
eventChan <- ProviderEvent{
142+
Type: EventError,
143+
Error: errors.New("maximum retry attempts reached for rate limit (429)"),
144+
}
145+
return
146+
}
147+
148+
// Inform user we're retrying with attempt number
149+
eventChan <- ProviderEvent{
150+
Type: EventWarning,
151+
Info: fmt.Sprintf("[Retrying due to rate limit... attempt %d of %d]", attempts, maxRetries),
152+
}
153+
154+
// Calculate backoff with exponential backoff and jitter
155+
backoffMs := 2000 * (1 << (attempts - 1)) // 2s, 4s, 8s, 16s, 32s
156+
jitterMs := int(float64(backoffMs) * 0.2)
157+
totalBackoffMs := backoffMs + jitterMs
158+
159+
// Sleep with backoff, respecting context cancellation
160+
select {
161+
case <-ctx.Done():
162+
eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()}
163+
return
164+
case <-time.After(time.Duration(totalBackoffMs) * time.Millisecond):
165+
// Continue with retry
166+
}
154167
}
155168

156-
switch event := event.AsAny().(type) {
157-
case anthropic.ContentBlockStartEvent:
158-
eventChan <- ProviderEvent{Type: EventContentStart}
169+
attempts++
170+
171+
// Create new streaming request
172+
stream := a.client.Messages.NewStreaming(
173+
ctx,
174+
anthropic.MessageNewParams{
175+
Model: anthropic.Model(a.model.APIModel),
176+
MaxTokens: a.maxTokens,
177+
Temperature: temperature,
178+
Messages: anthropicMessages,
179+
Tools: anthropicTools,
180+
Thinking: thinkingParam,
181+
System: []anthropic.TextBlockParam{
182+
{
183+
Text: a.systemMessage,
184+
CacheControl: anthropic.CacheControlEphemeralParam{
185+
Type: "ephemeral",
186+
},
187+
},
188+
},
189+
},
190+
)
159191

160-
case anthropic.ContentBlockDeltaEvent:
161-
if event.Delta.Type == "thinking_delta" && event.Delta.Thinking != "" {
162-
eventChan <- ProviderEvent{
163-
Type: EventThinkingDelta,
164-
Thinking: event.Delta.Thinking,
192+
// Process stream events
193+
accumulatedMessage := anthropic.Message{}
194+
streamSuccess := false
195+
196+
// Process the stream until completion or error
197+
for stream.Next() {
198+
event := stream.Current()
199+
err := accumulatedMessage.Accumulate(event)
200+
if err != nil {
201+
eventChan <- ProviderEvent{Type: EventError, Error: err}
202+
return // Don't retry on accumulation errors
203+
}
204+
205+
switch event := event.AsAny().(type) {
206+
case anthropic.ContentBlockStartEvent:
207+
eventChan <- ProviderEvent{Type: EventContentStart}
208+
209+
case anthropic.ContentBlockDeltaEvent:
210+
if event.Delta.Type == "thinking_delta" && event.Delta.Thinking != "" {
211+
eventChan <- ProviderEvent{
212+
Type: EventThinkingDelta,
213+
Thinking: event.Delta.Thinking,
214+
}
215+
} else if event.Delta.Type == "text_delta" && event.Delta.Text != "" {
216+
eventChan <- ProviderEvent{
217+
Type: EventContentDelta,
218+
Content: event.Delta.Text,
219+
}
165220
}
166-
} else if event.Delta.Type == "text_delta" && event.Delta.Text != "" {
167-
eventChan <- ProviderEvent{
168-
Type: EventContentDelta,
169-
Content: event.Delta.Text,
221+
222+
case anthropic.ContentBlockStopEvent:
223+
eventChan <- ProviderEvent{Type: EventContentStop}
224+
225+
case anthropic.MessageStopEvent:
226+
streamSuccess = true
227+
content := ""
228+
for _, block := range accumulatedMessage.Content {
229+
if text, ok := block.AsAny().(anthropic.TextBlock); ok {
230+
content += text.Text
231+
}
170232
}
171-
}
172233

173-
case anthropic.ContentBlockStopEvent:
174-
eventChan <- ProviderEvent{Type: EventContentStop}
234+
toolCalls := a.extractToolCalls(accumulatedMessage.Content)
235+
tokenUsage := a.extractTokenUsage(accumulatedMessage.Usage)
175236

176-
case anthropic.MessageStopEvent:
177-
content := ""
178-
for _, block := range accumulatedMessage.Content {
179-
if text, ok := block.AsAny().(anthropic.TextBlock); ok {
180-
content += text.Text
237+
eventChan <- ProviderEvent{
238+
Type: EventComplete,
239+
Response: &ProviderResponse{
240+
Content: content,
241+
ToolCalls: toolCalls,
242+
Usage: tokenUsage,
243+
FinishReason: string(accumulatedMessage.StopReason),
244+
},
181245
}
182246
}
247+
}
183248

184-
toolCalls := a.extractToolCalls(accumulatedMessage.Content)
185-
tokenUsage := a.extractTokenUsage(accumulatedMessage.Usage)
249+
// If the stream completed successfully, we're done
250+
if streamSuccess {
251+
return
252+
}
186253

187-
eventChan <- ProviderEvent{
188-
Type: EventComplete,
189-
Response: &ProviderResponse{
190-
Content: content,
191-
ToolCalls: toolCalls,
192-
Usage: tokenUsage,
193-
FinishReason: string(accumulatedMessage.StopReason),
194-
},
254+
// Check for stream errors
255+
err := stream.Err()
256+
if err != nil {
257+
var apierr *anthropic.Error
258+
if errors.As(err, &apierr) {
259+
if apierr.StatusCode == 429 || apierr.StatusCode == 529 {
260+
// Check for Retry-After header
261+
if retryAfterValues := apierr.Response.Header.Values("Retry-After"); len(retryAfterValues) > 0 {
262+
// Parse the retry after value (seconds)
263+
var retryAfterSec int
264+
if _, err := fmt.Sscanf(retryAfterValues[0], "%d", &retryAfterSec); err == nil {
265+
retryMs := retryAfterSec * 1000
266+
267+
// Inform user of retry with specific wait time
268+
eventChan <- ProviderEvent{
269+
Type: EventWarning,
270+
Info: fmt.Sprintf("[Rate limited: waiting %d seconds as specified by API]", retryAfterSec),
271+
}
272+
273+
// Sleep respecting context cancellation
274+
select {
275+
case <-ctx.Done():
276+
eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()}
277+
return
278+
case <-time.After(time.Duration(retryMs) * time.Millisecond):
279+
// Continue with retry after specified delay
280+
continue
281+
}
282+
}
283+
}
284+
285+
// Fall back to exponential backoff if Retry-After parsing failed
286+
continue
287+
}
195288
}
196-
}
197-
}
198289

199-
if stream.Err() != nil {
200-
eventChan <- ProviderEvent{Type: EventError, Error: stream.Err()}
290+
// For non-rate limit errors, report and exit
291+
eventChan <- ProviderEvent{Type: EventError, Error: err}
292+
return
293+
}
201294
}
202295
}()
203296

@@ -311,3 +404,4 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag
311404

312405
return anthropicMessages
313406
}
407+

internal/llm/provider/provider.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
EventContentStop EventType = "content_stop"
1818
EventComplete EventType = "complete"
1919
EventError EventType = "error"
20+
EventWarning EventType = "warning"
21+
EventInfo EventType = "info"
2022
)
2123

2224
type TokenUsage struct {
@@ -40,6 +42,9 @@ type ProviderEvent struct {
4042
ToolCall *message.ToolCall
4143
Error error
4244
Response *ProviderResponse
45+
46+
// Used for giving users info on e.x retry
47+
Info string
4348
}
4449

4550
type Provider interface {

0 commit comments

Comments
 (0)