diff --git a/pkg/agent/context.go b/pkg/agent/context.go index 5a84c45e22..18563fc7d9 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -458,7 +458,7 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string { // // See: https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching // See: https://platform.openai.com/docs/guides/prompt-caching -func (cb *ContextBuilder) buildDynamicContext(channel, chatID string) string { +func (cb *ContextBuilder) buildDynamicContext(channel, chatID, chatName string) string { now := time.Now().Format("2006-01-02 15:04 (Monday)") rt := fmt.Sprintf("%s %s, Go %s", runtime.GOOS, runtime.GOARCH, runtime.Version()) @@ -466,7 +466,11 @@ func (cb *ContextBuilder) buildDynamicContext(channel, chatID string) string { fmt.Fprintf(&sb, "## Current Time\n%s\n\n## Runtime\n%s", now, rt) if channel != "" && chatID != "" { - fmt.Fprintf(&sb, "\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID) + if chatName != "" { + fmt.Fprintf(&sb, "\n\n## Current Session\nChannel: %s\nChat ID: %s (%s)", channel, chatID, chatName) + } else { + fmt.Fprintf(&sb, "\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID) + } } return sb.String() @@ -477,7 +481,7 @@ func (cb *ContextBuilder) BuildMessages( summary string, currentMessage string, media []string, - channel, chatID string, + channel, chatID, chatName string, ) []providers.Message { messages := []providers.Message{} @@ -493,7 +497,7 @@ func (cb *ContextBuilder) BuildMessages( staticPrompt := cb.BuildSystemPromptWithCache() // Build short dynamic context (time, runtime, session) — changes per request - dynamicCtx := cb.buildDynamicContext(channel, chatID) + dynamicCtx := cb.buildDynamicContext(channel, chatID, chatName) // Compose a single system message: static (cached) + dynamic + optional summary. // Keeping all system content in one message ensures every provider adapter can diff --git a/pkg/agent/context_cache_test.go b/pkg/agent/context_cache_test.go index 707510820d..4f9b8cb05f 100644 --- a/pkg/agent/context_cache_test.go +++ b/pkg/agent/context_cache_test.go @@ -82,7 +82,7 @@ func TestSingleSystemMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - msgs := cb.BuildMessages(tt.history, tt.summary, tt.message, nil, "test", "chat1") + msgs := cb.BuildMessages(tt.history, tt.summary, tt.message, nil, "test", "chat1", "") systemCount := 0 for _, m := range msgs { @@ -576,7 +576,7 @@ func TestConcurrentBuildSystemPromptWithCache(t *testing.T) { } // Also exercise BuildMessages concurrently - msgs := cb.BuildMessages(nil, "", "hello", nil, "test", "chat") + msgs := cb.BuildMessages(nil, "", "hello", nil, "test", "chat", "") if len(msgs) < 2 { errs <- "BuildMessages returned fewer than 2 messages" return @@ -664,6 +664,22 @@ func BenchmarkBuildMessagesWithCache(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _ = cb.BuildMessages(history, "summary", "new message", nil, "cli", "test") + _ = cb.BuildMessages(history, "summary", "new message", nil, "cli", "test", "") + } +} + +func TestBuildMessages_IncludesChatNameInDynamicContext(t *testing.T) { + tmpDir := setupWorkspace(t, map[string]string{ + "IDENTITY.md": "# Identity\nTest agent.", + }) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + msgs := cb.BuildMessages(nil, "", "hello", nil, "discord", "1234567890", "#general") + if len(msgs) == 0 { + t.Fatal("expected messages") + } + if !strings.Contains(msgs[0].Content, "Chat ID: 1234567890 (#general)") { + t.Fatalf("dynamic context missing chat name: %q", msgs[0].Content) } } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index f20a56b9c4..d04db78352 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -58,6 +58,7 @@ type processOptions struct { SessionKey string // Session identifier for history/context Channel string // Target channel for tool execution ChatID string // Target chat ID for tool execution + ChatName string // Optional human-readable chat/channel name UserMessage string // User message content (may include prefix) Media []string // media:// refs from inbound message DefaultResponse string // Response when LLM returns empty @@ -70,6 +71,7 @@ const ( defaultResponse = "I've completed processing but have no response to give. Increase `max_tool_iterations` in config.json." sessionKeyAgentPrefix = "agent:" metadataKeyAccountID = "account_id" + metadataKeyChatName = "chat_name" metadataKeyGuildID = "guild_id" metadataKeyTeamID = "team_id" metadataKeyParentPeerKind = "parent_peer_kind" @@ -735,6 +737,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) SessionKey: sessionKey, Channel: msg.Channel, ChatID: msg.ChatID, + ChatName: inboundMetadata(msg, metadataKeyChatName), UserMessage: msg.Content, Media: msg.Media, DefaultResponse: defaultResponse, @@ -879,6 +882,7 @@ func (al *AgentLoop) runAgentLoop( opts.Media, opts.Channel, opts.ChatID, + opts.ChatName, ) // Resolve media:// refs: images→base64 data URLs, non-images→local paths in content @@ -1150,7 +1154,7 @@ func (al *AgentLoop) runLLMIteration( newSummary := agent.Sessions.GetSummary(opts.SessionKey) messages = agent.ContextBuilder.BuildMessages( newHistory, newSummary, "", - nil, opts.Channel, opts.ChatID, + nil, opts.Channel, opts.ChatID, opts.ChatName, ) continue } diff --git a/pkg/channels/discord/discord.go b/pkg/channels/discord/discord.go index 83a04907c9..1a129300c1 100644 --- a/pkg/channels/discord/discord.go +++ b/pkg/channels/discord/discord.go @@ -459,10 +459,28 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag "channel_id": m.ChannelID, "is_dm": fmt.Sprintf("%t", m.GuildID == ""), } + if channelName := c.resolveChannelName(m.ChannelID); channelName != "" { + metadata["chat_name"] = channelName + } c.HandleMessage(c.ctx, peer, m.ID, senderID, m.ChannelID, content, mediaPaths, metadata, sender) } +func (c *DiscordChannel) resolveChannelName(channelID string) string { + if c == nil || c.session == nil || strings.TrimSpace(channelID) == "" { + return "" + } + if c.session.State != nil { + if ch, err := c.session.State.Channel(channelID); err == nil && ch != nil && strings.TrimSpace(ch.Name) != "" { + return strings.TrimSpace(ch.Name) + } + } + if ch, err := c.session.Channel(channelID); err == nil && ch != nil && strings.TrimSpace(ch.Name) != "" { + return strings.TrimSpace(ch.Name) + } + return "" +} + // startTyping starts a continuous typing indicator loop for the given chatID. // It stops any existing typing loop for that chatID before starting a new one. func (c *DiscordChannel) startTyping(chatID string) { diff --git a/pkg/channels/discord/discord_test.go b/pkg/channels/discord/discord_test.go index 0cd5328f40..6fd4a18831 100644 --- a/pkg/channels/discord/discord_test.go +++ b/pkg/channels/discord/discord_test.go @@ -89,3 +89,26 @@ func TestApplyDiscordProxy_InvalidProxyURL(t *testing.T) { t.Fatal("applyDiscordProxy() expected error for invalid proxy URL, got nil") } } + +func TestResolveChannelName_FromState(t *testing.T) { + session, err := discordgo.New("Bot test-token") + if err != nil { + t.Fatalf("discordgo.New() error: %v", err) + } + session.State = discordgo.NewState() + if err := session.State.GuildAdd(&discordgo.Guild{ID: "guild-1"}); err != nil { + t.Fatalf("GuildAdd() error: %v", err) + } + if err := session.State.ChannelAdd(&discordgo.Channel{ + ID: "123", + GuildID: "guild-1", + Name: "general", + }); err != nil { + t.Fatalf("ChannelAdd() error: %v", err) + } + + ch := &DiscordChannel{session: session} + if got := ch.resolveChannelName("123"); got != "general" { + t.Fatalf("resolveChannelName() = %q, want %q", got, "general") + } +} diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 34ee46b7bc..ad76a4b735 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -9,6 +9,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/mymmrac/telego" @@ -41,7 +42,8 @@ var ( type TelegramChannel struct { *channels.BaseChannel bot *telego.Bot - bh *th.BotHandler + bh telegramBotHandler + bhMu sync.Mutex config *config.Config chatIDs map[string]int64 ctx context.Context @@ -49,6 +51,14 @@ type TelegramChannel struct { registerFunc func(context.Context, []commands.Definition) error commandRegCancel context.CancelFunc + startPollingFunc func(context.Context) (<-chan telego.Update, error) + newHandlerFunc func(<-chan telego.Update) (telegramBotHandler, error) + sleepFunc func(context.Context, time.Duration) bool +} + +type telegramBotHandler interface { + Start() error + StopWithContext(context.Context) error } func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) { @@ -107,24 +117,18 @@ func (c *TelegramChannel) Start(ctx context.Context) error { c.ctx, c.cancel = context.WithCancel(ctx) - updates, err := c.bot.UpdatesViaLongPolling(c.ctx, &telego.GetUpdatesParams{ - Timeout: 30, - }) + updates, err := c.startLongPolling(c.ctx) if err != nil { c.cancel() return fmt.Errorf("failed to start long polling: %w", err) } - bh, err := th.NewBotHandler(c.bot, updates) + bh, err := c.newBotHandler(updates) if err != nil { c.cancel() return fmt.Errorf("failed to create bot handler: %w", err) } - c.bh = bh - - bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { - return c.handleMessage(ctx, &message) - }, th.AnyMessage()) + c.setBotHandler(bh) c.SetRunning(true) logger.InfoCF("telegram", "Telegram bot connected", map[string]any{ @@ -133,13 +137,7 @@ func (c *TelegramChannel) Start(ctx context.Context) error { c.startCommandRegistration(c.ctx, commands.BuiltinDefinitions()) - go func() { - if err = bh.Start(); err != nil { - logger.ErrorCF("telegram", "Bot handler failed", map[string]any{ - "error": err.Error(), - }) - } - }() + go c.runPollingLoop(c.ctx, bh) return nil } @@ -149,8 +147,8 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { c.SetRunning(false) // Stop the bot handler - if c.bh != nil { - _ = c.bh.StopWithContext(ctx) + if bh := c.currentBotHandler(); bh != nil { + _ = bh.StopWithContext(ctx) } // Cancel our context (stops long polling) @@ -164,6 +162,118 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { return nil } +func (c *TelegramChannel) startLongPolling(ctx context.Context) (<-chan telego.Update, error) { + if c.startPollingFunc != nil { + return c.startPollingFunc(ctx) + } + return c.bot.UpdatesViaLongPolling(ctx, &telego.GetUpdatesParams{ + Timeout: 30, + }) +} + +func (c *TelegramChannel) newBotHandler(updates <-chan telego.Update) (telegramBotHandler, error) { + if c.newHandlerFunc != nil { + return c.newHandlerFunc(updates) + } + + bh, err := th.NewBotHandler(c.bot, updates) + if err != nil { + return nil, err + } + bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { + return c.handleMessage(ctx, &message) + }, th.AnyMessage()) + return bh, nil +} + +func (c *TelegramChannel) setBotHandler(bh telegramBotHandler) { + c.bhMu.Lock() + defer c.bhMu.Unlock() + c.bh = bh +} + +func (c *TelegramChannel) currentBotHandler() telegramBotHandler { + c.bhMu.Lock() + defer c.bhMu.Unlock() + return c.bh +} + +func (c *TelegramChannel) sleep(ctx context.Context, delay time.Duration) bool { + if c.sleepFunc != nil { + return c.sleepFunc(ctx, delay) + } + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-ctx.Done(): + return false + case <-timer.C: + return true + } +} + +func (c *TelegramChannel) runPollingLoop(ctx context.Context, bh telegramBotHandler) { + backoff := time.Second + for { + if err := bh.Start(); err != nil && ctx.Err() == nil { + logger.ErrorCF("telegram", "Bot handler failed", map[string]any{ + "error": err.Error(), + }) + } + if ctx.Err() != nil { + return + } + + logger.WarnC("telegram", "Updates channel closed, restarting long polling") + if !c.sleep(ctx, backoff) { + return + } + + for { + updates, err := c.startLongPolling(ctx) + if err != nil { + logger.ErrorCF("telegram", "Failed to restart long polling", map[string]any{ + "error": err.Error(), + "retry_after": backoff.String(), + }) + if backoff < 30*time.Second { + backoff *= 2 + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + } + if !c.sleep(ctx, backoff) { + return + } + continue + } + + nextHandler, err := c.newBotHandler(updates) + if err != nil { + logger.ErrorCF("telegram", "Failed to recreate bot handler", map[string]any{ + "error": err.Error(), + "retry_after": backoff.String(), + }) + if backoff < 30*time.Second { + backoff *= 2 + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + } + if !c.sleep(ctx, backoff) { + return + } + continue + } + + c.setBotHandler(nextHandler) + bh = nextHandler + backoff = time.Second + break + } + } +} + func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return channels.ErrNotRunning diff --git a/pkg/channels/telegram/telegram_test.go b/pkg/channels/telegram/telegram_test.go index c2186d0a30..93d8b13480 100644 --- a/pkg/channels/telegram/telegram_test.go +++ b/pkg/channels/telegram/telegram_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "io" "strings" "testing" "time" @@ -460,3 +461,82 @@ func TestHandleMessage_ReplyThread_NonForum_NoIsolation(t *testing.T) { assert.Empty(t, inbound.Metadata["parent_peer_kind"]) assert.Empty(t, inbound.Metadata["parent_peer_id"]) } + +type fakeTelegramBotHandler struct { + start func() error + stop func(context.Context) error +} + +func (f *fakeTelegramBotHandler) Start() error { + if f.start != nil { + return f.start() + } + return nil +} + +func (f *fakeTelegramBotHandler) StopWithContext(ctx context.Context) error { + if f.stop != nil { + return f.stop(ctx) + } + return nil +} + +func TestRunPollingLoop_RestartsAfterHandlerStops(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + started := make(chan string, 2) + first := &fakeTelegramBotHandler{ + start: func() error { + started <- "first" + return nil + }, + } + second := &fakeTelegramBotHandler{ + start: func() error { + started <- "second" + <-ctx.Done() + return io.EOF + }, + } + + ch := &TelegramChannel{ + BaseChannel: channels.NewBaseChannel("telegram", nil, nil, nil), + sleepFunc: func(ctx context.Context, _ time.Duration) bool { + return ctx.Err() == nil + }, + } + + startPollingCalls := 0 + ch.startPollingFunc = func(context.Context) (<-chan telego.Update, error) { + startPollingCalls++ + return make(chan telego.Update), nil + } + ch.newHandlerFunc = func(<-chan telego.Update) (telegramBotHandler, error) { + return second, nil + } + + done := make(chan struct{}) + go func() { + ch.runPollingLoop(ctx, first) + close(done) + }() + + if got := <-started; got != "first" { + t.Fatalf("first start = %q, want %q", got, "first") + } + if got := <-started; got != "second" { + t.Fatalf("second start = %q, want %q", got, "second") + } + if startPollingCalls != 1 { + t.Fatalf("startPollingCalls = %d, want %d", startPollingCalls, 1) + } + + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("runPollingLoop did not stop after context cancellation") + } +} diff --git a/pkg/providers/error_classifier.go b/pkg/providers/error_classifier.go index fd9bf1e81f..a0c0881af3 100644 --- a/pkg/providers/error_classifier.go +++ b/pkg/providers/error_classifier.go @@ -48,6 +48,12 @@ var ( substr("timed out"), substr("deadline exceeded"), substr("context deadline exceeded"), + substr("failed to send request"), + substr("connection reset by peer"), + substr("connection refused"), + substr("no route to host"), + substr("unexpected eof"), + substr("tls handshake timeout"), } billingPatterns = []errorPattern{ diff --git a/pkg/providers/error_classifier_test.go b/pkg/providers/error_classifier_test.go index 67d9af62b2..098e5ec08b 100644 --- a/pkg/providers/error_classifier_test.go +++ b/pkg/providers/error_classifier_test.go @@ -154,6 +154,28 @@ func TestClassifyError_TimeoutPatterns(t *testing.T) { } } +func TestClassifyError_TransportSendPatterns(t *testing.T) { + patterns := []string{ + `failed to send request: Post "https://openrouter.ai/api/v1/chat/completions": read tcp 127.0.0.1:1->127.0.0.1:2: read: connection reset by peer`, + `failed to send request: dial tcp 127.0.0.1:443: connect: connection refused`, + `failed to send request: dial tcp 127.0.0.1:443: connect: no route to host`, + `failed to send request: Post "https://openrouter.ai/api/v1/chat/completions": unexpected EOF`, + `failed to send request: tls handshake timeout`, + } + + for _, msg := range patterns { + err := errors.New(msg) + result := ClassifyError(err, "openrouter", "stepfun/step-3.5-flash") + if result == nil { + t.Errorf("pattern %q: expected non-nil", msg) + continue + } + if result.Reason != FailoverTimeout { + t.Errorf("pattern %q: reason = %q, want timeout", msg, result.Reason) + } + } +} + func TestClassifyError_AuthPatterns(t *testing.T) { patterns := []string{ "invalid api key",