diff --git a/conformance/testing-epp/scheduler_test.go b/conformance/testing-epp/scheduler_test.go index 4901e0380..b1450e282 100644 --- a/conformance/testing-epp/scheduler_test.go +++ b/conformance/testing-epp/scheduler_test.go @@ -31,7 +31,7 @@ import ( func TestSchedule(t *testing.T) { tests := []struct { name string - input []backendmetrics.PodMetrics + input []types.Pod req *types.LLMRequest wantRes *types.SchedulingResult err bool @@ -47,7 +47,7 @@ func TestSchedule(t *testing.T) { }, { name: "req header not set", - input: []backendmetrics.PodMetrics{ + input: []types.Pod{ &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "random-endpoint"}}, }, req: &types.LLMRequest{ @@ -59,7 +59,7 @@ func TestSchedule(t *testing.T) { }, { name: "no pods address from the candidate pods matches req header address", - input: []backendmetrics.PodMetrics{ + input: []types.Pod{ &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, }, req: &types.LLMRequest{ @@ -71,7 +71,7 @@ func TestSchedule(t *testing.T) { }, { name: "one pod address from the candidate pods matches req header address", - input: []backendmetrics.PodMetrics{ + input: []types.Pod{ &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "matched-endpoint"}}, }, @@ -100,7 +100,7 @@ func TestSchedule(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { scheduler := NewReqHeaderBasedScheduler() - got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input)) + got, err := scheduler.Schedule(context.Background(), test.req, test.input) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index d4a0c0b63..5676ea871 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -195,13 +195,13 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any) if !found { - return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll()) + return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) } // Check if endpoint key is present in the subset map and ensure there is at least one value endpointSubsetList, found := subsetMap[subsetHintKey].([]any) if !found { - return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll()) + return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) } else if len(endpointSubsetList) == 0 { loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") return []schedulingtypes.Pod{} @@ -227,7 +227,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList)) - return schedulingtypes.ToSchedulerPodMetrics(podFitleredList) + return d.toSchedulerPodMetrics(podFitleredList) } // prepareRequest populates the RequestContext and calls the registered PreRequest plugins @@ -257,6 +257,15 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC return reqCtx, nil } +func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []schedulingtypes.Pod { + pm := make([]schedulingtypes.Pod, len(pods)) + for i, pod := range pods { + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()} + } + + return pm +} + func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { response := &Response{ RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], diff --git a/pkg/epp/scheduling/framework/scheduler_profile_test.go b/pkg/epp/scheduling/framework/scheduler_profile_test.go index a2adccaf6..1b1119fd8 100644 --- a/pkg/epp/scheduling/framework/scheduler_profile_test.go +++ b/pkg/epp/scheduling/framework/scheduler_profile_test.go @@ -25,7 +25,6 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" - backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) @@ -45,7 +44,7 @@ func TestSchedulePlugins(t *testing.T) { tests := []struct { name string profile *SchedulerProfile - input []backendmetrics.PodMetrics + input []types.Pod wantTargetPod k8stypes.NamespacedName targetPodScore float64 // Number of expected pods to score (after filter) @@ -59,10 +58,10 @@ func TestSchedulePlugins(t *testing.T) { WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)). WithPicker(pickerPlugin). WithPostCyclePlugins(tp1, tp2), - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, }, wantTargetPod: k8stypes.NamespacedName{Name: "pod1"}, targetPodScore: 1.1, @@ -76,10 +75,10 @@ func TestSchedulePlugins(t *testing.T) { WithScorers(NewWeightedScorer(tp1, 60), NewWeightedScorer(tp2, 40)). WithPicker(pickerPlugin). WithPostCyclePlugins(tp1, tp2), - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, }, wantTargetPod: k8stypes.NamespacedName{Name: "pod1"}, targetPodScore: 50, @@ -93,10 +92,10 @@ func TestSchedulePlugins(t *testing.T) { WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)). WithPicker(pickerPlugin). WithPostCyclePlugins(tp1, tp2), - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, - &backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}, + &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}}, }, numPodsToScore: 0, err: true, // no available pods to server after filter all @@ -123,7 +122,7 @@ func TestSchedulePlugins(t *testing.T) { RequestId: uuid.NewString(), } // Run profile cycle - got, err := test.profile.Run(context.Background(), request, types.NewCycleState(), types.ToSchedulerPodMetrics(test.input)) + got, err := test.profile.Run(context.Background(), request, types.NewCycleState(), test.input) // Validate error state if test.err != (err != nil) { @@ -136,7 +135,7 @@ func TestSchedulePlugins(t *testing.T) { // Validate output wantPod := &types.PodMetrics{ - Pod: &backend.Pod{NamespacedName: test.wantTargetPod, Labels: make(map[string]string)}, + Pod: &backend.Pod{NamespacedName: test.wantTargetPod}, } wantRes := &types.ProfileRunResult{ TargetPod: wantPod, diff --git a/pkg/epp/scheduling/scheduler_test.go b/pkg/epp/scheduling/scheduler_test.go index 720a6b4f5..830cc4e31 100644 --- a/pkg/epp/scheduling/scheduler_test.go +++ b/pkg/epp/scheduling/scheduler_test.go @@ -33,7 +33,7 @@ func TestSchedule(t *testing.T) { tests := []struct { name string req *types.LLMRequest - input []backendmetrics.PodMetrics + input []types.Pod wantRes *types.SchedulingResult err bool }{ @@ -43,7 +43,7 @@ func TestSchedule(t *testing.T) { TargetModel: "any-model", RequestId: uuid.NewString(), }, - input: []backendmetrics.PodMetrics{}, + input: []types.Pod{}, wantRes: nil, err: true, }, @@ -55,10 +55,10 @@ func TestSchedule(t *testing.T) { }, // pod2 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. - input: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{ + input: []types.Pod{ + &types.PodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, - Metrics: &backendmetrics.MetricsState{ + MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -68,9 +68,9 @@ func TestSchedule(t *testing.T) { }, }, }, - &backendmetrics.FakePodMetrics{ + &types.PodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, - Metrics: &backendmetrics.MetricsState{ + MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.1, MaxActiveModels: 2, @@ -80,9 +80,9 @@ func TestSchedule(t *testing.T) { }, }, }, - &backendmetrics.FakePodMetrics{ + &types.PodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}, - Metrics: &backendmetrics.MetricsState{ + MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 10, KVCacheUsagePercent: 0.2, MaxActiveModels: 2, @@ -97,7 +97,7 @@ func TestSchedule(t *testing.T) { "default": { TargetPod: &types.ScoredPod{ Pod: &types.PodMetrics{ - Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}, Labels: make(map[string]string)}, + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: &backendmetrics.MetricsState{ WaitingQueueSize: 3, KVCacheUsagePercent: 0.1, @@ -106,7 +106,6 @@ func TestSchedule(t *testing.T) { "foo": 1, "critical": 1, }, - WaitingModels: map[string]int{}, }, }, }, @@ -120,7 +119,7 @@ func TestSchedule(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { scheduler := NewScheduler() - got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input)) + got, err := scheduler.Schedule(context.Background(), test.req, test.input) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } diff --git a/pkg/epp/scheduling/types/types.go b/pkg/epp/scheduling/types/types.go index 451384751..10ac2c824 100644 --- a/pkg/epp/scheduling/types/types.go +++ b/pkg/epp/scheduling/types/types.go @@ -70,14 +70,6 @@ type PodMetrics struct { *backendmetrics.MetricsState } -func ToSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []Pod { - pm := make([]Pod, 0, len(pods)) - for _, pod := range pods { - pm = append(pm, &PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()}) - } - return pm -} - // ProfileRunResult captures the profile run result. type ProfileRunResult struct { TargetPod Pod