Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (r *Runner) Run(ctx context.Context) error {
// --- Admission Control Initialization ---
var admissionController requestcontrol.AdmissionController
var locator contracts.PodLocator
locator = requestcontrol.NewDatastorePodLocator(ds)
locator = requestcontrol.NewDatastorePodLocator(ds, requestcontrol.WithDisableEndpointSubsetFilter(opts.DisableEndpointSubsetFilter))
if r.featureGates[flowcontrol.FeatureGate] {
locator = requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
setupLog.Info("Initializing experimental Flow Control layer")
Expand Down
31 changes: 30 additions & 1 deletion pkg/epp/requestcontrol/locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,41 @@ const (

// --- DatastorePodLocator (The Delegate) ---

// PodLocatorConfig holds configuration for the DatastorePodLocator.
type PodLocatorConfig struct {
DisableEndpointSubsetFilter bool
}

// LocatorOption is a function that configures the PodLocatorConfig.
type LocatorOption func(*PodLocatorConfig)

// WithDisableEndpointSubsetFilter sets the DisableEndpointSubsetFilter flag.
func WithDisableEndpointSubsetFilter(disable bool) LocatorOption {
return func(c *PodLocatorConfig) {
c.DisableEndpointSubsetFilter = disable
}
}

// DatastorePodLocator implements contracts.PodLocator by querying the EPP Datastore.
// It centralizes the logic for resolving candidate pods based on request metadata (specifically Envoy subset filters).
type DatastorePodLocator struct {
datastore Datastore
config PodLocatorConfig
}

var _ contracts.PodLocator = &DatastorePodLocator{}

// NewDatastorePodLocator creates a new DatastorePodLocator.
func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator {
func NewDatastorePodLocator(ds Datastore, opts ...LocatorOption) *DatastorePodLocator {
cfg := PodLocatorConfig{
DisableEndpointSubsetFilter: false,
}
for _, opt := range opts {
opt(&cfg)
}
return &DatastorePodLocator{
datastore: ds,
config: cfg,
}
}

Expand All @@ -75,6 +98,12 @@ func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator {
func (d *DatastorePodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)

// If the user explicitly disabled subset filtering, return the default pool (all pods).
if d.config.DisableEndpointSubsetFilter {
loggerTrace.Info("endpoint subset filtering is explicitly disabled, returning all pods")
return d.datastore.PodList(datastore.AllPodsPredicate)
}

// Check if the subset filter namespace exists in metadata.
// If not, we assume the request targets the default pool (all pods).
if requestMetadata == nil {
Expand Down
31 changes: 30 additions & 1 deletion pkg/epp/requestcontrol/locator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func TestDatastorePodLocator_Locate(t *testing.T) {

allPods := []backendmetrics.PodMetrics{podA, podB, podC}
mockDS := &mockDatastore{pods: allPods}
locator := NewDatastorePodLocator(mockDS)

tests := []struct {
name string
opts []LocatorOption
metadata map[string]any
expectedPodIPs []string
}{
Expand Down Expand Up @@ -105,11 +105,40 @@ func TestDatastorePodLocator_Locate(t *testing.T) {
}),
expectedPodIPs: []string{"10.0.0.1"},
},
{
name: "Subset filter with match (filter disabled)",
opts: []LocatorOption{
WithDisableEndpointSubsetFilter(true),
},
metadata: makeMetadataWithSubset([]any{
"10.0.0.1:8080",
}),
expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
},
{
name: "Subset filter is present but list is empty (filter disabled)",
opts: []LocatorOption{
WithDisableEndpointSubsetFilter(true),
},
metadata: makeMetadataWithSubset([]any{}),
expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
},
{
name: "Subset filter with no matches (filter disabled)",
opts: []LocatorOption{
WithDisableEndpointSubsetFilter(true),
},
metadata: makeMetadataWithSubset([]any{
"192.168.1.1:8080",
}),
expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
locator := NewDatastorePodLocator(mockDS, tc.opts...)
result := locator.Locate(context.Background(), tc.metadata)

var gotIPs []string
Expand Down
8 changes: 6 additions & 2 deletions pkg/epp/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type Options struct {
//
// Endpoints (in lieu of using an InferencePool for service discovery).
//
EndpointSelector string // Selector to filter model server pods on, only 'key=value' pairs are supported. (TODO: k8s.Selector, pflag.StringSlice?)
EndpointTargetPorts []int // Target ports of model server pods.
EndpointSelector string // Selector to filter model server pods on, only 'key=value' pairs are supported. (TODO: k8s.Selector, pflag.StringSlice?)
EndpointTargetPorts []int // Target ports of model server pods.
DisableEndpointSubsetFilter bool // Disables respecting x-gateway-destination-endpoint-subset in EPP.
//
// MSP metrics scraping.
//
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewOptions() *Options {
GRPCPort: DefaultGrpcPort,
PoolGroup: "inference.networking.k8s.io",
EndpointTargetPorts: []int{},
DisableEndpointSubsetFilter: false,
ModelServerMetricsScheme: "http",
ModelServerMetricsPath: "/metrics",
ModelServerMetricsHTTPSInsecure: true,
Expand Down Expand Up @@ -141,6 +143,8 @@ func (opts *Options) AddFlags(fs *pflag.FlagSet) {
"Format: a comma-separated list of key=value pairs without whitespace (e.g., 'app=vllm-llama3-8b-instruct,env=prod').")
fs.IntSliceVar(&opts.EndpointTargetPorts, "endpoint-target-ports", opts.EndpointTargetPorts, "Target ports of model server pods. "+
"Format: a comma-separated list of numbers without whitespace (e.g., '3000,3001,3002').")
fs.BoolVar(&opts.DisableEndpointSubsetFilter, "disable-endpoint-subset-filter", opts.DisableEndpointSubsetFilter,
"Disables respecting the x-gateway-destination-endpoint-subset metadata for dispatching requests in EPP.")
fs.StringVar(&opts.ModelServerMetricsScheme, "model-server-metrics-scheme", opts.ModelServerMetricsScheme,
"Protocol scheme used in scraping metrics from endpoints.")
fs.StringVar(&opts.ModelServerMetricsPath, "model-server-metrics-path", opts.ModelServerMetricsPath,
Expand Down
Loading