Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 52 additions & 69 deletions cmd/picoclaw/internal/gateway/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/sipeed/picoclaw/cmd/picoclaw/internal"
Expand Down Expand Up @@ -43,7 +44,6 @@ import (

// Timeout constants for service operations
const (
serviceRestartTimeout = 30 * time.Second
serviceShutdownTimeout = 30 * time.Second
providerReloadTimeout = 30 * time.Second
gracefulShutdownTimeout = 15 * time.Second
Expand Down Expand Up @@ -121,7 +121,7 @@ func gatewayCmd(debug bool) error {
defer stopWatch()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

// Main event loop - wait for signals or config changes
for {
Expand Down Expand Up @@ -150,15 +150,19 @@ func setupAndStartServices(

// Setup cron tool and service
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
services.CronService = setupCronTool(
var err error
services.CronService, err = setupCronTool(
agentLoop,
msgBus,
cfg.WorkspacePath(),
cfg.Agents.Defaults.RestrictToWorkspace,
execTimeout,
cfg,
)
if err := services.CronService.Start(); err != nil {
if err != nil {
return nil, fmt.Errorf("error setting up cron service: %w", err)
}
if err = services.CronService.Start(); err != nil {
return nil, fmt.Errorf("error starting cron service: %w", err)
}
fmt.Println("βœ“ Cron service started")
Expand All @@ -170,26 +174,8 @@ func setupAndStartServices(
cfg.Heartbeat.Enabled,
)
services.HeartbeatService.SetBus(msgBus)
services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult {
// Use cli:direct as fallback if no valid channel
if channel == "" || chatID == "" {
channel, chatID = "cli", "direct"
}
// Use ProcessHeartbeat - no session history, each heartbeat is independent
var response string
var err error
response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
if err != nil {
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
}
if response == "HEARTBEAT_OK" {
return tools.SilentResult("Heartbeat OK")
}
// For heartbeat, always return silent - the subagent result will be
// sent to user via processSystemMessage when the async task completes
return tools.SilentResult(response)
})
if err := services.HeartbeatService.Start(); err != nil {
services.HeartbeatService.SetHandler(createHeartbeatHandler(agentLoop))
if err = services.HeartbeatService.Start(); err != nil {
return nil, fmt.Errorf("error starting heartbeat service: %w", err)
}
fmt.Println("βœ“ Heartbeat service started")
Expand All @@ -206,7 +192,6 @@ func setupAndStartServices(
}

// Create channel manager
var err error
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
if err != nil {
// Stop the media store if it's a FileMediaStore with cleanup
Expand Down Expand Up @@ -238,7 +223,7 @@ func setupAndStartServices(
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)

if err := services.ChannelManager.StartAll(context.Background()); err != nil {
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
return nil, fmt.Errorf("error starting channels: %w", err)
}

Expand All @@ -251,7 +236,7 @@ func setupAndStartServices(
MonitorUSB: cfg.Devices.MonitorUSB,
}, stateManager)
services.DeviceService.SetBus(msgBus)
if err := services.DeviceService.Start(context.Background()); err != nil {
if err = services.DeviceService.Start(context.Background()); err != nil {
logger.ErrorCF("device", "Error starting device service", map[string]any{"error": err.Error()})
} else if cfg.Devices.Enabled {
fmt.Println("βœ“ Device event service started")
Expand Down Expand Up @@ -386,25 +371,24 @@ func restartServices(
services *gatewayServices,
msgBus *bus.MessageBus,
) error {
// Create an independent context with timeout for service restart
// This prevents cancellation from the main loop context during reload
ctx, cancel := context.WithTimeout(context.Background(), serviceRestartTimeout)
defer cancel()

// Get current config from agent loop (which has been updated if this is a reload)
cfg := al.GetConfig()

// Re-create and start cron service with new config
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
services.CronService = setupCronTool(
var err error
services.CronService, err = setupCronTool(
al,
msgBus,
cfg.WorkspacePath(),
cfg.Agents.Defaults.RestrictToWorkspace,
execTimeout,
cfg,
)
if err := services.CronService.Start(); err != nil {
if err != nil {
return fmt.Errorf("error restarting cron service: %w", err)
}
if err = services.CronService.Start(); err != nil {
return fmt.Errorf("error restarting cron service: %w", err)
}
fmt.Println(" βœ“ Cron service restarted")
Expand All @@ -416,31 +400,12 @@ func restartServices(
cfg.Heartbeat.Enabled,
)
services.HeartbeatService.SetBus(msgBus)
services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult {
if channel == "" || chatID == "" {
channel, chatID = "cli", "direct"
}
var response string
var err error
response, err = al.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
if err != nil {
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
}
if response == "HEARTBEAT_OK" {
return tools.SilentResult("Heartbeat OK")
}
return tools.SilentResult(response)
})
if err := services.HeartbeatService.Start(); err != nil {
services.HeartbeatService.SetHandler(createHeartbeatHandler(al))
if err = services.HeartbeatService.Start(); err != nil {
return fmt.Errorf("error restarting heartbeat service: %w", err)
}
fmt.Println(" βœ“ Heartbeat service restarted")

// Stop the old media store before creating a new one
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Stop()
}

// Re-create media store with new config
services.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
Enabled: cfg.Tools.MediaCleanup.Enabled,
Expand All @@ -454,13 +419,8 @@ func restartServices(
al.SetMediaStore(services.MediaStore)

// Re-create channel manager with new config
var err error
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
if err != nil {
// Stop the media store if it's a FileMediaStore with cleanup
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Stop()
}
return fmt.Errorf("error recreating channel manager: %w", err)
}
al.SetChannelManager(services.ChannelManager)
Expand All @@ -477,7 +437,8 @@ func restartServices(
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)

if err := services.ChannelManager.StartAll(ctx); err != nil {
// Use background context for lifecycle to ensure services persist after restartServices returns
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
return fmt.Errorf("error restarting channels: %w", err)
}
fmt.Printf(
Expand All @@ -493,7 +454,7 @@ func restartServices(
MonitorUSB: cfg.Devices.MonitorUSB,
}, stateManager)
services.DeviceService.SetBus(msgBus)
if err := services.DeviceService.Start(ctx); err != nil {
if err := services.DeviceService.Start(context.Background()); err != nil {
logger.WarnCF("device", "Failed to restart device service", map[string]any{"error": err.Error()})
} else if cfg.Devices.Enabled {
fmt.Println(" βœ“ Device event service restarted")
Expand Down Expand Up @@ -544,6 +505,10 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf
// Debounce - wait a bit to ensure file write is complete
time.Sleep(500 * time.Millisecond)

// Update last known state to prevent repeated reload attempts on failure
lastModTime = currentModTime
lastSize = currentSize

// Validate and load new config
newCfg, err := config.LoadConfig(configPath)
if err != nil {
Expand All @@ -561,10 +526,6 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf

logger.Info("βœ“ Config file validated and loaded")

// Update last known state
lastModTime = currentModTime
lastSize = currentSize

// Send new config to main loop (non-blocking)
select {
case configChan <- newCfg:
Expand Down Expand Up @@ -613,7 +574,7 @@ func setupCronTool(
restrict bool,
execTimeout time.Duration,
cfg *config.Config,
) *cron.CronService {
) (*cron.CronService, error) {
cronStorePath := filepath.Join(workspace, "cron", "jobs.json")

// Create cron service
Expand All @@ -625,7 +586,7 @@ func setupCronTool(
var err error
cronTool, err = tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout, cfg)
if err != nil {
logger.Fatalf("Critical error during CronTool initialization: %v", err)
return nil, fmt.Errorf("critical error during CronTool initialization: %w", err)
}

agentLoop.RegisterTool(cronTool)
Expand All @@ -639,5 +600,27 @@ func setupCronTool(
})
}

return cronService
return cronService, nil
}

func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, chatID string) *tools.ToolResult {
return func(prompt, channel, chatID string) *tools.ToolResult {
// Use cli:direct as fallback if no valid channel
if channel == "" || chatID == "" {
channel, chatID = "cli", "direct"
}
// Use ProcessHeartbeat - no session history, each heartbeat is independent
var response string
var err error
response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
if err != nil {
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
}
if response == "HEARTBEAT_OK" {
return tools.SilentResult("Heartbeat OK")
}
// For heartbeat, always return silent - the subagent result will be
// sent to user via processSystemMessage when the async task completes
return tools.SilentResult(response)
}
}
3 changes: 3 additions & 0 deletions pkg/health/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"maps"
"net/http"
"os"
"sync"
"time"
)
Expand All @@ -29,6 +30,7 @@ type StatusResponse struct {
Status string `json:"status"`
Uptime string `json:"uptime"`
Checks map[string]Check `json:"checks,omitempty"`
Pid int `json:"pid"`
}

func NewServer(host string, port int) *Server {
Expand Down Expand Up @@ -112,6 +114,7 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
resp := StatusResponse{
Status: "ok",
Uptime: uptime.String(),
Pid: os.Getpid(),
}

json.NewEncoder(w).Encode(resp)
Expand Down
Loading