Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions proxy/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ const (
StateShutdown ProcessState = ProcessState("shutdown")
)

// Strategies used for controlling the desired stopping behaviour
type StopStrategy int

const (
StopImmediately StopStrategy = iota
StopWaitForInflightRequest

// stop the process and reset its state to pristine
StopAndReset
)

type Process struct {
Expand Down Expand Up @@ -72,6 +76,7 @@ type Process struct {
gracefulStopTimeout time.Duration
}

// Create a new Process
func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process {
ctx, cancel := context.WithCancel(context.Background())
concurrentLimit := 10
Expand Down Expand Up @@ -201,6 +206,7 @@ func (p *Process) start() error {

// Set process state to failed
if err != nil {
p.proxyLogger.Errorf("<%s> cmd.Start() failed: %s", p.ID, err.Error())
if curState, swapErr := p.swapState(StateStarting, StateFailed); swapErr != nil {
return fmt.Errorf(
"failed to start command and state swap failed. command error: %v, current state: %v, state swap error: %v",
Expand Down
16 changes: 13 additions & 3 deletions proxy/processgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ProcessGroup struct {

// map of current processes
processes map[string]*Process
processMutex sync.Mutex
lastUsedProcess string
}

Expand Down Expand Up @@ -86,17 +87,26 @@ func (pg *ProcessGroup) StopProcesses(strategy StopStrategy) {

// stop Processes in parallel
var wg sync.WaitGroup
for _, process := range pg.processes {
for modelID, process := range pg.processes {
wg.Add(1)
go func(process *Process) {
go func(modelID string, process *Process) {
defer wg.Done()
switch strategy {
case StopImmediately:
process.StopImmediately()

// stop all processes and replace with a new prestine Process
case StopAndReset:
process.StopImmediately()
modelConfig, actualModelID, _ := pg.config.FindConfig(modelID)
pg.processMutex.Lock()
pg.proxyLogger.Debugf("<%s> Stopped and replaced with new Process", modelID)
pg.processes[actualModelID] = NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger)
pg.processMutex.Unlock()
default:
process.Stop()
}
}(process)
}(modelID, process)
}
wg.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/proxymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (pm *ProxyManager) sendErrorResponse(c *gin.Context, statusCode int, messag
}

func (pm *ProxyManager) unloadAllModelsHandler(c *gin.Context) {
pm.StopProcesses(StopImmediately)
pm.StopProcesses(StopAndReset)
c.String(http.StatusOK, "OK")
}

Expand Down