diff --git a/cmd/picoclaw/internal/gateway/helpers.go b/cmd/picoclaw/internal/gateway/helpers.go index 3562f03ef..85e93bcf9 100644 --- a/cmd/picoclaw/internal/gateway/helpers.go +++ b/cmd/picoclaw/internal/gateway/helpers.go @@ -7,6 +7,7 @@ import ( "os/signal" "path/filepath" "sync" + "syscall" "time" "github.com/sipeed/picoclaw/cmd/picoclaw/internal" @@ -43,7 +44,6 @@ import ( // Timeout constants for service operations const ( - serviceRestartTimeout = 30 * time.Second serviceShutdownTimeout = 30 * time.Second providerReloadTimeout = 30 * time.Second gracefulShutdownTimeout = 15 * time.Second @@ -121,7 +121,7 @@ func gatewayCmd(debug bool) error { defer stopWatch() sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Main event loop - wait for signals or config changes for { @@ -150,7 +150,8 @@ func setupAndStartServices( // Setup cron tool and service execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute - services.CronService = setupCronTool( + var err error + services.CronService, err = setupCronTool( agentLoop, msgBus, cfg.WorkspacePath(), @@ -158,7 +159,10 @@ func setupAndStartServices( execTimeout, cfg, ) - if err := services.CronService.Start(); err != nil { + if err != nil { + return nil, fmt.Errorf("error setting up cron service: %w", err) + } + if err = services.CronService.Start(); err != nil { return nil, fmt.Errorf("error starting cron service: %w", err) } fmt.Println("✓ Cron service started") @@ -170,26 +174,8 @@ func setupAndStartServices( cfg.Heartbeat.Enabled, ) services.HeartbeatService.SetBus(msgBus) - services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult { - // Use cli:direct as fallback if no valid channel - if channel == "" || chatID == "" { - channel, chatID = "cli", "direct" - } - // Use ProcessHeartbeat - no session history, each heartbeat is independent - var response string - var err error - response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID) - if err != nil { - return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) - } - if response == "HEARTBEAT_OK" { - return tools.SilentResult("Heartbeat OK") - } - // For heartbeat, always return silent - the subagent result will be - // sent to user via processSystemMessage when the async task completes - return tools.SilentResult(response) - }) - if err := services.HeartbeatService.Start(); err != nil { + services.HeartbeatService.SetHandler(createHeartbeatHandler(agentLoop)) + if err = services.HeartbeatService.Start(); err != nil { return nil, fmt.Errorf("error starting heartbeat service: %w", err) } fmt.Println("✓ Heartbeat service started") @@ -206,7 +192,6 @@ func setupAndStartServices( } // Create channel manager - var err error services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore) if err != nil { // Stop the media store if it's a FileMediaStore with cleanup @@ -238,7 +223,7 @@ func setupAndStartServices( services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) services.ChannelManager.SetupHTTPServer(addr, services.HealthServer) - if err := services.ChannelManager.StartAll(context.Background()); err != nil { + if err = services.ChannelManager.StartAll(context.Background()); err != nil { return nil, fmt.Errorf("error starting channels: %w", err) } @@ -251,7 +236,7 @@ func setupAndStartServices( MonitorUSB: cfg.Devices.MonitorUSB, }, stateManager) services.DeviceService.SetBus(msgBus) - if err := services.DeviceService.Start(context.Background()); err != nil { + if err = services.DeviceService.Start(context.Background()); err != nil { logger.ErrorCF("device", "Error starting device service", map[string]any{"error": err.Error()}) } else if cfg.Devices.Enabled { fmt.Println("✓ Device event service started") @@ -386,17 +371,13 @@ func restartServices( services *gatewayServices, msgBus *bus.MessageBus, ) error { - // Create an independent context with timeout for service restart - // This prevents cancellation from the main loop context during reload - ctx, cancel := context.WithTimeout(context.Background(), serviceRestartTimeout) - defer cancel() - // Get current config from agent loop (which has been updated if this is a reload) cfg := al.GetConfig() // Re-create and start cron service with new config execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute - services.CronService = setupCronTool( + var err error + services.CronService, err = setupCronTool( al, msgBus, cfg.WorkspacePath(), @@ -404,7 +385,10 @@ func restartServices( execTimeout, cfg, ) - if err := services.CronService.Start(); err != nil { + if err != nil { + return fmt.Errorf("error restarting cron service: %w", err) + } + if err = services.CronService.Start(); err != nil { return fmt.Errorf("error restarting cron service: %w", err) } fmt.Println(" ✓ Cron service restarted") @@ -416,31 +400,12 @@ func restartServices( cfg.Heartbeat.Enabled, ) services.HeartbeatService.SetBus(msgBus) - services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult { - if channel == "" || chatID == "" { - channel, chatID = "cli", "direct" - } - var response string - var err error - response, err = al.ProcessHeartbeat(context.Background(), prompt, channel, chatID) - if err != nil { - return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) - } - if response == "HEARTBEAT_OK" { - return tools.SilentResult("Heartbeat OK") - } - return tools.SilentResult(response) - }) - if err := services.HeartbeatService.Start(); err != nil { + services.HeartbeatService.SetHandler(createHeartbeatHandler(al)) + if err = services.HeartbeatService.Start(); err != nil { return fmt.Errorf("error restarting heartbeat service: %w", err) } fmt.Println(" ✓ Heartbeat service restarted") - // Stop the old media store before creating a new one - if fms, ok := services.MediaStore.(*media.FileMediaStore); ok { - fms.Stop() - } - // Re-create media store with new config services.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ Enabled: cfg.Tools.MediaCleanup.Enabled, @@ -454,13 +419,8 @@ func restartServices( al.SetMediaStore(services.MediaStore) // Re-create channel manager with new config - var err error services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore) if err != nil { - // Stop the media store if it's a FileMediaStore with cleanup - if fms, ok := services.MediaStore.(*media.FileMediaStore); ok { - fms.Stop() - } return fmt.Errorf("error recreating channel manager: %w", err) } al.SetChannelManager(services.ChannelManager) @@ -477,7 +437,8 @@ func restartServices( services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) services.ChannelManager.SetupHTTPServer(addr, services.HealthServer) - if err := services.ChannelManager.StartAll(ctx); err != nil { + // Use background context for lifecycle to ensure services persist after restartServices returns + if err = services.ChannelManager.StartAll(context.Background()); err != nil { return fmt.Errorf("error restarting channels: %w", err) } fmt.Printf( @@ -493,7 +454,7 @@ func restartServices( MonitorUSB: cfg.Devices.MonitorUSB, }, stateManager) services.DeviceService.SetBus(msgBus) - if err := services.DeviceService.Start(ctx); err != nil { + if err := services.DeviceService.Start(context.Background()); err != nil { logger.WarnCF("device", "Failed to restart device service", map[string]any{"error": err.Error()}) } else if cfg.Devices.Enabled { fmt.Println(" ✓ Device event service restarted") @@ -544,6 +505,10 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf // Debounce - wait a bit to ensure file write is complete time.Sleep(500 * time.Millisecond) + // Update last known state to prevent repeated reload attempts on failure + lastModTime = currentModTime + lastSize = currentSize + // Validate and load new config newCfg, err := config.LoadConfig(configPath) if err != nil { @@ -561,10 +526,6 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf logger.Info("✓ Config file validated and loaded") - // Update last known state - lastModTime = currentModTime - lastSize = currentSize - // Send new config to main loop (non-blocking) select { case configChan <- newCfg: @@ -613,7 +574,7 @@ func setupCronTool( restrict bool, execTimeout time.Duration, cfg *config.Config, -) *cron.CronService { +) (*cron.CronService, error) { cronStorePath := filepath.Join(workspace, "cron", "jobs.json") // Create cron service @@ -625,7 +586,7 @@ func setupCronTool( var err error cronTool, err = tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout, cfg) if err != nil { - logger.Fatalf("Critical error during CronTool initialization: %v", err) + return nil, fmt.Errorf("critical error during CronTool initialization: %w", err) } agentLoop.RegisterTool(cronTool) @@ -639,5 +600,27 @@ func setupCronTool( }) } - return cronService + return cronService, nil +} + +func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, chatID string) *tools.ToolResult { + return func(prompt, channel, chatID string) *tools.ToolResult { + // Use cli:direct as fallback if no valid channel + if channel == "" || chatID == "" { + channel, chatID = "cli", "direct" + } + // Use ProcessHeartbeat - no session history, each heartbeat is independent + var response string + var err error + response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID) + if err != nil { + return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) + } + if response == "HEARTBEAT_OK" { + return tools.SilentResult("Heartbeat OK") + } + // For heartbeat, always return silent - the subagent result will be + // sent to user via processSystemMessage when the async task completes + return tools.SilentResult(response) + } } diff --git a/pkg/health/server.go b/pkg/health/server.go index 5609ebdf6..b9ee9f496 100644 --- a/pkg/health/server.go +++ b/pkg/health/server.go @@ -6,6 +6,7 @@ import ( "fmt" "maps" "net/http" + "os" "sync" "time" ) @@ -29,6 +30,7 @@ type StatusResponse struct { Status string `json:"status"` Uptime string `json:"uptime"` Checks map[string]Check `json:"checks,omitempty"` + Pid int `json:"pid"` } func NewServer(host string, port int) *Server { @@ -112,6 +114,7 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { resp := StatusResponse{ Status: "ok", Uptime: uptime.String(), + Pid: os.Getpid(), } json.NewEncoder(w).Encode(resp)