generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 195
Redesign EPP Metrics Pipeline to be Model Server Agnostic #461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
k8s-ci-robot
merged 19 commits into
kubernetes-sigs:main
from
BenjaminBraunDev:metrics_refactor
Mar 14, 2025
Merged
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
214905d
start adding metrics changes for trion support
BenjaminBraunDev 6125054
Refactor metrics to work with any prometheus metric naming convention…
BenjaminBraunDev 71e00ad
Finalize metric refactor and testing.
BenjaminBraunDev dd2825f
Set streaming env var to false in triton ext_proc.yaml
BenjaminBraunDev aa2ee06
Update titon server deployment to pull frozen repo branch instead of …
BenjaminBraunDev d4c083e
Remove model server specific metric files and tests and point EPP ima…
BenjaminBraunDev df3f3e3
Remove commented prints and old comments.
BenjaminBraunDev 558132e
Remove triton support for now, make metrics mapping 1-to-1 with load …
BenjaminBraunDev 5838459
moved files for cleaner diff
BenjaminBraunDev 1c367a6
re-add todos and rename kv flag to reflect percentage usage.
BenjaminBraunDev 3356bd3
Fix nits, move logging channel for backend/metrics.go from default to…
BenjaminBraunDev 371fd58
Rebase into metric agnostic redesign.
BenjaminBraunDev 97fd0de
Merge getLatestMetric and getLabeledMetric.
BenjaminBraunDev 27b34e9
Remove unused datastore types.
BenjaminBraunDev 4b84744
Fix lint.
BenjaminBraunDev 66e0376
Remove log and fix nits.
BenjaminBraunDev 9f4859b
Move ext_proc and inferencemodel yaml files back, fix nits and remove…
BenjaminBraunDev c082e86
Remove the rest of logging from metrics.go and tests.
BenjaminBraunDev 81ee1e6
Add trace log to podmetrics and small warning fix to metrics_spec_test.
BenjaminBraunDev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,272 @@ | ||
| /* | ||
| Copyright 2025 The Kubernetes Authors. | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package metrics | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "net/http" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/go-logr/logr" | ||
| dto "github.com/prometheus/client_model/go" | ||
| "github.com/prometheus/common/expfmt" | ||
| "go.uber.org/multierr" | ||
| "sigs.k8s.io/controller-runtime/pkg/log" | ||
| logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" | ||
| ) | ||
|
|
||
| const ( | ||
| // LoRA metrics based on protocol | ||
| LoraRequestInfoRunningAdaptersMetricName = "running_lora_adapters" | ||
| LoraRequestInfoWaitingAdaptersMetricName = "waiting_lora_adapters" | ||
| LoraRequestInfoMaxAdaptersMetricName = "max_lora" | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
|
|
||
| type PodMetricsClientImpl struct { | ||
| MetricMapping *MetricMapping | ||
| } | ||
|
|
||
| // FetchMetrics fetches metrics from a given pod. | ||
| func (p *PodMetricsClientImpl) FetchMetrics( | ||
| ctx context.Context, | ||
| pod *Pod, | ||
| existing *Metrics, | ||
| port int32, | ||
| ) (*Metrics, error) { | ||
| logger := log.FromContext(ctx) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| loggerDefault := logger.V(logutil.DEFAULT) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Currently the metrics endpoint is hard-coded, which works with vLLM. | ||
| // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. | ||
| url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics" | ||
|
|
||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
| if err != nil { | ||
| loggerDefault.Error(err, "Failed create HTTP request", "method", http.MethodGet, "url", url) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil, fmt.Errorf("failed to create request: %v", err) | ||
| } | ||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| loggerDefault.Error(err, "Failed to fetch metrics", "pod", pod.NamespacedName) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod.NamespacedName, err) | ||
| } | ||
| defer func() { | ||
| _ = resp.Body.Close() | ||
| }() | ||
|
|
||
| if resp.StatusCode != http.StatusOK { | ||
| loggerDefault.Error(nil, "Unexpected status code returned", "pod", pod.NamespacedName, "statusCode", resp.StatusCode) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil, fmt.Errorf("unexpected status code from %s: %v", pod.NamespacedName, resp.StatusCode) | ||
| } | ||
|
|
||
| parser := expfmt.TextParser{} | ||
| metricFamilies, err := parser.TextToMetricFamilies(resp.Body) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return p.promToPodMetrics(logger, metricFamilies, existing) | ||
| } | ||
|
|
||
| // promToPodMetrics updates internal pod metrics with scraped Prometheus metrics. | ||
| func (p *PodMetricsClientImpl) promToPodMetrics( | ||
| logger logr.Logger, | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| metricFamilies map[string]*dto.MetricFamily, | ||
| existing *Metrics, | ||
| ) (*Metrics, error) { | ||
| var errs error | ||
| updated := existing.Clone() | ||
|
|
||
| if p.MetricMapping.TotalQueuedRequests != nil { | ||
| queued, err := p.getMetric(logger, metricFamilies, *p.MetricMapping.TotalQueuedRequests) | ||
| if err == nil { | ||
| updated.WaitingQueueSize = int(queued.GetGauge().GetValue()) | ||
| } else { | ||
| errs = multierr.Append(errs, err) | ||
| } | ||
| } | ||
|
|
||
| if p.MetricMapping.KVCacheUtilization != nil { | ||
| usage, err := p.getMetric(logger, metricFamilies, *p.MetricMapping.KVCacheUtilization) | ||
| if err == nil { | ||
| updated.KVCacheUsagePercent = usage.GetGauge().GetValue() | ||
| } else { | ||
| errs = multierr.Append(errs, err) | ||
| } | ||
| } | ||
|
|
||
| // Handle LoRA metrics (only if all LoRA MetricSpecs are present) | ||
| if p.MetricMapping.LoraRequestInfo != nil { | ||
| loraMetrics, _, err := p.getLatestLoraMetric(logger, metricFamilies) | ||
| errs = multierr.Append(errs, err) | ||
|
|
||
| if loraMetrics != nil { | ||
| updated.ActiveModels = make(map[string]int) | ||
| for _, label := range loraMetrics.GetLabel() { | ||
| if label.GetName() == LoraRequestInfoRunningAdaptersMetricName { | ||
| if label.GetValue() != "" { | ||
| adapterList := strings.Split(label.GetValue(), ",") | ||
| for _, adapter := range adapterList { | ||
| updated.ActiveModels[adapter] = 0 | ||
| } | ||
| } | ||
| } | ||
| if label.GetName() == LoraRequestInfoWaitingAdaptersMetricName { | ||
| if label.GetValue() != "" { | ||
| adapterList := strings.Split(label.GetValue(), ",") | ||
| for _, adapter := range adapterList { | ||
| updated.ActiveModels[adapter] = 0 | ||
| } | ||
| } | ||
| } | ||
| if label.GetName() == LoraRequestInfoMaxAdaptersMetricName { | ||
| if label.GetValue() != "" { | ||
| updated.MaxActiveModels, err = strconv.Atoi(label.GetValue()) | ||
| if err != nil { | ||
| errs = multierr.Append(errs, err) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return updated, errs | ||
| } | ||
|
|
||
| // getLatestLoraMetric gets latest lora metric series in gauge metric family `vllm:lora_requests_info` | ||
| // reason its specially fetched is because each label key value pair permutation generates new series | ||
| // and only most recent is useful. The value of each series is the creation timestamp so we can | ||
| // retrieve the latest by sorting the value. | ||
| func (p *PodMetricsClientImpl) getLatestLoraMetric(logger logr.Logger, metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) { | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if p.MetricMapping.LoraRequestInfo == nil { | ||
| return nil, time.Time{}, nil // No LoRA metrics configured | ||
| } | ||
|
|
||
| loraRequests, ok := metricFamilies[p.MetricMapping.LoraRequestInfo.MetricName] | ||
| if !ok { | ||
| logger.V(logutil.TRACE).Error(nil, "Metric family not found", "name", p.MetricMapping.LoraRequestInfo.MetricName) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil, time.Time{}, fmt.Errorf("metric family %q not found", p.MetricMapping.LoraRequestInfo.MetricName) | ||
| } | ||
|
|
||
| var latest *dto.Metric | ||
| var latestTs float64 // Use float64, as Gauge.Value is float64 | ||
|
|
||
| // Iterate over all metrics in the family. | ||
| for _, m := range loraRequests.GetMetric() { | ||
| running := "" | ||
| waiting := "" | ||
| // Check if the metric has the expected LoRA labels. This is important! | ||
| hasRequiredLabels := false | ||
| for _, lp := range m.GetLabel() { | ||
| switch lp.GetName() { | ||
| case LoraRequestInfoRunningAdaptersMetricName: | ||
| running = lp.GetValue() | ||
| hasRequiredLabels = true | ||
| case LoraRequestInfoWaitingAdaptersMetricName: | ||
| waiting = lp.GetValue() | ||
| hasRequiredLabels = true | ||
| } | ||
| } | ||
| // Skip if it does not have the lora labels | ||
| if !hasRequiredLabels { | ||
| continue | ||
| } | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Ignore metrics with both labels empty. | ||
| if running == "" && waiting == "" { | ||
| continue | ||
| } | ||
|
|
||
| // Select the metric with the *largest Gauge Value* (which represents the timestamp). | ||
| if m.GetGauge().GetValue() > latestTs { | ||
| latestTs = m.GetGauge().GetValue() | ||
| latest = m | ||
| } | ||
| } | ||
| if latest == nil { | ||
| logger.V(logutil.TRACE).Info("Metric value Empty", "value", latest, "metric", p.MetricMapping.LoraRequestInfo.MetricName) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil, time.Time{}, nil | ||
| } | ||
|
|
||
| // Convert the gauge value (creation timestamp) to time.Time. | ||
| return latest, time.Unix(0, int64(latestTs*1e9)), nil // Convert nanoseconds to time.Time | ||
| } | ||
|
|
||
| // getMetric retrieves a specific metric based on MetricSpec. | ||
| func (p *PodMetricsClientImpl) getMetric(logger logr.Logger, metricFamilies map[string]*dto.MetricFamily, spec MetricSpec) (*dto.Metric, error) { | ||
| mf, ok := metricFamilies[spec.MetricName] | ||
| if !ok { | ||
| logger.V(logutil.TRACE).Error(nil, "Metric family not found", "name", spec.MetricName) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil, fmt.Errorf("metric family %q not found", spec.MetricName) | ||
| } | ||
|
|
||
| if len(mf.GetMetric()) == 0 { | ||
| return nil, fmt.Errorf("no metrics available for %q", spec.MetricName) | ||
| } | ||
|
|
||
| return getLatestMetric(logger, mf, &spec) | ||
| } | ||
|
|
||
| // getLabeledMetric gets the latest metric with matching labels. | ||
| func getLatestMetric(logger logr.Logger, mf *dto.MetricFamily, spec *MetricSpec) (*dto.Metric, error) { | ||
| var latestMetric *dto.Metric | ||
| var latestTimestamp int64 = -1 // Initialize to -1 so any timestamp is greater | ||
|
|
||
| var labels map[string]string = nil | ||
| if spec.Labels != nil { | ||
| labels = spec.Labels | ||
| } | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| for _, m := range mf.GetMetric() { | ||
| if labels == nil || labelsMatch(m.GetLabel(), spec.Labels) { | ||
| if m.GetTimestampMs() > latestTimestamp { | ||
| latestTimestamp = m.GetTimestampMs() | ||
| latestMetric = m | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if latestMetric != nil { | ||
| logger.V(logutil.TRACE).Info("Labeled metric found", "value", latestMetric, "name", spec.MetricName) | ||
BenjaminBraunDev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return latestMetric, nil | ||
| } | ||
|
|
||
| return nil, fmt.Errorf("no matching metric found for %q with labels %+v", spec.MetricName, labels) | ||
| } | ||
|
|
||
| // labelsMatch checks if a metric's labels contain all the labels in the spec. | ||
| func labelsMatch(metricLabels []*dto.LabelPair, specLabels map[string]string) bool { | ||
| if len(specLabels) == 0 { | ||
| return true // No specific labels required | ||
| } | ||
|
|
||
| for specName, specValue := range specLabels { | ||
| found := false | ||
| for _, label := range metricLabels { | ||
| if label.GetName() == specName && label.GetValue() == specValue { | ||
| found = true | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| return false // A required label is missing | ||
| } | ||
| } | ||
| return true // All required labels are present | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.