From 821f4cb82d399e3a4606dff12fc2a6c641dd6d58 Mon Sep 17 00:00:00 2001 From: Nir Rozenbaum Date: Wed, 2 Jul 2025 15:50:09 +0300 Subject: [PATCH 1/2] move the converstion from pod metrics to scheduler pod representation one level up Signed-off-by: Nir Rozenbaum --- conformance/testing-epp/scheduler_test.go | 10 +++--- pkg/epp/requestcontrol/director.go | 14 +++++++-- .../framework/scheduler_profile_test.go | 31 +++++++++---------- pkg/epp/scheduling/scheduler_test.go | 23 +++++++------- pkg/epp/scheduling/types/types.go | 8 ----- 5 files changed, 42 insertions(+), 44 deletions(-) diff --git a/conformance/testing-epp/scheduler_test.go b/conformance/testing-epp/scheduler_test.go index 4901e03808..b1450e2820 100644 --- a/conformance/testing-epp/scheduler_test.go +++ b/conformance/testing-epp/scheduler_test.go @@ -31,7 +31,7 @@ import ( func TestSchedule(t *testing.T) { tests := []struct { name string - input []backendmetrics.PodMetrics + input []types.Pod req *types.LLMRequest wantRes *types.SchedulingResult err bool @@ -47,7 +47,7 @@ func TestSchedule(t *testing.T) { }, { name: "req header not set", - input: []backendmetrics.PodMetrics{ + input: []types.Pod{ &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "random-endpoint"}}, }, req: &types.LLMRequest{ @@ -59,7 +59,7 @@ func TestSchedule(t *testing.T) { }, { name: "no pods address from the candidate pods matches req header address", - input: []backendmetrics.PodMetrics{ + input: []types.Pod{ &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, }, req: &types.LLMRequest{ @@ -71,7 +71,7 @@ func TestSchedule(t *testing.T) { }, { name: "one pod address from the candidate pods matches req header address", - input: []backendmetrics.PodMetrics{ + input: []types.Pod{ &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "matched-endpoint"}}, }, @@ -100,7 +100,7 @@ func TestSchedule(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { scheduler := NewReqHeaderBasedScheduler() - got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input)) + got, err := scheduler.Schedule(context.Background(), test.req, test.input) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index d4a0c0b63f..f96be7f99c 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -195,13 +195,13 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any) if !found { - return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll()) + return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) } // Check if endpoint key is present in the subset map and ensure there is at least one value endpointSubsetList, found := subsetMap[subsetHintKey].([]any) if !found { - return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll()) + return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) } else if len(endpointSubsetList) == 0 { loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") return []schedulingtypes.Pod{} @@ -227,7 +227,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList)) - return schedulingtypes.ToSchedulerPodMetrics(podFitleredList) + return d.toSchedulerPodMetrics(podFitleredList) } // prepareRequest populates the RequestContext and calls the registered PreRequest plugins @@ -257,6 +257,14 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC return reqCtx, nil } +func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []schedulingtypes.Pod { + pm := make([]schedulingtypes.Pod, 0, len(pods)) + for _, pod := range pods { + pm = append(pm, &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()}) + } + return pm +} + func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { response := &Response{ RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], diff --git a/pkg/epp/scheduling/framework/scheduler_profile_test.go b/pkg/epp/scheduling/framework/scheduler_profile_test.go index a2adccaf61..1b1119fd80 100644 --- a/pkg/epp/scheduling/framework/scheduler_profile_test.go +++ b/pkg/epp/scheduling/framework/scheduler_profile_test.go @@ -25,7 +25,6 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" - backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) @@ -45,7 +44,7 @@ func TestSchedulePlugins(t *testing.T) { tests := []struct { name string profile *SchedulerProfile - input []backendmetrics.PodMetrics + input []types.Pod wantTargetPod k8stypes.NamespacedName targetPodScore float64 // Number of expected pods to score (after filter) @@ -59,10 +58,10 @@ func TestSchedulePlugins(t *testing.T) { WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)). WithPicker(pickerPlugin). WithPostCyclePlugins(tp1, tp2), - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, }, wantTargetPod: k8stypes.NamespacedName{Name: "pod1"}, targetPodScore: 1.1, @@ -76,10 +75,10 @@ func TestSchedulePlugins(t *testing.T) { WithScorers(NewWeightedScorer(tp1, 60), NewWeightedScorer(tp2, 40)). WithPicker(pickerPlugin). WithPostCyclePlugins(tp1, tp2), - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, }, wantTargetPod: k8stypes.NamespacedName{Name: "pod1"}, targetPodScore: 50, @@ -93,10 +92,10 @@ func TestSchedulePlugins(t *testing.T) { WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)). WithPicker(pickerPlugin). WithPostCyclePlugins(tp1, tp2), - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, }, numPodsToScore: 0, err: true, // no available pods to server after filter all @@ -123,7 +122,7 @@ func TestSchedulePlugins(t *testing.T) { RequestId: uuid.NewString(), } // Run profile cycle - got, err := test.profile.Run(context.Background(), request, types.NewCycleState(), types.ToSchedulerPodMetrics(test.input)) + got, err := test.profile.Run(context.Background(), request, types.NewCycleState(), test.input) // Validate error state if test.err != (err != nil) { @@ -136,7 +135,7 @@ func TestSchedulePlugins(t *testing.T) { // Validate output wantPod := &types.PodMetrics{ - Pod: &backend.Pod{NamespacedName: test.wantTargetPod, Labels: make(map[string]string)}, + Pod: &backend.Pod{NamespacedName: test.wantTargetPod}, } wantRes := &types.ProfileRunResult{ TargetPod: wantPod, diff --git a/pkg/epp/scheduling/scheduler_test.go b/pkg/epp/scheduling/scheduler_test.go index 720a6b4f55..830cc4e310 100644 --- a/pkg/epp/scheduling/scheduler_test.go +++ b/pkg/epp/scheduling/scheduler_test.go @@ -33,7 +33,7 @@ func TestSchedule(t *testing.T) { tests := []struct { name string req *types.LLMRequest - input []backendmetrics.PodMetrics + input []types.Pod wantRes *types.SchedulingResult err bool }{ @@ -43,7 +43,7 @@ func TestSchedule(t *testing.T) { TargetModel: "any-model", RequestId: uuid.NewString(), }, - input: []backendmetrics.PodMetrics{}, + input: []types.Pod{}, wantRes: nil, err: true, }, @@ -55,10 +55,10 @@ func TestSchedule(t *testing.T) { }, // pod2 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{ + input: []types.Pod{ + &types.PodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, - Metrics: &backendmetrics.MetricsState{ + MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -68,9 +68,9 @@ func TestSchedule(t *testing.T) { }, }, }, - &backendmetrics.FakePodMetrics{ + &types.PodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, - Metrics: &backendmetrics.MetricsState{ + MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.1, MaxActiveModels: 2, @@ -80,9 +80,9 @@ func TestSchedule(t *testing.T) { }, }, }, - &backendmetrics.FakePodMetrics{ + &types.PodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}, - Metrics: &backendmetrics.MetricsState{ + MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -97,7 +97,7 @@ func TestSchedule(t *testing.T) { "default": { TargetPod: &types.ScoredPod{ Pod: &types.PodMetrics{ - Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}, Labels: make(map[string]string)}, + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.1, @@ -106,7 +106,6 @@ func TestSchedule(t *testing.T) { "foo": 1, "critical": 1, }, - WaitingModels: map[string]int{}, }, }, }, @@ -120,7 +119,7 @@ func TestSchedule(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { scheduler := NewScheduler() - got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input)) + got, err := scheduler.Schedule(context.Background(), test.req, test.input) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } diff --git a/pkg/epp/scheduling/types/types.go b/pkg/epp/scheduling/types/types.go index 451384751f..10ac2c824a 100644 --- a/pkg/epp/scheduling/types/types.go +++ b/pkg/epp/scheduling/types/types.go @@ -70,14 +70,6 @@ type PodMetrics struct { *backendmetrics.MetricsState } -func ToSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []Pod { - pm := make([]Pod, 0, len(pods)) - for _, pod := range pods { - pm = append(pm, &PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()}) - } - return pm -} - // ProfileRunResult captures the profile run result. type ProfileRunResult struct { TargetPod Pod From 1d7a5a6caffbcf3a91f6d6f038b3ddfad26ca947 Mon Sep 17 00:00:00 2001 From: Nir Rozenbaum Date: Thu, 3 Jul 2025 11:13:49 +0300 Subject: [PATCH 2/2] minor change in helper func Signed-off-by: Nir Rozenbaum --- pkg/epp/requestcontrol/director.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index f96be7f99c..5676ea871b 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -258,10 +258,11 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC } func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []schedulingtypes.Pod { - pm := make([]schedulingtypes.Pod, 0, len(pods)) - for _, pod := range pods { - pm = append(pm, &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()}) + pm := make([]schedulingtypes.Pod, len(pods)) + for i, pod := range pods { + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()} } + return pm }