Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
468723b
feat(telegram): stream LLM responses in real-time via sendMessageDraft
amirmamaghani Mar 4, 2026
6ab2c82
fix(telegram): delete placeholder message when streaming delivers res…
amirmamaghani Mar 4, 2026
9792df6
refactor(streaming): remove dead code and simplify streaming wiring
amirmamaghani Mar 4, 2026
2d5f320
fix(streaming): skip streamer acquisition when SendResponse is false
amirmamaghani Mar 4, 2026
55e2466
fix(streaming): guard streamer for non-sendable messages, add streami…
amirmamaghani Mar 4, 2026
0f310f1
Merge branch 'main' into feat/telegram-streaming
amirmamaghani Mar 5, 2026
b6d7970
feat(telegram): stream LLM responses in real-time via sendMessageDraft
amirmamaghani Mar 4, 2026
2945d50
fix(telegram): delete placeholder message when streaming delivers res…
amirmamaghani Mar 4, 2026
7911142
refactor(streaming): remove dead code and simplify streaming wiring
amirmamaghani Mar 4, 2026
6b90fe4
fix(streaming): skip streamer acquisition when SendResponse is false
amirmamaghani Mar 4, 2026
1ab2072
fix(streaming): guard streamer for non-sendable messages, add streami…
amirmamaghani Mar 4, 2026
e4bf599
Merge branch 'feat/telegram-streaming' of github.com:amirmamaghani/pi…
amirmamaghani Mar 5, 2026
9938028
fix(picoclaw): add missing closing brace for StreamingProvider interface
amirmamaghani Mar 5, 2026
9f3caa3
fix: resolve golangci-lint formatting issues
amirmamaghani Mar 5, 2026
fa3c00a
fix: address code review feedback on streaming PR
amirmamaghani Mar 5, 2026
0213494
feat: make streaming throttle interval and min growth configurable
amirmamaghani Mar 5, 2026
a4b2031
merge: incorporate upstream main into feat/telegram-streaming
amirmamaghani Mar 6, 2026
a514e52
merge: sync main into feat/telegram-streaming
amirmamaghani Mar 7, 2026
c85e977
Merge branch 'main' into feat/telegram-streaming
amirmamaghani Mar 11, 2026
4669e5a
fix(telegram): use parseTelegramChatID in DeleteMessage and BeginStream
amirmamaghani Mar 11, 2026
1ab4cab
merge: resolve conflicts with main branch and remove pico-echo-server…
amirmamaghani Mar 18, 2026
ac68b6d
merge: resolve conflict with main in channel manager
amirmamaghani Mar 20, 2026
aabe47a
fix(streaming): set streamActive only after successful Finalize
amirmamaghani Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@
"allow_from": [
"YOUR_USER_ID"
],
"reasoning_channel_id": ""
"reasoning_channel_id": "",
"streaming": {
"enabled": true
}
},
"discord": {
"enabled": false,
Expand Down
34 changes: 34 additions & 0 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ func (al *AgentLoop) handleReasoning(
}

// runLLMIteration executes the LLM call loop with tool handling.
// Returns (finalContent, iteration, error).
func (al *AgentLoop) runLLMIteration(
ctx context.Context,
agent *AgentInstance,
Expand All @@ -820,6 +821,13 @@ func (al *AgentLoop) runLLMIteration(
iteration := 0
var finalContent string

// Check if both the provider and channel support streaming
streamProvider, providerCanStream := agent.Provider.(providers.StreamingProvider)
var streamer bus.Streamer
if providerCanStream && !opts.NoHistory && !constants.IsInternalChannel(opts.Channel) {
streamer, _ = al.bus.GetStreamer(ctx, opts.Channel, opts.ChatID)
}

for iteration < agent.MaxIterations {
iteration++

Expand Down Expand Up @@ -875,6 +883,16 @@ func (al *AgentLoop) runLLMIteration(
}

callLLM := func() (*providers.LLMResponse, error) {
// Use streaming when available (streamer obtained, provider supports it)
if streamer != nil && streamProvider != nil {
return streamProvider.ChatStream(
ctx, messages, providerToolDefs, agent.Model, llmOpts,
func(accumulated string) {
streamer.Update(ctx, accumulated)
},
)
}

if len(agent.Candidates) > 1 && al.fallback != nil {
fbResult, fbErr := al.fallback.Execute(
ctx,
Expand Down Expand Up @@ -998,15 +1016,31 @@ func (al *AgentLoop) runLLMIteration(
// Check if no tool calls - we're done
if len(response.ToolCalls) == 0 {
finalContent = response.Content

// If we were streaming, finalize the message (sends the permanent message)
if streamer != nil {
if err := streamer.Finalize(ctx, finalContent); err != nil {
logger.WarnCF("agent", "Stream finalize failed", map[string]any{
"error": err.Error(),
})
}
}

logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"content_chars": len(finalContent),
"streamed": streamer != nil,
})
break
}

// Tool calls detected — cancel any active stream (draft auto-expires)
if streamer != nil {
streamer.Cancel(ctx)
}

normalizedToolCalls := make([]providers.ToolCall, 0, len(response.ToolCalls))
for _, tc := range response.ToolCalls {
normalizedToolCalls = append(normalizedToolCalls, providers.NormalizeToolCall(tc))
Expand Down
40 changes: 35 additions & 5 deletions pkg/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,29 @@ var ErrBusClosed = errors.New("message bus closed")

const defaultBusBufferSize = 64

// StreamDelegate is implemented by the channel Manager to provide streaming
// capabilities to the agent loop without tight coupling.
type StreamDelegate interface {
// GetStreamer returns a Streamer for the given channel+chatID if the channel
// supports streaming. Returns nil, false if streaming is unavailable.
GetStreamer(ctx context.Context, channel, chatID string) (Streamer, bool)
}

// Streamer pushes incremental content to a streaming-capable channel.
// Defined here so the agent loop can use it without importing pkg/channels.
type Streamer interface {
Update(ctx context.Context, content string) error
Finalize(ctx context.Context, content string) error
Cancel(ctx context.Context)
}

type MessageBus struct {
inbound chan InboundMessage
outbound chan OutboundMessage
outboundMedia chan OutboundMediaMessage
done chan struct{}
closed atomic.Bool
inbound chan InboundMessage
outbound chan OutboundMessage
outboundMedia chan OutboundMediaMessage
done chan struct{}
closed atomic.Bool
streamDelegate atomic.Value // stores StreamDelegate
}

func NewMessageBus() *MessageBus {
Expand Down Expand Up @@ -114,6 +131,19 @@ func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMedia
}
}

// SetStreamDelegate registers a StreamDelegate (typically the channel Manager).
func (mb *MessageBus) SetStreamDelegate(d StreamDelegate) {
mb.streamDelegate.Store(d)
}

// GetStreamer returns a Streamer for the given channel+chatID via the delegate.
func (mb *MessageBus) GetStreamer(ctx context.Context, channel, chatID string) (Streamer, bool) {
if d, ok := mb.streamDelegate.Load().(StreamDelegate); ok && d != nil {
return d.GetStreamer(ctx, channel, chatID)
}
return nil, false
}

func (mb *MessageBus) Close() {
if mb.closed.CompareAndSwap(false, true) {
close(mb.done)
Expand Down
10 changes: 7 additions & 3 deletions pkg/channels/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,24 @@ func (c *BaseChannel) HandleMessage(

// Auto-trigger typing indicator, message reaction, and placeholder before publishing.
// Each capability is independent — all three may fire for the same message.
// Note: even when streaming is available, we still show typing + placeholder on inbound.
// If streaming actually activates, preSend will skip the placeholder edit (streamActive map)
// and the typing stop will still be called. This avoids the problem of compile-time interface
// checks incorrectly skipping indicators when streaming may not work at runtime.
if c.owner != nil && c.placeholderRecorder != nil {
// Typing — independent pipeline
// Typing
if tc, ok := c.owner.(TypingCapable); ok {
if stop, err := tc.StartTyping(ctx, chatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
}
}
// Reaction — independent pipeline
// Reaction
if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder — independent pipeline
// Placeholder
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
Expand Down
23 changes: 23 additions & 0 deletions pkg/channels/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ type MessageEditor interface {
EditMessage(ctx context.Context, chatID string, messageID string, content string) error
}

// MessageDeleter — channels that can delete a message by ID.
type MessageDeleter interface {
DeleteMessage(ctx context.Context, chatID string, messageID string) error
}

// ReactionCapable — channels that can add a reaction (e.g. 👀) to an inbound message.
// ReactToMessage adds a reaction and returns an undo function to remove it.
// The undo function MUST be idempotent and safe to call multiple times.
Expand All @@ -31,6 +36,24 @@ type PlaceholderCapable interface {
SendPlaceholder(ctx context.Context, chatID string) (messageID string, err error)
}

// StreamingCapable — channels that can show partial LLM output in real-time.
// The channel SHOULD gracefully degrade if the platform rejects streaming
// (e.g. Telegram bot without forum mode). In that case, Update becomes a no-op
// and Finalize still delivers the final message.
type StreamingCapable interface {
BeginStream(ctx context.Context, chatID string) (Streamer, error)
}

// Streamer pushes incremental content to a streaming-capable channel.
type Streamer interface {
// Update sends accumulated partial content to the user.
Update(ctx context.Context, content string) error
// Finalize commits the final message. After this, the Streamer is done.
Finalize(ctx context.Context, content string) error
// Cancel aborts the stream (e.g. when tool calls are detected mid-stream).
Cancel(ctx context.Context)
}

// PlaceholderRecorder is injected into channels by Manager.
// Channels call these methods on inbound to register typing/placeholder state.
// Manager uses the registered state on outbound to stop typing and edit placeholders.
Expand Down
67 changes: 65 additions & 2 deletions pkg/channels/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Manager struct {
placeholders sync.Map // "channel:chatID" → placeholderID (string)
typingStops sync.Map // "channel:chatID" → func()
reactionUndos sync.Map // "channel:chatID" → reactionEntry
streamActive sync.Map // "channel:chatID" → true (set when streamer.Finalize sent the message)
}

type asyncTask struct {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
}

// preSend handles typing stop, reaction undo, and placeholder editing before sending a message.
// Returns true if the message was edited into a placeholder (skip Send).
// Returns true if the message was already delivered (skip Send).
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool {
key := name + ":" + msg.ChatID

Expand All @@ -132,7 +133,22 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess
}
}

// 3. Try editing placeholder
// 3. If a stream already finalized this message, delete the placeholder and skip send
if _, loaded := m.streamActive.LoadAndDelete(key); loaded {
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
// Prefer deleting the placeholder (cleaner UX than editing to same content)
if deleter, ok := ch.(MessageDeleter); ok {
deleter.DeleteMessage(ctx, msg.ChatID, entry.id) // best effort
} else if editor, ok := ch.(MessageEditor); ok {
editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content) // fallback
}
}
}
return true
}

// 4. Try editing placeholder
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
if editor, ok := ch.(MessageEditor); ok {
Expand All @@ -156,13 +172,60 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.Medi
mediaStore: store,
}

// Register as streaming delegate so the agent loop can obtain streamers
messageBus.SetStreamDelegate(m)

if err := m.initChannels(); err != nil {
return nil, err
}

return m, nil
}

// GetStreamer implements bus.StreamDelegate.
// It checks if the named channel supports streaming and returns a Streamer.
func (m *Manager) GetStreamer(ctx context.Context, channelName, chatID string) (bus.Streamer, bool) {
m.mu.RLock()
ch, exists := m.channels[channelName]
m.mu.RUnlock()

if !exists {
return nil, false
}

sc, ok := ch.(StreamingCapable)
if !ok {
return nil, false
}

streamer, err := sc.BeginStream(ctx, chatID)
if err != nil {
logger.DebugCF("channels", "Streaming unavailable, falling back to placeholder", map[string]any{
"channel": channelName,
"error": err.Error(),
})
return nil, false
}

// Mark streamActive on Finalize so preSend knows to clean up the placeholder
key := channelName + ":" + chatID
return &finalizeHookStreamer{
Streamer: streamer,
onFinalize: func() { m.streamActive.Store(key, true) },
}, true
}

// finalizeHookStreamer wraps a Streamer to run a hook on Finalize.
type finalizeHookStreamer struct {
Streamer
onFinalize func()
}

func (s *finalizeHookStreamer) Finalize(ctx context.Context, content string) error {
s.onFinalize()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting streamActive to True too early may cause the message sending to not fall back to the regular path after Streamer.Finalize fails.

return s.Streamer.Finalize(ctx, content)
}

// initChannel is a helper that looks up a factory by name and creates the channel.
func (m *Manager) initChannel(name, displayName string) {
f, ok := getFactory(name)
Expand Down
Loading
Loading