Skip to content

Commit fbf940d

Browse files
committed
Merge branch 'main' into d4
2 parents 2b22481 + 26f623e commit fbf940d

File tree

9 files changed

+355
-3
lines changed

9 files changed

+355
-3
lines changed

cmd/picoclaw/internal/agent/helpers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func agentCmd(message, sessionKey, model string, debug bool) error {
5050
msgBus := bus.NewMessageBus()
5151
defer msgBus.Close()
5252
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
53+
defer agentLoop.Close()
5354

5455
// Print agent startup info (only for interactive mode)
5556
startupInfo := agentLoop.GetStartupInfo()

cmd/picoclaw/internal/gateway/helpers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ func gatewayCmd(debug bool) error {
222222
cronService.Stop()
223223
mediaStore.Stop()
224224
agentLoop.Stop()
225+
agentLoop.Close()
225226
logger.Info("✓ Gateway stopped")
226227

227228
return nil

pkg/agent/instance.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package agent
22

33
import (
4+
"context"
45
"fmt"
56
"log"
67
"os"
@@ -9,6 +10,7 @@ import (
910
"strings"
1011

1112
"github.com/sipeed/picoclaw/pkg/config"
13+
"github.com/sipeed/picoclaw/pkg/memory"
1214
"github.com/sipeed/picoclaw/pkg/providers"
1315
"github.com/sipeed/picoclaw/pkg/routing"
1416
"github.com/sipeed/picoclaw/pkg/session"
@@ -31,7 +33,7 @@ type AgentInstance struct {
3133
SummarizeMessageThreshold int
3234
SummarizeTokenPercent int
3335
Provider providers.LLMProvider
34-
Sessions *session.SessionManager
36+
Sessions session.SessionStore
3537
ContextBuilder *ContextBuilder
3638
Tools *tools.ToolRegistry
3739
Subagents *config.SubagentsConfig
@@ -95,7 +97,7 @@ func NewAgentInstance(
9597
}
9698

9799
sessionsDir := filepath.Join(workspace, "sessions")
98-
sessionsManager := session.NewSessionManager(sessionsDir)
100+
sessions := initSessionStore(sessionsDir)
99101

100102
mcpDiscoveryActive := cfg.Tools.MCP.Enabled && cfg.Tools.MCP.Discovery.Enabled
101103
contextBuilder := NewContextBuilder(workspace).WithToolDiscovery(
@@ -226,7 +228,7 @@ func NewAgentInstance(
226228
SummarizeMessageThreshold: summarizeMessageThreshold,
227229
SummarizeTokenPercent: summarizeTokenPercent,
228230
Provider: provider,
229-
Sessions: sessionsManager,
231+
Sessions: sessions,
230232
ContextBuilder: contextBuilder,
231233
Tools: toolsRegistry,
232234
Subagents: subagents,
@@ -280,6 +282,39 @@ func compilePatterns(patterns []string) []*regexp.Regexp {
280282
return compiled
281283
}
282284

285+
// Close releases resources held by the agent's session store.
286+
func (a *AgentInstance) Close() error {
287+
if a.Sessions != nil {
288+
return a.Sessions.Close()
289+
}
290+
return nil
291+
}
292+
293+
// initSessionStore creates the session persistence backend.
294+
// It uses the JSONL store by default and auto-migrates legacy JSON sessions.
295+
// Falls back to SessionManager if the JSONL store cannot be initialized or
296+
// if migration fails (which indicates the store cannot write reliably).
297+
func initSessionStore(dir string) session.SessionStore {
298+
store, err := memory.NewJSONLStore(dir)
299+
if err != nil {
300+
log.Printf("memory: init store: %v; using json sessions", err)
301+
return session.NewSessionManager(dir)
302+
}
303+
304+
if n, merr := memory.MigrateFromJSON(context.Background(), dir, store); merr != nil {
305+
// Migration failure means the store could not write data.
306+
// Fall back to SessionManager to avoid a split state where
307+
// some sessions are in JSONL and others remain in JSON.
308+
log.Printf("memory: migration failed: %v; falling back to json sessions", merr)
309+
store.Close()
310+
return session.NewSessionManager(dir)
311+
} else if n > 0 {
312+
log.Printf("memory: migrated %d session(s) to jsonl", n)
313+
}
314+
315+
return session.NewJSONLBackend(store)
316+
}
317+
283318
func expandHome(path string) string {
284319
if path == "" {
285320
return path

pkg/agent/loop.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,11 @@ func (al *AgentLoop) Stop() {
430430
al.running.Store(false)
431431
}
432432

433+
// Close releases resources held by agent session stores. Call after Stop.
434+
func (al *AgentLoop) Close() {
435+
al.registry.Close()
436+
}
437+
433438
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
434439
registry := al.GetRegistry()
435440
for _, agentID := range registry.ListAgentIDs() {

pkg/agent/registry.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ func (r *AgentRegistry) ForEachTool(name string, fn func(tools.Tool)) {
114114
}
115115
}
116116

117+
// Close releases resources held by all registered agents.
118+
func (r *AgentRegistry) Close() {
119+
r.mu.RLock()
120+
defer r.mu.RUnlock()
121+
for _, agent := range r.agents {
122+
if err := agent.Close(); err != nil {
123+
logger.WarnCF("agent", "Failed to close agent",
124+
map[string]any{"agent_id": agent.ID, "error": err.Error()})
125+
}
126+
}
127+
}
128+
117129
// GetDefaultAgent returns the default agent instance.
118130
func (r *AgentRegistry) GetDefaultAgent() *AgentInstance {
119131
r.mu.RLock()

pkg/session/jsonl_backend.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package session
2+
3+
import (
4+
"context"
5+
"log"
6+
7+
"github.com/sipeed/picoclaw/pkg/memory"
8+
"github.com/sipeed/picoclaw/pkg/providers"
9+
)
10+
11+
// JSONLBackend adapts a memory.Store into the SessionStore interface.
12+
// Write errors are logged rather than returned, matching the fire-and-forget
13+
// contract of SessionManager that the agent loop relies on.
14+
type JSONLBackend struct {
15+
store memory.Store
16+
}
17+
18+
// NewJSONLBackend wraps a memory.Store for use as a SessionStore.
19+
func NewJSONLBackend(store memory.Store) *JSONLBackend {
20+
return &JSONLBackend{store: store}
21+
}
22+
23+
func (b *JSONLBackend) AddMessage(sessionKey, role, content string) {
24+
if err := b.store.AddMessage(context.Background(), sessionKey, role, content); err != nil {
25+
log.Printf("session: add message: %v", err)
26+
}
27+
}
28+
29+
func (b *JSONLBackend) AddFullMessage(sessionKey string, msg providers.Message) {
30+
if err := b.store.AddFullMessage(context.Background(), sessionKey, msg); err != nil {
31+
log.Printf("session: add full message: %v", err)
32+
}
33+
}
34+
35+
func (b *JSONLBackend) GetHistory(key string) []providers.Message {
36+
msgs, err := b.store.GetHistory(context.Background(), key)
37+
if err != nil {
38+
log.Printf("session: get history: %v", err)
39+
return []providers.Message{}
40+
}
41+
return msgs
42+
}
43+
44+
func (b *JSONLBackend) GetSummary(key string) string {
45+
summary, err := b.store.GetSummary(context.Background(), key)
46+
if err != nil {
47+
log.Printf("session: get summary: %v", err)
48+
return ""
49+
}
50+
return summary
51+
}
52+
53+
func (b *JSONLBackend) SetSummary(key, summary string) {
54+
if err := b.store.SetSummary(context.Background(), key, summary); err != nil {
55+
log.Printf("session: set summary: %v", err)
56+
}
57+
}
58+
59+
func (b *JSONLBackend) SetHistory(key string, history []providers.Message) {
60+
if err := b.store.SetHistory(context.Background(), key, history); err != nil {
61+
log.Printf("session: set history: %v", err)
62+
}
63+
}
64+
65+
func (b *JSONLBackend) TruncateHistory(key string, keepLast int) {
66+
if err := b.store.TruncateHistory(context.Background(), key, keepLast); err != nil {
67+
log.Printf("session: truncate history: %v", err)
68+
}
69+
}
70+
71+
// Save persists session state. Since the JSONL store fsyncs every write
72+
// immediately, the data is already durable. Save runs compaction to reclaim
73+
// space from logically truncated messages (no-op when there are none).
74+
func (b *JSONLBackend) Save(key string) error {
75+
return b.store.Compact(context.Background(), key)
76+
}
77+
78+
// Close releases resources held by the underlying store.
79+
func (b *JSONLBackend) Close() error {
80+
return b.store.Close()
81+
}

pkg/session/jsonl_backend_test.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package session_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/sipeed/picoclaw/pkg/memory"
8+
"github.com/sipeed/picoclaw/pkg/providers"
9+
"github.com/sipeed/picoclaw/pkg/session"
10+
)
11+
12+
// Compile-time interface satisfaction checks.
13+
var (
14+
_ session.SessionStore = (*session.SessionManager)(nil)
15+
_ session.SessionStore = (*session.JSONLBackend)(nil)
16+
)
17+
18+
func newBackend(t *testing.T) *session.JSONLBackend {
19+
t.Helper()
20+
store, err := memory.NewJSONLStore(t.TempDir())
21+
if err != nil {
22+
t.Fatal(err)
23+
}
24+
t.Cleanup(func() { store.Close() })
25+
return session.NewJSONLBackend(store)
26+
}
27+
28+
func TestJSONLBackend_AddAndGetHistory(t *testing.T) {
29+
b := newBackend(t)
30+
31+
b.AddMessage("s1", "user", "hello")
32+
b.AddMessage("s1", "assistant", "hi")
33+
34+
history := b.GetHistory("s1")
35+
if len(history) != 2 {
36+
t.Fatalf("got %d messages, want 2", len(history))
37+
}
38+
if history[0].Role != "user" || history[0].Content != "hello" {
39+
t.Errorf("msg[0] = %+v", history[0])
40+
}
41+
if history[1].Role != "assistant" || history[1].Content != "hi" {
42+
t.Errorf("msg[1] = %+v", history[1])
43+
}
44+
}
45+
46+
func TestJSONLBackend_AddFullMessage(t *testing.T) {
47+
b := newBackend(t)
48+
49+
msg := providers.Message{
50+
Role: "assistant",
51+
Content: "done",
52+
ToolCalls: []providers.ToolCall{
53+
{ID: "tc1", Function: &providers.FunctionCall{Name: "read_file", Arguments: `{"path":"x"}`}},
54+
},
55+
}
56+
b.AddFullMessage("s1", msg)
57+
58+
history := b.GetHistory("s1")
59+
if len(history) != 1 {
60+
t.Fatalf("got %d, want 1", len(history))
61+
}
62+
if len(history[0].ToolCalls) != 1 || history[0].ToolCalls[0].ID != "tc1" {
63+
t.Errorf("tool calls = %+v", history[0].ToolCalls)
64+
}
65+
}
66+
67+
func TestJSONLBackend_Summary(t *testing.T) {
68+
b := newBackend(t)
69+
70+
if got := b.GetSummary("s1"); got != "" {
71+
t.Errorf("got %q, want empty", got)
72+
}
73+
74+
b.SetSummary("s1", "test summary")
75+
if got := b.GetSummary("s1"); got != "test summary" {
76+
t.Errorf("got %q, want %q", got, "test summary")
77+
}
78+
}
79+
80+
func TestJSONLBackend_TruncateAndSave(t *testing.T) {
81+
b := newBackend(t)
82+
83+
for i := 0; i < 10; i++ {
84+
b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i))
85+
}
86+
b.TruncateHistory("s1", 3)
87+
88+
history := b.GetHistory("s1")
89+
if len(history) != 3 {
90+
t.Fatalf("got %d, want 3", len(history))
91+
}
92+
if history[0].Content != "msg 7" {
93+
t.Errorf("got %q, want %q", history[0].Content, "msg 7")
94+
}
95+
96+
// Save triggers compaction.
97+
if err := b.Save("s1"); err != nil {
98+
t.Fatal(err)
99+
}
100+
101+
// Messages still accessible after compaction.
102+
history = b.GetHistory("s1")
103+
if len(history) != 3 {
104+
t.Fatalf("after save: got %d, want 3", len(history))
105+
}
106+
}
107+
108+
func TestJSONLBackend_SetHistory(t *testing.T) {
109+
b := newBackend(t)
110+
b.AddMessage("s1", "user", "old")
111+
112+
b.SetHistory("s1", []providers.Message{
113+
{Role: "user", Content: "new1"},
114+
{Role: "assistant", Content: "new2"},
115+
})
116+
117+
history := b.GetHistory("s1")
118+
if len(history) != 2 {
119+
t.Fatalf("got %d, want 2", len(history))
120+
}
121+
if history[0].Content != "new1" {
122+
t.Errorf("got %q, want %q", history[0].Content, "new1")
123+
}
124+
}
125+
126+
func TestJSONLBackend_EmptySession(t *testing.T) {
127+
b := newBackend(t)
128+
129+
history := b.GetHistory("nonexistent")
130+
if history == nil {
131+
t.Fatal("got nil, want empty slice")
132+
}
133+
if len(history) != 0 {
134+
t.Errorf("got %d, want 0", len(history))
135+
}
136+
}
137+
138+
func TestJSONLBackend_SessionIsolation(t *testing.T) {
139+
b := newBackend(t)
140+
b.AddMessage("s1", "user", "session1")
141+
b.AddMessage("s2", "user", "session2")
142+
143+
h1 := b.GetHistory("s1")
144+
h2 := b.GetHistory("s2")
145+
146+
if len(h1) != 1 || h1[0].Content != "session1" {
147+
t.Errorf("s1: %+v", h1)
148+
}
149+
if len(h2) != 1 || h2[0].Content != "session2" {
150+
t.Errorf("s2: %+v", h2)
151+
}
152+
}
153+
154+
func TestJSONLBackend_SummarizeFlow(t *testing.T) {
155+
// Simulates the real summarization flow in the agent loop:
156+
// SetSummary → TruncateHistory → Save
157+
b := newBackend(t)
158+
159+
for i := 0; i < 20; i++ {
160+
b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i))
161+
}
162+
163+
b.SetSummary("s1", "conversation about testing")
164+
b.TruncateHistory("s1", 4)
165+
if err := b.Save("s1"); err != nil {
166+
t.Fatal(err)
167+
}
168+
169+
if got := b.GetSummary("s1"); got != "conversation about testing" {
170+
t.Errorf("summary = %q", got)
171+
}
172+
history := b.GetHistory("s1")
173+
if len(history) != 4 {
174+
t.Fatalf("got %d messages, want 4", len(history))
175+
}
176+
if history[0].Content != "msg 16" {
177+
t.Errorf("first message = %q, want %q", history[0].Content, "msg 16")
178+
}
179+
}

0 commit comments

Comments
 (0)