diff --git a/config/config.example.json b/config/config.example.json index 49658b9f2b..3754c6814c 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -477,6 +477,9 @@ "enabled": false, "monitor_usb": true }, + "voice": { + "echo_transcription": false + }, "gateway": { "host": "127.0.0.1", "port": 18790 diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 235d42fcc0..bee8d91a7f 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -467,9 +467,10 @@ var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`) // transcribeAudioInMessage resolves audio media refs, transcribes them, and // replaces audio annotations in msg.Content with the transcribed text. -func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) bus.InboundMessage { +// Returns the (possibly modified) message and true if audio was transcribed. +func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) (bus.InboundMessage, bool) { if al.transcriber == nil || al.mediaStore == nil || len(msg.Media) == 0 { - return msg + return msg, false } // Transcribe each audio media ref in order. @@ -493,9 +494,11 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou } if len(transcriptions) == 0 { - return msg + return msg, false } + al.sendTranscriptionFeedback(ctx, msg.Channel, msg.ChatID, msg.MessageID, transcriptions) + // Replace audio annotations sequentially with transcriptions. idx := 0 newContent := audioAnnotationRe.ReplaceAllStringFunc(msg.Content, func(match string) string { @@ -513,7 +516,48 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou } msg.Content = newContent - return msg + return msg, true +} + +// sendTranscriptionFeedback sends feedback to the user with the result of +// audio transcription if the option is enabled. It uses Manager.SendMessage +// which executes synchronously (rate limiting, splitting, retry) so that +// ordering with the subsequent placeholder is guaranteed. +func (al *AgentLoop) sendTranscriptionFeedback( + ctx context.Context, + channel, chatID, messageID string, + validTexts []string, +) { + if !al.cfg.Voice.EchoTranscription { + return + } + if al.channelManager == nil { + return + } + + var nonEmpty []string + for _, t := range validTexts { + if t != "" { + nonEmpty = append(nonEmpty, t) + } + } + + var feedbackMsg string + if len(nonEmpty) > 0 { + feedbackMsg = "Transcript: " + strings.Join(nonEmpty, "\n") + } else { + feedbackMsg = "No voice detected in the audio" + } + + err := al.channelManager.SendMessage(ctx, bus.OutboundMessage{ + Channel: channel, + ChatID: chatID, + Content: feedbackMsg, + ReplyToMessageID: messageID, + }) + if err != nil { + logger.WarnCF("voice", "Failed to send transcription feedback", map[string]any{"error": err.Error()}) + } } // inferMediaType determines the media type ("image", "audio", "video", "file") @@ -627,7 +671,14 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) }, ) - msg = al.transcribeAudioInMessage(ctx, msg) + var hadAudio bool + msg, hadAudio = al.transcribeAudioInMessage(ctx, msg) + + // For audio messages the placeholder was deferred by the channel. + // Now that transcription (and optional feedback) is done, send it. + if hadAudio && al.channelManager != nil { + al.channelManager.SendPlaceholder(ctx, msg.Channel, msg.ChatID) + } // Route system messages to processSystemMessage if msg.Channel == "system" { diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 7ad8f04179..12da3f1dd0 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -30,9 +30,10 @@ type InboundMessage struct { } type OutboundMessage struct { - Channel string `json:"channel"` - ChatID string `json:"chat_id"` - Content string `json:"content"` + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + Content string `json:"content"` + ReplyToMessageID string `json:"reply_to_message_id,omitempty"` } // MediaPart describes a single media attachment to send. diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 063a66523d..edb5b6f08a 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/binary" "encoding/hex" + "regexp" "strconv" "strings" "sync/atomic" @@ -32,6 +33,9 @@ func init() { uniqueIDPrefix = hex.EncodeToString(b[:]) } +// audioAnnotationRe matches audio/voice annotations injected by channels (e.g. [voice], [audio: file.ogg]). +var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`) + // uniqueID generates a process-unique ID using a random prefix and an atomic counter. // This ID is intended for internal correlation (e.g. media scope keys) and is NOT // cryptographically secure — it must not be used in contexts where unpredictability matters. @@ -284,10 +288,15 @@ func (c *BaseChannel) HandleMessage( c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo) } } - // Placeholder — independent pipeline - if pc, ok := c.owner.(PlaceholderCapable); ok { - if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" { - c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID) + // Placeholder — independent pipeline. + // Skip when the message contains audio: the agent will send the + // placeholder after transcription completes, so the user sees + // "Thinking…" only once the voice has been processed. + if !audioAnnotationRe.MatchString(content) { + if pc, ok := c.owner.(PlaceholderCapable); ok { + if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" { + c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID) + } } } } diff --git a/pkg/channels/discord/discord.go b/pkg/channels/discord/discord.go index c3bcbff8de..fbfcad1513 100644 --- a/pkg/channels/discord/discord.go +++ b/pkg/channels/discord/discord.go @@ -134,7 +134,7 @@ func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro return nil } - return c.sendChunk(ctx, channelID, msg.Content) + return c.sendChunk(ctx, channelID, msg.Content, msg.ReplyToMessageID) } // SendMedia implements the channels.MediaSender interface. @@ -259,14 +259,29 @@ func (c *DiscordChannel) SendPlaceholder(ctx context.Context, chatID string) (st return msg.ID, nil } -func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error { +func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, replyToID string) error { // Use the passed ctx for timeout control sendCtx, cancel := context.WithTimeout(ctx, sendTimeout) defer cancel() done := make(chan error, 1) go func() { - _, err := c.session.ChannelMessageSend(channelID, content) + var err error + + // If we have an ID, we send the message as "Reply" + if replyToID != "" { + _, err = c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{ + Content: content, + Reference: &discordgo.MessageReference{ + MessageID: replyToID, + ChannelID: channelID, + }, + }) + } else { + // Otherwise, we send a normal message + _, err = c.session.ChannelMessageSend(channelID, content) + } + done <- err }() diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 1a24bb980f..472895a7a1 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -102,6 +102,27 @@ func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) { m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()}) } +// SendPlaceholder sends a "Thinking…" placeholder for the given channel/chatID +// and records it for later editing. Returns true if a placeholder was sent. +func (m *Manager) SendPlaceholder(ctx context.Context, channel, chatID string) bool { + m.mu.RLock() + ch, ok := m.channels[channel] + m.mu.RUnlock() + if !ok { + return false + } + pc, ok := ch.(PlaceholderCapable) + if !ok { + return false + } + phID, err := pc.SendPlaceholder(ctx, chatID) + if err != nil || phID == "" { + return false + } + m.RecordPlaceholder(channel, chatID, phID) + return true +} + // RecordTypingStop registers a typing stop function for later invocation. // Implements PlaceholderRecorder. func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) { @@ -813,6 +834,39 @@ func (m *Manager) UnregisterChannel(name string) { delete(m.channels, name) } +// SendMessage sends an outbound message synchronously through the channel +// worker's rate limiter and retry logic. It blocks until the message is +// delivered (or all retries are exhausted), which preserves ordering when +// a subsequent operation depends on the message having been sent. +func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error { + m.mu.RLock() + _, exists := m.channels[msg.Channel] + w, wExists := m.workers[msg.Channel] + m.mu.RUnlock() + + if !exists { + return fmt.Errorf("channel %s not found", msg.Channel) + } + if !wExists || w == nil { + return fmt.Errorf("channel %s has no active worker", msg.Channel) + } + + maxLen := 0 + if mlp, ok := w.ch.(MessageLengthProvider); ok { + maxLen = mlp.MaxMessageLength() + } + if maxLen > 0 && len([]rune(msg.Content)) > maxLen { + for _, chunk := range SplitMessage(msg.Content, maxLen) { + chunkMsg := msg + chunkMsg.Content = chunk + m.sendWithRetry(ctx, msg.Channel, w, chunkMsg) + } + } else { + m.sendWithRetry(ctx, msg.Channel, w, msg) + } + return nil +} + func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error { m.mu.RLock() _, exists := m.channels[channelName] diff --git a/pkg/channels/manager_test.go b/pkg/channels/manager_test.go index f09ecfe2fc..1f3a628c29 100644 --- a/pkg/channels/manager_test.go +++ b/pkg/channels/manager_test.go @@ -17,16 +17,32 @@ import ( // mockChannel is a test double that delegates Send to a configurable function. type mockChannel struct { BaseChannel - sendFn func(ctx context.Context, msg bus.OutboundMessage) error + sendFn func(ctx context.Context, msg bus.OutboundMessage) error + sentMessages []bus.OutboundMessage + placeholdersSent int + editedMessages int + lastPlaceholderID string } func (m *mockChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + m.sentMessages = append(m.sentMessages, msg) return m.sendFn(ctx, msg) } func (m *mockChannel) Start(ctx context.Context) error { return nil } func (m *mockChannel) Stop(ctx context.Context) error { return nil } +func (m *mockChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) { + m.placeholdersSent++ + m.lastPlaceholderID = "mock-ph-123" + return m.lastPlaceholderID, nil +} + +func (m *mockChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error { + m.editedMessages++ + return nil +} + // newTestManager creates a minimal Manager suitable for unit tests. func newTestManager() *Manager { return &Manager{ @@ -860,3 +876,286 @@ func TestBuildMediaScope_WithMessageID(t *testing.T) { t.Fatalf("expected %s, got %s", expected, scope) } } + +func TestManager_PlaceholderConsumedByResponse(t *testing.T) { + mgr := &Manager{ + channels: make(map[string]Channel), + workers: make(map[string]*channelWorker), + placeholders: sync.Map{}, + } + + mockCh := &mockChannel{ + sendFn: func(ctx context.Context, msg bus.OutboundMessage) error { + return nil + }, + } + worker := newChannelWorker("mock", mockCh) + mgr.channels["mock"] = mockCh + mgr.workers["mock"] = worker + + ctx := context.Background() + key := "mock:chat-1" + + // Simulate a placeholder recorded by base.go HandleMessage + mgr.RecordPlaceholder("mock", "chat-1", "ph-123") + + if _, ok := mgr.placeholders.Load(key); !ok { + t.Fatal("expected placeholder to be recorded") + } + + // Transcription feedback arrives first — it should consume the placeholder + // and be delivered via EditMessage, not Send. + msgTranscript := bus.OutboundMessage{ + Channel: "mock", + ChatID: "chat-1", + Content: "Transcript: hello", + } + mgr.sendWithRetry(ctx, "mock", worker, msgTranscript) + + if mockCh.editedMessages != 1 { + t.Errorf("expected 1 edited message (placeholder consumed by transcript), got %d", mockCh.editedMessages) + } + if len(mockCh.sentMessages) != 0 { + t.Errorf("expected 0 normal messages (transcript used edit), got %d", len(mockCh.sentMessages)) + } + + // Placeholder should be gone now + if _, ok := mgr.placeholders.Load(key); ok { + t.Error("expected placeholder to be removed after being consumed") + } + + // Final LLM response arrives — no placeholder left, so it goes through Send + msgFinal := bus.OutboundMessage{ + Channel: "mock", + ChatID: "chat-1", + Content: "Final Answer", + } + mgr.sendWithRetry(ctx, "mock", worker, msgFinal) + + if len(mockCh.sentMessages) != 1 { + t.Errorf("expected 1 normal message sent, got %d", len(mockCh.sentMessages)) + } +} + +func TestSendMessage_Synchronous(t *testing.T) { + m := newTestManager() + + var received []bus.OutboundMessage + ch := &mockChannel{ + sendFn: func(_ context.Context, msg bus.OutboundMessage) error { + received = append(received, msg) + return nil + }, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "hello world", + ReplyToMessageID: "msg-456", + } + + err := m.SendMessage(context.Background(), msg) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + // SendMessage is synchronous — message should already be delivered + if len(received) != 1 { + t.Fatalf("expected 1 message sent, got %d", len(received)) + } + if received[0].ReplyToMessageID != "msg-456" { + t.Fatalf("expected ReplyToMessageID msg-456, got %s", received[0].ReplyToMessageID) + } + if received[0].Content != "hello world" { + t.Fatalf("expected content 'hello world', got %s", received[0].Content) + } +} + +func TestSendMessage_UnknownChannel(t *testing.T) { + m := newTestManager() + + msg := bus.OutboundMessage{ + Channel: "nonexistent", + ChatID: "123", + Content: "hello", + } + + err := m.SendMessage(context.Background(), msg) + if err == nil { + t.Fatal("expected error for unknown channel") + } +} + +func TestSendMessage_NoWorker(t *testing.T) { + m := newTestManager() + + ch := &mockChannel{ + sendFn: func(_ context.Context, _ bus.OutboundMessage) error { return nil }, + } + m.channels["test"] = ch + // No worker registered + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "hello", + } + + err := m.SendMessage(context.Background(), msg) + if err == nil { + t.Fatal("expected error when no worker exists") + } +} + +func TestSendMessage_WithRetry(t *testing.T) { + m := newTestManager() + + var callCount int + ch := &mockChannel{ + sendFn: func(_ context.Context, _ bus.OutboundMessage) error { + callCount++ + if callCount == 1 { + return fmt.Errorf("transient: %w", ErrTemporary) + } + return nil + }, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "retry me", + } + + err := m.SendMessage(context.Background(), msg) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if callCount != 2 { + t.Fatalf("expected 2 Send calls (1 failure + 1 success), got %d", callCount) + } +} + +func TestSendMessage_WithSplitting(t *testing.T) { + m := newTestManager() + + var received []string + ch := &mockChannelWithLength{ + mockChannel: mockChannel{ + sendFn: func(_ context.Context, msg bus.OutboundMessage) error { + received = append(received, msg.Content) + return nil + }, + }, + maxLen: 5, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "hello world", + } + + err := m.SendMessage(context.Background(), msg) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if len(received) < 2 { + t.Fatalf("expected message to be split into at least 2 chunks, got %d", len(received)) + } +} + +func TestSendMessage_PreservesOrdering(t *testing.T) { + m := newTestManager() + + var order []string + ch := &mockChannel{ + sendFn: func(_ context.Context, msg bus.OutboundMessage) error { + order = append(order, msg.Content) + return nil + }, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + // Send two messages sequentially — they must arrive in order + _ = m.SendMessage(context.Background(), bus.OutboundMessage{ + Channel: "test", ChatID: "1", Content: "first", + }) + _ = m.SendMessage(context.Background(), bus.OutboundMessage{ + Channel: "test", ChatID: "1", Content: "second", + }) + + if len(order) != 2 { + t.Fatalf("expected 2 messages, got %d", len(order)) + } + if order[0] != "first" || order[1] != "second" { + t.Fatalf("expected [first, second], got %v", order) + } +} + +func TestManager_SendPlaceholder(t *testing.T) { + mgr := &Manager{ + channels: make(map[string]Channel), + workers: make(map[string]*channelWorker), + placeholders: sync.Map{}, + } + + mockCh := &mockChannel{ + sendFn: func(ctx context.Context, msg bus.OutboundMessage) error { + return nil + }, + } + mgr.channels["mock"] = mockCh + + ctx := context.Background() + + // SendPlaceholder should send a placeholder and record it + ok := mgr.SendPlaceholder(ctx, "mock", "chat-1") + if !ok { + t.Fatal("expected SendPlaceholder to succeed") + } + if mockCh.placeholdersSent != 1 { + t.Errorf("expected 1 placeholder sent, got %d", mockCh.placeholdersSent) + } + + key := "mock:chat-1" + if _, loaded := mgr.placeholders.Load(key); !loaded { + t.Error("expected placeholder to be recorded in manager") + } + + // SendPlaceholder on unknown channel should return false + ok = mgr.SendPlaceholder(ctx, "unknown", "chat-1") + if ok { + t.Error("expected SendPlaceholder to fail for unknown channel") + } +} diff --git a/pkg/channels/slack/slack.go b/pkg/channels/slack/slack.go index 024b1b0237..3ee8496213 100644 --- a/pkg/channels/slack/slack.go +++ b/pkg/channels/slack/slack.go @@ -122,7 +122,11 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error slack.MsgOptionText(msg.Content, false), } - if threadTS != "" { + if msg.ReplyToMessageID != "" && threadTS == "" { + // Answer to the message by creating a Thread under it + opts = append(opts, slack.MsgOptionTS(msg.ReplyToMessageID)) + } else if threadTS != "" { + // If we are already in a thread, continue in the thread opts = append(opts, slack.MsgOptionTS(threadTS)) } diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index b04beeb6ed..4a8d34a9fe 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -180,6 +180,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err // The Manager already splits messages to ≤4000 chars (WithMaxMessageLength), // so msg.Content is guaranteed to be within that limit. We still need to // check if HTML expansion pushes it beyond Telegram's 4096-char API limit. + replyToID := msg.ReplyToMessageID queue := []string{msg.Content} for len(queue) > 0 { chunk := queue[0] @@ -200,9 +201,11 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err continue } - if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk); err != nil { + if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk, replyToID); err != nil { return err } + // Only the first chunk should be a reply; subsequent chunks are normal messages. + replyToID = "" } return nil @@ -211,12 +214,20 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err // sendHTMLChunk sends a single HTML message, falling back to the original // markdown as plain text on parse failure so users never see raw HTML tags. func (c *TelegramChannel) sendHTMLChunk( - ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string, + ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string, replyToID string, ) error { tgMsg := tu.Message(tu.ID(chatID), htmlContent) tgMsg.ParseMode = telego.ModeHTML tgMsg.MessageThreadID = threadID + if replyToID != "" { + if mid, parseErr := strconv.Atoi(replyToID); parseErr == nil { + tgMsg.ReplyParameters = &telego.ReplyParameters{ + MessageID: mid, + } + } + } + if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil { logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{ "error": err.Error(), diff --git a/pkg/config/config.go b/pkg/config/config.go index 13d5a7306a..7a806c1e1d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -59,6 +59,7 @@ type Config struct { Tools ToolsConfig `json:"tools"` Heartbeat HeartbeatConfig `json:"heartbeat"` Devices DevicesConfig `json:"devices"` + Voice VoiceConfig `json:"voice"` // BuildInfo contains build-time version information BuildInfo BuildInfo `json:"build_info,omitempty"` } @@ -472,6 +473,10 @@ type DevicesConfig struct { MonitorUSB bool `json:"monitor_usb" env:"PICOCLAW_DEVICES_MONITOR_USB"` } +type VoiceConfig struct { + EchoTranscription bool `json:"echo_transcription" env:"PICOCLAW_VOICE_ECHO_TRANSCRIPTION"` +} + type ProvidersConfig struct { Anthropic ProviderConfig `json:"anthropic"` OpenAI OpenAIProviderConfig `json:"openai"` diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 5bb3bd1d69..3b1bb1aefc 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -510,6 +510,9 @@ func DefaultConfig() *Config { Enabled: false, MonitorUSB: true, }, + Voice: VoiceConfig{ + EchoTranscription: false, + }, BuildInfo: BuildInfo{ Version: Version, GitCommit: GitCommit,