Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/metrics"
schedulingframework "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
)

const (
Expand Down Expand Up @@ -55,6 +56,7 @@ var (
modelLabels = []string{"model_name", "target_model_name"}
modelTypeLabels = []string{"model_name", "target_model_name", "type"}
poolLabels = []string{"name"}
enpointLabels = []string{"pod_name", "namespace", "port"}
Comment thread
lionelvillard marked this conversation as resolved.
Outdated

// --- Common Buckets ---

Expand Down Expand Up @@ -320,7 +322,7 @@ var (
Name: "scheduler_attempts_total",
Help: metricsutil.HelpMsgWithStability("Total number of scheduling attempts.", compbasemetrics.ALPHA),
},
[]string{"status"}, // "success", "failure"
append([]string{"status", "target_model_name"}, enpointLabels...),
)

pluginProcessingLatencies = prometheus.NewHistogramVec(
Expand Down Expand Up @@ -770,13 +772,28 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
schedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
}

// RecordSchedulerAttempt records a scheduling attempt with status.
func RecordSchedulerAttempt(err error) {
// RecordSchedulerAttempt records a scheduling attempt with status and endpoint information.
func RecordSchedulerAttempt(err error, targetModelName string, result *schedulingframework.SchedulingResult) {
if err != nil {
schedulerAttemptsTotal.WithLabelValues(SchedulerStatusFailure).Inc()
} else {
schedulerAttemptsTotal.WithLabelValues(SchedulerStatusSuccess).Inc()
schedulerAttemptsTotal.WithLabelValues(SchedulerStatusFailure, "", "", "", "").Inc()
Comment thread
lionelvillard marked this conversation as resolved.
Outdated
return
}

if result != nil {
// Collect endpoint information for successful scheduling attempts
primaryResults := result.ProfileResults[result.PrimaryProfileName]

// prepareRequest (in director.go) selects the first endpoint. Do the same here.
if len(primaryResults.TargetEndpoints) > 0 {
metadata := primaryResults.TargetEndpoints[0].GetMetadata()
if metadata != nil {
schedulerAttemptsTotal.WithLabelValues(SchedulerStatusSuccess, targetModelName, metadata.PodName, metadata.NamespacedName.Namespace, metadata.Port).Inc()
return
Comment thread
lionelvillard marked this conversation as resolved.
Outdated
}
}
}

schedulerAttemptsTotal.WithLabelValues(SchedulerStatusSuccess, targetModelName, "", "", "").Inc()
}

const (
Expand Down
165 changes: 128 additions & 37 deletions pkg/epp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/metrics/testutil"
"sigs.k8s.io/controller-runtime/pkg/metrics"

errcommon "sigs.k8s.io/gateway-api-inference-extension/pkg/common/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
schedulingframework "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
)

const (
Expand Down Expand Up @@ -796,46 +799,134 @@ func TestFlowControlEnqueueDurationMetric(t *testing.T) {

func TestSchedulerAttemptsTotal(t *testing.T) {

scenarios := []struct {
name string
successCount int
failureCount int
}{
{
name: "mixed success and failure attempts",
successCount: 10,
failureCount: 5,
},
compareMetrics := func(t *testing.T, goldenFile string) {
t.Helper()
wantMetrics, err := os.Open(goldenFile)
if err != nil {
t.Fatal(err)
}
defer func() {
if err = wantMetrics.Close(); err != nil {
t.Error(err)
}
}()
if err := testutil.GatherAndCompare(
metrics.Registry,
wantMetrics,
"inference_extension_scheduler_attempts_total",
); err != nil {
t.Errorf("metric comparison failed: %v", err)
}
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
Reset()
for i := 0; i < scenario.successCount; i++ {
RecordSchedulerAttempt(nil)
}
for i := 0; i < scenario.failureCount; i++ {
RecordSchedulerAttempt(errors.New("simulated scheduling failure"))
}
t.Run("success with endpoint metadata", func(t *testing.T) {
Reset()
result := &schedulingframework.SchedulingResult{
PrimaryProfileName: "primary",
ProfileResults: map[string]*schedulingframework.ProfileRunResult{
"primary": {
TargetEndpoints: []schedulingframework.Endpoint{
schedulingframework.NewEndpoint(
&fwkdl.EndpointMetadata{
NamespacedName: k8stypes.NamespacedName{Name: "pod-1", Namespace: "ns-1"},
PodName: "pod-1",
Port: "8080",
},
nil, nil,
),
},
},
},
}
RecordSchedulerAttempt(nil, "modelA", result)
RecordSchedulerAttempt(nil, "modelA", result)
compareMetrics(t, "testdata/scheduler_attempts_with_result_metrics")
})

wantMetrics, err := os.Open("testdata/scheduler_attempts_total_metrics")
defer func() {
if err = wantMetrics.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(
metrics.Registry,
wantMetrics,
"inference_extension_scheduler_attempts_total",
); err != nil {
t.Errorf("metric comparison failed: %v", err)
}
})
}
t.Run("success with multiple endpoints uses first", func(t *testing.T) {
Reset()
result := &schedulingframework.SchedulingResult{
PrimaryProfileName: "primary",
ProfileResults: map[string]*schedulingframework.ProfileRunResult{
"primary": {
TargetEndpoints: []schedulingframework.Endpoint{
schedulingframework.NewEndpoint(
&fwkdl.EndpointMetadata{
NamespacedName: k8stypes.NamespacedName{Name: "pod-1", Namespace: "ns-1"},
PodName: "pod-1",
Port: "8080",
},
nil, nil,
),
schedulingframework.NewEndpoint(
&fwkdl.EndpointMetadata{
NamespacedName: k8stypes.NamespacedName{Name: "pod-2", Namespace: "ns-2"},
PodName: "pod-2",
Port: "9090",
},
nil, nil,
),
},
},
},
}
RecordSchedulerAttempt(nil, "modelA", result)
RecordSchedulerAttempt(nil, "modelB", result)
compareMetrics(t, "testdata/scheduler_attempts_multiple_endpoints_metrics")
})

t.Run("success with different models and endpoints", func(t *testing.T) {
Reset()
resultA := &schedulingframework.SchedulingResult{
PrimaryProfileName: "primary",
ProfileResults: map[string]*schedulingframework.ProfileRunResult{
"primary": {
TargetEndpoints: []schedulingframework.Endpoint{
schedulingframework.NewEndpoint(
&fwkdl.EndpointMetadata{
NamespacedName: k8stypes.NamespacedName{Name: "pod-1", Namespace: "ns-1"},
PodName: "pod-1",
Port: "8080",
},
nil, nil,
),
},
},
},
}
resultB := &schedulingframework.SchedulingResult{
PrimaryProfileName: "primary",
ProfileResults: map[string]*schedulingframework.ProfileRunResult{
"primary": {
TargetEndpoints: []schedulingframework.Endpoint{
schedulingframework.NewEndpoint(
&fwkdl.EndpointMetadata{
NamespacedName: k8stypes.NamespacedName{Name: "pod-2", Namespace: "ns-2"},
PodName: "pod-2",
Port: "9090",
},
nil, nil,
),
},
},
},
}
RecordSchedulerAttempt(nil, "modelA", resultA)
RecordSchedulerAttempt(nil, "modelA", resultA)
RecordSchedulerAttempt(nil, "modelB", resultB)
compareMetrics(t, "testdata/scheduler_attempts_different_models_metrics")
})

t.Run("mixed success and failure attempts", func(t *testing.T) {
Reset()
for i := 0; i < 10; i++ {
RecordSchedulerAttempt(nil, "modelA", nil)
}
for i := 0; i < 5; i++ {
RecordSchedulerAttempt(errors.New("simulated scheduling failure"), "modelA", nil)
}
compareMetrics(t, "testdata/scheduler_attempts_total_metrics")
})
}

func TestPrefixCacheMetrics(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# HELP inference_extension_scheduler_attempts_total [ALPHA] Total number of scheduling attempts.
# TYPE inference_extension_scheduler_attempts_total counter
inference_extension_scheduler_attempts_total{namespace="ns-1",pod_name="pod-1",port="8080",status="success",target_model_name="modelA"} 2
inference_extension_scheduler_attempts_total{namespace="ns-2",pod_name="pod-2",port="9090",status="success",target_model_name="modelB"} 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# HELP inference_extension_scheduler_attempts_total [ALPHA] Total number of scheduling attempts.
# TYPE inference_extension_scheduler_attempts_total counter
inference_extension_scheduler_attempts_total{namespace="ns-1",pod_name="pod-1",port="8080",status="success",target_model_name="modelA"} 1
inference_extension_scheduler_attempts_total{namespace="ns-1",pod_name="pod-1",port="8080",status="success",target_model_name="modelB"} 1
4 changes: 2 additions & 2 deletions pkg/epp/metrics/testdata/scheduler_attempts_total_metrics
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# HELP inference_extension_scheduler_attempts_total [ALPHA] Total number of scheduling attempts.
# TYPE inference_extension_scheduler_attempts_total counter
inference_extension_scheduler_attempts_total{status="failure"} 5
inference_extension_scheduler_attempts_total{status="success"} 10
inference_extension_scheduler_attempts_total{namespace="",pod_name="",port="",status="failure",target_model_name=""} 5
inference_extension_scheduler_attempts_total{namespace="",pod_name="",port="",status="success",target_model_name="modelA"} 10
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# HELP inference_extension_scheduler_attempts_total [ALPHA] Total number of scheduling attempts.
# TYPE inference_extension_scheduler_attempts_total counter
inference_extension_scheduler_attempts_total{namespace="ns-1",pod_name="pod-1",port="8080",status="success",target_model_name="modelA"} 2
2 changes: 1 addition & 1 deletion pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Scheduler) Schedule(ctx context.Context, request *framework.LLMRequest,
scheduleStart := time.Now()
defer func() {
metrics.RecordSchedulerE2ELatency(time.Since(scheduleStart))
metrics.RecordSchedulerAttempt(err)
metrics.RecordSchedulerAttempt(err, request.TargetModel, result)
}()

profileRunResults := map[string]*framework.ProfileRunResult{}
Expand Down
2 changes: 2 additions & 0 deletions site-src/guides/metrics-and-observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ This guide describes the current state of exposed metrics and how to scrape them
| inference_pool_per_pod_queue_size | Gauge | The total number of queue for each model server pod under the inference pool | `model_server_pod`=&lt;model-server-pod-name&gt; <br> `name`=&lt;inference-pool-name&gt; | ALPHA |
| inference_pool_ready_pods | Gauge | The number of ready pods for an inference server pool. | `name`=&lt;inference-pool-name&gt; | ALPHA |
| inference_extension_info | Gauge | The general information of the current build. | `commit`=&lt;hash-of-the-build&gt; <br> `build_ref`=&lt;ref-to-the-build&gt; | ALPHA |
| inference_extension_scheduler_attempts_total | Counter | Total number of scheduling attempts. | `status`=&lt;success\|failure&gt; <br> `target_model_name`=&lt;target-model-name&gt; <br> `pod_name`=&lt;pod-name&gt; <br> `namespace`=&lt;namespace&gt; <br> `port`=&lt;port&gt; | ALPHA |
Comment thread
lionelvillard marked this conversation as resolved.


### Dynamic LoRA Adapter Sidecar

Expand Down
Loading