Skip to content

Commit 16da4c3

Browse files
committed
Refactor Process stop logic to use Cancel context
1 parent 1e16c83 commit 16da4c3

File tree

2 files changed

+111
-82
lines changed

2 files changed

+111
-82
lines changed

misc/process-cmd-test/main.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"os/exec"
@@ -39,7 +40,7 @@ func main() {
3940
//cmd := exec.CommandContext(ctx, "sleep", "1")
4041
cmd := exec.CommandContext(ctx,
4142
"../../build/simple-responder_darwin_arm64",
42-
"-ignore-sig-term", /* so it doesn't exit on receiving SIGTERM, test cmd.WaitTimeout */
43+
//"-ignore-sig-term", /* so it doesn't exit on receiving SIGTERM, test cmd.WaitTimeout */
4344
)
4445
cmd.Stdin = os.Stdin
4546
cmd.Stdout = os.Stdout
@@ -50,7 +51,18 @@ func main() {
5051
cmd.Cancel = func() error {
5152
fmt.Println("✔︎ Cancel() called, sending SIGTERM")
5253
cmd.Process.Signal(syscall.SIGTERM)
53-
return nil
54+
55+
//return nil
56+
57+
// this error is returned by cmd.Wait(), and can be used to
58+
// single an error when the process couldn't be normally terminated
59+
// but since a SIGTERM is sent, it's probably ok to return a nil
60+
// as WaitDelay timing out will override the any error set here.
61+
//
62+
// test by enabling/disabling -ignore-sig-term on the process
63+
// with -ignore-sig-term enabled, cmd.Wait() will have "signal: killed"
64+
// without it, it will show the "new error from cancel"
65+
return errors.New("error from cmd.Cancel()") // sets error returned by cmd.Wait()
5466
}
5567

5668
if err := cmd.Start(); err != nil {

proxy/process.go

Lines changed: 97 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ type Process struct {
4343
config ModelConfig
4444
cmd *exec.Cmd
4545

46-
// for p.cmd.Wait() select { ... }
47-
cmdWaitChan chan error
46+
// PR #155 called to cancel the upstream process
47+
cancelUpstream context.CancelFunc
4848

4949
processLogger *LogMonitor
5050
proxyLogger *LogMonitor
@@ -65,11 +65,16 @@ type Process struct {
6565
// for managing concurrency limits
6666
concurrencyLimitSemaphore chan struct{}
6767

68-
// stop timeout waiting for graceful shutdown
69-
gracefulStopTimeout time.Duration
70-
7168
// track that this happened
7269
upstreamWasStoppedWithKill bool
70+
71+
// !!!
72+
// These properties are to be removed when migration over exec.CommandContext is complete
73+
74+
// for p.cmd.Wait() select { ... }
75+
cmdWaitChan chan error
76+
// stop timeout waiting for graceful shutdown
77+
gracefulStopTimeout time.Duration
7378
}
7479

7580
func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process {
@@ -82,7 +87,7 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLo
8287
ID: ID,
8388
config: config,
8489
cmd: nil,
85-
cmdWaitChan: make(chan error, 1),
90+
cancelUpstream: nil,
8691
processLogger: processLogger,
8792
proxyLogger: proxyLogger,
8893
healthCheckTimeout: healthCheckTimeout,
@@ -92,9 +97,12 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLo
9297
// concurrency limit
9398
concurrencyLimitSemaphore: make(chan struct{}, concurrentLimit),
9499

95-
// stop timeout
96-
gracefulStopTimeout: 10 * time.Second,
97100
upstreamWasStoppedWithKill: false,
101+
102+
// To be removed when migration over exec.CommandContext is complete
103+
// stop timeout
104+
gracefulStopTimeout: 10 * time.Second,
105+
cmdWaitChan: make(chan error, 1),
98106
}
99107
}
100108

@@ -190,12 +198,16 @@ func (p *Process) start() error {
190198

191199
p.waitStarting.Add(1)
192200
defer p.waitStarting.Done()
193-
194-
p.cmd = exec.Command(args[0], args[1:]...)
201+
cmdContext, ctxCancelUpstream := context.WithCancel(context.Background())
202+
p.cmd = exec.CommandContext(cmdContext, args[0], args[1:]...)
195203
p.cmd.Stdout = p.processLogger
196204
p.cmd.Stderr = p.processLogger
197205
p.cmd.Env = p.config.Env
198206

207+
p.cmd.Cancel = p.cmdStopUpstreamProcess
208+
p.cmd.WaitDelay = p.gracefulStopTimeout
209+
p.cancelUpstream = ctxCancelUpstream
210+
199211
err = p.cmd.Start()
200212

201213
// Set process state to failed
@@ -358,12 +370,7 @@ func (p *Process) StopImmediately() {
358370
}
359371
}
360372

361-
// stop the process with a graceful exit timeout
362-
p.stopCommand(p.gracefulStopTimeout)
363-
364-
if curState, err := p.swapState(StateStopping, StateStopped); err != nil {
365-
p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState)
366-
}
373+
p.stopCommand()
367374
}
368375

369376
// Shutdown is called when llama-swap is shutting down. It will give a little bit
@@ -375,90 +382,49 @@ func (p *Process) Shutdown() {
375382
return
376383
}
377384

378-
p.stopCommand(p.gracefulStopTimeout)
379-
385+
p.stopCommand()
380386
// just force it to this state since there is no recovery from shutdown
381387
p.state = StateShutdown
382388
}
383389

384390
// stopCommand will send a SIGTERM to the process and wait for it to exit.
385391
// If it does not exit within 5 seconds, it will send a SIGKILL.
386-
func (p *Process) stopCommand(sigtermTTL time.Duration) {
392+
func (p *Process) stopCommand() {
387393
stopStartTime := time.Now()
388394
defer func() {
389395
p.proxyLogger.Debugf("<%s> stopCommand took %v", p.ID, time.Since(stopStartTime))
390396
}()
391397

392-
sigtermTimeout, cancelTimeout := context.WithTimeout(context.Background(), sigtermTTL)
393-
defer cancelTimeout()
394-
395-
if p.cmd == nil || p.cmd.Process == nil {
396-
p.proxyLogger.Debugf("<%s> cmd or cmd.Process is nil (normal during config reload)", p.ID)
398+
if p.cancelUpstream == nil {
399+
p.proxyLogger.Errorf("<%s> stopCommand has a nil p.cancelUpstream()", p.ID)
397400
return
398401
}
399402

400-
// if err := p.terminateProcess(); err != nil {
401-
// p.proxyLogger.Debugf("<%s> Process already terminated: %v (normal during shutdown)", p.ID, err)
402-
// }
403-
// the default cmdStop to taskkill /f /t /pid ${PID}
404-
if runtime.GOOS == "windows" && strings.TrimSpace(p.config.CmdStop) == "" {
405-
p.config.CmdStop = "taskkill /f /t /pid ${PID}"
406-
}
407-
408-
if p.config.CmdStop != "" {
409-
// replace ${PID} with the pid of the process
410-
stopArgs, err := SanitizeCommand(strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", p.cmd.Process.Pid)))
411-
if err != nil {
412-
p.proxyLogger.Errorf("<%s> Failed to sanitize stop command: %v", p.ID, err)
413-
return
414-
}
415-
416-
p.proxyLogger.Debugf("<%s> Executing stop command: %s", p.ID, strings.Join(stopArgs, " "))
417-
418-
stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...)
419-
stopCmd.Stdout = p.processLogger
420-
stopCmd.Stderr = p.processLogger
421-
stopCmd.Env = p.config.Env
422-
423-
if err := stopCmd.Run(); err != nil {
424-
p.proxyLogger.Errorf("<%s> Failed to exec stop command: %v", p.ID, err)
425-
return
426-
}
427-
} else {
428-
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
429-
p.proxyLogger.Errorf("<%s> Failed to send SIGTERM to process: %v", p.ID, err)
430-
return
431-
}
432-
}
403+
p.cancelUpstream()
404+
err := <-p.cmdWaitChan
433405

434-
select {
435-
case <-sigtermTimeout.Done():
436-
p.proxyLogger.Debugf("<%s> Process timed out waiting to stop, sending KILL signal (normal during shutdown)", p.ID)
437-
p.upstreamWasStoppedWithKill = true
438-
if err := p.cmd.Process.Kill(); err != nil {
439-
p.proxyLogger.Errorf("<%s> Failed to kill process: %v", p.ID, err)
440-
}
441-
case err := <-p.cmdWaitChan:
442-
// Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK
443-
// because if we make it here then the cmd has been successfully running and made it
444-
// through the health check. There is a possibility that the cmd crashed after the health check
445-
// succeeded but that's not a case llama-swap is handling for now.
446-
if err != nil {
447-
if errno, ok := err.(syscall.Errno); ok {
448-
p.proxyLogger.Errorf("<%s> errno >> %v", p.ID, errno)
449-
} else if exitError, ok := err.(*exec.ExitError); ok {
450-
if strings.Contains(exitError.String(), "signal: terminated") {
451-
p.proxyLogger.Debugf("<%s> Process stopped OK", p.ID)
452-
} else if strings.Contains(exitError.String(), "signal: interrupt") {
453-
p.proxyLogger.Debugf("<%s> Process interrupted OK", p.ID)
454-
} else {
455-
p.proxyLogger.Warnf("<%s> ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode())
456-
}
406+
// Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK
407+
// because if we make it here then the cmd has been successfully running and made it
408+
// through the health check. There is a possibility that the cmd crashed after the health check
409+
// succeeded but that's not a case llama-swap is handling for now.
410+
if err != nil {
411+
if errno, ok := err.(syscall.Errno); ok {
412+
p.proxyLogger.Errorf("<%s> errno >> %v", p.ID, errno)
413+
} else if exitError, ok := err.(*exec.ExitError); ok {
414+
if strings.Contains(exitError.String(), "signal: terminated") {
415+
p.proxyLogger.Debugf("<%s> Process stopped OK", p.ID)
416+
} else if strings.Contains(exitError.String(), "signal: interrupt") {
417+
p.proxyLogger.Debugf("<%s> Process interrupted OK", p.ID)
457418
} else {
419+
p.proxyLogger.Warnf("<%s> ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode())
420+
}
421+
} else {
422+
if err.Error() != "context canceled" /* this is normal */ {
458423
p.proxyLogger.Errorf("<%s> Process exited >> %v", p.ID, err)
459424
}
460425
}
461426
}
427+
462428
}
463429

464430
func (p *Process) checkHealthEndpoint(healthURL string) error {
@@ -587,5 +553,56 @@ func (p *Process) waitForCmd() {
587553
return
588554
}
589555

556+
// handle state clean up and changes
557+
if p.CurrentState() == StateStopping {
558+
if curState, err := p.swapState(StateStopping, StateStopped); err != nil {
559+
p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState)
560+
}
561+
}
562+
590563
p.cmdWaitChan <- exitErr
591564
}
565+
566+
// cmdStopUpstreamProcess attemps to stop the upstream process gracefully
567+
func (p *Process) cmdStopUpstreamProcess() error {
568+
p.processLogger.Debugf("<%s> cmdStopUpstreamProcess() initiating graceful stop of upstream process", p.ID)
569+
570+
// this should never happen ...
571+
if p.cmd == nil || p.cmd.Process == nil {
572+
p.proxyLogger.Debugf("<%s> cmd or cmd.Process is nil (normal during config reload)", p.ID)
573+
return fmt.Errorf("<%s> process is nil or cmd is nil, skipping graceful stop", p.ID)
574+
}
575+
576+
// the default cmdStop to taskkill /f /t /pid ${PID}
577+
if runtime.GOOS == "windows" && strings.TrimSpace(p.config.CmdStop) == "" {
578+
p.config.CmdStop = "taskkill /f /t /pid ${PID}"
579+
}
580+
581+
if p.config.CmdStop != "" {
582+
// replace ${PID} with the pid of the process
583+
stopArgs, err := SanitizeCommand(strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", p.cmd.Process.Pid)))
584+
if err != nil {
585+
p.proxyLogger.Errorf("<%s> Failed to sanitize stop command: %v", p.ID, err)
586+
return err
587+
}
588+
589+
p.proxyLogger.Debugf("<%s> Executing stop command: %s", p.ID, strings.Join(stopArgs, " "))
590+
591+
stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...)
592+
stopCmd.Stdout = p.processLogger
593+
stopCmd.Stderr = p.processLogger
594+
stopCmd.Env = p.config.Env
595+
596+
if err := stopCmd.Run(); err != nil {
597+
p.proxyLogger.Errorf("<%s> Failed to exec stop command: %v", p.ID, err)
598+
return err
599+
}
600+
} else {
601+
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
602+
p.proxyLogger.Errorf("<%s> Failed to send SIGTERM to process: %v", p.ID, err)
603+
return err
604+
}
605+
}
606+
607+
return nil
608+
}

0 commit comments

Comments
 (0)