Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/agent/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ Your workspace is at: %s

4. **Context summaries** - Conversation summaries provided as context are approximate references only. They may be incomplete or outdated. Always defer to explicit user instructions over summary content.

5. **Team delegation** - For any task that is non-trivial, multi-step, or involves distinct concerns (e.g. "convert React to Vue", "build a feature", "analyze and report"), you MUST use the 'team' tool to delegate and parallelize. Do NOT attempt to handle complex tasks inline by calling tools one by one yourself. Decompose first, delegate second, then report the outcome.

%s`,
version, workspacePath, workspacePath, workspacePath, workspacePath, workspacePath, toolDiscovery)
}
Expand Down
164 changes: 135 additions & 29 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func registerSharedTools(

if cfg.Tools.IsToolEnabled("web") {
searchTool, err := tools.NewWebSearchTool(tools.WebSearchToolOptions{
BraveAPIKeys: config.MergeAPIKeys(cfg.Tools.Web.Brave.APIKey(), cfg.Tools.Web.Brave.APIKeys()),
BraveAPIKeys: config.MergeAPIKeys(
cfg.Tools.Web.Brave.APIKey(),
cfg.Tools.Web.Brave.APIKeys(),
),
BraveMaxResults: cfg.Tools.Web.Brave.MaxResults,
BraveEnabled: cfg.Tools.Web.Brave.Enabled,
TavilyAPIKeys: config.MergeAPIKeys(
Expand Down Expand Up @@ -196,7 +199,11 @@ func registerSharedTools(
Proxy: cfg.Tools.Web.Proxy,
})
if err != nil {
logger.ErrorCF("agent", "Failed to create web search tool", map[string]any{"error": err.Error()})
logger.ErrorCF(
"agent",
"Failed to create web search tool",
map[string]any{"error": err.Error()},
)
} else if searchTool != nil {
agent.Tools.Register(searchTool)
}
Expand All @@ -209,7 +216,11 @@ func registerSharedTools(
cfg.Tools.Web.FetchLimitBytes,
cfg.Tools.Web.PrivateHostWhitelist)
if err != nil {
logger.ErrorCF("agent", "Failed to create web fetch tool", map[string]any{"error": err.Error()})
logger.ErrorCF(
"agent",
"Failed to create web fetch tool",
map[string]any{"error": err.Error()},
)
} else {
agent.Tools.Register(fetchTool)
}
Expand Down Expand Up @@ -284,12 +295,39 @@ func registerSharedTools(
}
}

// Team tool
teamSubagentManager := tools.NewSubagentManager(
provider,
agent.Model,
agent.Candidates,
agent.Workspace,
cfg.Tools.Team,
msgBus,
)
teamSubagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature)

teamTool := tools.NewTeamTool(teamSubagentManager, cfg)
if cfg.Tools.IsToolEnabled("team") {
teamTool.SetSpawner(NewSubTurnSpawner(al))
agent.Tools.Register(teamTool)
}

// Share the fully-built registry back to team subagent manager
teamSubagentManager.SetTools(agent.Tools)

// Spawn and spawn_status tools share a SubagentManager.
// Construct it when either tool is enabled (both require subagent).
spawnEnabled := cfg.Tools.IsToolEnabled("spawn")
spawnStatusEnabled := cfg.Tools.IsToolEnabled("spawn_status")
if (spawnEnabled || spawnStatusEnabled) && cfg.Tools.IsToolEnabled("subagent") {
subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace)
subagentManager := tools.NewSubagentManager(
provider,
agent.Model,
agent.Candidates,
agent.Workspace,
cfg.Tools.Team,
msgBus,
)
subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature)

// Set the spawner that links into AgentLoop's turnState
Expand Down Expand Up @@ -472,7 +510,12 @@ func (al *AgentLoop) Run(ctx context.Context) error {
"queue_depth": al.pendingSteeringCountForScope(target.SessionKey),
})

continued, continueErr := al.Continue(ctx, target.SessionKey, target.Channel, target.ChatID)
continued, continueErr := al.Continue(
ctx,
target.SessionKey,
target.Channel,
target.ChatID,
)
if continueErr != nil {
logger.WarnCF("agent", "Failed to continue queued steering",
map[string]any{
Expand Down Expand Up @@ -500,14 +543,22 @@ func (al *AgentLoop) Run(ctx context.Context) error {
"queue_depth": al.pendingSteeringCountForScope(target.SessionKey),
})

continued, continueErr := al.Continue(ctx, target.SessionKey, target.Channel, target.ChatID)
continued, continueErr := al.Continue(
ctx,
target.SessionKey,
target.Channel,
target.ChatID,
)
if continueErr != nil {
logger.WarnCF("agent", "Failed to continue queued steering after shutdown drain",
logger.WarnCF(
"agent",
"Failed to continue queued steering after shutdown drain",
map[string]any{
"channel": target.Channel,
"chat_id": target.ChatID,
"error": continueErr.Error(),
})
},
)
return
}
if continued == "" {
Expand Down Expand Up @@ -566,11 +617,15 @@ func (al *AgentLoop) drainBusToSteering(ctx context.Context, activeScope, active
msgScope, _, scopeOK := al.resolveSteeringTarget(msg)
if !scopeOK || msgScope != activeScope {
if err := al.requeueInboundMessage(msg); err != nil {
logger.WarnCF("agent", "Failed to requeue non-steering inbound message", map[string]any{
"error": err.Error(),
"channel": msg.Channel,
"sender_id": msg.SenderID,
})
logger.WarnCF(
"agent",
"Failed to requeue non-steering inbound message",
map[string]any{
"error": err.Error(),
"channel": msg.Channel,
"sender_id": msg.SenderID,
},
)
}
continue
}
Expand Down Expand Up @@ -604,7 +659,10 @@ func (al *AgentLoop) Stop() {
al.running.Store(false)
}

func (al *AgentLoop) publishResponseIfNeeded(ctx context.Context, channel, chatID, response string) {
func (al *AgentLoop) publishResponseIfNeeded(
ctx context.Context,
channel, chatID, response string,
) {
if response == "" {
return
}
Expand Down Expand Up @@ -1054,7 +1112,10 @@ var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)
// transcribeAudioInMessage resolves audio media refs, transcribes them, and
// replaces audio annotations in msg.Content with the transcribed text.
// Returns the (possibly modified) message and true if audio was transcribed.
func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) (bus.InboundMessage, bool) {
func (al *AgentLoop) transcribeAudioInMessage(
ctx context.Context,
msg bus.InboundMessage,
) (bus.InboundMessage, bool) {
if al.transcriber == nil || al.mediaStore == nil || len(msg.Media) == 0 {
return msg, false
}
Expand All @@ -1064,7 +1125,11 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou
for _, ref := range msg.Media {
path, meta, err := al.mediaStore.ResolveWithMeta(ref)
if err != nil {
logger.WarnCF("voice", "Failed to resolve media ref", map[string]any{"ref": ref, "error": err})
logger.WarnCF(
"voice",
"Failed to resolve media ref",
map[string]any{"ref": ref, "error": err},
)
continue
}
if !utils.IsAudioFile(meta.Filename, meta.ContentType) {
Expand Down Expand Up @@ -1142,7 +1207,11 @@ func (al *AgentLoop) sendTranscriptionFeedback(
ReplyToMessageID: messageID,
})
if err != nil {
logger.WarnCF("voice", "Failed to send transcription feedback", map[string]any{"error": err.Error()})
logger.WarnCF(
"voice",
"Failed to send transcription feedback",
map[string]any{"error": err.Error()},
)
}
}

Expand Down Expand Up @@ -1342,7 +1411,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
return al.runAgentLoop(ctx, agent, opts)
}

func (al *AgentLoop) resolveMessageRoute(msg bus.InboundMessage) (routing.ResolvedRoute, *AgentInstance, error) {
func (al *AgentLoop) resolveMessageRoute(
msg bus.InboundMessage,
) (routing.ResolvedRoute, *AgentInstance, error) {
registry := al.GetRegistry()
route := registry.ResolveRoute(routing.RouteInput{
Channel: msg.Channel,
Expand All @@ -1358,7 +1429,10 @@ func (al *AgentLoop) resolveMessageRoute(msg bus.InboundMessage) (routing.Resolv
agent = registry.GetDefaultAgent()
}
if agent == nil {
return routing.ResolvedRoute{}, nil, fmt.Errorf("no agent available for route (agent_id=%s)", route.AgentID)
return routing.ResolvedRoute{}, nil, fmt.Errorf(
"no agent available for route (agent_id=%s)",
route.AgentID,
)
}

return route, agent, nil
Expand Down Expand Up @@ -2559,12 +2633,15 @@ turnLoop:
}

if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 {
logger.InfoCF("agent", "Steering arrived after turn completion; continuing turn before finalizing",
logger.InfoCF(
"agent",
"Steering arrived after turn completion; continuing turn before finalizing",
map[string]any{
"agent_id": ts.agent.ID,
"steering_count": len(steerMsgs),
"session_key": ts.sessionKey,
})
},
)
pendingMessages = append(pendingMessages, steerMsgs...)
finalContent = ""
goto turnLoop
Expand Down Expand Up @@ -2612,6 +2689,7 @@ turnLoop:
finalContent: finalContent,
status: turnStatus,
followUps: append([]bus.InboundMessage(nil), ts.followUps...),
messages: append([]providers.Message(nil), messages...),
}, nil
}

Expand Down Expand Up @@ -2680,11 +2758,18 @@ func (al *AgentLoop) selectCandidates(
"score": score,
"threshold": agent.Router.Threshold(),
})
return agent.LightCandidates, resolvedCandidateModel(agent.LightCandidates, agent.Router.LightModel())
return agent.LightCandidates, resolvedCandidateModel(
agent.LightCandidates,
agent.Router.LightModel(),
)
}

// maybeSummarize triggers summarization if the session history exceeds thresholds.
func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey string, turnScope turnEventScope) {
func (al *AgentLoop) maybeSummarize(
agent *AgentInstance,
sessionKey string,
turnScope turnEventScope,
) {
newHistory := agent.Sessions.GetHistory(sessionKey)
tokenEstimate := al.estimateTokens(newHistory)
threshold := agent.ContextWindow * agent.SummarizeTokenPercent / 100
Expand Down Expand Up @@ -2718,7 +2803,10 @@ type compressionResult struct {
// prompt is built dynamically by BuildMessages and is NOT stored here.
// The compression note is recorded in the session summary so that
// BuildMessages can include it in the next system prompt.
func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) (compressionResult, bool) {
func (al *AgentLoop) forceCompression(
agent *AgentInstance,
sessionKey string,
) (compressionResult, bool) {
history := agent.Sessions.GetHistory(sessionKey)
if len(history) <= 2 {
return compressionResult{}, false
Expand Down Expand Up @@ -2871,7 +2959,11 @@ func formatToolsForLog(toolDefs []providers.ToolDefinition) string {
}

// summarizeSession summarizes the conversation history for a session.
func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string, turnScope turnEventScope) {
func (al *AgentLoop) summarizeSession(
agent *AgentInstance,
sessionKey string,
turnScope turnEventScope,
) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

Expand Down Expand Up @@ -3159,7 +3251,10 @@ func (al *AgentLoop) handleCommand(
}
}

func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOptions) *commands.Runtime {
func (al *AgentLoop) buildCommandsRuntime(
agent *AgentInstance,
opts *processOptions,
) *commands.Runtime {
registry := al.GetRegistry()
cfg := al.GetConfig()
rt := &commands.Runtime{
Expand Down Expand Up @@ -3200,7 +3295,10 @@ func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOpt
}
if agent != nil {
rt.GetModelInfo = func() (string, string) {
return agent.Model, resolvedCandidateProvider(agent.Candidates, cfg.Agents.Defaults.Provider)
return agent.Model, resolvedCandidateProvider(
agent.Candidates,
cfg.Agents.Defaults.Provider,
)
}
rt.SwitchModel = func(value string) (string, error) {
value = strings.TrimSpace(value)
Expand All @@ -3214,7 +3312,12 @@ func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOpt
return "", fmt.Errorf("failed to initialize model %q: %w", value, err)
}

nextCandidates := resolveModelCandidates(cfg, cfg.Agents.Defaults.Provider, modelCfg.Model, agent.Fallbacks)
nextCandidates := resolveModelCandidates(
cfg,
cfg.Agents.Defaults.Provider,
modelCfg.Model,
agent.Fallbacks,
)
if len(nextCandidates) == 0 {
return "", fmt.Errorf("model %q did not resolve to any provider candidates", value)
}
Expand Down Expand Up @@ -3303,7 +3406,10 @@ func (al *AgentLoop) applyExplicitSkillCommand(

canonicalSkill, ok := agent.ContextBuilder.ResolveSkillName(fields[1])
if !ok {
return true, true, fmt.Sprintf("Unknown skill: %s\nUse /list skills to see installed skills.", fields[1])
return true, true, fmt.Sprintf(
"Unknown skill: %s\nUse /list skills to see installed skills.",
fields[1],
)
}

if len(fields) == 2 {
Expand Down
Loading
Loading