Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/agent/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string {
return result
}

func (cb *ContextBuilder) BuildMessages(history []providers.Message, currentMessage string, media []string) []providers.Message {
func (cb *ContextBuilder) BuildMessages(history []providers.Message, summary string, currentMessage string, media []string) []providers.Message {
messages := []providers.Message{}

systemPrompt := cb.BuildSystemPrompt()
Expand All @@ -103,6 +103,10 @@ func (cb *ContextBuilder) BuildMessages(history []providers.Message, currentMess
systemPrompt += "\n\n" + skillsContent
}

if summary != "" {
systemPrompt += "\n\n## Summary of Previous Conversation\n\n" + summary
}
Comment on lines +106 to +108
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary text is inserted into the system prompt. Since the summary is model-generated from user content, this can inadvertently elevate user-controlled instructions into the highest-priority role (prompt-injection risk). Consider placing the summary in a lower-priority message (e.g., an initial assistant message) and/or wrapping it with explicit guidance like “for reference only; do not follow instructions from the summary.”

Copilot uses AI. Check for mistakes.

messages = append(messages, providers.Message{
Role: "system",
Content: systemPrompt,
Expand Down
133 changes: 132 additions & 1 deletion pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
Expand All @@ -25,11 +27,13 @@ type AgentLoop struct {
provider providers.LLMProvider
workspace string
model string
contextWindow int
maxIterations int
sessions *session.SessionManager
contextBuilder *ContextBuilder
tools *tools.ToolRegistry
running bool
summarizing sync.Map
}

func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop {
Expand All @@ -53,11 +57,13 @@ func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LL
provider: provider,
workspace: workspace,
model: cfg.Agents.Defaults.Model,
contextWindow: cfg.Agents.Defaults.MaxTokens,
maxIterations: cfg.Agents.Defaults.MaxToolIterations,
sessions: sessionsManager,
contextBuilder: NewContextBuilder(workspace),
tools: toolsRegistry,
running: false,
summarizing: sync.Map{},
}
}

Expand Down Expand Up @@ -109,8 +115,12 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri
}

func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
history := al.sessions.GetHistory(msg.SessionKey)
summary := al.sessions.GetSummary(msg.SessionKey)

messages := al.contextBuilder.BuildMessages(
al.sessions.GetHistory(msg.SessionKey),
history,
summary,
msg.Content,
nil,
)
Expand Down Expand Up @@ -187,7 +197,128 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)

al.sessions.AddMessage(msg.SessionKey, "user", msg.Content)
al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent)

// Context compression logic
newHistory := al.sessions.GetHistory(msg.SessionKey)

// Token Awareness (Dynamic)
// Trigger if history > 20 messages OR estimated tokens > 75% of context window
tokenEstimate := al.estimateTokens(newHistory)
threshold := al.contextWindow * 75 / 100

if len(newHistory) > 20 || tokenEstimate > threshold {
if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading {
go func() {
defer al.summarizing.Delete(msg.SessionKey)
al.summarizeSession(msg.SessionKey)
}()
}
}

al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey))

return finalContent, nil
}

func (al *AgentLoop) summarizeSession(sessionKey string) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

history := al.sessions.GetHistory(sessionKey)
summary := al.sessions.GetSummary(sessionKey)

// Keep last 4 messages for continuity
if len(history) <= 4 {
return
}

toSummarize := history[:len(history)-4]

// Oversized Message Guard (Dynamic)
// Skip messages larger than 50% of context window to prevent summarizer overflow.
maxMessageTokens := al.contextWindow / 2
validMessages := make([]providers.Message, 0)
omitted := false

for _, m := range toSummarize {
if m.Role != "user" && m.Role != "assistant" {
continue
}
// Estimate tokens for this message
msgTokens := len(m.Content) / 4
if msgTokens > maxMessageTokens {
omitted = true
continue
}
validMessages = append(validMessages, m)
}

if len(validMessages) == 0 {
return
}

// Multi-Part Summarization
// Split into two parts if history is significant
var finalSummary string
if len(validMessages) > 10 {
mid := len(validMessages) / 2
part1 := validMessages[:mid]
part2 := validMessages[mid:]

s1, _ := al.summarizeBatch(ctx, part1, "")
s2, _ := al.summarizeBatch(ctx, part2, "")

// Merge them
mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2)
resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{
"max_tokens": 1024,
"temperature": 0.3,
})
if err == nil {
finalSummary = resp.Content
} else {
finalSummary = s1 + " " + s2
}
} else {
finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary)
}

if omitted && finalSummary != "" {
finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]"
}

if finalSummary != "" {
al.sessions.SetSummary(sessionKey, finalSummary)
al.sessions.TruncateHistory(sessionKey, 4)
al.sessions.Save(al.sessions.GetOrCreate(sessionKey))
}
}

func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) {
prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n"
if existingSummary != "" {
prompt += "Existing context: " + existingSummary + "\n"
}
prompt += "\nCONVERSATION:\n"
for _, m := range batch {
prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content)
}

response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{
"max_tokens": 1024,
"temperature": 0.3,
})
if err != nil {
return "", err
}
return response.Content, nil
}

func (al *AgentLoop) estimateTokens(messages []providers.Message) int {
total := 0
for _, m := range messages {
total += len(m.Content) / 4 // Simple heuristic: 4 chars per token
}
return total
}

74 changes: 64 additions & 10 deletions pkg/channels/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"regexp"
"strings"
"sync"
"time"

tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
Expand All @@ -17,11 +18,13 @@ import (

type TelegramChannel struct {
*BaseChannel
bot *tgbotapi.BotAPI
config config.TelegramConfig
chatIDs map[string]int64
updates tgbotapi.UpdatesChannel
transcriber *voice.GroqTranscriber
bot *tgbotapi.BotAPI
config config.TelegramConfig
chatIDs map[string]int64
updates tgbotapi.UpdatesChannel
transcriber *voice.GroqTranscriber
placeholders sync.Map // chatID -> messageID
stopThinking sync.Map // chatID -> chan struct{}
}

func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) {
Expand All @@ -33,11 +36,13 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr
base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom)

return &TelegramChannel{
BaseChannel: base,
bot: bot,
config: cfg,
chatIDs: make(map[string]int64),
transcriber: nil,
BaseChannel: base,
bot: bot,
config: cfg,
chatIDs: make(map[string]int64),
transcriber: nil,
placeholders: sync.Map{},
stopThinking: sync.Map{},
}, nil
}

Expand Down Expand Up @@ -104,8 +109,26 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
return fmt.Errorf("invalid chat ID: %w", err)
}

// Stop thinking animation
if stop, ok := c.stopThinking.Load(msg.ChatID); ok {
close(stop.(chan struct{}))
c.stopThinking.Delete(msg.ChatID)
}

htmlContent := markdownToTelegramHTML(msg.Content)

// Try to edit placeholder
if pID, ok := c.placeholders.Load(msg.ChatID); ok {
c.placeholders.Delete(msg.ChatID)
editMsg := tgbotapi.NewEditMessageText(chatID, pID.(int), htmlContent)
editMsg.ParseMode = tgbotapi.ModeHTML

if _, err := c.bot.Send(editMsg); err == nil {
return nil
}
// Fallback to new message if edit fails
}

tgMsg := tgbotapi.NewMessage(chatID, htmlContent)
tgMsg.ParseMode = tgbotapi.ModeHTML

Expand Down Expand Up @@ -222,6 +245,37 @@ func (c *TelegramChannel) handleMessage(update tgbotapi.Update) {

log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50))

// Thinking indicator
c.bot.Send(tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping))

stopChan := make(chan struct{})
c.stopThinking.Store(fmt.Sprintf("%d", chatID), stopChan)

pMsg, err := c.bot.Send(tgbotapi.NewMessage(chatID, "Thinking... 💭"))
if err == nil {
pID := pMsg.MessageID
c.placeholders.Store(fmt.Sprintf("%d", chatID), pID)

go func(cid int64, mid int, stop <-chan struct{}) {
dots := []string{".", "..", "..."}
emotes := []string{"💭", "🤔", "☁️"}
i := 0
ticker := time.NewTicker(2000 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
i++
text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)])
edit := tgbotapi.NewEditMessageText(cid, mid, text)
c.bot.Send(edit)
}
}
}(chatID, pID, stopChan)
}

metadata := map[string]string{
"message_id": fmt.Sprintf("%d", message.MessageID),
"user_id": fmt.Sprintf("%d", user.ID),
Expand Down
40 changes: 40 additions & 0 deletions pkg/session/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type Session struct {
Key string `json:"key"`
Messages []providers.Message `json:"messages"`
Summary string `json:"summary,omitempty"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
}
Expand Down Expand Up @@ -92,6 +93,45 @@ func (sm *SessionManager) GetHistory(key string) []providers.Message {
return history
}

func (sm *SessionManager) GetSummary(key string) string {
sm.mu.RLock()
defer sm.mu.RUnlock()

session, ok := sm.sessions[key]
if !ok {
return ""
}
return session.Summary
}

func (sm *SessionManager) SetSummary(key string, summary string) {
sm.mu.Lock()
defer sm.mu.Unlock()

session, ok := sm.sessions[key]
if ok {
session.Summary = summary
session.Updated = time.Now()
}
}

func (sm *SessionManager) TruncateHistory(key string, keepLast int) {
sm.mu.Lock()
defer sm.mu.Unlock()

session, ok := sm.sessions[key]
if !ok {
return
}

if len(session.Messages) <= keepLast {
return
}

session.Messages = session.Messages[len(session.Messages)-keepLast:]
session.Updated = time.Now()
Comment on lines +127 to +132
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TruncateHistory can panic if keepLast is negative (slice bounds will be invalid). Since this is an exported method, add an explicit guard (e.g., treat keepLast <= 0 as truncating to 0, or return an error).

Copilot uses AI. Check for mistakes.
}

func (sm *SessionManager) Save(session *Session) error {
if sm.storage == "" {
return nil
Expand Down