Skip to content

Commit e5af39a

Browse files
committed
feat: config hot-reload - fix merge conflicts
1 parent 1766292 commit e5af39a

File tree

7 files changed

+153
-112
lines changed

7 files changed

+153
-112
lines changed

llama-swap.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,22 @@ func watchConfigFileWithReload(configPath string, reloadChan chan<- *proxy.Proxy
153153
debounceTimer = time.AfterFunc(debounceDuration, func() {
154154
log.Printf("Config file modified: %s, reloading...", event.Name)
155155

156-
// Load new configuration
157-
newConfig, err := proxy.LoadConfig(configPath)
156+
// Try up to 3 times with exponential backoff
157+
var newConfig proxy.Config
158+
var err error
159+
for retries := 0; retries < 3; retries++ {
160+
// Load new configuration
161+
newConfig, err = proxy.LoadConfig(configPath)
162+
if err == nil {
163+
break
164+
}
165+
log.Printf("Error loading new config (attempt %d/3): %v", retries+1, err)
166+
if retries < 2 {
167+
time.Sleep(time.Duration(1<<retries) * time.Second)
168+
}
169+
}
158170
if err != nil {
159-
log.Printf("Error loading new config: %v", err)
171+
log.Printf("Failed to load new config after retries: %v", err)
160172
return
161173
}
162174

proxy/helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func getTestSimpleResponderConfigPort(expectedMessage string, port int) ModelCon
6363

6464
// Create a process configuration
6565
return ModelConfig{
66-
Cmd: fmt.Sprintf("%s --port %d --silent --respond %s", binaryPath, port, expectedMessage),
66+
Cmd: fmt.Sprintf("%s --port %d --silent --respond %s", binaryPath, port, expectedMessage), // Re-added --silent
6767
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
6868
CheckEndpoint: "/health",
6969
}

proxy/logMonitor_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,18 @@ func TestLogMonitor(t *testing.T) {
2121
client2Messages := make([]byte, 0)
2222

2323
var wg sync.WaitGroup
24-
wg.Add(2) // One for each client
24+
wg.Add(2) // One for each client (goroutine)
2525

2626
// Write messages first
27-
logMonitor.Write([]byte("1"))
28-
logMonitor.Write([]byte("2"))
29-
logMonitor.Write([]byte("3"))
27+
if _, err := logMonitor.Write([]byte("1")); err != nil {
28+
t.Fatalf("Failed to write log message: %v", err)
29+
}
30+
if _, err := logMonitor.Write([]byte("2")); err != nil {
31+
t.Fatalf("Failed to write log message: %v", err)
32+
}
33+
if _, err := logMonitor.Write([]byte("3")); err != nil {
34+
t.Fatalf("Failed to write log message: %v", err)
35+
}
3036

3137
// Start goroutines to collect messages
3238
go func() {

proxy/process.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,9 @@ func (p *Process) stopCommand(sigtermTTL time.Duration) {
356356
select {
357357
case <-sigtermTimeout.Done():
358358
p.proxyLogger.Debugf("<%s> Process timed out waiting to stop, sending KILL signal (normal during shutdown)", p.ID)
359-
p.cmd.Process.Kill()
359+
if err := p.cmd.Process.Kill(); err != nil {
360+
p.proxyLogger.Errorf("<%s> Failed to kill process: %v", p.ID, err)
361+
}
360362
case err := <-p.cmdWaitChan:
361363
// Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK
362364
// because if we make it here then the cmd has been successfully running and made it

proxy/process_test.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@ var (
1818

1919
func init() {
2020
// flip to help with debugging tests
21-
if false {
21+
if false { // Reverted to false
2222
debugLogger.SetLogLevel(LevelDebug)
2323
} else {
24-
debugLogger.SetLogLevel(LevelError)
24+
debugLogger.SetLogLevel(LevelError) // This will now be active
2525
}
2626
}
2727

2828
func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
2929

3030
expectedMessage := "testing91931"
31-
config := getTestSimpleResponderConfig(expectedMessage)
31+
// Use a specific port for the first instance in this test
32+
config1 := getTestSimpleResponderConfigPort(expectedMessage, 12901)
3233

3334
// Create a process
34-
process := NewProcess("test-process", 5, config, debugLogger, debugLogger)
35+
process := NewProcess("test-process", 5, config1, debugLogger, debugLogger)
3536
defer process.Stop()
3637

3738
req := httptest.NewRequest("GET", "/test", nil)
@@ -48,6 +49,13 @@ func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
4849
// Stop the process
4950
process.Stop()
5051

52+
// Ensure the process is fully stopped and port is likely released
53+
time.Sleep(100 * time.Millisecond) // Small delay to help port release
54+
55+
// Use a different specific port for the second instance
56+
config2 := getTestSimpleResponderConfigPort(expectedMessage, 12902)
57+
process.config = config2 // Update the process config to use the new port
58+
5159
req = httptest.NewRequest("GET", "/", nil)
5260
w = httptest.NewRecorder()
5361

@@ -56,7 +64,7 @@ func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
5664

5765
// should have automatically started the process again
5866
if w.Code != http.StatusOK {
59-
t.Errorf("Expected status code %d, got %d", http.StatusOK, w.Code)
67+
t.Errorf("Expected status code %d, got %d (URL: %s, Port: %s)", http.StatusOK, w.Code, req.URL.Path, process.config.Proxy)
6068
}
6169
}
6270

@@ -65,7 +73,8 @@ func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
6573
func TestProcess_WaitOnMultipleStarts(t *testing.T) {
6674

6775
expectedMessage := "testing91931"
68-
config := getTestSimpleResponderConfig(expectedMessage)
76+
// Use a specific, high port for this test to reduce collision likelihood
77+
config := getTestSimpleResponderConfigPort(expectedMessage, 12903)
6978

7079
process := NewProcess("test-process", 5, config, debugLogger, debugLogger)
7180
defer process.Stop()

proxy/proxymanager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ type ProxyManager struct {
3737
processGroups map[string]*ProcessGroup
3838
}
3939

40-
// New creates a new ProxyManager with default loggers.
4140
func New(config Config) *ProxyManager {
4241
// set up loggers
4342
stdoutLogger := NewLogMonitorWriter(os.Stdout)
@@ -87,7 +86,6 @@ func New(config Config) *ProxyManager {
8786
return pm
8887
}
8988

90-
// setupGinEngine configures the Gin engine with all necessary routes and middleware
9189
func (pm *ProxyManager) setupGinEngine() {
9290
pm.ginEngine.Use(func(c *gin.Context) {
9391
// Start timer
@@ -208,6 +206,8 @@ func (pm *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
208206

209207
// StopProcesses acquires a lock and stops all running upstream processes.
210208
// This is the public method safe for concurrent calls.
209+
// Unlike Shutdown, this method only stops the processes but doesn't perform
210+
// a complete shutdown, allowing for process replacement without full termination.
211211
func (pm *ProxyManager) StopProcesses() {
212212
pm.Lock()
213213
defer pm.Unlock()

proxy/proxymanager_test.go

Lines changed: 107 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -251,41 +251,56 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
251251
}
252252

253253
func TestProxyManager_Shutdown(t *testing.T) {
254-
// Test Case 1: Startup failure due to unavailable proxy
255-
t.Run("startup failure with unavailable proxy", func(t *testing.T) {
256-
// Create configuration with invalid command that will fail immediately
257-
modelConfig := ModelConfig{
258-
Cmd: "/invalid-command", // Invalid executable path that will fail to start
259-
Proxy: "http://localhost:9991",
260-
CheckEndpoint: "/health",
261-
}
262-
263-
config := AddDefaultGroupToConfig(Config{
264-
HealthCheckTimeout: 15,
265-
Models: map[string]ModelConfig{
266-
"model1": modelConfig,
254+
// make broken model configurations
255+
model1Config := getTestSimpleResponderConfigPort("model1", 9991)
256+
model1Config.Proxy = "http://localhost:10001/"
257+
258+
model2Config := getTestSimpleResponderConfigPort("model2", 9992)
259+
model2Config.Proxy = "http://localhost:10002/"
260+
261+
model3Config := getTestSimpleResponderConfigPort("model3", 9993)
262+
model3Config.Proxy = "http://localhost:10003/"
263+
264+
config := AddDefaultGroupToConfig(Config{
265+
HealthCheckTimeout: 15,
266+
Models: map[string]ModelConfig{
267+
"model1": model1Config,
268+
"model2": model2Config,
269+
"model3": model3Config,
270+
},
271+
LogLevel: "error",
272+
Groups: map[string]GroupConfig{
273+
"test": {
274+
Swap: false,
275+
Members: []string{"model1", "model2", "model3"},
267276
},
268-
LogLevel: "error",
269-
})
277+
},
278+
})
270279

271-
proxy := New(config)
272-
defer proxy.Shutdown()
280+
proxy := New(config)
273281

274-
// Try to start the model
275-
reqBody := `{"model":"model1"}`
276-
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
277-
w := httptest.NewRecorder()
278-
proxy.ServeHTTP(w, req)
282+
// Start all the processes
283+
var wg sync.WaitGroup
284+
for _, modelName := range []string{"model1", "model2", "model3"} {
285+
wg.Add(1)
286+
go func(modelName string) {
287+
defer wg.Done()
288+
reqBody := fmt.Sprintf(`{"model":"%s"}`, modelName)
289+
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
290+
w := httptest.NewRecorder()
279291

280-
assert.Equal(t, http.StatusBadGateway, w.Code)
281-
assert.Contains(t, w.Body.String(), "unable to start process: start() failed: fork/exec /invalid-command: no such file or directory")
292+
// send a request to trigger the proxy to load ... this should hang waiting for start up
293+
proxy.ServeHTTP(w, req)
294+
assert.Equal(t, http.StatusBadGateway, w.Code)
295+
assert.Contains(t, w.Body.String(), "health check interrupted due to shutdown")
296+
}(modelName)
297+
}
282298

283-
// Verify process is tracked but in failed state
284-
processGroup := proxy.findGroupByModelName("model1")
285-
assert.NotNil(t, processGroup)
286-
process := processGroup.processes["model1"]
287-
assert.Equal(t, StateFailed, process.CurrentState())
288-
})
299+
go func() {
300+
<-time.After(time.Second)
301+
proxy.Shutdown()
302+
}()
303+
wg.Wait()
289304
}
290305

291306
func TestProxyManager_Unload(t *testing.T) {
@@ -382,8 +397,6 @@ func TestProxyManager_RunningEndpoint(t *testing.T) {
382397
})
383398
}
384399

385-
386-
387400
func TestProxyManager_AudioTranscriptionHandler(t *testing.T) {
388401
config := AddDefaultGroupToConfig(Config{
389402
HealthCheckTimeout: 15,
@@ -432,6 +445,68 @@ func TestProxyManager_AudioTranscriptionHandler(t *testing.T) {
432445
assert.Equal(t, strconv.Itoa(370+contentLength), response["h_content_length"])
433446
}
434447

448+
// Test useModelName in configuration sends overrides what is sent to upstream
449+
func TestProxyManager_UseModelName(t *testing.T) {
450+
upstreamModelName := "upstreamModel"
451+
452+
modelConfig := getTestSimpleResponderConfig(upstreamModelName)
453+
modelConfig.UseModelName = upstreamModelName
454+
455+
config := AddDefaultGroupToConfig(Config{
456+
HealthCheckTimeout: 15,
457+
Models: map[string]ModelConfig{
458+
"model1": modelConfig,
459+
},
460+
LogLevel: "error",
461+
})
462+
463+
proxy := New(config)
464+
defer proxy.StopProcesses()
465+
466+
requestedModel := "model1"
467+
468+
t.Run("useModelName over rides requested model: /v1/chat/completions", func(t *testing.T) {
469+
reqBody := fmt.Sprintf(`{"model":"%s"}`, requestedModel)
470+
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
471+
w := httptest.NewRecorder()
472+
473+
proxy.ServeHTTP(w, req)
474+
assert.Equal(t, http.StatusOK, w.Code)
475+
assert.Contains(t, w.Body.String(), upstreamModelName)
476+
})
477+
478+
t.Run("useModelName over rides requested model: /v1/audio/transcriptions", func(t *testing.T) {
479+
// Create a buffer with multipart form data
480+
var b bytes.Buffer
481+
w := multipart.NewWriter(&b)
482+
483+
// Add the model field
484+
fw, err := w.CreateFormField("model")
485+
assert.NoError(t, err)
486+
_, err = fw.Write([]byte(requestedModel))
487+
assert.NoError(t, err)
488+
489+
// Add a file field
490+
fw, err = w.CreateFormFile("file", "test.mp3")
491+
assert.NoError(t, err)
492+
_, err = fw.Write([]byte("test"))
493+
assert.NoError(t, err)
494+
w.Close()
495+
496+
// Create the request with the multipart form data
497+
req := httptest.NewRequest("POST", "/v1/audio/transcriptions", &b)
498+
req.Header.Set("Content-Type", w.FormDataContentType())
499+
rec := httptest.NewRecorder()
500+
proxy.ServeHTTP(rec, req)
501+
502+
// Verify the response
503+
assert.Equal(t, http.StatusOK, rec.Code)
504+
var response map[string]string
505+
err = json.Unmarshal(rec.Body.Bytes(), &response)
506+
assert.NoError(t, err)
507+
assert.Equal(t, upstreamModelName, response["model"])
508+
})
509+
}
435510

436511
func TestProxyManager_CORSOptionsHandler(t *testing.T) {
437512
config := AddDefaultGroupToConfig(Config{
@@ -501,69 +576,6 @@ func TestProxyManager_CORSOptionsHandler(t *testing.T) {
501576
}
502577
}
503578

504-
// Test useModelName in configuration sends overrides what is sent to upstream
505-
func TestProxyManager_UseModelName(t *testing.T) {
506-
upstreamModelName := "upstreamModel"
507-
508-
modelConfig := getTestSimpleResponderConfig(upstreamModelName)
509-
modelConfig.UseModelName = upstreamModelName
510-
511-
config := AddDefaultGroupToConfig(Config{
512-
HealthCheckTimeout: 15,
513-
Models: map[string]ModelConfig{
514-
"model1": modelConfig,
515-
},
516-
LogLevel: "error",
517-
})
518-
519-
proxy := New(config)
520-
defer proxy.StopProcesses()
521-
522-
requestedModel := "model1"
523-
524-
t.Run("useModelName over rides requested model: /v1/chat/completions", func(t *testing.T) {
525-
reqBody := fmt.Sprintf(`{"model":"%s"}`, requestedModel)
526-
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
527-
w := httptest.NewRecorder()
528-
529-
proxy.ServeHTTP(w, req)
530-
assert.Equal(t, http.StatusOK, w.Code)
531-
assert.Contains(t, w.Body.String(), upstreamModelName)
532-
})
533-
534-
t.Run("useModelName over rides requested model: /v1/audio/transcriptions", func(t *testing.T) {
535-
// Create a buffer with multipart form data
536-
var b bytes.Buffer
537-
w := multipart.NewWriter(&b)
538-
539-
// Add the model field
540-
fw, err := w.CreateFormField("model")
541-
assert.NoError(t, err)
542-
_, err = fw.Write([]byte(requestedModel))
543-
assert.NoError(t, err)
544-
545-
// Add a file field
546-
fw, err = w.CreateFormFile("file", "test.mp3")
547-
assert.NoError(t, err)
548-
_, err = fw.Write([]byte("test"))
549-
assert.NoError(t, err)
550-
w.Close()
551-
552-
// Create the request with the multipart form data
553-
req := httptest.NewRequest("POST", "/v1/audio/transcriptions", &b)
554-
req.Header.Set("Content-Type", w.FormDataContentType())
555-
rec := httptest.NewRecorder()
556-
proxy.ServeHTTP(rec, req)
557-
558-
// Verify the response
559-
assert.Equal(t, http.StatusOK, rec.Code)
560-
var response map[string]string
561-
err = json.Unmarshal(rec.Body.Bytes(), &response)
562-
assert.NoError(t, err)
563-
assert.Equal(t, upstreamModelName, response["model"])
564-
})
565-
}
566-
567579
func TestProxyManager_Upstream(t *testing.T) {
568580
config := AddDefaultGroupToConfig(Config{
569581
HealthCheckTimeout: 15,

0 commit comments

Comments
 (0)