diff --git a/cmd/epp/main.go b/cmd/epp/main.go index e1cd50154..fa63f0bce 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -38,7 +38,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics/filters" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/vllm" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" @@ -92,6 +91,17 @@ var ( "certPath", "", "The path to the certificate for secure serving. The certificate and private key files "+ "are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+ "then a self-signed certificate is used.") + // metric flags + totalQueuedRequestsMetric = flag.String("totalQueuedRequestsMetric", + "vllm:num_requests_waiting", + "Prometheus metric for the number of queued requests.") + kvCacheUsagePercentageMetric = flag.String("kvCacheUsagePercentageMetric", + "vllm:gpu_cache_usage_perc", + "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).") + // LoRA metrics + loraInfoMetric = flag.String("loraInfoMetric", + "vllm:lora_requests_info", + "Prometheus metric for the LoRA info metrics (must be in vLLM label format).") setupLog = ctrl.Log.WithName("setup") ) @@ -143,9 +153,21 @@ func run() error { ctx := ctrl.SetupSignalHandler() - pmf := backendmetrics.NewPodMetricsFactory(&vllm.PodMetricsClientImpl{}, *refreshMetricsInterval) + // Set up mapper for metric scraping. + mapping, err := backendmetrics.NewMetricMapping( + *totalQueuedRequestsMetric, + *kvCacheUsagePercentageMetric, + *loraInfoMetric, + ) + if err != nil { + setupLog.Error(err, "Failed to create metric mapping from flags.") + return err + } + + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval) // Setup runner. datastore := datastore.NewDatastore(ctx, pmf) + serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace, diff --git a/pkg/epp/backend/metrics/metrics.go b/pkg/epp/backend/metrics/metrics.go new file mode 100644 index 000000000..be732e78e --- /dev/null +++ b/pkg/epp/backend/metrics/metrics.go @@ -0,0 +1,245 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "fmt" + "net/http" + "strconv" + "strings" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "go.uber.org/multierr" +) + +const ( + // LoRA metrics based on protocol + LoraInfoRunningAdaptersMetricName = "running_lora_adapters" + LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters" + LoraInfoMaxAdaptersMetricName = "max_lora" +) + +type PodMetricsClientImpl struct { + MetricMapping *MetricMapping +} + +// FetchMetrics fetches metrics from a given pod. +func (p *PodMetricsClientImpl) FetchMetrics( + ctx context.Context, + pod *Pod, + existing *Metrics, + port int32, +) (*Metrics, error) { + + // Currently the metrics endpoint is hard-coded, which works with vLLM. + // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. + url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics" + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod.NamespacedName, err) + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code from %s: %v", pod.NamespacedName, resp.StatusCode) + } + + parser := expfmt.TextParser{} + metricFamilies, err := parser.TextToMetricFamilies(resp.Body) + if err != nil { + return nil, err + } + return p.promToPodMetrics(metricFamilies, existing) +} + +// promToPodMetrics updates internal pod metrics with scraped Prometheus metrics. +func (p *PodMetricsClientImpl) promToPodMetrics( + metricFamilies map[string]*dto.MetricFamily, + existing *Metrics, +) (*Metrics, error) { + var errs error + updated := existing.Clone() + + if p.MetricMapping.TotalQueuedRequests != nil { + queued, err := p.getMetric(metricFamilies, *p.MetricMapping.TotalQueuedRequests) + if err == nil { + updated.WaitingQueueSize = int(queued.GetGauge().GetValue()) + } else { + errs = multierr.Append(errs, err) + } + } + + if p.MetricMapping.KVCacheUtilization != nil { + usage, err := p.getMetric(metricFamilies, *p.MetricMapping.KVCacheUtilization) + if err == nil { + updated.KVCacheUsagePercent = usage.GetGauge().GetValue() + } else { + errs = multierr.Append(errs, err) + } + } + + // Handle LoRA metrics (only if all LoRA MetricSpecs are present) + if p.MetricMapping.LoraRequestInfo != nil { + loraMetrics, err := p.getLatestLoraMetric(metricFamilies) + errs = multierr.Append(errs, err) + + if loraMetrics != nil { + updated.ActiveModels = make(map[string]int) + for _, label := range loraMetrics.GetLabel() { + if label.GetName() == LoraInfoRunningAdaptersMetricName { + if label.GetValue() != "" { + adapterList := strings.Split(label.GetValue(), ",") + for _, adapter := range adapterList { + updated.ActiveModels[adapter] = 0 + } + } + } + if label.GetName() == LoraInfoWaitingAdaptersMetricName { + if label.GetValue() != "" { + adapterList := strings.Split(label.GetValue(), ",") + for _, adapter := range adapterList { + updated.ActiveModels[adapter] = 0 + } + } + } + if label.GetName() == LoraInfoMaxAdaptersMetricName { + if label.GetValue() != "" { + updated.MaxActiveModels, err = strconv.Atoi(label.GetValue()) + if err != nil { + errs = multierr.Append(errs, err) + } + } + } + } + } + } + + return updated, errs +} + +// getLatestLoraMetric gets latest lora metric series in gauge metric family `vllm:lora_requests_info` +// reason its specially fetched is because each label key value pair permutation generates new series +// and only most recent is useful. The value of each series is the creation timestamp so we can +// retrieve the latest by sorting the value. +func (p *PodMetricsClientImpl) getLatestLoraMetric(metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, error) { + if p.MetricMapping.LoraRequestInfo == nil { + return nil, nil // No LoRA metrics configured + } + + loraRequests, ok := metricFamilies[p.MetricMapping.LoraRequestInfo.MetricName] + if !ok { + return nil, fmt.Errorf("metric family %q not found", p.MetricMapping.LoraRequestInfo.MetricName) + } + + var latest *dto.Metric + var latestTs float64 // Use float64, as Gauge.Value is float64 + + // Iterate over all metrics in the family. + for _, m := range loraRequests.GetMetric() { + running := "" + waiting := "" + // Check if the metric has the expected LoRA labels. + for _, lp := range m.GetLabel() { + switch lp.GetName() { + case LoraInfoRunningAdaptersMetricName: + running = lp.GetValue() + case LoraInfoWaitingAdaptersMetricName: + waiting = lp.GetValue() + } + } + // Ignore metrics with both labels empty. + if running == "" && waiting == "" { + continue + } + + // Select the metric with the *largest Gauge Value* (which represents the timestamp). + if m.GetGauge().GetValue() > latestTs { + latestTs = m.GetGauge().GetValue() + latest = m + } + } + if latest == nil { + return nil, nil + } + + return latest, nil // Convert nanoseconds to time.Time +} + +// getMetric retrieves a specific metric based on MetricSpec. +func (p *PodMetricsClientImpl) getMetric(metricFamilies map[string]*dto.MetricFamily, spec MetricSpec) (*dto.Metric, error) { + mf, ok := metricFamilies[spec.MetricName] + if !ok { + return nil, fmt.Errorf("metric family %q not found", spec.MetricName) + } + + if len(mf.GetMetric()) == 0 { + return nil, fmt.Errorf("no metrics available for %q", spec.MetricName) + } + + return getLatestMetric(mf, &spec) +} + +// getLabeledMetric gets the latest metric with matching labels. +func getLatestMetric(mf *dto.MetricFamily, spec *MetricSpec) (*dto.Metric, error) { + var latestMetric *dto.Metric + var latestTimestamp int64 = -1 // Initialize to -1 so any timestamp is greater + + for _, m := range mf.GetMetric() { + if spec.Labels == nil || labelsMatch(m.GetLabel(), spec.Labels) { + if m.GetTimestampMs() > latestTimestamp { + latestTimestamp = m.GetTimestampMs() + latestMetric = m + } + } + } + + if latestMetric != nil { + return latestMetric, nil + } + + return nil, fmt.Errorf("no matching metric found for %q with labels %+v", spec.MetricName, spec.Labels) +} + +// labelsMatch checks if a metric's labels contain all the labels in the spec. +func labelsMatch(metricLabels []*dto.LabelPair, specLabels map[string]string) bool { + if len(specLabels) == 0 { + return true // No specific labels required + } + + for specName, specValue := range specLabels { + found := false + for _, label := range metricLabels { + if label.GetName() == specName && label.GetValue() == specValue { + found = true + break + } + } + if !found { + return false // A required label is missing + } + } + return true // All required labels are present +} diff --git a/pkg/epp/backend/metrics/metrics_spec.go b/pkg/epp/backend/metrics/metrics_spec.go new file mode 100644 index 000000000..ce0c075dd --- /dev/null +++ b/pkg/epp/backend/metrics/metrics_spec.go @@ -0,0 +1,113 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "fmt" + "strings" +) + +// MetricSpec represents a single metric's specification. +type MetricSpec struct { + MetricName string + Labels map[string]string // Label name -> Label value +} + +// MetricMapping holds named MetricSpecs. +type MetricMapping struct { + TotalQueuedRequests *MetricSpec + KVCacheUtilization *MetricSpec + LoraRequestInfo *MetricSpec +} + +// stringToMetricSpec converts a string to a MetricSpec. +// Example inputs: +// +// "metric_name" +// "metric_name{label1=value1}" +// "metric_name{label1=value1,label2=value2}" +func stringToMetricSpec(specStr string) (*MetricSpec, error) { + specStr = strings.TrimSpace(specStr) + metricName := specStr + labels := make(map[string]string) + + // Check for labels enclosed in curly braces + start := strings.Index(specStr, "{") + end := strings.Index(specStr, "}") + + if start != -1 || end != -1 { // If *either* brace is present... + if start == -1 || end == -1 || end <= start+1 { // ...check that *both* are present and correctly placed. + return nil, fmt.Errorf("invalid metric spec string: %q, missing or malformed label block", specStr) + } + + metricName = strings.TrimSpace(specStr[:start]) + labelStr := specStr[start+1 : end] + + // Split into individual label pairs + labelPairs := strings.Split(labelStr, ",") + for _, pair := range labelPairs { + pair = strings.TrimSpace(pair) + parts := strings.Split(pair, "=") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid label pair: %q in metric spec: %q", pair, specStr) + } + labelName := strings.TrimSpace(parts[0]) + labelValue := strings.TrimSpace(parts[1]) + if labelName == "" || labelValue == "" { + return nil, fmt.Errorf("empty label name or value in pair: %q in metric spec: %q", pair, specStr) + } + labels[labelName] = labelValue + } + // Check for extra characters after labels + if end != len(specStr)-1 { + return nil, fmt.Errorf("invalid characters after label section in: %q", specStr) + } + + } + + if metricName == "" { // Metric name cannot be empty + return nil, fmt.Errorf("empty metric name in spec: %q", specStr) + } + + return &MetricSpec{ + MetricName: metricName, + Labels: labels, + }, nil +} + +// NewMetricMapping creates a MetricMapping from string values. +func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr string) (*MetricMapping, error) { + queuedSpec, err := stringToMetricSpec(queuedStr) + if err != nil { + return nil, fmt.Errorf("error parsing WaitingRequests: %w", err) + } + kvUsageSpec, err := stringToMetricSpec(kvUsageStr) + if err != nil { + return nil, fmt.Errorf("error parsing KVCacheUsage: %w", err) + } + loraReqInfoSpec, err := stringToMetricSpec(loraReqInfoStr) + if err != nil { + return nil, fmt.Errorf("error parsing loraReqInfoStr: %w", err) + } + mapping := &MetricMapping{ + TotalQueuedRequests: queuedSpec, + KVCacheUtilization: kvUsageSpec, + LoraRequestInfo: loraReqInfoSpec, + } + + return mapping, nil +} diff --git a/pkg/epp/backend/metrics/metrics_spec_test.go b/pkg/epp/backend/metrics/metrics_spec_test.go new file mode 100644 index 000000000..828042065 --- /dev/null +++ b/pkg/epp/backend/metrics/metrics_spec_test.go @@ -0,0 +1,173 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "reflect" + "testing" +) + +func TestStringToMetricSpec(t *testing.T) { + tests := []struct { + name string + input string + want *MetricSpec + wantErr bool + }{ + { + name: "empty string", + input: "", + want: nil, + wantErr: true, + }, + { + name: "no labels", + input: "my_metric", + want: &MetricSpec{ + MetricName: "my_metric", + Labels: map[string]string{}, + }, + wantErr: false, + }, + { + name: "one label", + input: "my_metric{label1=value1}", + want: &MetricSpec{ + MetricName: "my_metric", + Labels: map[string]string{ + "label1": "value1", + }, + }, + wantErr: false, + }, + { + name: "multiple labels", + input: "my_metric{label1=value1,label2=value2}", + want: &MetricSpec{ + MetricName: "my_metric", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + }, + }, + wantErr: false, + }, + { + name: "extra whitespace", + input: " my_metric { label1 = value1 , label2 = value2 } ", + want: &MetricSpec{ + MetricName: "my_metric", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + }, + }, + wantErr: false, + }, + { + name: "missing closing brace", + input: "my_metric{label1=value1", + want: nil, + wantErr: true, + }, + { + name: "missing opening brace", + input: "my_metriclabel1=value1}", + want: nil, // Corrected expected value + wantErr: true, + }, + { + name: "invalid label pair", + input: "my_metric{label1}", + want: nil, + wantErr: true, + }, + { + name: "empty label name", + input: "my_metric{=value1}", + want: nil, + wantErr: true, + }, + { + name: "empty label value", + input: "my_metric{label1=}", + want: nil, + wantErr: true, + }, + { + name: "empty label name and value with spaces", + input: "my_metric{ = }", + want: nil, + wantErr: true, + }, + { + name: "characters after closing brace", + input: "my_metric{label=val}extra", + want: nil, + wantErr: true, + }, + { + name: "empty metric name", + input: "{label=val}", + want: nil, + wantErr: true, + }, + { + name: "no labels and just metric name with space", + input: "my_metric ", + want: &MetricSpec{ + MetricName: "my_metric", + Labels: map[string]string{}, + }, + wantErr: false, + }, + { + name: "no labels and just metric name with space before and after", + input: " my_metric ", + want: &MetricSpec{ + MetricName: "my_metric", + Labels: map[string]string{}, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := stringToMetricSpec(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("stringToMetricSpec() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr { + if got != nil { // handles if we got a nil spec and didn't expect an error + t.Errorf("stringToMetricSpec() = %v, want %v", got, tt.want) + return + } + } else { + if got == nil { + t.Fatalf("stringToMetricSpec() = got nil but wanted %v", tt.want) + } + if !reflect.DeepEqual(got.MetricName, tt.want.MetricName) { + t.Errorf("stringToMetricSpec() got MetricName = %v, want %v", got.MetricName, tt.want.MetricName) + } + if !reflect.DeepEqual(got.Labels, tt.want.Labels) { + t.Errorf("stringToMetricSpec() got Labels = %v, want %v", got.Labels, tt.want.Labels) + } + } + }) + } +} diff --git a/pkg/epp/backend/metrics/metrics_test.go b/pkg/epp/backend/metrics/metrics_test.go new file mode 100644 index 000000000..d0396bf74 --- /dev/null +++ b/pkg/epp/backend/metrics/metrics_test.go @@ -0,0 +1,505 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "errors" + "reflect" + "strconv" + "strings" + "testing" + + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "go.uber.org/multierr" + "google.golang.org/protobuf/proto" + "k8s.io/apimachinery/pkg/types" + + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +// --- Test Helpers --- + +func makeMetric(labels map[string]string, value float64, timestampMs int64) *dto.Metric { + labelPairs := []*dto.LabelPair{} + for k, v := range labels { + labelPairs = append(labelPairs, &dto.LabelPair{Name: proto.String(k), Value: proto.String(v)}) + } + return &dto.Metric{ + Label: labelPairs, + Gauge: &dto.Gauge{Value: &value}, + TimestampMs: ×tampMs, + } +} + +func makeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { + return &dto.MetricFamily{ + Name: &name, + Type: dto.MetricType_GAUGE.Enum(), + Metric: metrics, + } +} + +// --- Tests --- + +func TestGetMetric(t *testing.T) { + + metricFamilies := map[string]*dto.MetricFamily{ + "metric1": makeMetricFamily("metric1", + makeMetric(map[string]string{"label1": "value1"}, 1.0, 1000), + makeMetric(map[string]string{"label1": "value2"}, 2.0, 2000), + ), + "metric2": makeMetricFamily("metric2", + makeMetric(map[string]string{"labelA": "A1", "labelB": "B1"}, 3.0, 1500), + makeMetric(map[string]string{"labelA": "A2", "labelB": "B2"}, 4.0, 2500), + ), + "metric3": makeMetricFamily("metric3", + makeMetric(map[string]string{}, 5.0, 3000), + makeMetric(map[string]string{}, 6.0, 1000), + ), + } + + tests := []struct { + name string + spec MetricSpec + wantGaugeValue float64 + wantError bool + }{ + { + name: "get labeled metric, exists", + spec: MetricSpec{ + MetricName: "metric1", + Labels: map[string]string{"label1": "value1"}, + }, + wantGaugeValue: 1.0, + wantError: false, + }, + { + name: "get labeled metric, wrong value", + spec: MetricSpec{ + MetricName: "metric1", + Labels: map[string]string{"label1": "value3"}, + }, + wantGaugeValue: -1, // Expect an error, not a specific value + wantError: true, + }, + { + name: "get labeled metric, missing label", + spec: MetricSpec{ + MetricName: "metric1", + Labels: map[string]string{"label2": "value2"}, + }, + wantGaugeValue: -1, + wantError: true, + }, + { + name: "get labeled metric, extra label present", + spec: MetricSpec{ + MetricName: "metric2", + Labels: map[string]string{"labelA": "A1"}, + }, + wantGaugeValue: 3.0, + wantError: false, + }, + { + name: "get unlabeled metric, exists", + spec: MetricSpec{ + MetricName: "metric3", + Labels: nil, // Explicitly nil + }, + wantGaugeValue: 5.0, // latest metric, which occurs first in our test data + wantError: false, + }, + { + name: "get unlabeled metric, metric family not found", + spec: MetricSpec{ + MetricName: "metric4", + Labels: nil, + }, + wantGaugeValue: -1, + wantError: true, + }, + { + name: "get labeled metric, metric family not found", + spec: MetricSpec{ + MetricName: "metric4", + Labels: map[string]string{"label1": "value1"}, + }, + wantGaugeValue: -1, + wantError: true, + }, + { + name: "get metric, no metrics available", + spec: MetricSpec{ + MetricName: "empty_metric", + }, + wantGaugeValue: -1, + wantError: true, + }, + { + name: "get latest metric", + spec: MetricSpec{ + MetricName: "metric3", + Labels: map[string]string{}, // Empty map, not nil + }, + wantGaugeValue: 5.0, + wantError: false, + }, + } + + p := &PodMetricsClientImpl{} // No need for MetricMapping here + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + gotMetric, err := p.getMetric(metricFamilies, tt.spec) + + if tt.wantError { + if err == nil { + t.Errorf("getMetric() expected error, got nil") + } + } else { + if err != nil { + t.Fatalf("getMetric() unexpected error: %v", err) + } + if gotMetric.GetGauge().GetValue() != tt.wantGaugeValue { + t.Errorf("getMetric() got value %v, want %v", gotMetric.GetGauge().GetValue(), tt.wantGaugeValue) + } + } + }) + } +} + +func TestLabelsMatch(t *testing.T) { + tests := []struct { + name string + metricLabels []*dto.LabelPair + specLabels map[string]string + want bool + }{ + { + name: "empty spec labels, should match", + metricLabels: []*dto.LabelPair{{Name: proto.String("a"), Value: proto.String("b")}}, + specLabels: map[string]string{}, + want: true, + }, + { + name: "nil spec labels, should match", + metricLabels: []*dto.LabelPair{{Name: proto.String("a"), Value: proto.String("b")}}, + specLabels: nil, + want: true, + }, + { + name: "exact match", + metricLabels: []*dto.LabelPair{{Name: proto.String("a"), Value: proto.String("b")}}, + specLabels: map[string]string{"a": "b"}, + want: true, + }, + { + name: "extra labels in metric", + metricLabels: []*dto.LabelPair{{Name: proto.String("a"), Value: proto.String("b")}, {Name: proto.String("c"), Value: proto.String("d")}}, + specLabels: map[string]string{"a": "b"}, + want: true, + }, + { + name: "missing label in metric", + metricLabels: []*dto.LabelPair{{Name: proto.String("a"), Value: proto.String("b")}}, + specLabels: map[string]string{"a": "b", "c": "d"}, + want: false, + }, + { + name: "value mismatch", + metricLabels: []*dto.LabelPair{{Name: proto.String("a"), Value: proto.String("b")}}, + specLabels: map[string]string{"a": "c"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := labelsMatch(tt.metricLabels, tt.specLabels); got != tt.want { + t.Errorf("labelsMatch() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetLatestLoraMetric(t *testing.T) { + + testCases := []struct { + name string + metricFamilies map[string]*dto.MetricFamily + expectedAdapters map[string]int + expectedMax int + expectedErr error + mapping *MetricMapping + }{ + { + name: "no lora metrics", + metricFamilies: map[string]*dto.MetricFamily{ + "some_other_metric": makeMetricFamily("some_other_metric", + makeMetric(nil, 1.0, 1000), + ), + }, + expectedAdapters: nil, + expectedMax: 0, + expectedErr: errors.New("metric family \"vllm:lora_requests_info\" not found"), // Expect an error because the family is missing + mapping: &MetricMapping{ + LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"}, + }, + }, + { + name: "basic lora metrics", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm:lora_requests_info": makeMetricFamily("vllm:lora_requests_info", + makeMetric(map[string]string{"running_lora_adapters": "lora1", "max_lora": "2"}, 3000.0, 1000), // Newer + makeMetric(map[string]string{"running_lora_adapters": "lora2,lora3", "max_lora": "4"}, 1000.0, 1000), // Older + + ), + }, + expectedAdapters: map[string]int{"lora1": 0}, + expectedMax: 2, + expectedErr: nil, + mapping: &MetricMapping{ + LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"}, + }, + }, + { + name: "no matching lora metrics", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm:lora_requests_info": makeMetricFamily("vllm:lora_requests_info", + makeMetric(map[string]string{"other_label": "value"}, 5.0, 3000), + ), + }, + expectedAdapters: nil, + expectedMax: 0, + expectedErr: nil, // Expect *no* error; just no adapters found + mapping: &MetricMapping{ + LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"}, + }, + }, + { + name: "no lora metrics if not in MetricMapping", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm:lora_requests_info": makeMetricFamily("vllm:lora_requests_info", + makeMetric(map[string]string{"running_lora_adapters": "lora1", "max_lora": "2"}, 5.0, 3000), + makeMetric(map[string]string{"running_lora_adapters": "lora2,lora3", "max_lora": "4"}, 6.0, 1000), + ), + }, + expectedAdapters: nil, + expectedMax: 0, + expectedErr: nil, + mapping: &MetricMapping{ // No LoRA metrics defined + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := &PodMetricsClientImpl{MetricMapping: tc.mapping} + loraMetric, err := p.getLatestLoraMetric(tc.metricFamilies) + + if tc.expectedErr != nil { + if err == nil || err.Error() != tc.expectedErr.Error() { + t.Errorf("getLatestLoraMetric() error = %v, wantErr %v", err, tc.expectedErr) + } + return // Stop here if an error was expected + } else if err != nil { + t.Fatalf("getLatestLoraMetric() unexpected error: %v", err) + } + + if tc.mapping.LoraRequestInfo == nil { + if loraMetric != nil { + t.Errorf("getLatestLoraMetric() expected nil metric, got %v", loraMetric) + } + return // Stop if no Lora metrics are expected. + } + + if tc.expectedAdapters == nil && loraMetric == nil { + return // Both nil, as expected + } + + if tc.expectedAdapters != nil && loraMetric != nil { // proceed with checks + + adaptersFound := make(map[string]int) + maxLora := 0 + for _, label := range loraMetric.GetLabel() { + if label.GetName() == "running_lora_adapters" && label.GetValue() != "" { + for _, adapter := range strings.Split(label.GetValue(), ",") { + adaptersFound[adapter] = 0 + } + } + if label.GetName() == "waiting_lora_adapters" && label.GetValue() != "" { + for _, adapter := range strings.Split(label.GetValue(), ",") { + adaptersFound[adapter] = 0 // Overwrite if already present + } + } + if label.GetName() == "max_lora" { + var converr error // define err in this scope. + maxLora, converr = strconv.Atoi(label.GetValue()) + if converr != nil && tc.expectedErr == nil { // only report if we don't expect any other errors + t.Errorf("getLatestLoraMetric() could not parse max_lora: %v", converr) + } + } + } + + if !reflect.DeepEqual(adaptersFound, tc.expectedAdapters) { + t.Errorf("getLatestLoraMetric() adapters = %v, want %v", adaptersFound, tc.expectedAdapters) + } + if maxLora != tc.expectedMax { + t.Errorf("getLatestLoraMetric() maxLora = %v, want %v", maxLora, tc.expectedMax) + } + } else { // one is nil and the other is not + t.Errorf("getLatestLoraMetric(): one of expectedAdapters/loraMetric is nil and the other is not, expected %v, got %v", tc.expectedAdapters, loraMetric) + } + }) + } +} + +func TestPromToPodMetrics(t *testing.T) { + tests := []struct { + name string + metricFamilies map[string]*dto.MetricFamily + mapping *MetricMapping + existingMetrics *Metrics + expectedMetrics *Metrics + expectedErr error // Count of expected errors + }{ + { + name: "vllm metrics", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm_waiting": makeMetricFamily("vllm_waiting", + makeMetric(nil, 5.0, 1000), + makeMetric(nil, 7.0, 2000), // Newer + ), + "vllm_usage": makeMetricFamily("vllm_usage", + makeMetric(nil, 0.8, 2000), + makeMetric(nil, 0.7, 500), + ), + "vllm:lora_requests_info": makeMetricFamily("vllm:lora_requests_info", + makeMetric(map[string]string{"running_lora_adapters": "lora1,lora2", "waiting_lora_adapters": "lora3", "max_lora": "3"}, 3000.0, 1000), + ), + }, + mapping: &MetricMapping{ + TotalQueuedRequests: &MetricSpec{MetricName: "vllm_waiting"}, + KVCacheUtilization: &MetricSpec{MetricName: "vllm_usage"}, + LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"}, + }, + existingMetrics: &Metrics{}, + expectedMetrics: &Metrics{ + WaitingQueueSize: 7, + KVCacheUsagePercent: 0.8, + ActiveModels: map[string]int{"lora1": 0, "lora2": 0, "lora3": 0}, + MaxActiveModels: 3, + }, + }, + { + name: "missing metrics", + metricFamilies: map[string]*dto.MetricFamily{}, // No metrics + mapping: &MetricMapping{ + TotalQueuedRequests: &MetricSpec{MetricName: "vllm_waiting"}, + KVCacheUtilization: &MetricSpec{MetricName: "vllm_usage"}, + LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"}, + }, + existingMetrics: &Metrics{ActiveModels: map[string]int{}}, + expectedMetrics: &Metrics{ActiveModels: map[string]int{}}, + expectedErr: multierr.Combine(errors.New("metric family \"vllm_waiting\" not found"), errors.New("metric family \"vllm_usage\" not found"), errors.New("metric family \"vllm:lora_requests_info\" not found")), + }, + { + name: "partial metrics available + LoRA", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm_usage": makeMetricFamily("vllm_usage", + makeMetric(nil, 0.8, 2000), // Only usage is present + ), + "vllm:lora_requests_info": makeMetricFamily("vllm:lora_requests_info", + makeMetric(map[string]string{"running_lora_adapters": "lora1,lora2", "waiting_lora_adapters": "lora3", "max_lora": "3"}, 3000.0, 1000), + ), + }, + mapping: &MetricMapping{ + TotalQueuedRequests: &MetricSpec{MetricName: "vllm_waiting"}, // Not Present + KVCacheUtilization: &MetricSpec{MetricName: "vllm_usage"}, + LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"}, + }, + existingMetrics: &Metrics{}, + expectedMetrics: &Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.8, + ActiveModels: map[string]int{"lora1": 0, "lora2": 0, "lora3": 0}, + MaxActiveModels: 3, + }, + expectedErr: errors.New("metric family \"vllm_waiting\" not found"), + }, + { + name: "invalid max lora", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm:lora_requests_info": makeMetricFamily("vllm:lora_requests_info", + makeMetric(map[string]string{"running_lora_adapters": "lora1", "max_lora": "invalid"}, 3000.0, 1000), + ), + }, + mapping: &MetricMapping{ + LoraRequestInfo: &MetricSpec{MetricName: "vllm:lora_requests_info"}, + }, + existingMetrics: &Metrics{}, + expectedMetrics: &Metrics{ + ActiveModels: map[string]int{"lora1": 0}, + MaxActiveModels: 0, // Should still default to 0. + + }, + expectedErr: errors.New("strconv.Atoi: parsing \"invalid\": invalid syntax"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + p := &PodMetricsClientImpl{MetricMapping: tc.mapping} + updated, err := p.promToPodMetrics(tc.metricFamilies, tc.existingMetrics) + if tc.expectedErr != nil { + assert.Error(t, err) + assert.EqualError(t, err, tc.expectedErr.Error()) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedMetrics, updated) + } + }) + } +} + +// TestFetchMetrics is a basic integration test. It assumes +// there's no server running on the specified port. +func TestFetchMetrics(t *testing.T) { + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + pod := &Pod{ + Address: "127.0.0.1", + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "pod", + }, + } + existing := &Metrics{} + p := &PodMetricsClientImpl{} // No MetricMapping needed for this basic test + + _, err := p.FetchMetrics(ctx, pod, existing, 9999) // Use a port that's unlikely to be in use. + if err == nil { + t.Errorf("FetchMetrics() expected error, got nil") + } + // Check for a specific error message (fragile, but OK for this example) + expectedSubstr := "connection refused" + if err != nil && !strings.Contains(err.Error(), expectedSubstr) { + t.Errorf("FetchMetrics() error = %v, want error containing %q", err, expectedSubstr) + } +} diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index b954a98ce..01db14bec 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -115,6 +115,7 @@ func (pm *podMetrics) refreshMetrics() error { defer cancel() updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics(), pool.Spec.TargetPortNumber) if err != nil { + pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err) // As refresher is running in the background, it's possible that the pod is deleted but // the refresh goroutine doesn't read the done channel yet. In this case, we just return nil. // The refresher will be stopped after this interval. diff --git a/pkg/epp/backend/vllm/metrics.go b/pkg/epp/backend/vllm/metrics.go deleted file mode 100644 index 8d2dd7154..000000000 --- a/pkg/epp/backend/vllm/metrics.go +++ /dev/null @@ -1,237 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package vllm provides vllm specific pod metrics implementation. -package vllm - -import ( - "context" - "fmt" - "net/http" - "strconv" - "strings" - "time" - - "github.com/go-logr/logr" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "go.uber.org/multierr" - "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" -) - -// Metric names used in the vLLM metrics implementation. -// Refer to the protocol doc for more details: -// https://github.com/kubernetes-sigs/gateway-api-inference-extension/tree/main/docs/proposals/003-model-server-protocol -const ( - LoraRequestInfoMetricName = "vllm:lora_requests_info" - LoraRequestInfoRunningAdaptersMetricName = "running_lora_adapters" - LoraRequestInfoWaitingAdaptersMetricName = "waiting_lora_adapters" - LoraRequestInfoMaxAdaptersMetricName = "max_lora" - // TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork. - RunningQueueSizeMetricName = "vllm:num_requests_running" - WaitingQueueSizeMetricName = "vllm:num_requests_waiting" - /* TODO: Uncomment this once the following are added to the fork. - RunningQueueSizeMetricName = "vllm:num_tokens_running" - WaitingQueueSizeMetricName = "vllm:num_tokens_waiting" - */ - KVCacheUsagePercentMetricName = "vllm:gpu_cache_usage_perc" -) - -type PodMetricsClientImpl struct{} - -// FetchMetrics fetches metrics from a given pod. -func (p *PodMetricsClientImpl) FetchMetrics( - ctx context.Context, - pod *metrics.Pod, - existing *metrics.Metrics, - port int32, -) (*metrics.Metrics, error) { - logger := log.FromContext(ctx).V(logutil.TRACE) - - // Currently the metrics endpoint is hard-coded, which works with vLLM. - // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. - url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics" - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - logger.Error(err, "Failed create HTTP request", "method", http.MethodGet, "url", url) - return nil, fmt.Errorf("failed to create request: %v", err) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - logger.Error(err, "Failed to fetch metrics", "pod", pod.NamespacedName) - return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod.NamespacedName, err) - } - defer func() { - _ = resp.Body.Close() - }() - - if resp.StatusCode != http.StatusOK { - logger.Error(nil, "Unexpected status code returned", "pod", pod.NamespacedName, "statusCode", resp.StatusCode) - return nil, fmt.Errorf("unexpected status code from %s: %v", pod.NamespacedName, resp.StatusCode) - } - - parser := expfmt.TextParser{} - metricFamilies, err := parser.TextToMetricFamilies(resp.Body) - if err != nil { - return nil, err - } - return promToPodMetrics(logger, metricFamilies, existing) -} - -// promToPodMetrics updates internal pod metrics with scraped prometheus metrics. -// A combined error is returned if errors occur in one or more metric processing. -// it returns a new PodMetrics pointer which can be used to atomically update the pod metrics map. -func promToPodMetrics( - logger logr.Logger, - metricFamilies map[string]*dto.MetricFamily, - existing *metrics.Metrics, -) (*metrics.Metrics, error) { - var errs error - updated := existing.Clone() - runningQueueSize, err := getLatestMetric(logger, metricFamilies, RunningQueueSizeMetricName) - errs = multierr.Append(errs, err) - if err == nil { - updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue()) - } - waitingQueueSize, err := getLatestMetric(logger, metricFamilies, WaitingQueueSizeMetricName) - errs = multierr.Append(errs, err) - if err == nil { - updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue()) - } - cachePercent, err := getLatestMetric(logger, metricFamilies, KVCacheUsagePercentMetricName) - errs = multierr.Append(errs, err) - if err == nil { - updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue() - } - - loraMetrics, _, err := getLatestLoraMetric(logger, metricFamilies) - errs = multierr.Append(errs, err) - /* TODO: uncomment once this is available in vllm. - kvCap, _, err := getGaugeLatestValue(metricFamilies, KvCacheMaxTokenCapacityMetricName) - errs = multierr.Append(errs, err) - if err != nil { - updated.KvCacheMaxTokenCapacity = int(kvCap) - } - */ - - if loraMetrics != nil { - updated.ActiveModels = make(map[string]int) - for _, label := range loraMetrics.GetLabel() { - if label.GetName() == LoraRequestInfoRunningAdaptersMetricName { - if label.GetValue() != "" { - adapterList := strings.Split(label.GetValue(), ",") - for _, adapter := range adapterList { - updated.ActiveModels[adapter] = 0 - } - } - } - if label.GetName() == LoraRequestInfoWaitingAdaptersMetricName { - if label.GetValue() != "" { - adapterList := strings.Split(label.GetValue(), ",") - for _, adapter := range adapterList { - updated.ActiveModels[adapter] = 0 - } - } - } - if label.GetName() == LoraRequestInfoMaxAdaptersMetricName { - if label.GetValue() != "" { - updated.MaxActiveModels, err = strconv.Atoi(label.GetValue()) - if err != nil { - errs = multierr.Append(errs, err) - } - } - } - } - - } - - return updated, errs -} - -// getLatestLoraMetric gets latest lora metric series in gauge metric family `vllm:lora_requests_info` -// reason its specially fetched is because each label key value pair permutation generates new series -// and only most recent is useful. The value of each series is the creation timestamp so we can -// retrieve the latest by sorting the value. -func getLatestLoraMetric(logger logr.Logger, metricFamilies map[string]*dto.MetricFamily) (*dto.Metric, time.Time, error) { - loraRequests, ok := metricFamilies[LoraRequestInfoMetricName] - if !ok { - logger.V(logutil.TRACE).Error(nil, "Metric family not found", "name", LoraRequestInfoMetricName) - return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName) - } - - var latest *dto.Metric - var latestTs float64 - - // Iterate over all metrics in the family. - for _, m := range loraRequests.GetMetric() { - var running, waiting string - // Read the label values for running and waiting adapters. - for _, lp := range m.GetLabel() { - switch lp.GetName() { - case LoraRequestInfoRunningAdaptersMetricName: - running = lp.GetValue() - case LoraRequestInfoWaitingAdaptersMetricName: - waiting = lp.GetValue() - } - } - - // Ignore metrics with both labels empty. This happens when there are no running or waiting requests on - // the server, in this case it is best to use the last set of active adapters. - if running == "" && waiting == "" { - continue - } - - // Select the metric with the latest creation timestamp. - if m.GetGauge().GetValue() > latestTs { - latestTs = m.GetGauge().GetValue() - latest = m - } - } - - if latest == nil { - logger.V(logutil.TRACE).Info("Metric value Empty", "value", latest, "metric", LoraRequestInfoMetricName) - return nil, time.Time{}, nil - } - - // Convert the gauge value (creation timestamp) to time.Time. - return latest, time.Unix(0, int64(latestTs*1000)), nil -} - -// getLatestMetric gets the latest metric of a family. This should be used to get the latest Gauge metric. -// Since vllm doesn't set the timestamp in metric, this metric essentially gets the first metric. -func getLatestMetric(logger logr.Logger, metricFamilies map[string]*dto.MetricFamily, metricName string) (*dto.Metric, error) { - mf, ok := metricFamilies[metricName] - if !ok { - logger.V(logutil.TRACE).Error(nil, "Metric family not found", "name", metricName) - return nil, fmt.Errorf("metric family %q not found", metricName) - } - if len(mf.GetMetric()) == 0 { - return nil, fmt.Errorf("no metrics available for %q", metricName) - } - var latestTs int64 - var latest *dto.Metric - for _, m := range mf.GetMetric() { - if m.GetTimestampMs() >= latestTs { - latestTs = m.GetTimestampMs() - latest = m - } - } - logger.V(logutil.TRACE).Info("Metric value selected", "value", latest, "metric", metricName) - return latest, nil -} diff --git a/pkg/epp/backend/vllm/metrics_test.go b/pkg/epp/backend/vllm/metrics_test.go deleted file mode 100644 index 5555bd260..000000000 --- a/pkg/epp/backend/vllm/metrics_test.go +++ /dev/null @@ -1,250 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package vllm - -import ( - "errors" - "testing" - - dto "github.com/prometheus/client_model/go" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" -) - -func TestPromToPodMetrics(t *testing.T) { - logger := logutil.NewTestLogger() - - testCases := []struct { - name string - metricFamilies map[string]*dto.MetricFamily - initialMetrics *metrics.Metrics - expectedMetrics *metrics.Metrics - expectedErr error - }{ - { - name: "all metrics available", - metricFamilies: map[string]*dto.MetricFamily{ - RunningQueueSizeMetricName: { - Metric: []*dto.Metric{ - { - Gauge: &dto.Gauge{ - Value: proto.Float64(10), - }, - TimestampMs: proto.Int64(100), - }, - { - Gauge: &dto.Gauge{ - Value: proto.Float64(15), - }, - TimestampMs: proto.Int64(200), // This is the latest - }, - }, - }, - WaitingQueueSizeMetricName: { - Metric: []*dto.Metric{ - { - Gauge: &dto.Gauge{ - Value: proto.Float64(20), - }, - TimestampMs: proto.Int64(100), - }, - { - Gauge: &dto.Gauge{ - Value: proto.Float64(25), - }, - TimestampMs: proto.Int64(200), // This is the latest - }, - }, - }, - KVCacheUsagePercentMetricName: { - Metric: []*dto.Metric{ - { - Gauge: &dto.Gauge{ - Value: proto.Float64(0.8), - }, - TimestampMs: proto.Int64(100), - }, - { - Gauge: &dto.Gauge{ - Value: proto.Float64(0.9), - }, - TimestampMs: proto.Int64(200), // This is the latest - }, - }, - }, - LoraRequestInfoMetricName: { - Metric: []*dto.Metric{ - { - Label: []*dto.LabelPair{ - { - Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), - Value: proto.String("lora3,lora4"), - }, - { - Name: proto.String(LoraRequestInfoMaxAdaptersMetricName), - Value: proto.String("2"), - }, - }, - Gauge: &dto.Gauge{ - Value: proto.Float64(100), - }, - }, - { - Label: []*dto.LabelPair{ - { - Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), - Value: proto.String("lora2"), - }, - { - Name: proto.String(LoraRequestInfoMaxAdaptersMetricName), - Value: proto.String("2"), - }, - }, - Gauge: &dto.Gauge{ - Value: proto.Float64(90), - }, - }, - }, - }, - }, - expectedMetrics: &metrics.Metrics{ - RunningQueueSize: 15, - WaitingQueueSize: 25, - KVCacheUsagePercent: 0.9, - ActiveModels: map[string]int{ - "lora3": 0, - "lora4": 0, - }, - MaxActiveModels: 2, - }, - initialMetrics: &metrics.Metrics{}, - expectedErr: nil, - }, - { - name: "invalid max lora", - metricFamilies: map[string]*dto.MetricFamily{ - RunningQueueSizeMetricName: { - Metric: []*dto.Metric{ - { - Gauge: &dto.Gauge{ - Value: proto.Float64(10), - }, - TimestampMs: proto.Int64(100), - }, - { - Gauge: &dto.Gauge{ - Value: proto.Float64(15), - }, - TimestampMs: proto.Int64(200), // This is the latest - }, - }, - }, - WaitingQueueSizeMetricName: { - Metric: []*dto.Metric{ - { - Gauge: &dto.Gauge{ - Value: proto.Float64(20), - }, - TimestampMs: proto.Int64(100), - }, - { - Gauge: &dto.Gauge{ - Value: proto.Float64(25), - }, - TimestampMs: proto.Int64(200), // This is the latest - }, - }, - }, - KVCacheUsagePercentMetricName: { - Metric: []*dto.Metric{ - { - Gauge: &dto.Gauge{ - Value: proto.Float64(0.8), - }, - TimestampMs: proto.Int64(100), - }, - { - Gauge: &dto.Gauge{ - Value: proto.Float64(0.9), - }, - TimestampMs: proto.Int64(200), // This is the latest - }, - }, - }, - LoraRequestInfoMetricName: { - Metric: []*dto.Metric{ - { - Label: []*dto.LabelPair{ - { - Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), - Value: proto.String("lora3,lora4"), - }, - { - Name: proto.String(LoraRequestInfoMaxAdaptersMetricName), - Value: proto.String("2a"), - }, - }, - Gauge: &dto.Gauge{ - Value: proto.Float64(100), - }, - }, - { - Label: []*dto.LabelPair{ - { - Name: proto.String(LoraRequestInfoRunningAdaptersMetricName), - Value: proto.String("lora2"), - }, - { - Name: proto.String(LoraRequestInfoMaxAdaptersMetricName), - Value: proto.String("2"), - }, - }, - Gauge: &dto.Gauge{ - Value: proto.Float64(90), - }, - }, - }, - }, - }, - expectedMetrics: &metrics.Metrics{ - RunningQueueSize: 15, - WaitingQueueSize: 25, - KVCacheUsagePercent: 0.9, - ActiveModels: map[string]int{ - "lora3": 0, - "lora4": 0, - }, - MaxActiveModels: 0, - }, - initialMetrics: &metrics.Metrics{}, - expectedErr: errors.New("strconv.Atoi: parsing '2a': invalid syntax"), - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - updated, err := promToPodMetrics(logger, tc.metricFamilies, tc.initialMetrics) - if tc.expectedErr != nil { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, tc.expectedMetrics, updated) - } - }) - } -}