Skip to content
1 change: 1 addition & 0 deletions cmd/picoclaw/internal/agent/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func agentCmd(message, sessionKey, model string, debug bool) error {
msgBus := bus.NewMessageBus()
defer msgBus.Close()
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
defer agentLoop.Close()

// Print agent startup info (only for interactive mode)
startupInfo := agentLoop.GetStartupInfo()
Expand Down
1 change: 1 addition & 0 deletions cmd/picoclaw/internal/gateway/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func gatewayCmd(debug bool) error {
cronService.Stop()
mediaStore.Stop()
agentLoop.Stop()
agentLoop.Close()
fmt.Println("✓ Gateway stopped")

return nil
Expand Down
41 changes: 38 additions & 3 deletions pkg/agent/instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"context"
"fmt"
"log"
"os"
Expand All @@ -9,6 +10,7 @@ import (
"strings"

"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/memory"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/routing"
"github.com/sipeed/picoclaw/pkg/session"
Expand All @@ -31,7 +33,7 @@ type AgentInstance struct {
SummarizeMessageThreshold int
SummarizeTokenPercent int
Provider providers.LLMProvider
Sessions *session.SessionManager
Sessions session.SessionStore
ContextBuilder *ContextBuilder
Tools *tools.ToolRegistry
Subagents *config.SubagentsConfig
Expand Down Expand Up @@ -94,7 +96,7 @@ func NewAgentInstance(
}

sessionsDir := filepath.Join(workspace, "sessions")
sessionsManager := session.NewSessionManager(sessionsDir)
sessions := initSessionStore(sessionsDir)

contextBuilder := NewContextBuilder(workspace)

Expand Down Expand Up @@ -221,7 +223,7 @@ func NewAgentInstance(
SummarizeMessageThreshold: summarizeMessageThreshold,
SummarizeTokenPercent: summarizeTokenPercent,
Provider: provider,
Sessions: sessionsManager,
Sessions: sessions,
ContextBuilder: contextBuilder,
Tools: toolsRegistry,
Subagents: subagents,
Expand Down Expand Up @@ -275,6 +277,39 @@ func compilePatterns(patterns []string) []*regexp.Regexp {
return compiled
}

// Close releases resources held by the agent's session store.
func (a *AgentInstance) Close() error {
if a.Sessions != nil {
return a.Sessions.Close()
}
return nil
}

// initSessionStore creates the session persistence backend.
// It uses the JSONL store by default and auto-migrates legacy JSON sessions.
// Falls back to SessionManager if the JSONL store cannot be initialized or
// if migration fails (which indicates the store cannot write reliably).
func initSessionStore(dir string) session.SessionStore {
store, err := memory.NewJSONLStore(dir)
if err != nil {
log.Printf("memory: init store: %v; using json sessions", err)
return session.NewSessionManager(dir)
}

if n, merr := memory.MigrateFromJSON(context.Background(), dir, store); merr != nil {
// Migration failure means the store could not write data.
// Fall back to SessionManager to avoid a split state where
// some sessions are in JSONL and others remain in JSON.
log.Printf("memory: migration failed: %v; falling back to json sessions", merr)
store.Close()
return session.NewSessionManager(dir)
} else if n > 0 {
log.Printf("memory: migrated %d session(s) to jsonl", n)
}

return session.NewJSONLBackend(store)
}

func expandHome(path string) string {
if path == "" {
return path
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ func (al *AgentLoop) Stop() {
al.running.Store(false)
}

// Close releases resources held by agent session stores. Call after Stop.
func (al *AgentLoop) Close() {
al.registry.Close()
}

func (al *AgentLoop) RegisterTool(tool tools.Tool) {
for _, agentID := range al.registry.ListAgentIDs() {
if agent, ok := al.registry.GetAgent(agentID); ok {
Expand Down
12 changes: 12 additions & 0 deletions pkg/agent/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ func (r *AgentRegistry) ForEachTool(name string, fn func(tools.Tool)) {
}
}

// Close releases resources held by all registered agents.
func (r *AgentRegistry) Close() {
r.mu.RLock()
defer r.mu.RUnlock()
for _, agent := range r.agents {
if err := agent.Close(); err != nil {
logger.WarnCF("agent", "Failed to close agent",
map[string]any{"agent_id": agent.ID, "error": err.Error()})
}
}
}

// GetDefaultAgent returns the default agent instance.
func (r *AgentRegistry) GetDefaultAgent() *AgentInstance {
r.mu.RLock()
Expand Down
81 changes: 81 additions & 0 deletions pkg/session/jsonl_backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package session

import (
"context"
"log"

"github.com/sipeed/picoclaw/pkg/memory"
"github.com/sipeed/picoclaw/pkg/providers"
)

// JSONLBackend adapts a memory.Store into the SessionStore interface.
// Write errors are logged rather than returned, matching the fire-and-forget
// contract of SessionManager that the agent loop relies on.
type JSONLBackend struct {
store memory.Store
}

// NewJSONLBackend wraps a memory.Store for use as a SessionStore.
func NewJSONLBackend(store memory.Store) *JSONLBackend {
return &JSONLBackend{store: store}
}

func (b *JSONLBackend) AddMessage(sessionKey, role, content string) {
if err := b.store.AddMessage(context.Background(), sessionKey, role, content); err != nil {
log.Printf("session: add message: %v", err)
}
}

func (b *JSONLBackend) AddFullMessage(sessionKey string, msg providers.Message) {
if err := b.store.AddFullMessage(context.Background(), sessionKey, msg); err != nil {
log.Printf("session: add full message: %v", err)
}
}

func (b *JSONLBackend) GetHistory(key string) []providers.Message {
msgs, err := b.store.GetHistory(context.Background(), key)
if err != nil {
log.Printf("session: get history: %v", err)
return []providers.Message{}
}
return msgs
}

func (b *JSONLBackend) GetSummary(key string) string {
summary, err := b.store.GetSummary(context.Background(), key)
if err != nil {
log.Printf("session: get summary: %v", err)
return ""
}
return summary
}

func (b *JSONLBackend) SetSummary(key, summary string) {
if err := b.store.SetSummary(context.Background(), key, summary); err != nil {
log.Printf("session: set summary: %v", err)
}
}

func (b *JSONLBackend) SetHistory(key string, history []providers.Message) {
if err := b.store.SetHistory(context.Background(), key, history); err != nil {
log.Printf("session: set history: %v", err)
}
}

func (b *JSONLBackend) TruncateHistory(key string, keepLast int) {
if err := b.store.TruncateHistory(context.Background(), key, keepLast); err != nil {
log.Printf("session: truncate history: %v", err)
}
}

// Save persists session state. Since the JSONL store fsyncs every write
// immediately, the data is already durable. Save runs compaction to reclaim
// space from logically truncated messages (no-op when there are none).
func (b *JSONLBackend) Save(key string) error {
return b.store.Compact(context.Background(), key)
}

// Close releases resources held by the underlying store.
func (b *JSONLBackend) Close() error {
return b.store.Close()
}
179 changes: 179 additions & 0 deletions pkg/session/jsonl_backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package session_test

import (
"fmt"
"testing"

"github.com/sipeed/picoclaw/pkg/memory"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/session"
)

// Compile-time interface satisfaction checks.
var (
_ session.SessionStore = (*session.SessionManager)(nil)
_ session.SessionStore = (*session.JSONLBackend)(nil)
)

func newBackend(t *testing.T) *session.JSONLBackend {
t.Helper()
store, err := memory.NewJSONLStore(t.TempDir())
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { store.Close() })
return session.NewJSONLBackend(store)
}

func TestJSONLBackend_AddAndGetHistory(t *testing.T) {
b := newBackend(t)

b.AddMessage("s1", "user", "hello")
b.AddMessage("s1", "assistant", "hi")

history := b.GetHistory("s1")
if len(history) != 2 {
t.Fatalf("got %d messages, want 2", len(history))
}
if history[0].Role != "user" || history[0].Content != "hello" {
t.Errorf("msg[0] = %+v", history[0])
}
if history[1].Role != "assistant" || history[1].Content != "hi" {
t.Errorf("msg[1] = %+v", history[1])
}
}

func TestJSONLBackend_AddFullMessage(t *testing.T) {
b := newBackend(t)

msg := providers.Message{
Role: "assistant",
Content: "done",
ToolCalls: []providers.ToolCall{
{ID: "tc1", Function: &providers.FunctionCall{Name: "read_file", Arguments: `{"path":"x"}`}},
},
}
b.AddFullMessage("s1", msg)

history := b.GetHistory("s1")
if len(history) != 1 {
t.Fatalf("got %d, want 1", len(history))
}
if len(history[0].ToolCalls) != 1 || history[0].ToolCalls[0].ID != "tc1" {
t.Errorf("tool calls = %+v", history[0].ToolCalls)
}
}

func TestJSONLBackend_Summary(t *testing.T) {
b := newBackend(t)

if got := b.GetSummary("s1"); got != "" {
t.Errorf("got %q, want empty", got)
}

b.SetSummary("s1", "test summary")
if got := b.GetSummary("s1"); got != "test summary" {
t.Errorf("got %q, want %q", got, "test summary")
}
}

func TestJSONLBackend_TruncateAndSave(t *testing.T) {
b := newBackend(t)

for i := 0; i < 10; i++ {
b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i))
}
b.TruncateHistory("s1", 3)

history := b.GetHistory("s1")
if len(history) != 3 {
t.Fatalf("got %d, want 3", len(history))
}
if history[0].Content != "msg 7" {
t.Errorf("got %q, want %q", history[0].Content, "msg 7")
}

// Save triggers compaction.
if err := b.Save("s1"); err != nil {
t.Fatal(err)
}

// Messages still accessible after compaction.
history = b.GetHistory("s1")
if len(history) != 3 {
t.Fatalf("after save: got %d, want 3", len(history))
}
}

func TestJSONLBackend_SetHistory(t *testing.T) {
b := newBackend(t)
b.AddMessage("s1", "user", "old")

b.SetHistory("s1", []providers.Message{
{Role: "user", Content: "new1"},
{Role: "assistant", Content: "new2"},
})

history := b.GetHistory("s1")
if len(history) != 2 {
t.Fatalf("got %d, want 2", len(history))
}
if history[0].Content != "new1" {
t.Errorf("got %q, want %q", history[0].Content, "new1")
}
}

func TestJSONLBackend_EmptySession(t *testing.T) {
b := newBackend(t)

history := b.GetHistory("nonexistent")
if history == nil {
t.Fatal("got nil, want empty slice")
}
if len(history) != 0 {
t.Errorf("got %d, want 0", len(history))
}
}

func TestJSONLBackend_SessionIsolation(t *testing.T) {
b := newBackend(t)
b.AddMessage("s1", "user", "session1")
b.AddMessage("s2", "user", "session2")

h1 := b.GetHistory("s1")
h2 := b.GetHistory("s2")

if len(h1) != 1 || h1[0].Content != "session1" {
t.Errorf("s1: %+v", h1)
}
if len(h2) != 1 || h2[0].Content != "session2" {
t.Errorf("s2: %+v", h2)
}
}

func TestJSONLBackend_SummarizeFlow(t *testing.T) {
// Simulates the real summarization flow in the agent loop:
// SetSummary → TruncateHistory → Save
b := newBackend(t)

for i := 0; i < 20; i++ {
b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i))
}

b.SetSummary("s1", "conversation about testing")
b.TruncateHistory("s1", 4)
if err := b.Save("s1"); err != nil {
t.Fatal(err)
}

if got := b.GetSummary("s1"); got != "conversation about testing" {
t.Errorf("summary = %q", got)
}
history := b.GetHistory("s1")
if len(history) != 4 {
t.Fatalf("got %d messages, want 4", len(history))
}
if history[0].Content != "msg 16" {
t.Errorf("first message = %q, want %q", history[0].Content, "msg 16")
}
}
Loading
Loading