diff --git a/pkg/agent/context.go b/pkg/agent/context.go index ba07e33d31..a727cc8331 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -1,11 +1,14 @@ package agent import ( + "errors" "fmt" + "io/fs" "os" "path/filepath" "runtime" "strings" + "sync" "time" "github.com/sipeed/picoclaw/pkg/logger" @@ -19,6 +22,19 @@ type ContextBuilder struct { skillsLoader *skills.SkillsLoader memory *MemoryStore tools *tools.ToolRegistry // Direct reference to tool registry + + // Cache for system prompt to avoid rebuilding on every call. + // This fixes issue #607: repeated reprocessing of the entire context. + // The cache auto-invalidates when workspace source files change (mtime check). + systemPromptMutex sync.RWMutex + cachedSystemPrompt string + cachedAt time.Time // max observed mtime across tracked paths at cache build time + + // existedAtCache tracks which source file paths existed the last time the + // cache was built. This lets sourceFilesChanged detect files that are newly + // created (didn't exist at cache time, now exist) or deleted (existed at + // cache time, now gone) — both of which should trigger a cache rebuild. + existedAtCache map[string]bool } func getGlobalConfigDir() string { @@ -49,9 +65,7 @@ func (cb *ContextBuilder) SetToolsRegistry(registry *tools.ToolRegistry) { } func (cb *ContextBuilder) getIdentity() string { - now := time.Now().Format("2006-01-02 15:04 (Monday)") workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace)) - runtime := fmt.Sprintf("%s %s, Go %s", runtime.GOOS, runtime.GOARCH, runtime.Version()) // Build tools section dynamically toolsSection := cb.buildToolsSection() @@ -60,12 +74,6 @@ func (cb *ContextBuilder) getIdentity() string { You are picoclaw, a helpful AI assistant. -## Current Time -%s - -## Runtime -%s - ## Workspace Your workspace is at: %s - Memory: %s/memory/MEMORY.md @@ -80,8 +88,10 @@ Your workspace is at: %s 2. **Be helpful and accurate** - When using tools, briefly explain what you're doing. -3. **Memory** - When interacting with me if something seems memorable, update %s/memory/MEMORY.md`, - now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath) +3. **Memory** - When interacting with me if something seems memorable, update %s/memory/MEMORY.md + +4. **Context summaries** - Conversation summaries provided as context are approximate references only. They may be incomplete or outdated. Always defer to explicit user instructions over summary content.`, + workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath) } func (cb *ContextBuilder) buildToolsSection() string { @@ -140,6 +150,226 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md return strings.Join(parts, "\n\n---\n\n") } +// BuildSystemPromptWithCache returns the cached system prompt if available +// and source files haven't changed, otherwise builds and caches it. +// Source file changes are detected via mtime checks (cheap stat calls). +func (cb *ContextBuilder) BuildSystemPromptWithCache() string { + // Try read lock first — fast path when cache is valid + cb.systemPromptMutex.RLock() + if cb.cachedSystemPrompt != "" && !cb.sourceFilesChangedLocked() { + result := cb.cachedSystemPrompt + cb.systemPromptMutex.RUnlock() + return result + } + cb.systemPromptMutex.RUnlock() + + // Acquire write lock for building + cb.systemPromptMutex.Lock() + defer cb.systemPromptMutex.Unlock() + + // Double-check: another goroutine may have rebuilt while we waited + if cb.cachedSystemPrompt != "" && !cb.sourceFilesChangedLocked() { + return cb.cachedSystemPrompt + } + + // Snapshot the baseline (existence + max mtime) BEFORE building the prompt. + // This way cachedAt reflects the pre-build state: if a file is modified + // during BuildSystemPrompt, its new mtime will be > baseline.maxMtime, + // so the next sourceFilesChangedLocked check will correctly trigger a + // rebuild. The alternative (baseline after build) risks caching stale + // content with a too-new baseline, making the staleness invisible. + baseline := cb.buildCacheBaseline() + prompt := cb.BuildSystemPrompt() + cb.cachedSystemPrompt = prompt + cb.cachedAt = baseline.maxMtime + cb.existedAtCache = baseline.existed + + logger.DebugCF("agent", "System prompt cached", + map[string]any{ + "length": len(prompt), + }) + + return prompt +} + +// InvalidateCache clears the cached system prompt. +// Normally not needed because the cache auto-invalidates via mtime checks, +// but this is useful for tests or explicit reload commands. +func (cb *ContextBuilder) InvalidateCache() { + cb.systemPromptMutex.Lock() + defer cb.systemPromptMutex.Unlock() + + cb.cachedSystemPrompt = "" + cb.cachedAt = time.Time{} + cb.existedAtCache = nil + + logger.DebugCF("agent", "System prompt cache invalidated", nil) +} + +// sourcePaths returns the workspace source file paths tracked for cache +// invalidation (bootstrap files + memory). The skills directory is handled +// separately in sourceFilesChangedLocked because it requires both directory- +// level and recursive file-level mtime checks. +func (cb *ContextBuilder) sourcePaths() []string { + return []string{ + filepath.Join(cb.workspace, "AGENTS.md"), + filepath.Join(cb.workspace, "SOUL.md"), + filepath.Join(cb.workspace, "USER.md"), + filepath.Join(cb.workspace, "IDENTITY.md"), + filepath.Join(cb.workspace, "memory", "MEMORY.md"), + } +} + +// cacheBaseline holds the file existence snapshot and the latest observed +// mtime across all tracked paths. Used as the cache reference point. +type cacheBaseline struct { + existed map[string]bool + maxMtime time.Time +} + +// buildCacheBaseline records which tracked paths currently exist and computes +// the latest mtime across all tracked files + skills directory contents. +// Called under write lock when the cache is built. +func (cb *ContextBuilder) buildCacheBaseline() cacheBaseline { + skillsDir := filepath.Join(cb.workspace, "skills") + + // All paths whose existence we track: source files + skills dir. + allPaths := append(cb.sourcePaths(), skillsDir) + + existed := make(map[string]bool, len(allPaths)) + var maxMtime time.Time + + for _, p := range allPaths { + info, err := os.Stat(p) + existed[p] = err == nil + if err == nil && info.ModTime().After(maxMtime) { + maxMtime = info.ModTime() + } + } + + // Walk skills files to capture their mtimes too. + // Use os.Stat (not d.Info) to match the stat method used in + // fileChangedSince / skillFilesModifiedSince for consistency. + _ = filepath.WalkDir(skillsDir, func(path string, d fs.DirEntry, walkErr error) error { + if walkErr == nil && !d.IsDir() { + if info, err := os.Stat(path); err == nil && info.ModTime().After(maxMtime) { + maxMtime = info.ModTime() + } + } + return nil + }) + + // If no tracked files exist yet (empty workspace), maxMtime is zero. + // Use a very old non-zero time so that: + // 1. cachedAt.IsZero() won't trigger perpetual rebuilds. + // 2. Any real file created afterwards has mtime > cachedAt, so it + // will be detected by fileChangedSince (unlike time.Now() which + // could race with a file whose mtime <= Now). + if maxMtime.IsZero() { + maxMtime = time.Unix(1, 0) + } + + return cacheBaseline{existed: existed, maxMtime: maxMtime} +} + +// sourceFilesChangedLocked checks whether any workspace source file has been +// modified, created, or deleted since the cache was last built. +// +// IMPORTANT: The caller MUST hold at least a read lock on systemPromptMutex. +// Go's sync.RWMutex is not reentrant, so this function must NOT acquire the +// lock itself (it would deadlock when called from BuildSystemPromptWithCache +// which already holds RLock or Lock). +func (cb *ContextBuilder) sourceFilesChangedLocked() bool { + if cb.cachedAt.IsZero() { + return true + } + + // Check tracked source files (bootstrap + memory). + for _, p := range cb.sourcePaths() { + if cb.fileChangedSince(p) { + return true + } + } + + // --- Skills directory (handled separately from sourcePaths) --- + // + // 1. Creation/deletion: tracked via existedAtCache, same as bootstrap files. + skillsDir := filepath.Join(cb.workspace, "skills") + if cb.fileChangedSince(skillsDir) { + return true + } + + // 2. Structural changes (add/remove entries inside the dir) are reflected + // in the directory's own mtime, which fileChangedSince already checks. + // + // 3. Content-only edits to files inside skills/ do NOT update the parent + // directory mtime on most filesystems, so we recursively walk to check + // individual file mtimes at any nesting depth. + if skillFilesModifiedSince(skillsDir, cb.cachedAt) { + return true + } + + return false +} + +// fileChangedSince returns true if a tracked source file has been modified, +// newly created, or deleted since the cache was built. +// +// Four cases: +// - existed at cache time, exists now -> check mtime +// - existed at cache time, gone now -> changed (deleted) +// - absent at cache time, exists now -> changed (created) +// - absent at cache time, gone now -> no change +func (cb *ContextBuilder) fileChangedSince(path string) bool { + // Defensive: if existedAtCache was never initialised, treat as changed + // so the cache rebuilds rather than silently serving stale data. + if cb.existedAtCache == nil { + return true + } + + existedBefore := cb.existedAtCache[path] + info, err := os.Stat(path) + existsNow := err == nil + + if existedBefore != existsNow { + return true // file was created or deleted + } + if !existsNow { + return false // didn't exist before, doesn't exist now + } + return info.ModTime().After(cb.cachedAt) +} + +// errWalkStop is a sentinel error used to stop filepath.WalkDir early. +// Using a dedicated error (instead of fs.SkipAll) makes the early-exit +// intent explicit and avoids the nilerr linter warning that would fire +// if the callback returned nil when its err parameter is non-nil. +var errWalkStop = errors.New("walk stop") + +// skillFilesModifiedSince recursively walks the skills directory and checks +// whether any file was modified after t. This catches content-only edits at +// any nesting depth (e.g. skills/name/docs/extra.md) that don't update +// parent directory mtimes. +func skillFilesModifiedSince(skillsDir string, t time.Time) bool { + changed := false + err := filepath.WalkDir(skillsDir, func(path string, d fs.DirEntry, walkErr error) error { + if walkErr == nil && !d.IsDir() { + if info, statErr := os.Stat(path); statErr == nil && info.ModTime().After(t) { + changed = true + return errWalkStop // stop walking + } + } + return nil + }) + // errWalkStop is expected (early exit on first changed file). + // os.IsNotExist means the skills dir doesn't exist yet — not an error. + // Any other error is unexpected and worth logging. + if err != nil && !errors.Is(err, errWalkStop) && !os.IsNotExist(err) { + logger.DebugCF("agent", "skills walk error", map[string]any{"error": err.Error()}) + } + return changed +} + func (cb *ContextBuilder) LoadBootstrapFiles() string { bootstrapFiles := []string{ "AGENTS.md", @@ -159,6 +389,28 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string { return sb.String() } +// buildDynamicContext returns a short dynamic context string with per-request info. +// This changes every request (time, session) so it is NOT part of the cached prompt. +// LLM-side KV cache reuse is achieved by each provider adapter's native mechanism: +// - Anthropic: per-block cache_control (ephemeral) on the static SystemParts block +// - OpenAI / Codex: prompt_cache_key for prefix-based caching +// +// See: https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching +// See: https://platform.openai.com/docs/guides/prompt-caching +func (cb *ContextBuilder) buildDynamicContext(channel, chatID string) string { + now := time.Now().Format("2006-01-02 15:04 (Monday)") + rt := fmt.Sprintf("%s %s, Go %s", runtime.GOOS, runtime.GOARCH, runtime.Version()) + + var sb strings.Builder + fmt.Fprintf(&sb, "## Current Time\n%s\n\n## Runtime\n%s", now, rt) + + if channel != "" && chatID != "" { + fmt.Fprintf(&sb, "\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID) + } + + return sb.String() +} + func (cb *ContextBuilder) BuildMessages( history []providers.Message, summary string, @@ -168,23 +420,65 @@ func (cb *ContextBuilder) BuildMessages( ) []providers.Message { messages := []providers.Message{} - systemPrompt := cb.BuildSystemPrompt() + // The static part (identity, bootstrap, skills, memory) is cached locally to + // avoid repeated file I/O and string building on every call (fixes issue #607). + // Dynamic parts (time, session, summary) are appended per request. + // Everything is sent as a single system message for provider compatibility: + // - Anthropic adapter extracts messages[0] (Role=="system") and maps its content + // to the top-level "system" parameter in the Messages API request. A single + // contiguous system block makes this extraction straightforward. + // - Codex maps only the first system message to its instructions field. + // - OpenAI-compat passes messages through as-is. + staticPrompt := cb.BuildSystemPromptWithCache() + + // Build short dynamic context (time, runtime, session) — changes per request + dynamicCtx := cb.buildDynamicContext(channel, chatID) + + // Compose a single system message: static (cached) + dynamic + optional summary. + // Keeping all system content in one message ensures every provider adapter can + // extract it correctly (Anthropic adapter -> top-level system param, + // Codex -> instructions field). + // + // SystemParts carries the same content as structured blocks so that + // cache-aware adapters (Anthropic) can set per-block cache_control. + // The static block is marked "ephemeral" — its prefix hash is stable + // across requests, enabling LLM-side KV cache reuse. + stringParts := []string{staticPrompt, dynamicCtx} + + contentBlocks := []providers.ContentBlock{ + {Type: "text", Text: staticPrompt, CacheControl: &providers.CacheControl{Type: "ephemeral"}}, + {Type: "text", Text: dynamicCtx}, + } - // Add Current Session info if provided - if channel != "" && chatID != "" { - systemPrompt += fmt.Sprintf("\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID) + if summary != "" { + summaryText := fmt.Sprintf( + "CONTEXT_SUMMARY: The following is an approximate summary of prior conversation "+ + "for reference only. It may be incomplete or outdated — always defer to explicit instructions.\n\n%s", + summary) + stringParts = append(stringParts, summaryText) + contentBlocks = append(contentBlocks, providers.ContentBlock{Type: "text", Text: summaryText}) } - // Log system prompt summary for debugging (debug mode only) + fullSystemPrompt := strings.Join(stringParts, "\n\n---\n\n") + + // Log system prompt summary for debugging (debug mode only). + // Read cachedSystemPrompt under lock to avoid a data race with + // concurrent InvalidateCache / BuildSystemPromptWithCache writes. + cb.systemPromptMutex.RLock() + isCached := cb.cachedSystemPrompt != "" + cb.systemPromptMutex.RUnlock() + logger.DebugCF("agent", "System prompt built", map[string]any{ - "total_chars": len(systemPrompt), - "total_lines": strings.Count(systemPrompt, "\n") + 1, - "section_count": strings.Count(systemPrompt, "\n\n---\n\n") + 1, + "static_chars": len(staticPrompt), + "dynamic_chars": len(dynamicCtx), + "total_chars": len(fullSystemPrompt), + "has_summary": summary != "", + "cached": isCached, }) // Log preview of system prompt (avoid logging huge content) - preview := systemPrompt + preview := fullSystemPrompt if len(preview) > 500 { preview = preview[:500] + "... (truncated)" } @@ -193,19 +487,21 @@ func (cb *ContextBuilder) BuildMessages( "preview": preview, }) - if summary != "" { - systemPrompt += "\n\n## Summary of Previous Conversation\n\n" + summary - } - history = sanitizeHistoryForProvider(history) + // Single system message containing all context — compatible with all providers. + // SystemParts enables cache-aware adapters to set per-block cache_control; + // Content is the concatenated fallback for adapters that don't read SystemParts. messages = append(messages, providers.Message{ - Role: "system", - Content: systemPrompt, + Role: "system", + Content: fullSystemPrompt, + SystemParts: contentBlocks, }) + // Add conversation history messages = append(messages, history...) + // Add current user message if strings.TrimSpace(currentMessage) != "" { messages = append(messages, providers.Message{ Role: "user", @@ -224,6 +520,14 @@ func sanitizeHistoryForProvider(history []providers.Message) []providers.Message sanitized := make([]providers.Message, 0, len(history)) for _, msg := range history { switch msg.Role { + case "system": + // Drop system messages from history. BuildMessages always + // constructs its own single system message (static + dynamic + + // summary); extra system messages would break providers that + // only accept one (Anthropic, Codex). + logger.DebugCF("agent", "Dropping system message from history", map[string]any{}) + continue + case "tool": if len(sanitized) == 0 { logger.DebugCF("agent", "Dropping orphaned leading tool message", map[string]any{}) diff --git a/pkg/agent/context_cache_test.go b/pkg/agent/context_cache_test.go new file mode 100644 index 0000000000..ba70d4c0d5 --- /dev/null +++ b/pkg/agent/context_cache_test.go @@ -0,0 +1,513 @@ +package agent + +import ( + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +// setupWorkspace creates a temporary workspace with standard directories and optional files. +// Returns the tmpDir path; caller should defer os.RemoveAll(tmpDir). +func setupWorkspace(t *testing.T, files map[string]string) string { + t.Helper() + tmpDir, err := os.MkdirTemp("", "picoclaw-test-*") + if err != nil { + t.Fatal(err) + } + os.MkdirAll(filepath.Join(tmpDir, "memory"), 0o755) + os.MkdirAll(filepath.Join(tmpDir, "skills"), 0o755) + for name, content := range files { + dir := filepath.Dir(filepath.Join(tmpDir, name)) + os.MkdirAll(dir, 0o755) + if err := os.WriteFile(filepath.Join(tmpDir, name), []byte(content), 0o644); err != nil { + t.Fatal(err) + } + } + return tmpDir +} + +// TestSingleSystemMessage verifies that BuildMessages always produces exactly one +// system message regardless of summary/history variations. +// Fix: multiple system messages break Anthropic (top-level system param) and +// Codex (only reads last system message as instructions). +func TestSingleSystemMessage(t *testing.T) { + tmpDir := setupWorkspace(t, map[string]string{ + "IDENTITY.md": "# Identity\nTest agent.", + }) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + tests := []struct { + name string + history []providers.Message + summary string + message string + }{ + { + name: "no summary, no history", + summary: "", + message: "hello", + }, + { + name: "with summary", + summary: "Previous conversation discussed X", + message: "hello", + }, + { + name: "with history and summary", + history: []providers.Message{ + {Role: "user", Content: "hi"}, + {Role: "assistant", Content: "hello"}, + }, + summary: strings.Repeat("Long summary text. ", 50), + message: "new message", + }, + { + name: "system message in history is filtered", + history: []providers.Message{ + {Role: "system", Content: "stale system prompt from previous session"}, + {Role: "user", Content: "hi"}, + {Role: "assistant", Content: "hello"}, + }, + summary: "", + message: "new message", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msgs := cb.BuildMessages(tt.history, tt.summary, tt.message, nil, "test", "chat1") + + systemCount := 0 + for _, m := range msgs { + if m.Role == "system" { + systemCount++ + } + } + if systemCount != 1 { + t.Errorf("expected exactly 1 system message, got %d", systemCount) + } + if msgs[0].Role != "system" { + t.Errorf("first message should be system, got %s", msgs[0].Role) + } + if msgs[len(msgs)-1].Role != "user" { + t.Errorf("last message should be user, got %s", msgs[len(msgs)-1].Role) + } + + // System message must contain identity (static) and time (dynamic) + sys := msgs[0].Content + if !strings.Contains(sys, "picoclaw") { + t.Error("system message missing identity") + } + if !strings.Contains(sys, "Current Time") { + t.Error("system message missing dynamic time context") + } + + // Summary handling + if tt.summary != "" { + if !strings.Contains(sys, "CONTEXT_SUMMARY:") { + t.Error("summary present but CONTEXT_SUMMARY prefix missing") + } + if !strings.Contains(sys, tt.summary[:20]) { + t.Error("summary content not found in system message") + } + } else { + if strings.Contains(sys, "CONTEXT_SUMMARY:") { + t.Error("CONTEXT_SUMMARY should not appear without summary") + } + } + }) + } +} + +// TestMtimeAutoInvalidation verifies that the cache detects source file changes +// via mtime without requiring explicit InvalidateCache(). +// Fix: original implementation had no auto-invalidation — edits to bootstrap files, +// memory, or skills were invisible until process restart. +func TestMtimeAutoInvalidation(t *testing.T) { + tests := []struct { + name string + file string // relative path inside workspace + contentV1 string + contentV2 string + checkField string // substring to verify in rebuilt prompt + }{ + { + name: "bootstrap file change", + file: "IDENTITY.md", + contentV1: "# Original Identity", + contentV2: "# Updated Identity", + checkField: "Updated Identity", + }, + { + name: "memory file change", + file: "memory/MEMORY.md", + contentV1: "# Memory\nUser likes Go.", + contentV2: "# Memory\nUser likes Rust.", + checkField: "User likes Rust", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tmpDir := setupWorkspace(t, map[string]string{tt.file: tt.contentV1}) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + sp1 := cb.BuildSystemPromptWithCache() + + // Overwrite file and set future mtime to ensure detection. + // Use 2s offset for filesystem mtime resolution safety (some FS + // have 1s or coarser granularity, especially in CI containers). + fullPath := filepath.Join(tmpDir, tt.file) + os.WriteFile(fullPath, []byte(tt.contentV2), 0o644) + future := time.Now().Add(2 * time.Second) + os.Chtimes(fullPath, future, future) + + // Verify sourceFilesChangedLocked detects the mtime change + cb.systemPromptMutex.RLock() + changed := cb.sourceFilesChangedLocked() + cb.systemPromptMutex.RUnlock() + if !changed { + t.Fatalf("sourceFilesChangedLocked() should detect %s change", tt.file) + } + + // Should auto-rebuild without explicit InvalidateCache() + sp2 := cb.BuildSystemPromptWithCache() + if sp1 == sp2 { + t.Errorf("cache not rebuilt after %s change", tt.file) + } + if !strings.Contains(sp2, tt.checkField) { + t.Errorf("rebuilt prompt missing expected content %q", tt.checkField) + } + }) + } + + // Skills directory mtime change + t.Run("skills dir change", func(t *testing.T) { + tmpDir := setupWorkspace(t, nil) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + _ = cb.BuildSystemPromptWithCache() // populate cache + + // Touch skills directory (simulate new skill installed) + skillsDir := filepath.Join(tmpDir, "skills") + future := time.Now().Add(2 * time.Second) + os.Chtimes(skillsDir, future, future) + + // Verify sourceFilesChangedLocked detects it (cache is rebuilt) + // We confirm by checking internal state: a second call should rebuild. + cb.systemPromptMutex.RLock() + changed := cb.sourceFilesChangedLocked() + cb.systemPromptMutex.RUnlock() + if !changed { + t.Error("sourceFilesChangedLocked() should detect skills dir mtime change") + } + }) +} + +// TestExplicitInvalidateCache verifies that InvalidateCache() forces a rebuild +// even when source files haven't changed (useful for tests and reload commands). +func TestExplicitInvalidateCache(t *testing.T) { + tmpDir := setupWorkspace(t, map[string]string{ + "IDENTITY.md": "# Test Identity", + }) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + sp1 := cb.BuildSystemPromptWithCache() + cb.InvalidateCache() + sp2 := cb.BuildSystemPromptWithCache() + + if sp1 != sp2 { + t.Error("prompt should be identical after invalidate+rebuild when files unchanged") + } + + // Verify cachedAt was reset + cb.InvalidateCache() + cb.systemPromptMutex.RLock() + if !cb.cachedAt.IsZero() { + t.Error("cachedAt should be zero after InvalidateCache()") + } + cb.systemPromptMutex.RUnlock() +} + +// TestCacheStability verifies that the static prompt is stable across repeated calls +// when no files change (regression test for issue #607). +func TestCacheStability(t *testing.T) { + tmpDir := setupWorkspace(t, map[string]string{ + "IDENTITY.md": "# Identity\nContent", + "SOUL.md": "# Soul\nContent", + }) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + results := make([]string, 5) + for i := range results { + results[i] = cb.BuildSystemPromptWithCache() + } + for i := 1; i < len(results); i++ { + if results[i] != results[0] { + t.Errorf("cached prompt changed between call 0 and %d", i) + } + } + + // Static prompt must NOT contain per-request data + if strings.Contains(results[0], "Current Time") { + t.Error("static cached prompt should not contain time (added dynamically)") + } +} + +// TestNewFileCreationInvalidatesCache verifies that creating a source file that +// did not exist when the cache was built triggers a cache rebuild. +// This catches the "from nothing to something" edge case that the old +// modifiedSince (return false on stat error) would miss. +func TestNewFileCreationInvalidatesCache(t *testing.T) { + tests := []struct { + name string + file string // relative path inside workspace + content string + checkField string // substring to verify in rebuilt prompt + }{ + { + name: "new bootstrap file", + file: "SOUL.md", + content: "# Soul\nBe kind and helpful.", + checkField: "Be kind and helpful", + }, + { + name: "new memory file", + file: "memory/MEMORY.md", + content: "# Memory\nUser prefers dark mode.", + checkField: "User prefers dark mode", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Start with an empty workspace (no bootstrap/memory files) + tmpDir := setupWorkspace(t, nil) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + // Populate cache — file does not exist yet + sp1 := cb.BuildSystemPromptWithCache() + if strings.Contains(sp1, tt.checkField) { + t.Fatalf("prompt should not contain %q before file is created", tt.checkField) + } + + // Create the file after cache was built + fullPath := filepath.Join(tmpDir, tt.file) + os.MkdirAll(filepath.Dir(fullPath), 0o755) + if err := os.WriteFile(fullPath, []byte(tt.content), 0o644); err != nil { + t.Fatal(err) + } + // Set future mtime to guarantee detection + future := time.Now().Add(2 * time.Second) + os.Chtimes(fullPath, future, future) + + // Cache should auto-invalidate because file went from absent -> present + sp2 := cb.BuildSystemPromptWithCache() + if !strings.Contains(sp2, tt.checkField) { + t.Errorf("cache not invalidated on new file creation: expected %q in prompt", tt.checkField) + } + }) + } +} + +// TestSkillFileContentChange verifies that modifying a skill file's content +// (not just the directory structure) invalidates the cache. +// This is the scenario where directory mtime alone is insufficient — on most +// filesystems, editing a file inside a directory does NOT update the parent +// directory's mtime. +func TestSkillFileContentChange(t *testing.T) { + skillMD := `--- +name: test-skill +description: "A test skill" +--- +# Test Skill v1 +Original content.` + + tmpDir := setupWorkspace(t, map[string]string{ + "skills/test-skill/SKILL.md": skillMD, + }) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + // Populate cache + sp1 := cb.BuildSystemPromptWithCache() + _ = sp1 // cache is warm + + // Modify the skill file content (without touching the skills/ directory) + updatedSkillMD := `--- +name: test-skill +description: "An updated test skill" +--- +# Test Skill v2 +Updated content.` + + skillPath := filepath.Join(tmpDir, "skills", "test-skill", "SKILL.md") + if err := os.WriteFile(skillPath, []byte(updatedSkillMD), 0o644); err != nil { + t.Fatal(err) + } + // Set future mtime on the skill file only (NOT the directory) + future := time.Now().Add(2 * time.Second) + os.Chtimes(skillPath, future, future) + + // Verify that sourceFilesChangedLocked detects the content change + cb.systemPromptMutex.RLock() + changed := cb.sourceFilesChangedLocked() + cb.systemPromptMutex.RUnlock() + if !changed { + t.Error("sourceFilesChangedLocked() should detect skill file content change") + } + + // Verify cache is actually rebuilt with new content + sp2 := cb.BuildSystemPromptWithCache() + if sp1 == sp2 && strings.Contains(sp1, "test-skill") { + // If the skill appeared in the prompt and the prompt didn't change, + // the cache was not invalidated. + t.Error("cache should be invalidated when skill file content changes") + } +} + +// TestConcurrentBuildSystemPromptWithCache verifies that multiple goroutines +// can safely call BuildSystemPromptWithCache concurrently without producing +// empty results, panics, or data races. +// Run with: go test -race ./pkg/agent/ -run TestConcurrentBuildSystemPromptWithCache +func TestConcurrentBuildSystemPromptWithCache(t *testing.T) { + tmpDir := setupWorkspace(t, map[string]string{ + "IDENTITY.md": "# Identity\nConcurrency test agent.", + "SOUL.md": "# Soul\nBe helpful.", + "memory/MEMORY.md": "# Memory\nUser prefers Go.", + "skills/demo/SKILL.md": "---\nname: demo\ndescription: \"demo skill\"\n---\n# Demo", + }) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + const goroutines = 20 + const iterations = 50 + + var wg sync.WaitGroup + errs := make(chan string, goroutines*iterations) + + for g := 0; g < goroutines; g++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for i := 0; i < iterations; i++ { + result := cb.BuildSystemPromptWithCache() + if result == "" { + errs <- "empty prompt returned" + return + } + if !strings.Contains(result, "picoclaw") { + errs <- "prompt missing identity" + return + } + + // Also exercise BuildMessages concurrently + msgs := cb.BuildMessages(nil, "", "hello", nil, "test", "chat") + if len(msgs) < 2 { + errs <- "BuildMessages returned fewer than 2 messages" + return + } + if msgs[0].Role != "system" { + errs <- "first message not system" + return + } + + // Occasionally invalidate to exercise the write path + if i%10 == 0 { + cb.InvalidateCache() + } + } + }(g) + } + + wg.Wait() + close(errs) + + for errMsg := range errs { + t.Errorf("concurrent access error: %s", errMsg) + } +} + +// BenchmarkBuildMessagesWithCache measures caching performance. + +// TestEmptyWorkspaceBaselineDetectsNewFiles verifies that when the cache is +// built on an empty workspace (no tracked files exist), creating a file +// afterwards still triggers cache invalidation. This validates the +// time.Unix(1, 0) fallback for maxMtime: any real file's mtime is after epoch, +// so fileChangedSince correctly detects the absent -> present transition AND +// the mtime comparison succeeds even without artificially inflated Chtimes. +func TestEmptyWorkspaceBaselineDetectsNewFiles(t *testing.T) { + // Empty workspace: no bootstrap files, no memory, no skills content. + tmpDir := setupWorkspace(t, nil) + defer os.RemoveAll(tmpDir) + + cb := NewContextBuilder(tmpDir) + + // Build cache — all tracked files are absent, maxMtime falls back to epoch. + sp1 := cb.BuildSystemPromptWithCache() + + // Create a bootstrap file with natural mtime (no Chtimes manipulation). + // The file's mtime should be the current wall-clock time, which is + // strictly after time.Unix(1, 0). + soulPath := filepath.Join(tmpDir, "SOUL.md") + if err := os.WriteFile(soulPath, []byte("# Soul\nNewly created."), 0o644); err != nil { + t.Fatal(err) + } + + // Cache should detect the new file via existedAtCache (absent -> present). + cb.systemPromptMutex.RLock() + changed := cb.sourceFilesChangedLocked() + cb.systemPromptMutex.RUnlock() + if !changed { + t.Fatal("sourceFilesChangedLocked should detect newly created file on empty workspace") + } + + sp2 := cb.BuildSystemPromptWithCache() + if !strings.Contains(sp2, "Newly created") { + t.Error("rebuilt prompt should contain new file content") + } + if sp1 == sp2 { + t.Error("cache should have been invalidated after file creation") + } +} + +// BenchmarkBuildMessagesWithCache measures caching performance. +func BenchmarkBuildMessagesWithCache(b *testing.B) { + tmpDir, _ := os.MkdirTemp("", "picoclaw-bench-*") + defer os.RemoveAll(tmpDir) + + os.MkdirAll(filepath.Join(tmpDir, "memory"), 0o755) + os.MkdirAll(filepath.Join(tmpDir, "skills"), 0o755) + for _, name := range []string{"IDENTITY.md", "SOUL.md", "USER.md"} { + os.WriteFile(filepath.Join(tmpDir, name), []byte(strings.Repeat("Content.\n", 10)), 0o644) + } + + cb := NewContextBuilder(tmpDir) + history := []providers.Message{ + {Role: "user", Content: "previous message"}, + {Role: "assistant", Content: "previous response"}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = cb.BuildMessages(history, "summary", "new message", nil, "cli", "test") + } +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index dbc4a9b870..c40d46ef50 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -524,8 +524,9 @@ func (al *AgentLoop) runLLMIteration( fbResult, fbErr := al.fallback.Execute(ctx, agent.Candidates, func(ctx context.Context, provider, model string) (*providers.LLMResponse, error) { return agent.Provider.Chat(ctx, messages, providerToolDefs, model, map[string]any{ - "max_tokens": agent.MaxTokens, - "temperature": agent.Temperature, + "max_tokens": agent.MaxTokens, + "temperature": agent.Temperature, + "prompt_cache_key": agent.ID, }) }, ) @@ -540,8 +541,9 @@ func (al *AgentLoop) runLLMIteration( return fbResult.Response, nil } return agent.Provider.Chat(ctx, messages, providerToolDefs, agent.Model, map[string]any{ - "max_tokens": agent.MaxTokens, - "temperature": agent.Temperature, + "max_tokens": agent.MaxTokens, + "temperature": agent.Temperature, + "prompt_cache_key": agent.ID, }) } @@ -962,8 +964,9 @@ func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) { nil, agent.Model, map[string]any{ - "max_tokens": 1024, - "temperature": 0.3, + "max_tokens": 1024, + "temperature": 0.3, + "prompt_cache_key": agent.ID, }, ) if err == nil { @@ -1012,8 +1015,9 @@ func (al *AgentLoop) summarizeBatch( nil, agent.Model, map[string]any{ - "max_tokens": 1024, - "temperature": 0.3, + "max_tokens": 1024, + "temperature": 0.3, + "prompt_cache_key": agent.ID, }, ) if err != nil { diff --git a/pkg/providers/anthropic/provider.go b/pkg/providers/anthropic/provider.go index 35f6b8f623..9162174c99 100644 --- a/pkg/providers/anthropic/provider.go +++ b/pkg/providers/anthropic/provider.go @@ -113,7 +113,20 @@ func buildParams( for _, msg := range messages { switch msg.Role { case "system": - system = append(system, anthropic.TextBlockParam{Text: msg.Content}) + // Prefer structured SystemParts for per-block cache_control. + // This enables LLM-side KV cache reuse: the static block's prefix + // hash stays stable across requests while dynamic parts change freely. + if len(msg.SystemParts) > 0 { + for _, part := range msg.SystemParts { + block := anthropic.TextBlockParam{Text: part.Text} + if part.CacheControl != nil && part.CacheControl.Type == "ephemeral" { + block.CacheControl = anthropic.NewCacheControlEphemeralParam() + } + system = append(system, block) + } + } else { + system = append(system, anthropic.TextBlockParam{Text: msg.Content}) + } case "user": if msg.ToolCallID != "" { anthropicMessages = append(anthropicMessages, diff --git a/pkg/providers/codex_provider.go b/pkg/providers/codex_provider.go index ecc983642c..ae261710b6 100644 --- a/pkg/providers/codex_provider.go +++ b/pkg/providers/codex_provider.go @@ -208,6 +208,11 @@ func buildCodexParams( for _, msg := range messages { switch msg.Role { case "system": + // Use the full concatenated system prompt (static + dynamic + summary) + // as instructions. This keeps behavior consistent with Anthropic and + // OpenAI-compat adapters where the complete system context lives in + // one place. Prefix caching is handled by prompt_cache_key below, + // not by splitting content across instructions vs input messages. instructions = msg.Content case "user": if msg.ToolCallID != "" { @@ -289,6 +294,13 @@ func buildCodexParams( params.Instructions = openai.Opt(defaultCodexInstructions) } + // Prompt caching: pass a stable cache key so OpenAI can bucket requests + // and reuse prefix KV cache across calls with the same key. + // See: https://platform.openai.com/docs/guides/prompt-caching + if cacheKey, ok := options["prompt_cache_key"].(string); ok && cacheKey != "" { + params.PromptCacheKey = openai.Opt(cacheKey) + } + if len(tools) > 0 || enableWebSearch { params.Tools = translateToolsForCodex(tools, enableWebSearch) } diff --git a/pkg/providers/openai_compat/provider.go b/pkg/providers/openai_compat/provider.go index d2412ae1b3..a8d244d4a2 100644 --- a/pkg/providers/openai_compat/provider.go +++ b/pkg/providers/openai_compat/provider.go @@ -77,7 +77,7 @@ func (p *Provider) Chat( requestBody := map[string]any{ "model": model, - "messages": messages, + "messages": stripSystemParts(messages), } if len(tools) > 0 { @@ -111,6 +111,14 @@ func (p *Provider) Chat( } } + // Prompt caching: pass a stable cache key so OpenAI can bucket requests + // with the same key and reuse prefix KV cache across calls. + // The key is typically the agent ID — stable per agent, shared across requests. + // See: https://platform.openai.com/docs/guides/prompt-caching + if cacheKey, ok := options["prompt_cache_key"].(string); ok && cacheKey != "" { + requestBody["prompt_cache_key"] = cacheKey + } + jsonData, err := json.Marshal(requestBody) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) @@ -230,6 +238,32 @@ func parseResponse(body []byte) (*LLMResponse, error) { }, nil } +// openaiMessage is the wire-format message for OpenAI-compatible APIs. +// It mirrors protocoltypes.Message but omits SystemParts, which is an +// internal field that would be unknown to third-party endpoints. +type openaiMessage struct { + Role string `json:"role"` + Content string `json:"content"` + ToolCalls []ToolCall `json:"tool_calls,omitempty"` + ToolCallID string `json:"tool_call_id,omitempty"` +} + +// stripSystemParts converts []Message to []openaiMessage, dropping the +// SystemParts field so it doesn't leak into the JSON payload sent to +// OpenAI-compatible APIs (some strict endpoints reject unknown fields). +func stripSystemParts(messages []Message) []openaiMessage { + out := make([]openaiMessage, len(messages)) + for i, m := range messages { + out[i] = openaiMessage{ + Role: m.Role, + Content: m.Content, + ToolCalls: m.ToolCalls, + ToolCallID: m.ToolCallID, + } + } + return out +} + func normalizeModel(model, apiBase string) string { idx := strings.Index(model, "/") if idx == -1 { diff --git a/pkg/providers/protocoltypes/types.go b/pkg/providers/protocoltypes/types.go index 1d0ea6edd7..33f052c5a2 100644 --- a/pkg/providers/protocoltypes/types.go +++ b/pkg/providers/protocoltypes/types.go @@ -38,12 +38,28 @@ type UsageInfo struct { TotalTokens int `json:"total_tokens"` } +// CacheControl marks a content block for LLM-side prefix caching. +// Currently only "ephemeral" is supported (used by Anthropic). +type CacheControl struct { + Type string `json:"type"` // "ephemeral" +} + +// ContentBlock represents a structured segment of a system message. +// Adapters that understand SystemParts can use these blocks to set +// per-block cache control (e.g. Anthropic's cache_control: ephemeral). +type ContentBlock struct { + Type string `json:"type"` // "text" + Text string `json:"text"` + CacheControl *CacheControl `json:"cache_control,omitempty"` +} + type Message struct { - Role string `json:"role"` - Content string `json:"content"` - ReasoningContent string `json:"reasoning_content,omitempty"` - ToolCalls []ToolCall `json:"tool_calls,omitempty"` - ToolCallID string `json:"tool_call_id,omitempty"` + Role string `json:"role"` + Content string `json:"content"` + ReasoningContent string `json:"reasoning_content,omitempty"` + SystemParts []ContentBlock `json:"system_parts,omitempty"` // structured system blocks for cache-aware adapters + ToolCalls []ToolCall `json:"tool_calls,omitempty"` + ToolCallID string `json:"tool_call_id,omitempty"` } type ToolDefinition struct { diff --git a/pkg/providers/types.go b/pkg/providers/types.go index b2dda04a5d..f0c168bc6f 100644 --- a/pkg/providers/types.go +++ b/pkg/providers/types.go @@ -17,6 +17,8 @@ type ( ToolFunctionDefinition = protocoltypes.ToolFunctionDefinition ExtraContent = protocoltypes.ExtraContent GoogleExtra = protocoltypes.GoogleExtra + ContentBlock = protocoltypes.ContentBlock + CacheControl = protocoltypes.CacheControl ) type LLMProvider interface { diff --git a/pkg/tools/registry.go b/pkg/tools/registry.go index 6ecb8ae7cc..d37a093a86 100644 --- a/pkg/tools/registry.go +++ b/pkg/tools/registry.go @@ -3,6 +3,7 @@ package tools import ( "context" "fmt" + "sort" "sync" "time" @@ -107,13 +108,27 @@ func (r *ToolRegistry) ExecuteWithContext( return result } +// sortedToolNames returns tool names in sorted order for deterministic iteration. +// This is critical for KV cache stability: non-deterministic map iteration would +// produce different system prompts and tool definitions on each call, invalidating +// the LLM's prefix cache even when no tools have changed. +func (r *ToolRegistry) sortedToolNames() []string { + names := make([]string, 0, len(r.tools)) + for name := range r.tools { + names = append(names, name) + } + sort.Strings(names) + return names +} + func (r *ToolRegistry) GetDefinitions() []map[string]any { r.mu.RLock() defer r.mu.RUnlock() - definitions := make([]map[string]any, 0, len(r.tools)) - for _, tool := range r.tools { - definitions = append(definitions, ToolToSchema(tool)) + sorted := r.sortedToolNames() + definitions := make([]map[string]any, 0, len(sorted)) + for _, name := range sorted { + definitions = append(definitions, ToolToSchema(r.tools[name])) } return definitions } @@ -124,8 +139,10 @@ func (r *ToolRegistry) ToProviderDefs() []providers.ToolDefinition { r.mu.RLock() defer r.mu.RUnlock() - definitions := make([]providers.ToolDefinition, 0, len(r.tools)) - for _, tool := range r.tools { + sorted := r.sortedToolNames() + definitions := make([]providers.ToolDefinition, 0, len(sorted)) + for _, name := range sorted { + tool := r.tools[name] schema := ToolToSchema(tool) // Safely extract nested values with type checks @@ -155,11 +172,7 @@ func (r *ToolRegistry) List() []string { r.mu.RLock() defer r.mu.RUnlock() - names := make([]string, 0, len(r.tools)) - for name := range r.tools { - names = append(names, name) - } - return names + return r.sortedToolNames() } // Count returns the number of registered tools. @@ -175,8 +188,10 @@ func (r *ToolRegistry) GetSummaries() []string { r.mu.RLock() defer r.mu.RUnlock() - summaries := make([]string, 0, len(r.tools)) - for _, tool := range r.tools { + sorted := r.sortedToolNames() + summaries := make([]string, 0, len(sorted)) + for _, name := range sorted { + tool := r.tools[name] summaries = append(summaries, fmt.Sprintf("- `%s` - %s", tool.Name(), tool.Description())) } return summaries