Skip to content

Commit be4a33c

Browse files
authored
refactor gateway/helpers and add server.pid to health (#1646)
1 parent 79b0568 commit be4a33c

File tree

2 files changed

+55
-69
lines changed

2 files changed

+55
-69
lines changed

cmd/picoclaw/internal/gateway/helpers.go

Lines changed: 52 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os/signal"
88
"path/filepath"
99
"sync"
10+
"syscall"
1011
"time"
1112

1213
"github.com/sipeed/picoclaw/cmd/picoclaw/internal"
@@ -43,7 +44,6 @@ import (
4344

4445
// Timeout constants for service operations
4546
const (
46-
serviceRestartTimeout = 30 * time.Second
4747
serviceShutdownTimeout = 30 * time.Second
4848
providerReloadTimeout = 30 * time.Second
4949
gracefulShutdownTimeout = 15 * time.Second
@@ -121,7 +121,7 @@ func gatewayCmd(debug bool) error {
121121
defer stopWatch()
122122

123123
sigChan := make(chan os.Signal, 1)
124-
signal.Notify(sigChan, os.Interrupt)
124+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
125125

126126
// Main event loop - wait for signals or config changes
127127
for {
@@ -150,15 +150,19 @@ func setupAndStartServices(
150150

151151
// Setup cron tool and service
152152
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
153-
services.CronService = setupCronTool(
153+
var err error
154+
services.CronService, err = setupCronTool(
154155
agentLoop,
155156
msgBus,
156157
cfg.WorkspacePath(),
157158
cfg.Agents.Defaults.RestrictToWorkspace,
158159
execTimeout,
159160
cfg,
160161
)
161-
if err := services.CronService.Start(); err != nil {
162+
if err != nil {
163+
return nil, fmt.Errorf("error setting up cron service: %w", err)
164+
}
165+
if err = services.CronService.Start(); err != nil {
162166
return nil, fmt.Errorf("error starting cron service: %w", err)
163167
}
164168
fmt.Println("✓ Cron service started")
@@ -170,26 +174,8 @@ func setupAndStartServices(
170174
cfg.Heartbeat.Enabled,
171175
)
172176
services.HeartbeatService.SetBus(msgBus)
173-
services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult {
174-
// Use cli:direct as fallback if no valid channel
175-
if channel == "" || chatID == "" {
176-
channel, chatID = "cli", "direct"
177-
}
178-
// Use ProcessHeartbeat - no session history, each heartbeat is independent
179-
var response string
180-
var err error
181-
response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
182-
if err != nil {
183-
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
184-
}
185-
if response == "HEARTBEAT_OK" {
186-
return tools.SilentResult("Heartbeat OK")
187-
}
188-
// For heartbeat, always return silent - the subagent result will be
189-
// sent to user via processSystemMessage when the async task completes
190-
return tools.SilentResult(response)
191-
})
192-
if err := services.HeartbeatService.Start(); err != nil {
177+
services.HeartbeatService.SetHandler(createHeartbeatHandler(agentLoop))
178+
if err = services.HeartbeatService.Start(); err != nil {
193179
return nil, fmt.Errorf("error starting heartbeat service: %w", err)
194180
}
195181
fmt.Println("✓ Heartbeat service started")
@@ -206,7 +192,6 @@ func setupAndStartServices(
206192
}
207193

208194
// Create channel manager
209-
var err error
210195
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
211196
if err != nil {
212197
// Stop the media store if it's a FileMediaStore with cleanup
@@ -238,7 +223,7 @@ func setupAndStartServices(
238223
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
239224
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)
240225

241-
if err := services.ChannelManager.StartAll(context.Background()); err != nil {
226+
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
242227
return nil, fmt.Errorf("error starting channels: %w", err)
243228
}
244229

@@ -251,7 +236,7 @@ func setupAndStartServices(
251236
MonitorUSB: cfg.Devices.MonitorUSB,
252237
}, stateManager)
253238
services.DeviceService.SetBus(msgBus)
254-
if err := services.DeviceService.Start(context.Background()); err != nil {
239+
if err = services.DeviceService.Start(context.Background()); err != nil {
255240
logger.ErrorCF("device", "Error starting device service", map[string]any{"error": err.Error()})
256241
} else if cfg.Devices.Enabled {
257242
fmt.Println("✓ Device event service started")
@@ -386,25 +371,24 @@ func restartServices(
386371
services *gatewayServices,
387372
msgBus *bus.MessageBus,
388373
) error {
389-
// Create an independent context with timeout for service restart
390-
// This prevents cancellation from the main loop context during reload
391-
ctx, cancel := context.WithTimeout(context.Background(), serviceRestartTimeout)
392-
defer cancel()
393-
394374
// Get current config from agent loop (which has been updated if this is a reload)
395375
cfg := al.GetConfig()
396376

397377
// Re-create and start cron service with new config
398378
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
399-
services.CronService = setupCronTool(
379+
var err error
380+
services.CronService, err = setupCronTool(
400381
al,
401382
msgBus,
402383
cfg.WorkspacePath(),
403384
cfg.Agents.Defaults.RestrictToWorkspace,
404385
execTimeout,
405386
cfg,
406387
)
407-
if err := services.CronService.Start(); err != nil {
388+
if err != nil {
389+
return fmt.Errorf("error restarting cron service: %w", err)
390+
}
391+
if err = services.CronService.Start(); err != nil {
408392
return fmt.Errorf("error restarting cron service: %w", err)
409393
}
410394
fmt.Println(" ✓ Cron service restarted")
@@ -416,31 +400,12 @@ func restartServices(
416400
cfg.Heartbeat.Enabled,
417401
)
418402
services.HeartbeatService.SetBus(msgBus)
419-
services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult {
420-
if channel == "" || chatID == "" {
421-
channel, chatID = "cli", "direct"
422-
}
423-
var response string
424-
var err error
425-
response, err = al.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
426-
if err != nil {
427-
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
428-
}
429-
if response == "HEARTBEAT_OK" {
430-
return tools.SilentResult("Heartbeat OK")
431-
}
432-
return tools.SilentResult(response)
433-
})
434-
if err := services.HeartbeatService.Start(); err != nil {
403+
services.HeartbeatService.SetHandler(createHeartbeatHandler(al))
404+
if err = services.HeartbeatService.Start(); err != nil {
435405
return fmt.Errorf("error restarting heartbeat service: %w", err)
436406
}
437407
fmt.Println(" ✓ Heartbeat service restarted")
438408

439-
// Stop the old media store before creating a new one
440-
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
441-
fms.Stop()
442-
}
443-
444409
// Re-create media store with new config
445410
services.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
446411
Enabled: cfg.Tools.MediaCleanup.Enabled,
@@ -454,13 +419,8 @@ func restartServices(
454419
al.SetMediaStore(services.MediaStore)
455420

456421
// Re-create channel manager with new config
457-
var err error
458422
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
459423
if err != nil {
460-
// Stop the media store if it's a FileMediaStore with cleanup
461-
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
462-
fms.Stop()
463-
}
464424
return fmt.Errorf("error recreating channel manager: %w", err)
465425
}
466426
al.SetChannelManager(services.ChannelManager)
@@ -477,7 +437,8 @@ func restartServices(
477437
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
478438
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)
479439

480-
if err := services.ChannelManager.StartAll(ctx); err != nil {
440+
// Use background context for lifecycle to ensure services persist after restartServices returns
441+
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
481442
return fmt.Errorf("error restarting channels: %w", err)
482443
}
483444
fmt.Printf(
@@ -493,7 +454,7 @@ func restartServices(
493454
MonitorUSB: cfg.Devices.MonitorUSB,
494455
}, stateManager)
495456
services.DeviceService.SetBus(msgBus)
496-
if err := services.DeviceService.Start(ctx); err != nil {
457+
if err := services.DeviceService.Start(context.Background()); err != nil {
497458
logger.WarnCF("device", "Failed to restart device service", map[string]any{"error": err.Error()})
498459
} else if cfg.Devices.Enabled {
499460
fmt.Println(" ✓ Device event service restarted")
@@ -544,6 +505,10 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf
544505
// Debounce - wait a bit to ensure file write is complete
545506
time.Sleep(500 * time.Millisecond)
546507

508+
// Update last known state to prevent repeated reload attempts on failure
509+
lastModTime = currentModTime
510+
lastSize = currentSize
511+
547512
// Validate and load new config
548513
newCfg, err := config.LoadConfig(configPath)
549514
if err != nil {
@@ -561,10 +526,6 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf
561526

562527
logger.Info("✓ Config file validated and loaded")
563528

564-
// Update last known state
565-
lastModTime = currentModTime
566-
lastSize = currentSize
567-
568529
// Send new config to main loop (non-blocking)
569530
select {
570531
case configChan <- newCfg:
@@ -613,7 +574,7 @@ func setupCronTool(
613574
restrict bool,
614575
execTimeout time.Duration,
615576
cfg *config.Config,
616-
) *cron.CronService {
577+
) (*cron.CronService, error) {
617578
cronStorePath := filepath.Join(workspace, "cron", "jobs.json")
618579

619580
// Create cron service
@@ -625,7 +586,7 @@ func setupCronTool(
625586
var err error
626587
cronTool, err = tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout, cfg)
627588
if err != nil {
628-
logger.Fatalf("Critical error during CronTool initialization: %v", err)
589+
return nil, fmt.Errorf("critical error during CronTool initialization: %w", err)
629590
}
630591

631592
agentLoop.RegisterTool(cronTool)
@@ -639,5 +600,27 @@ func setupCronTool(
639600
})
640601
}
641602

642-
return cronService
603+
return cronService, nil
604+
}
605+
606+
func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, chatID string) *tools.ToolResult {
607+
return func(prompt, channel, chatID string) *tools.ToolResult {
608+
// Use cli:direct as fallback if no valid channel
609+
if channel == "" || chatID == "" {
610+
channel, chatID = "cli", "direct"
611+
}
612+
// Use ProcessHeartbeat - no session history, each heartbeat is independent
613+
var response string
614+
var err error
615+
response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
616+
if err != nil {
617+
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
618+
}
619+
if response == "HEARTBEAT_OK" {
620+
return tools.SilentResult("Heartbeat OK")
621+
}
622+
// For heartbeat, always return silent - the subagent result will be
623+
// sent to user via processSystemMessage when the async task completes
624+
return tools.SilentResult(response)
625+
}
643626
}

pkg/health/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"maps"
88
"net/http"
9+
"os"
910
"sync"
1011
"time"
1112
)
@@ -29,6 +30,7 @@ type StatusResponse struct {
2930
Status string `json:"status"`
3031
Uptime string `json:"uptime"`
3132
Checks map[string]Check `json:"checks,omitempty"`
33+
Pid int `json:"pid"`
3234
}
3335

3436
func NewServer(host string, port int) *Server {
@@ -112,6 +114,7 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
112114
resp := StatusResponse{
113115
Status: "ok",
114116
Uptime: uptime.String(),
117+
Pid: os.Getpid(),
115118
}
116119

117120
json.NewEncoder(w).Encode(resp)

0 commit comments

Comments
 (0)