Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
gopkg.in/yaml.v3 v3.0.1
github.com/kelindar/event v1.5.2
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kelindar/event v1.5.2 h1:qtgssZqMh/QQMCIxlbx4wU3DoMHOrJXKdiZhphJ4YbY=
github.com/kelindar/event v1.5.2/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
Expand Down
216 changes: 101 additions & 115 deletions llama-swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/gin-gonic/gin"
"github.com/kelindar/event"
"github.com/mostlygeek/llama-swap/proxy"
)

Expand Down Expand Up @@ -53,144 +54,129 @@ func main() {
gin.SetMode(gin.ReleaseMode)
}

proxyManager := proxy.New(config)

// Setup channels for server management
reloadChan := make(chan *proxy.ProxyManager)
exitChan := make(chan struct{})
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Create server with initial handler
srv := &http.Server{
Addr: *listenStr,
Handler: proxyManager,
Addr: *listenStr,
}

// Start server
fmt.Printf("llama-swap listening on %s\n", *listenStr)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("Fatal server error: %v\n", err)
close(exitChan)
}
}()

// Handle config reloads and signals
go func() {
currentManager := proxyManager
for {
select {
case newManager := <-reloadChan:
log.Println("Config change detected, waiting for in-flight requests to complete...")
// Stop old manager processes gracefully (this waits for in-flight requests)
currentManager.StopProcesses(proxy.StopWaitForInflightRequest)
// Now do a full shutdown to clear the process map
currentManager.Shutdown()
currentManager = newManager
srv.Handler = newManager
log.Println("Server handler updated with new config")
case sig := <-sigChan:
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
currentManager.Shutdown()
if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
// Support for watching config and reloading when it changes
reloadProxyManager := func() {
if currentPM, ok := srv.Handler.(*proxy.ProxyManager); ok {
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Warning, unable to reload configuration: %v\n", err)
return
}
}
}()

// Start file watcher if requested
if *watchConfig {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
log.Printf("Error getting absolute path for config: %v. File watching disabled.", err)
fmt.Println("Configuration Changed")
currentPM.Shutdown()
srv.Handler = proxy.New(config)
fmt.Println("Configuration Reloaded")

// wait a few seconds and tell any UI to reload
time.AfterFunc(3*time.Second, func() {
event.Emit(proxy.ConfigFileChangedEvent{
ReloadingState: proxy.ReloadingStateEnd,
})
})
} else {
go watchConfigFileWithReload(absConfigPath, reloadChan)
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Error, unable to load configuration: %v\n", err)
os.Exit(1)
}
srv.Handler = proxy.New(config)
}
}

// Wait for exit signal
<-exitChan
}

// watchConfigFileWithReload monitors the configuration file and sends new ProxyManager instances through reloadChan.
func watchConfigFileWithReload(configPath string, reloadChan chan<- *proxy.ProxyManager) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("Error creating file watcher: %v. File watching disabled.", err)
return
}
defer watcher.Close()

err = watcher.Add(configPath)
if err != nil {
log.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", configPath, err)
return
}

log.Printf("Watching config file for changes: %s", configPath)
// load the initial proxy manager
reloadProxyManager()
debouncedReload := debounce(time.Second, reloadProxyManager)
if *watchConfig {
defer event.On(func(e proxy.ConfigFileChangedEvent) {
if e.ReloadingState == proxy.ReloadingStateStart {
debouncedReload()
}
})()

var debounceTimer *time.Timer
debounceDuration := 2 * time.Second
fmt.Println("Watching Configuration for changes")
go func() {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
fmt.Printf("Error getting absolute path for watching config file: %v\n", err)
return
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
fmt.Printf("Error creating file watcher: %v. File watching disabled.\n", err)
return
}

for {
select {
case event, ok := <-watcher.Events:
if !ok {
err = watcher.Add(absConfigPath)
if err != nil {
fmt.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", absConfigPath, err)
return
}
// We only care about writes/creates to the specific config file
if event.Name == configPath && (event.Has(fsnotify.Write) || event.Has(fsnotify.Create) || event.Has(fsnotify.Remove)) {
// Reset or start the debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
debounceTimer = time.AfterFunc(debounceDuration, func() {
log.Printf("Config file modified: %s, reloading...", event.Name)

// Try up to 3 times with exponential backoff
var newConfig proxy.Config
var err error
for retries := 0; retries < 3; retries++ {
// Load new configuration
newConfig, err = proxy.LoadConfig(configPath)
if err == nil {
break
}
log.Printf("Error loading new config (attempt %d/3): %v", retries+1, err)
if retries < 2 {
time.Sleep(time.Duration(1<<retries) * time.Second)
}
}
if err != nil {
log.Printf("Failed to load new config after retries: %v", err)
return
}

// Create new ProxyManager with new config
newPM := proxy.New(newConfig)
reloadChan <- newPM
log.Println("Config reloaded successfully")
if (event.Has(fsnotify.Remove)) {
// re-add watcher
err = watcher.Add(configPath)
if err != nil {
log.Printf("Could not re-add watcher for %s: %s", configPath, err)
}
defer watcher.Close()
for {
select {
case changeEvent := <-watcher.Events:
if changeEvent.Name == absConfigPath && (changeEvent.Has(fsnotify.Write) || changeEvent.Has(fsnotify.Create) || changeEvent.Has(fsnotify.Remove)) {
event.Emit(proxy.ConfigFileChangedEvent{
ReloadingState: proxy.ReloadingStateStart,
})
}
})
}
case err, ok := <-watcher.Errors:
if !ok {
log.Println("File watcher error channel closed.")
return

case err := <-watcher.Errors:
log.Printf("File watcher error: %v", err)
}
}
log.Printf("File watcher error: %v", err)
}()
}

// shutdown on signal
go func() {
sig := <-sigChan
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

if pm, ok := srv.Handler.(*proxy.ProxyManager); ok {
pm.Shutdown()
} else {
fmt.Println("srv.Handler is not of type *proxy.ProxyManager")
}

if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
}()

// Start server
fmt.Printf("llama-swap listening on %s\n", *listenStr)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Fatal server error: %v\n", err)
}
}()

// Wait for exit signal
<-exitChan
}

func debounce(interval time.Duration, f func()) func() {
var timer *time.Timer
return func() {
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(interval, f)
}
}
49 changes: 49 additions & 0 deletions proxy/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package proxy

// package level registry of the different event types

const ProcessStateChangeEventID = 0x01
const ChatCompletionStatsEventID = 0x02
const ConfigFileChangedEventID = 0x03
const LogDataEventID = 0x04

type ProcessStateChangeEvent struct {
ProcessName string
NewState ProcessState
OldState ProcessState
}

func (e ProcessStateChangeEvent) Type() uint32 {
return ProcessStateChangeEventID
}

type ChatCompletionStats struct {
TokensGenerated int
}

func (e ChatCompletionStats) Type() uint32 {
return ChatCompletionStatsEventID
}

type ReloadingState int

const (
ReloadingStateStart ReloadingState = iota
ReloadingStateEnd
)

type ConfigFileChangedEvent struct {
ReloadingState ReloadingState
}

func (e ConfigFileChangedEvent) Type() uint32 {
return ConfigFileChangedEventID
}

type LogDataEvent struct {
Data []byte
}

func (e LogDataEvent) Type() uint32 {
return LogDataEventID
}
45 changes: 14 additions & 31 deletions proxy/logMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package proxy

import (
"container/ring"
"context"
"fmt"
"io"
"os"
"sync"

"github.com/kelindar/event"
)

type LogLevel int
Expand All @@ -18,7 +21,7 @@ const (
)

type LogMonitor struct {
clients map[chan []byte]bool
eventbus *event.Dispatcher
mu sync.RWMutex
buffer *ring.Ring
bufferMu sync.RWMutex
Expand All @@ -37,11 +40,11 @@ func NewLogMonitor() *LogMonitor {

func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
return &LogMonitor{
clients: make(map[chan []byte]bool),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
eventbus: event.NewDispatcher(),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
}
}

Expand Down Expand Up @@ -81,34 +84,14 @@ func (w *LogMonitor) GetHistory() []byte {
return history
}

func (w *LogMonitor) Subscribe() chan []byte {
w.mu.Lock()
defer w.mu.Unlock()

ch := make(chan []byte, 100)
w.clients[ch] = true
return ch
}

func (w *LogMonitor) Unsubscribe(ch chan []byte) {
w.mu.Lock()
defer w.mu.Unlock()

delete(w.clients, ch)
close(ch)
func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc {
return event.Subscribe(w.eventbus, func(e LogDataEvent) {
callback(e.Data)
})
}

func (w *LogMonitor) broadcast(msg []byte) {
w.mu.RLock()
defer w.mu.RUnlock()

for client := range w.clients {
select {
case client <- msg:
default:
// If client buffer is full, skip
}
}
event.Publish(w.eventbus, LogDataEvent{Data: msg})
}

func (w *LogMonitor) SetPrefix(prefix string) {
Expand Down
Loading