diff --git a/cmd/picoclaw/internal/agent/helpers.go b/cmd/picoclaw/internal/agent/helpers.go index f754abc652..a995945d2b 100644 --- a/cmd/picoclaw/internal/agent/helpers.go +++ b/cmd/picoclaw/internal/agent/helpers.go @@ -50,6 +50,7 @@ func agentCmd(message, sessionKey, model string, debug bool) error { msgBus := bus.NewMessageBus() defer msgBus.Close() agentLoop := agent.NewAgentLoop(cfg, msgBus, provider) + defer agentLoop.Close() // Print agent startup info (only for interactive mode) startupInfo := agentLoop.GetStartupInfo() diff --git a/cmd/picoclaw/internal/gateway/helpers.go b/cmd/picoclaw/internal/gateway/helpers.go index 4f93b858a3..fed3d5ffbe 100644 --- a/cmd/picoclaw/internal/gateway/helpers.go +++ b/cmd/picoclaw/internal/gateway/helpers.go @@ -214,6 +214,7 @@ func gatewayCmd(debug bool) error { cronService.Stop() mediaStore.Stop() agentLoop.Stop() + agentLoop.Close() fmt.Println("✓ Gateway stopped") return nil diff --git a/pkg/agent/instance.go b/pkg/agent/instance.go index 97cf0fa059..709b792034 100644 --- a/pkg/agent/instance.go +++ b/pkg/agent/instance.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "log" "os" @@ -9,6 +10,7 @@ import ( "strings" "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/memory" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" "github.com/sipeed/picoclaw/pkg/session" @@ -31,7 +33,7 @@ type AgentInstance struct { SummarizeMessageThreshold int SummarizeTokenPercent int Provider providers.LLMProvider - Sessions *session.SessionManager + Sessions session.SessionStore ContextBuilder *ContextBuilder Tools *tools.ToolRegistry Subagents *config.SubagentsConfig @@ -94,7 +96,7 @@ func NewAgentInstance( } sessionsDir := filepath.Join(workspace, "sessions") - sessionsManager := session.NewSessionManager(sessionsDir) + sessions := initSessionStore(sessionsDir) contextBuilder := NewContextBuilder(workspace) @@ -221,7 +223,7 @@ func NewAgentInstance( SummarizeMessageThreshold: summarizeMessageThreshold, SummarizeTokenPercent: summarizeTokenPercent, Provider: provider, - Sessions: sessionsManager, + Sessions: sessions, ContextBuilder: contextBuilder, Tools: toolsRegistry, Subagents: subagents, @@ -275,6 +277,39 @@ func compilePatterns(patterns []string) []*regexp.Regexp { return compiled } +// Close releases resources held by the agent's session store. +func (a *AgentInstance) Close() error { + if a.Sessions != nil { + return a.Sessions.Close() + } + return nil +} + +// initSessionStore creates the session persistence backend. +// It uses the JSONL store by default and auto-migrates legacy JSON sessions. +// Falls back to SessionManager if the JSONL store cannot be initialized or +// if migration fails (which indicates the store cannot write reliably). +func initSessionStore(dir string) session.SessionStore { + store, err := memory.NewJSONLStore(dir) + if err != nil { + log.Printf("memory: init store: %v; using json sessions", err) + return session.NewSessionManager(dir) + } + + if n, merr := memory.MigrateFromJSON(context.Background(), dir, store); merr != nil { + // Migration failure means the store could not write data. + // Fall back to SessionManager to avoid a split state where + // some sessions are in JSONL and others remain in JSON. + log.Printf("memory: migration failed: %v; falling back to json sessions", merr) + store.Close() + return session.NewSessionManager(dir) + } else if n > 0 { + log.Printf("memory: migrated %d session(s) to jsonl", n) + } + + return session.NewJSONLBackend(store) +} + func expandHome(path string) string { if path == "" { return path diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 3d13071c01..fb7806f2cc 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -380,6 +380,11 @@ func (al *AgentLoop) Stop() { al.running.Store(false) } +// Close releases resources held by agent session stores. Call after Stop. +func (al *AgentLoop) Close() { + al.registry.Close() +} + func (al *AgentLoop) RegisterTool(tool tools.Tool) { for _, agentID := range al.registry.ListAgentIDs() { if agent, ok := al.registry.GetAgent(agentID); ok { diff --git a/pkg/agent/registry.go b/pkg/agent/registry.go index 0e7973dc3e..58b7ce4404 100644 --- a/pkg/agent/registry.go +++ b/pkg/agent/registry.go @@ -114,6 +114,18 @@ func (r *AgentRegistry) ForEachTool(name string, fn func(tools.Tool)) { } } +// Close releases resources held by all registered agents. +func (r *AgentRegistry) Close() { + r.mu.RLock() + defer r.mu.RUnlock() + for _, agent := range r.agents { + if err := agent.Close(); err != nil { + logger.WarnCF("agent", "Failed to close agent", + map[string]any{"agent_id": agent.ID, "error": err.Error()}) + } + } +} + // GetDefaultAgent returns the default agent instance. func (r *AgentRegistry) GetDefaultAgent() *AgentInstance { r.mu.RLock() diff --git a/pkg/session/jsonl_backend.go b/pkg/session/jsonl_backend.go new file mode 100644 index 0000000000..7f470de153 --- /dev/null +++ b/pkg/session/jsonl_backend.go @@ -0,0 +1,81 @@ +package session + +import ( + "context" + "log" + + "github.com/sipeed/picoclaw/pkg/memory" + "github.com/sipeed/picoclaw/pkg/providers" +) + +// JSONLBackend adapts a memory.Store into the SessionStore interface. +// Write errors are logged rather than returned, matching the fire-and-forget +// contract of SessionManager that the agent loop relies on. +type JSONLBackend struct { + store memory.Store +} + +// NewJSONLBackend wraps a memory.Store for use as a SessionStore. +func NewJSONLBackend(store memory.Store) *JSONLBackend { + return &JSONLBackend{store: store} +} + +func (b *JSONLBackend) AddMessage(sessionKey, role, content string) { + if err := b.store.AddMessage(context.Background(), sessionKey, role, content); err != nil { + log.Printf("session: add message: %v", err) + } +} + +func (b *JSONLBackend) AddFullMessage(sessionKey string, msg providers.Message) { + if err := b.store.AddFullMessage(context.Background(), sessionKey, msg); err != nil { + log.Printf("session: add full message: %v", err) + } +} + +func (b *JSONLBackend) GetHistory(key string) []providers.Message { + msgs, err := b.store.GetHistory(context.Background(), key) + if err != nil { + log.Printf("session: get history: %v", err) + return []providers.Message{} + } + return msgs +} + +func (b *JSONLBackend) GetSummary(key string) string { + summary, err := b.store.GetSummary(context.Background(), key) + if err != nil { + log.Printf("session: get summary: %v", err) + return "" + } + return summary +} + +func (b *JSONLBackend) SetSummary(key, summary string) { + if err := b.store.SetSummary(context.Background(), key, summary); err != nil { + log.Printf("session: set summary: %v", err) + } +} + +func (b *JSONLBackend) SetHistory(key string, history []providers.Message) { + if err := b.store.SetHistory(context.Background(), key, history); err != nil { + log.Printf("session: set history: %v", err) + } +} + +func (b *JSONLBackend) TruncateHistory(key string, keepLast int) { + if err := b.store.TruncateHistory(context.Background(), key, keepLast); err != nil { + log.Printf("session: truncate history: %v", err) + } +} + +// Save persists session state. Since the JSONL store fsyncs every write +// immediately, the data is already durable. Save runs compaction to reclaim +// space from logically truncated messages (no-op when there are none). +func (b *JSONLBackend) Save(key string) error { + return b.store.Compact(context.Background(), key) +} + +// Close releases resources held by the underlying store. +func (b *JSONLBackend) Close() error { + return b.store.Close() +} diff --git a/pkg/session/jsonl_backend_test.go b/pkg/session/jsonl_backend_test.go new file mode 100644 index 0000000000..40fa019cba --- /dev/null +++ b/pkg/session/jsonl_backend_test.go @@ -0,0 +1,179 @@ +package session_test + +import ( + "fmt" + "testing" + + "github.com/sipeed/picoclaw/pkg/memory" + "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/session" +) + +// Compile-time interface satisfaction checks. +var ( + _ session.SessionStore = (*session.SessionManager)(nil) + _ session.SessionStore = (*session.JSONLBackend)(nil) +) + +func newBackend(t *testing.T) *session.JSONLBackend { + t.Helper() + store, err := memory.NewJSONLStore(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { store.Close() }) + return session.NewJSONLBackend(store) +} + +func TestJSONLBackend_AddAndGetHistory(t *testing.T) { + b := newBackend(t) + + b.AddMessage("s1", "user", "hello") + b.AddMessage("s1", "assistant", "hi") + + history := b.GetHistory("s1") + if len(history) != 2 { + t.Fatalf("got %d messages, want 2", len(history)) + } + if history[0].Role != "user" || history[0].Content != "hello" { + t.Errorf("msg[0] = %+v", history[0]) + } + if history[1].Role != "assistant" || history[1].Content != "hi" { + t.Errorf("msg[1] = %+v", history[1]) + } +} + +func TestJSONLBackend_AddFullMessage(t *testing.T) { + b := newBackend(t) + + msg := providers.Message{ + Role: "assistant", + Content: "done", + ToolCalls: []providers.ToolCall{ + {ID: "tc1", Function: &providers.FunctionCall{Name: "read_file", Arguments: `{"path":"x"}`}}, + }, + } + b.AddFullMessage("s1", msg) + + history := b.GetHistory("s1") + if len(history) != 1 { + t.Fatalf("got %d, want 1", len(history)) + } + if len(history[0].ToolCalls) != 1 || history[0].ToolCalls[0].ID != "tc1" { + t.Errorf("tool calls = %+v", history[0].ToolCalls) + } +} + +func TestJSONLBackend_Summary(t *testing.T) { + b := newBackend(t) + + if got := b.GetSummary("s1"); got != "" { + t.Errorf("got %q, want empty", got) + } + + b.SetSummary("s1", "test summary") + if got := b.GetSummary("s1"); got != "test summary" { + t.Errorf("got %q, want %q", got, "test summary") + } +} + +func TestJSONLBackend_TruncateAndSave(t *testing.T) { + b := newBackend(t) + + for i := 0; i < 10; i++ { + b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i)) + } + b.TruncateHistory("s1", 3) + + history := b.GetHistory("s1") + if len(history) != 3 { + t.Fatalf("got %d, want 3", len(history)) + } + if history[0].Content != "msg 7" { + t.Errorf("got %q, want %q", history[0].Content, "msg 7") + } + + // Save triggers compaction. + if err := b.Save("s1"); err != nil { + t.Fatal(err) + } + + // Messages still accessible after compaction. + history = b.GetHistory("s1") + if len(history) != 3 { + t.Fatalf("after save: got %d, want 3", len(history)) + } +} + +func TestJSONLBackend_SetHistory(t *testing.T) { + b := newBackend(t) + b.AddMessage("s1", "user", "old") + + b.SetHistory("s1", []providers.Message{ + {Role: "user", Content: "new1"}, + {Role: "assistant", Content: "new2"}, + }) + + history := b.GetHistory("s1") + if len(history) != 2 { + t.Fatalf("got %d, want 2", len(history)) + } + if history[0].Content != "new1" { + t.Errorf("got %q, want %q", history[0].Content, "new1") + } +} + +func TestJSONLBackend_EmptySession(t *testing.T) { + b := newBackend(t) + + history := b.GetHistory("nonexistent") + if history == nil { + t.Fatal("got nil, want empty slice") + } + if len(history) != 0 { + t.Errorf("got %d, want 0", len(history)) + } +} + +func TestJSONLBackend_SessionIsolation(t *testing.T) { + b := newBackend(t) + b.AddMessage("s1", "user", "session1") + b.AddMessage("s2", "user", "session2") + + h1 := b.GetHistory("s1") + h2 := b.GetHistory("s2") + + if len(h1) != 1 || h1[0].Content != "session1" { + t.Errorf("s1: %+v", h1) + } + if len(h2) != 1 || h2[0].Content != "session2" { + t.Errorf("s2: %+v", h2) + } +} + +func TestJSONLBackend_SummarizeFlow(t *testing.T) { + // Simulates the real summarization flow in the agent loop: + // SetSummary → TruncateHistory → Save + b := newBackend(t) + + for i := 0; i < 20; i++ { + b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i)) + } + + b.SetSummary("s1", "conversation about testing") + b.TruncateHistory("s1", 4) + if err := b.Save("s1"); err != nil { + t.Fatal(err) + } + + if got := b.GetSummary("s1"); got != "conversation about testing" { + t.Errorf("summary = %q", got) + } + history := b.GetHistory("s1") + if len(history) != 4 { + t.Fatalf("got %d messages, want 4", len(history)) + } + if history[0].Content != "msg 16" { + t.Errorf("first message = %q, want %q", history[0].Content, "msg 16") + } +} diff --git a/pkg/session/manager.go b/pkg/session/manager.go index 08f0b0ad21..07f981df17 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -265,6 +265,12 @@ func (sm *SessionManager) loadSessions() error { return nil } +// Close is a no-op for the in-memory SessionManager; it satisfies the +// SessionStore interface so callers can release resources uniformly. +func (sm *SessionManager) Close() error { + return nil +} + // SetHistory updates the messages of a session. func (sm *SessionManager) SetHistory(key string, history []providers.Message) { sm.mu.Lock() diff --git a/pkg/session/session_store.go b/pkg/session/session_store.go new file mode 100644 index 0000000000..1d1a2f9675 --- /dev/null +++ b/pkg/session/session_store.go @@ -0,0 +1,32 @@ +package session + +import "github.com/sipeed/picoclaw/pkg/providers" + +// SessionStore defines the persistence operations used by the agent loop. +// Both SessionManager (legacy JSON backend) and JSONLBackend satisfy this +// interface, allowing the storage layer to be swapped without touching the +// agent loop code. +// +// Write methods (Add*, Set*, Truncate*) are fire-and-forget: they do not +// return errors. Implementations should log failures internally. This +// matches the original SessionManager contract that the agent loop relies on. +type SessionStore interface { + // AddMessage appends a simple role/content message to the session. + AddMessage(sessionKey, role, content string) + // AddFullMessage appends a complete message including tool calls. + AddFullMessage(sessionKey string, msg providers.Message) + // GetHistory returns the full message history for the session. + GetHistory(key string) []providers.Message + // GetSummary returns the conversation summary, or "" if none. + GetSummary(key string) string + // SetSummary replaces the conversation summary. + SetSummary(key, summary string) + // SetHistory replaces the full message history. + SetHistory(key string, history []providers.Message) + // TruncateHistory keeps only the last keepLast messages. + TruncateHistory(key string, keepLast int) + // Save persists any pending state to durable storage. + Save(key string) error + // Close releases resources held by the store. + Close() error +}