Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions proxy/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ const (
StateShutdown ProcessState = ProcessState("shutdown")
)

// Strategies used for controlling the desired stopping behaviour
type StopStrategy int

const (
StopImmediately StopStrategy = iota
StopWaitForInflightRequest

// stop the process and reset its state to pristine
StopAndReset
)

type Process struct {
Expand Down Expand Up @@ -72,6 +76,7 @@ type Process struct {
gracefulStopTimeout time.Duration
}

// Create a new Process
func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process {
ctx, cancel := context.WithCancel(context.Background())
concurrentLimit := 10
Expand Down Expand Up @@ -201,6 +206,7 @@ func (p *Process) start() error {

// Set process state to failed
if err != nil {
p.proxyLogger.Errorf("<%s> cmd.Start() failed: %s", p.ID, err.Error())
if curState, swapErr := p.swapState(StateStarting, StateFailed); swapErr != nil {
return fmt.Errorf(
"failed to start command and state swap failed. command error: %v, current state: %v, state swap error: %v",
Expand Down
16 changes: 13 additions & 3 deletions proxy/processgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ProcessGroup struct {

// map of current processes
processes map[string]*Process
processMutex sync.Mutex
lastUsedProcess string
}

Expand Down Expand Up @@ -86,17 +87,26 @@ func (pg *ProcessGroup) StopProcesses(strategy StopStrategy) {

// stop Processes in parallel
var wg sync.WaitGroup
for _, process := range pg.processes {
for modelID, process := range pg.processes {
wg.Add(1)
go func(process *Process) {
go func(modelID string, process *Process) {
defer wg.Done()
switch strategy {
case StopImmediately:
process.StopImmediately()

// stop all processes and replace with a new prestine Process
case StopAndReset:
process.StopImmediately()
modelConfig, _, _ := pg.config.FindConfig(modelID)
pg.processMutex.Lock()
pg.proxyLogger.Debugf("<%s> Stopped and replaced with new Process", modelID)
pg.processes[modelID] = NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger)
pg.processMutex.Unlock()
default:
process.Stop()
}
}(process)
}(modelID, process)
}
wg.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/proxymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (pm *ProxyManager) sendErrorResponse(c *gin.Context, statusCode int, messag
}

func (pm *ProxyManager) unloadAllModelsHandler(c *gin.Context) {
pm.StopProcesses(StopImmediately)
pm.StopProcesses(StopAndReset)
c.String(http.StatusOK, "OK")
}

Expand Down