Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
f669140
feat: add activity page and handler
g2mt Jul 18, 2025
f26779b
feat: add metrics logging and UI display
g2mt Jul 18, 2025
822e2cf
feat: add metrics parser and model-specific metrics endpoint
g2mt Jul 18, 2025
198646d
run go fmt
g2mt Jul 18, 2025
e9edd96
refactor: remove GetLatestMetrics and update apiGetMetrics to collect…
g2mt Jul 18, 2025
0d74cf0
refactor: remove ParseLogData from MetricsParser and refactor Process…
g2mt Jul 18, 2025
fc3ca90
refactor: update metrics parsing and API to use input/output tokens
g2mt Jul 18, 2025
67fd770
Remove setInterval
g2mt Jul 18, 2025
8b49999
Rename table column
g2mt Jul 18, 2025
d3f0147
Remove colors, hide token count if zero
g2mt Jul 18, 2025
b274f25
use - for empty token count column
g2mt Jul 18, 2025
75b2cdf
Add metricsMaxInMemory to config
g2mt Jul 18, 2025
4a26d32
Fix whitespace
g2mt Jul 18, 2025
ffd8dae
Remove getSummary
g2mt Jul 18, 2025
bacad51
refactor: remove model-specific metrics parsing and API endpoints
g2mt Jul 18, 2025
b3f5d2b
update tests
g2mt Jul 18, 2025
7ee133b
Run fetchMetrics on mount
g2mt Jul 18, 2025
e9a4156
refactor: update metrics parser to simplify method signatures
g2mt Jul 18, 2025
9f22155
Rename addMetric
g2mt Jul 18, 2025
fea861d
feat: add config-based metrics parser initialization
g2mt Jul 18, 2025
99b3eb2
remove newline
g2mt Jul 18, 2025
39908a1
feat: add metrics persistence to file
g2mt Jul 18, 2025
2fee6b8
document metricsLogPath in example config
g2mt Jul 18, 2025
4d30155
Add MetricsMaxInMemory to windows test
g2mt Jul 18, 2025
749ace4
Check if pm.metricsParser is nil in apiGetMetrics
g2mt Jul 19, 2025
fd7f626
feat: add metricsUseServerResponse config and update proxy logic
g2mt Jul 19, 2025
91b7efe
chore: add metricsUseServerResponse config option
g2mt Jul 19, 2025
9749b69
feat: add useServerResponse to MetricsParser
g2mt Jul 19, 2025
158a202
fix
g2mt Jul 19, 2025
f5b60a0
correct comment
g2mt Jul 19, 2025
6a84eab
Merge remote-tracking branch 'fork/activity-page' into activity-page
g2mt Jul 19, 2025
55efb27
refactor: update generation speed calculation
g2mt Jul 19, 2025
0e79f64
refactor: unify stdout and stderr handling in process.go
g2mt Jul 19, 2025
52436fd
feat: add streaming response handling in proxyOAIHandler
g2mt Jul 19, 2025
4f0ee68
feat: add log event subscription management
g2mt Jul 19, 2025
09e1e95
remove import
g2mt Jul 19, 2025
f473788
Use bufio.NewScanner to parse stdout lines
g2mt Jul 19, 2025
6d7bca3
use custom response recorder
g2mt Jul 19, 2025
f33222d
add bufio import
g2mt Jul 19, 2025
5cfbb84
refactor metrics debug logging
g2mt Jul 19, 2025
cdbc196
Remove metricsLogPath
g2mt Jul 19, 2025
c8be9ad
Merge branch 'activity-page-remove-httptest' into activity-page
g2mt Jul 19, 2025
8746468
Move responserecorder to another file
g2mt Jul 19, 2025
f574b6c
Merge branch 'activity-page-stream' into activity-page
g2mt Jul 19, 2025
e516610
move StreamingResponseRecorder to separate file
g2mt Jul 19, 2025
037e7d9
Rename responserecorder.go, remove NewResponseRecorder
g2mt Jul 19, 2025
3616243
Add Activity streaming
g2mt Jul 19, 2025
9508b7c
add fmt
g2mt Jul 19, 2025
814b533
Remove first fetch
g2mt Jul 19, 2025
b6b8046
fix missing !
g2mt Jul 19, 2025
f36cda1
Rename to StreamingResponseRecorder
g2mt Jul 19, 2025
0c44cfd
Refactor response recorder functions into a single middleware
g2mt Jul 19, 2025
2aa1c38
Rename metrics parser to MetricsMonitor
g2mt Jul 19, 2025
724d270
Rename to metricsDataCancel
g2mt Jul 19, 2025
cc8a1bb
Rename ResponseMiddleware to MetricsMiddleware, process HTTP request …
g2mt Jul 19, 2025
55516c6
Move log parsing to SubscribeToProcessLogs
g2mt Jul 19, 2025
9286fd9
Remove unused log
g2mt Jul 19, 2025
0c92922
Extract metrics recording into method
g2mt Jul 19, 2025
7a9a413
Add comment to OutputTokens
g2mt Jul 19, 2025
df3fbdb
Rename generationSpeed
g2mt Jul 19, 2025
990e8bd
Add comments for regexes
g2mt Jul 19, 2025
0e250a4
Add an ID for token metrics
g2mt Jul 19, 2025
252b451
Refactor into GetMetricsJSONByLines
g2mt Jul 19, 2025
1f5f850
cleanup
g2mt Jul 19, 2025
c80557c
add back toLocaleString
g2mt Jul 19, 2025
7c875e6
Merge branch 'mostlygeek:main' into activity-page
g2mt Jul 19, 2025
a2c6451
revert change
g2mt Jul 19, 2025
7a07a09
Remove GetMetricsJSONByLines
g2mt Jul 19, 2025
1d260f9
Remove apiStreamMetrics and move streaming into /api/events
g2mt Jul 20, 2025
1c6543c
Fix switch case variable declaration scope issue.
g2mt Jul 20, 2025
2e37d38
Fix loading state logic to handle empty metrics.
g2mt Jul 20, 2025
af4b4dc
Remove loading state from Activity
g2mt Jul 20, 2025
464d91c
Remove subtitle
g2mt Jul 20, 2025
ef97c68
fix style
g2mt Jul 20, 2025
6fd771b
remove debug logger
g2mt Jul 20, 2025
5ce9abd
Remove metricsMonitor event bus
g2mt Jul 20, 2025
6ed6dc8
remove nil checks
g2mt Jul 20, 2025
a0d1161
refactor MetricsMiddleware
g2mt Jul 20, 2025
e836ebc
Clean up ResponseWriter
g2mt Jul 20, 2025
93af520
refactor metrics middleware
g2mt Jul 20, 2025
741ca5a
add comment
g2mt Jul 20, 2025
8f2d75b
Remove flush
g2mt Jul 20, 2025
ce27be5
remove mm from irrelevant endpoints
g2mt Jul 22, 2025
b5ad4d9
move requested model parsing to ls-requested-model key
g2mt Jul 22, 2025
0813157
Remove import
g2mt Jul 22, 2025
2de9250
rm
g2mt Jul 22, 2025
82522b6
Add MiddlewareWritesMetrics tests
g2mt Jul 22, 2025
bab8b79
Rename metrics parser
g2mt Jul 22, 2025
f4288bb
record modelName for metrics in proxyOAIHandler
g2mt Jul 22, 2025
cd9dc5b
Add streaming to simple-responder.go
g2mt Jul 22, 2025
3a94a96
hide stream behind url query
g2mt Jul 22, 2025
19ca3a8
wrong test
g2mt Jul 22, 2025
4b0e94f
Convert to interface{}
g2mt Jul 22, 2025
a245674
fix test
g2mt Jul 22, 2025
05509e6
Get realModelName in middleware
g2mt Jul 22, 2025
242da36
add startTime
g2mt Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ healthCheckTimeout: 500
# - Valid log levels: debug, info, warn, error
logLevel: info

# metricsMaxInMemory: maximum number of metrics to keep in memory
# - optional, default: 1000
# - controls how many metrics are stored in memory before older ones are discarded
# - useful for limiting memory usage when processing large volumes of metrics
metricsMaxInMemory: 1000

# metricsLogPath: sets the path to the metrics log file
# - optional, default: empty string
# - if set, metrics will be logged to the specified file
# - useful for persistent logging of metrics
metricsLogPath: /path/to/metrics.log

# startPort: sets the starting port number for the automatic ${PORT} macro.
# - optional, default: 5800
# - the ${PORT} macro can be used in model.cmd and model.proxy settings
Expand Down Expand Up @@ -200,4 +212,4 @@ groups:
members:
- "forever-modelA"
- "forever-modelB"
- "forever-modelc"
- "forever-modelc"
1 change: 1 addition & 0 deletions docker/config.example.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
healthCheckTimeout: 300
logRequests: true
metricsMaxInMemory: 1000

models:
"qwen2.5":
Expand Down
4 changes: 4 additions & 0 deletions proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ type Config struct {
HealthCheckTimeout int `yaml:"healthCheckTimeout"`
LogRequests bool `yaml:"logRequests"`
LogLevel string `yaml:"logLevel"`
MetricsLogPath string `yaml:"metricsLogPath"`
MetricsMaxInMemory int `yaml:"metricsMaxInMemory"`
Models map[string]ModelConfig `yaml:"models"` /* key is model ID */
Profiles map[string][]string `yaml:"profiles"`
Groups map[string]GroupConfig `yaml:"groups"` /* key is group ID */
Expand Down Expand Up @@ -194,6 +196,8 @@ func LoadConfigFromReader(r io.Reader) (Config, error) {
HealthCheckTimeout: 120,
StartPort: 5800,
LogLevel: "info",
MetricsLogPath: "",
MetricsMaxInMemory: 1000,
}
err = yaml.Unmarshal(data, &config)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions proxy/config_posix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ groups:
},
},
HealthCheckTimeout: 15,
MetricsMaxInMemory: 1000,
Profiles: map[string][]string{
"test": {"model1", "model2"},
},
Expand Down
195 changes: 195 additions & 0 deletions proxy/metrics_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package proxy

import (
"bufio"
"encoding/json"
"os"
"regexp"
"strconv"
"sync"
"time"
)

// TokenMetrics represents parsed token statistics from llama-server logs
type TokenMetrics struct {
Timestamp time.Time `json:"timestamp"`
Model string `json:"model"`
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
TokensPerSecond float64 `json:"tokens_per_second"`
DurationMs int `json:"duration_ms"`
}

// MetricsParser parses llama-server output for token statistics
type MetricsParser struct {
mu sync.RWMutex
metrics []TokenMetrics
maxMetrics int
logPath string
promptEvalRegex *regexp.Regexp
evalRegex *regexp.Regexp
proxyLogger *LogMonitor
}

// NewMetricsParser creates a new metrics parser
func NewMetricsParser(config *Config, proxyLogger *LogMonitor) *MetricsParser {
maxMetrics := config.MetricsMaxInMemory
if maxMetrics <= 0 {
maxMetrics = 1000 // Default fallback
}

mp := &MetricsParser{
maxMetrics: maxMetrics,
logPath: config.MetricsLogPath,
promptEvalRegex: regexp.MustCompile(`prompt eval time\s*=\s*(\d+(?:\.\d+)?)\s*ms\s*/\s*(\d+)\s*tokens\s*\(\s*(\d+(?:\.\d+)?)\s*ms per token,\s*(\d+(?:\.\d+)?)\s*tokens per second\s*\)`),
evalRegex: regexp.MustCompile(`eval time\s*=\s*(\d+(?:\.\d+)?)\s*ms\s*/\s*(\d+)\s*tokens\s*\(\s*(\d+(?:\.\d+)?)\s*ms per token,\s*(\d+(?:\.\d+)?)\s*tokens per second\s*\)`),
proxyLogger: proxyLogger,
}

// Load existing metrics from file if path is provided
if config.MetricsLogPath != "" {
_ = mp.LoadMetrics() // Only warn, don't error as requested
}

return mp
}

// LoadMetrics loads metrics from the JSONL file
func (mp *MetricsParser) LoadMetrics() error {
if mp.logPath == "" {
return nil
}

file, err := os.Open(mp.logPath)
if err != nil {
if os.IsNotExist(err) {
// File doesn't exist yet, which is fine
return nil
}
if mp.proxyLogger != nil {
mp.proxyLogger.Warnf("Failed to open metrics log file for reading: %v", err)
}
return err
}
defer file.Close()

mp.mu.Lock()
defer mp.mu.Unlock()

// Use bufio.Scanner to read line by line
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
lineNum++
line := scanner.Text()
if len(line) == 0 {
continue
}

var metric TokenMetrics
if err := json.Unmarshal([]byte(line), &metric); err != nil {
if mp.proxyLogger != nil {
mp.proxyLogger.Warnf("Skipping malformed metrics line %d: %v", lineNum, err)
}
continue
}
mp.metrics = append(mp.metrics, metric)
}

// Keep only the most recent metrics if we loaded too many
if len(mp.metrics) > mp.maxMetrics {
mp.metrics = mp.metrics[len(mp.metrics)-mp.maxMetrics:]
}

if err := scanner.Err(); err != nil && mp.proxyLogger != nil {
mp.proxyLogger.Warnf("Error reading metrics log file: %v", err)
}

return scanner.Err()
}

// addMetrics adds a new metric to the collection and appends to file if configured
func (mp *MetricsParser) addMetrics(metric TokenMetrics) {
mp.mu.Lock()
defer mp.mu.Unlock()

mp.metrics = append(mp.metrics, metric)
if len(mp.metrics) > mp.maxMetrics {
mp.metrics = mp.metrics[len(mp.metrics)-mp.maxMetrics:]
}

// Append to JSONL file if path is configured
if mp.logPath != "" {
mp.appendToFile(metric)
}
}

// appendToFile appends a single metric to the JSONL file
func (mp *MetricsParser) appendToFile(metric TokenMetrics) {
file, err := os.OpenFile(mp.logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
if mp.proxyLogger != nil {
mp.proxyLogger.Warnf("Failed to open metrics log file for appending: %v", err)
}
return
}
defer file.Close()

jsonData, err := json.Marshal(metric)
if err != nil {
if mp.proxyLogger != nil {
mp.proxyLogger.Warnf("Failed to marshal metrics data: %v", err)
}
return
}

// Append newline and write
jsonData = append(jsonData, '\n')
if _, err := file.Write(jsonData); err != nil {
if mp.proxyLogger != nil {
mp.proxyLogger.Warnf("Failed to write metrics to log file: %v", err)
}
}
}

// ParseLogLine parses a single log line for token metrics
func (mp *MetricsParser) ParseLogLine(line string, modelName string) {
if matches := mp.promptEvalRegex.FindStringSubmatch(line); matches != nil {
// Check for prompt evaluation metrics (input tokens)
durationMs, _ := strconv.ParseFloat(matches[1], 64)
tokens, _ := strconv.Atoi(matches[2])
tokensPerSecond, _ := strconv.ParseFloat(matches[4], 64)

metrics := TokenMetrics{
Timestamp: time.Now(),
Model: modelName,
InputTokens: tokens,
OutputTokens: 0,
TokensPerSecond: tokensPerSecond,
DurationMs: int(durationMs),
}
mp.addMetrics(metrics)
} else if matches := mp.evalRegex.FindStringSubmatch(line); matches != nil {
// Check for evaluation metrics (output tokens)
durationMs, _ := strconv.ParseFloat(matches[1], 64)
tokens, _ := strconv.Atoi(matches[2])
tokensPerSecond, _ := strconv.ParseFloat(matches[4], 64)

metrics := TokenMetrics{
Timestamp: time.Now(),
Model: modelName,
InputTokens: 0,
OutputTokens: tokens,
TokensPerSecond: tokensPerSecond,
DurationMs: int(durationMs),
}
mp.addMetrics(metrics)
}
}

// GetMetricsJSON returns metrics as JSON
func (mp *MetricsParser) GetMetricsJSON() ([]byte, error) {
mp.mu.RLock()
defer mp.mu.RUnlock()
return json.Marshal(mp.metrics)
}
42 changes: 39 additions & 3 deletions proxy/process.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxy

import (
"bufio"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -49,6 +50,7 @@ type Process struct {

processLogger *LogMonitor
proxyLogger *LogMonitor
metricsParser *MetricsParser

healthCheckTimeout int
healthCheckLoopInterval time.Duration
Expand All @@ -73,7 +75,7 @@ type Process struct {
failedStartCount int
}

func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process {
func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor, metricsParser *MetricsParser) *Process {
concurrentLimit := 10
if config.ConcurrencyLimit > 0 {
concurrentLimit = config.ConcurrencyLimit
Expand All @@ -86,6 +88,7 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLo
cancelUpstream: nil,
processLogger: processLogger,
proxyLogger: proxyLogger,
metricsParser: metricsParser,
healthCheckTimeout: healthCheckTimeout,
healthCheckLoopInterval: 5 * time.Second, /* default, can not be set by user - used for testing */
state: StateStopped,
Expand Down Expand Up @@ -193,15 +196,23 @@ func (p *Process) start() error {
defer p.waitStarting.Done()
cmdContext, ctxCancelUpstream := context.WithCancel(context.Background())

// Create a pipe to capture stdout and feed both LogMonitor and MetricsParser
stdoutReader, stdoutWriter := io.Pipe()
stderrReader, stderrWriter := io.Pipe()

p.cmd = exec.CommandContext(cmdContext, args[0], args[1:]...)
p.cmd.Stdout = p.processLogger
p.cmd.Stderr = p.processLogger
p.cmd.Stdout = stdoutWriter
p.cmd.Stderr = stderrWriter
p.cmd.Env = append(p.cmd.Environ(), p.config.Env...)
p.cmd.Cancel = p.cmdStopUpstreamProcess
p.cmd.WaitDelay = p.gracefulStopTimeout
p.cancelUpstream = ctxCancelUpstream
p.cmdWaitChan = make(chan struct{})

// Start goroutines to process stdout and stderr
go p.processOutput(stdoutReader, "stdout")
go p.processOutput(stderrReader, "stderr")

p.failedStartCount++ // this will be reset to zero when the process has successfully started

p.proxyLogger.Debugf("<%s> Executing start command: %s, env: %s", p.ID, strings.Join(args, " "), strings.Join(p.config.Env, ", "))
Expand Down Expand Up @@ -512,6 +523,31 @@ func (p *Process) waitForCmd() {
close(p.cmdWaitChan)
}

// processOutput reads from a pipe and sends data to both LogMonitor and MetricsParser
func (p *Process) processOutput(reader *io.PipeReader, streamType string) {
defer reader.Close()

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}

// Send to LogMonitor for logging
p.processLogger.Write([]byte(line + "\n"))

// Send to MetricsParser for parsing
if p.metricsParser != nil {
p.metricsParser.ParseLogLine(line, p.ID)
}
}

if err := scanner.Err(); err != nil && err != io.EOF {
p.proxyLogger.Errorf("<%s> Error reading %s: %v", p.ID, streamType, err)
}
}

// cmdStopUpstreamProcess attemps to stop the upstream process gracefully
func (p *Process) cmdStopUpstreamProcess() error {
p.processLogger.Debugf("<%s> cmdStopUpstreamProcess() initiating graceful stop of upstream process", p.ID)
Expand Down
Loading
Loading