Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@
"enabled": false,
"monitor_usb": true
},
"voice": {
"echo_transcription": false
},
"gateway": {
"host": "127.0.0.1",
"port": 18790
Expand Down
61 changes: 56 additions & 5 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)

// transcribeAudioInMessage resolves audio media refs, transcribes them, and
// replaces audio annotations in msg.Content with the transcribed text.
func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) bus.InboundMessage {
// Returns the (possibly modified) message and true if audio was transcribed.
func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) (bus.InboundMessage, bool) {
if al.transcriber == nil || al.mediaStore == nil || len(msg.Media) == 0 {
return msg
return msg, false
}

// Transcribe each audio media ref in order.
Expand All @@ -493,9 +494,11 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou
}

if len(transcriptions) == 0 {
return msg
return msg, false
}

al.sendTranscriptionFeedback(ctx, msg.Channel, msg.ChatID, msg.MessageID, transcriptions)

// Replace audio annotations sequentially with transcriptions.
idx := 0
newContent := audioAnnotationRe.ReplaceAllStringFunc(msg.Content, func(match string) string {
Expand All @@ -513,7 +516,48 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou
}

msg.Content = newContent
return msg
return msg, true
}

// sendTranscriptionFeedback sends feedback to the user with the result of
// audio transcription if the option is enabled. It uses Manager.SendMessage
// which executes synchronously (rate limiting, splitting, retry) so that
// ordering with the subsequent placeholder is guaranteed.
func (al *AgentLoop) sendTranscriptionFeedback(
ctx context.Context,
channel, chatID, messageID string,
validTexts []string,
) {
if !al.cfg.Voice.EchoTranscription {
return
}
if al.channelManager == nil {
return
}

var nonEmpty []string
for _, t := range validTexts {
if t != "" {
nonEmpty = append(nonEmpty, t)
}
}

var feedbackMsg string
if len(nonEmpty) > 0 {
feedbackMsg = "Transcript: " + strings.Join(nonEmpty, "\n")
} else {
feedbackMsg = "No voice detected in the audio"
}

err := al.channelManager.SendMessage(ctx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: feedbackMsg,
ReplyToMessageID: messageID,
})
if err != nil {
logger.WarnCF("voice", "Failed to send transcription feedback", map[string]any{"error": err.Error()})
}
}

// inferMediaType determines the media type ("image", "audio", "video", "file")
Expand Down Expand Up @@ -627,7 +671,14 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
},
)

msg = al.transcribeAudioInMessage(ctx, msg)
var hadAudio bool
msg, hadAudio = al.transcribeAudioInMessage(ctx, msg)

// For audio messages the placeholder was deferred by the channel.
// Now that transcription (and optional feedback) is done, send it.
if hadAudio && al.channelManager != nil {
al.channelManager.SendPlaceholder(ctx, msg.Channel, msg.ChatID)
}

// Route system messages to processSystemMessage
if msg.Channel == "system" {
Expand Down
7 changes: 4 additions & 3 deletions pkg/bus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type InboundMessage struct {
}

type OutboundMessage struct {
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
ReplyToMessageID string `json:"reply_to_message_id,omitempty"`
}

// MediaPart describes a single media attachment to send.
Expand Down
17 changes: 13 additions & 4 deletions pkg/channels/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/binary"
"encoding/hex"
"regexp"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -32,6 +33,9 @@ func init() {
uniqueIDPrefix = hex.EncodeToString(b[:])
}

// audioAnnotationRe matches audio/voice annotations injected by channels (e.g. [voice], [audio: file.ogg]).
var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)

// uniqueID generates a process-unique ID using a random prefix and an atomic counter.
// This ID is intended for internal correlation (e.g. media scope keys) and is NOT
// cryptographically secure β€” it must not be used in contexts where unpredictability matters.
Expand Down Expand Up @@ -284,10 +288,15 @@ func (c *BaseChannel) HandleMessage(
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder β€” independent pipeline
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
// Placeholder β€” independent pipeline.
// Skip when the message contains audio: the agent will send the
// placeholder after transcription completes, so the user sees
// "Thinking…" only once the voice has been processed.
if !audioAnnotationRe.MatchString(content) {
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
}
}
}
}
Expand Down
21 changes: 18 additions & 3 deletions pkg/channels/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
return nil
}

return c.sendChunk(ctx, channelID, msg.Content)
return c.sendChunk(ctx, channelID, msg.Content, msg.ReplyToMessageID)
}

// SendMedia implements the channels.MediaSender interface.
Expand Down Expand Up @@ -259,14 +259,29 @@ func (c *DiscordChannel) SendPlaceholder(ctx context.Context, chatID string) (st
return msg.ID, nil
}

func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error {
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, replyToID string) error {
// Use the passed ctx for timeout control
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
defer cancel()

done := make(chan error, 1)
go func() {
_, err := c.session.ChannelMessageSend(channelID, content)
var err error

// If we have an ID, we send the message as "Reply"
if replyToID != "" {
_, err = c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{
Content: content,
Reference: &discordgo.MessageReference{
MessageID: replyToID,
ChannelID: channelID,
},
})
} else {
// Otherwise, we send a normal message
_, err = c.session.ChannelMessageSend(channelID, content)
}

done <- err
}()

Expand Down
54 changes: 54 additions & 0 deletions pkg/channels/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,27 @@ func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) {
m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()})
}

// SendPlaceholder sends a "Thinking…" placeholder for the given channel/chatID
// and records it for later editing. Returns true if a placeholder was sent.
func (m *Manager) SendPlaceholder(ctx context.Context, channel, chatID string) bool {
m.mu.RLock()
ch, ok := m.channels[channel]
m.mu.RUnlock()
if !ok {
return false
}
pc, ok := ch.(PlaceholderCapable)
if !ok {
return false
}
phID, err := pc.SendPlaceholder(ctx, chatID)
if err != nil || phID == "" {
return false
}
m.RecordPlaceholder(channel, chatID, phID)
return true
}

// RecordTypingStop registers a typing stop function for later invocation.
// Implements PlaceholderRecorder.
func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
Expand Down Expand Up @@ -813,6 +834,39 @@ func (m *Manager) UnregisterChannel(name string) {
delete(m.channels, name)
}

// SendMessage sends an outbound message synchronously through the channel
// worker's rate limiter and retry logic. It blocks until the message is
// delivered (or all retries are exhausted), which preserves ordering when
// a subsequent operation depends on the message having been sent.
func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error {
m.mu.RLock()
_, exists := m.channels[msg.Channel]
w, wExists := m.workers[msg.Channel]
m.mu.RUnlock()

if !exists {
return fmt.Errorf("channel %s not found", msg.Channel)
}
if !wExists || w == nil {
return fmt.Errorf("channel %s has no active worker", msg.Channel)
}

maxLen := 0
if mlp, ok := w.ch.(MessageLengthProvider); ok {
maxLen = mlp.MaxMessageLength()
}
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
for _, chunk := range SplitMessage(msg.Content, maxLen) {
chunkMsg := msg
chunkMsg.Content = chunk
m.sendWithRetry(ctx, msg.Channel, w, chunkMsg)
}
} else {
m.sendWithRetry(ctx, msg.Channel, w, msg)
}
return nil
}

func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
m.mu.RLock()
_, exists := m.channels[channelName]
Expand Down
Loading
Loading