Skip to content

Commit 3dbdb9b

Browse files
committed
refactor: rename PodLocator to EndpointCandidates
Rename PodLocator interface and related types to EndpointCandidates to better reflect that the component resolves endpoint candidates rather than pods specifically. Update all implementations, mocks, tests, and documentation accordingly.
1 parent 2a1fca3 commit 3dbdb9b

18 files changed

Lines changed: 169 additions & 154 deletions

File tree

cmd/epp/runner/runner.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -329,10 +329,10 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
329329

330330
// --- Admission Control Initialization ---
331331
var admissionController requestcontrol.AdmissionController
332-
var locator contracts.PodLocator
333-
locator = requestcontrol.NewDatastorePodLocator(ds, requestcontrol.WithDisableEndpointSubsetFilter(opts.DisableEndpointSubsetFilter))
332+
var endpointCandidates contracts.EndpointCandidates
333+
endpointCandidates = requestcontrol.NewDatastoreEndpointCandidates(ds, requestcontrol.WithDisableEndpointSubsetFilter(opts.DisableEndpointSubsetFilter))
334334
if r.featureGates[flowcontrol.FeatureGate] {
335-
locator = requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
335+
endpointCandidates = requestcontrol.NewCachedEndpointCandidates(ctx, endpointCandidates, time.Millisecond*50)
336336
setupLog.Info("Initializing experimental Flow Control layer")
337337
registry, err := fcregistry.NewFlowRegistry(eppConfig.FlowControlConfig.Registry, setupLog)
338338
if err != nil {
@@ -343,7 +343,7 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
343343
opts.PoolName,
344344
eppConfig.FlowControlConfig.Controller,
345345
registry, eppConfig.SaturationDetector,
346-
locator, eppConfig.FlowControlConfig.UsageLimitPolicy,
346+
endpointCandidates, eppConfig.FlowControlConfig.UsageLimitPolicy,
347347
)
348348
if err != nil {
349349
return nil, nil, fmt.Errorf("failed to initialize Flow Controller: %w", err)
@@ -352,12 +352,11 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
352352
admissionController = requestcontrol.NewFlowControlAdmissionController(fc, opts.PoolName)
353353
} else {
354354
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
355-
admissionController = requestcontrol.NewLegacyAdmissionController(eppConfig.SaturationDetector, locator)
355+
admissionController = requestcontrol.NewLegacyAdmissionController(eppConfig.SaturationDetector, endpointCandidates)
356356
}
357357

358-
director := requestcontrol.NewDirectorWithConfig(ds, scheduler, admissionController, r.parser, locator, r.requestControlConfig)
358+
director := requestcontrol.NewDirectorWithConfig(ds, scheduler, admissionController, r.parser, endpointCandidates, r.requestControlConfig)
359359

360-
// --- Setup ExtProc Server Runner ---
361360
serverRunner := &runserver.ExtProcServerRunner{
362361
GrpcPort: opts.GRPCPort,
363362
GKNN: *gknn,

pkg/epp/flowcontrol/benchmark/benchmark.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func setupBenchmarkHarness(
276276
}
277277
}
278278

279-
fc, err := controller.NewFlowController(ctx, "benchmark", cfg, reg, detector, &mocks.MockPodLocator{}, usagelimits.DefaultPolicy())
279+
fc, err := controller.NewFlowController(ctx, "benchmark", cfg, reg, detector, &mocks.MockEndpointCandidates{}, usagelimits.DefaultPolicy())
280280
if err != nil {
281281
b.Fatalf("Failed to init FlowController: %v", err)
282282
}

pkg/epp/flowcontrol/contracts/dependencies.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import (
2222
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
2323
)
2424

25-
// PodLocator defines the contract for a component that resolves the set of candidate pods for a request based on its
26-
// metadata (e.g., subsetting).
25+
// EndpointCandidates defines the contract for a component that resolves the set of candidate endpoints for a request
26+
// based on its metadata (e.g., subsetting).
2727
//
28-
// This interface allows the Flow Controller to fetch a fresh list of pods dynamically during the dispatch cycle,
29-
// enabling support for "Scale-from-Zero" scenarios where pods may not exist when the request is first enqueued.
30-
type PodLocator interface {
31-
// Locate returns a list of pod metrics that match the criteria defined in the request metadata.
28+
// This interface allows the Flow Controller to fetch a fresh list of candidates dynamically during the dispatch cycle,
29+
// enabling support for "Scale-from-Zero" scenarios where endpoints may not exist when the request is first enqueued.
30+
type EndpointCandidates interface {
31+
// Locate returns a list of endpoint candidate metrics that match the criteria defined in the request metadata.
3232
Locate(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint
3333
}

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,25 +108,38 @@ var _ contracts.RegistryShard = &MockRegistryShard{}
108108

109109
// --- Dependency Mocks ---
110110

111-
// MockPodLocator provides a mock implementation of the contracts.PodLocator interface.
112-
// It allows tests to control the exact set of pods returned for a given request.
113-
type MockPodLocator struct {
111+
// MockSaturationDetector is a simple "stub-style" mock for testing.
112+
type MockSaturationDetector struct {
113+
SaturationFunc func(ctx context.Context, candidatePods []fwkdl.Endpoint) float64
114+
}
115+
116+
func (m *MockSaturationDetector) Saturation(ctx context.Context, candidatePods []fwkdl.Endpoint) float64 {
117+
if m.SaturationFunc != nil {
118+
return m.SaturationFunc(ctx, candidatePods)
119+
}
120+
return 0.0
121+
}
122+
123+
// MockEndpointCandidates provides a mock implementation of the contracts.EndpointCandidates interface.
124+
// It allows tests to control the exact set of endpoint candidates returned for a given request.
125+
type MockEndpointCandidates struct {
114126
// LocateFunc allows injecting custom logic.
115127
LocateFunc func(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint
116-
// Pods is a static return value used if LocateFunc is nil.
117-
Pods []fwkdl.Endpoint
128+
// Candidates is a static return value used if LocateFunc is nil.
129+
Candidates []fwkdl.Endpoint
118130
}
119131

120-
func (m *MockPodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint {
132+
func (m *MockEndpointCandidates) Locate(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint {
121133
if m.LocateFunc != nil {
122134
return m.LocateFunc(ctx, requestMetadata)
123135
}
124136
// Return copy to be safe
125-
if m.Pods == nil {
137+
if m.Candidates == nil {
126138
return nil
127139
}
128-
result := make([]fwkdl.Endpoint, len(m.Pods))
129-
copy(result, m.Pods)
140+
141+
result := make([]fwkdl.Endpoint, len(m.Candidates))
142+
copy(result, m.Candidates)
130143
return result
131144
}
132145

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type shardProcessorFactory func(
6363
ctx context.Context,
6464
shard contracts.RegistryShard,
6565
saturationDetector flowcontrol.SaturationDetector,
66-
podLocator contracts.PodLocator,
66+
endpointCandidates contracts.EndpointCandidates,
6767
usageLimitPolicy flowcontrol.UsageLimitPolicy,
6868
clock clock.WithTicker,
6969
cleanupSweepInterval time.Duration,
@@ -100,7 +100,7 @@ type FlowController struct {
100100
config *Config
101101
registry registryClient
102102
saturationDetector flowcontrol.SaturationDetector
103-
podLocator contracts.PodLocator
103+
endpointCandidates contracts.EndpointCandidates
104104
usageLimitPolicy flowcontrol.UsageLimitPolicy
105105
clock clock.WithTicker
106106
logger logr.Logger
@@ -134,15 +134,15 @@ func NewFlowController(
134134
config *Config,
135135
registry contracts.FlowRegistry,
136136
sd flowcontrol.SaturationDetector,
137-
podLocator contracts.PodLocator,
137+
endpointCandidates contracts.EndpointCandidates,
138138
usageLimitPolicy flowcontrol.UsageLimitPolicy,
139139
opts ...flowControllerOption,
140140
) (*FlowController, error) {
141141
fc := &FlowController{
142142
config: config,
143143
registry: registry,
144144
saturationDetector: sd,
145-
podLocator: podLocator,
145+
endpointCandidates: endpointCandidates,
146146
usageLimitPolicy: usageLimitPolicy,
147147
clock: clock.RealClock{},
148148
logger: log.FromContext(ctx).WithName("flow-controller"),
@@ -153,7 +153,7 @@ func NewFlowController(
153153
ctx context.Context,
154154
shard contracts.RegistryShard,
155155
saturationDetector flowcontrol.SaturationDetector,
156-
podLocator contracts.PodLocator,
156+
endpointCandidates contracts.EndpointCandidates,
157157
usageLimitPolicy flowcontrol.UsageLimitPolicy,
158158
clock clock.WithTicker,
159159
cleanupSweepInterval time.Duration,
@@ -165,12 +165,13 @@ func NewFlowController(
165165
poolName,
166166
shard,
167167
saturationDetector,
168-
podLocator,
168+
endpointCandidates,
169169
usageLimitPolicy,
170170
clock,
171171
cleanupSweepInterval,
172172
enqueueChannelBufferSize,
173-
logger)
173+
logger,
174+
)
174175
}
175176

176177
for _, opt := range opts {
@@ -492,7 +493,7 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
492493
processorCtx,
493494
shard,
494495
fc.saturationDetector,
495-
fc.podLocator,
496+
fc.endpointCandidates,
496497
fc.usageLimitPolicy,
497498
fc.clock,
498499
fc.config.ExpiryCleanupInterval,

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func newUnitHarness(
126126
}
127127

128128
mockDetector := &mockSaturationDetector{}
129-
mockPodLocator := &mocks.MockPodLocator{}
129+
mockEndpointCandidates := &mocks.MockEndpointCandidates{}
130130

131131
mockProcessorFactory := &mockShardProcessorFactory{
132132
processors: make(map[string]*mockShardProcessor),
@@ -144,7 +144,7 @@ func newUnitHarness(
144144
withClock(harnessOpts.clock),
145145
withShardProcessorFactory(mockProcessorFactory.new),
146146
}
147-
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockPodLocator, usageLimitPolicy, fcOpts...)
147+
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockEndpointCandidates, usageLimitPolicy, fcOpts...)
148148
require.NoError(t, err, "failed to create FlowController for unit test harness")
149149

150150
h := &testHarness{
@@ -167,7 +167,7 @@ func newUnitHarness(
167167
func newIntegrationHarness(t *testing.T, ctx context.Context, cfg *Config, registry *mockRegistryClient) *testHarness {
168168
t.Helper()
169169
mockDetector := &mockSaturationDetector{}
170-
mockPodLocator := &mocks.MockPodLocator{}
170+
mockEndpointCandidates := &mocks.MockEndpointCandidates{}
171171
usageLimitPolicy := usagelimits.DefaultPolicy()
172172

173173
// Align FakeClock with system time. See explanation in newUnitHarness.
@@ -180,7 +180,7 @@ func newIntegrationHarness(t *testing.T, ctx context.Context, cfg *Config, regis
180180
withRegistryClient(registry),
181181
withClock(mockClock),
182182
}
183-
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockPodLocator, usageLimitPolicy, opts...)
183+
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockEndpointCandidates, opts...)
184184
require.NoError(t, err, "failed to create FlowController for integration test harness")
185185

186186
h := &testHarness{
@@ -290,7 +290,7 @@ func (f *mockShardProcessorFactory) new(
290290
_ context.Context, // The factory does not use the lifecycle context; it's passed to the processor's Run method later.
291291
shard contracts.RegistryShard,
292292
_ flowcontrol.SaturationDetector,
293-
_ contracts.PodLocator,
293+
_ contracts.EndpointCandidates,
294294
_ flowcontrol.UsageLimitPolicy,
295295
_ clock.WithTicker,
296296
_ time.Duration,
@@ -1155,7 +1155,7 @@ func TestFlowController_WorkerManagement(t *testing.T) {
11551155
ctx context.Context, // The context created by getOrStartWorker for the potential new processor.
11561156
shard contracts.RegistryShard,
11571157
_ flowcontrol.SaturationDetector,
1158-
_ contracts.PodLocator,
1158+
_ contracts.EndpointCandidates,
11591159
_ flowcontrol.UsageLimitPolicy,
11601160
_ clock.WithTicker,
11611161
_ time.Duration,

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type ShardProcessor struct {
6868
poolName string
6969
shard contracts.RegistryShard
7070
saturationDetector flowcontrol.SaturationDetector
71-
podLocator contracts.PodLocator
71+
endpointCandidates contracts.EndpointCandidates
7272
usageLimitPolicy flowcontrol.UsageLimitPolicy
7373
clock clock.WithTicker
7474
cleanupSweepInterval time.Duration
@@ -92,7 +92,7 @@ func NewShardProcessor(
9292
poolName string,
9393
shard contracts.RegistryShard,
9494
saturationDetector flowcontrol.SaturationDetector,
95-
podLocator contracts.PodLocator,
95+
endpointCandidates contracts.EndpointCandidates,
9696
usageLimitPolicy flowcontrol.UsageLimitPolicy,
9797
clock clock.WithTicker,
9898
cleanupSweepInterval time.Duration,
@@ -103,7 +103,7 @@ func NewShardProcessor(
103103
shard: shard,
104104
poolName: poolName,
105105
saturationDetector: saturationDetector,
106-
podLocator: podLocator,
106+
endpointCandidates: endpointCandidates,
107107
usageLimitPolicy: usageLimitPolicy,
108108
clock: clock,
109109
cleanupSweepInterval: cleanupSweepInterval,
@@ -317,7 +317,7 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
317317
metrics.RecordFlowControlDispatchCycleDuration(time.Since(dispatchCycleStart))
318318
}()
319319

320-
pool := sp.podLocator.Locate(ctx, nil)
320+
pool := sp.endpointCandidates.Locate(ctx, nil)
321321
saturation := sp.saturationDetector.Saturation(ctx, pool)
322322

323323
// Record pool saturation metric

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type testHarness struct {
8888
clock *testclock.FakeClock
8989
logger logr.Logger
9090
saturationDetector *mockSaturationDetector
91-
podLocator *mocks.MockPodLocator
91+
endpointCandidates *mocks.MockEndpointCandidates
9292

9393
// --- Centralized Mock State ---
9494
// The harness's mutex protects the single source of truth for all mock state.
@@ -109,7 +109,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
109109
clock: testclock.NewFakeClock(time.Now()),
110110
logger: logr.Discard(),
111111
saturationDetector: &mockSaturationDetector{},
112-
podLocator: &mocks.MockPodLocator{Pods: []fwkdl.Endpoint{&metrics.FakePodMetrics{}}},
112+
endpointCandidates: &mocks.MockEndpointCandidates{Candidates: []fwkdl.Endpoint{&metrics.FakePodMetrics{}}},
113113
startSignal: make(chan struct{}),
114114
queues: make(map[flowcontrol.FlowKey]*mocks.MockManagedQueue),
115115
priorityFlows: make(map[int][]flowcontrol.FlowKey),
@@ -137,7 +137,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
137137
"test-pool",
138138
h,
139139
h.saturationDetector,
140-
h.podLocator,
140+
h.endpointCandidates,
141141
usagelimits.DefaultPolicy(),
142142
h.clock,
143143
expiryCleanupInterval,

pkg/epp/framework/interface/flowcontrol/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type FlowControlRequest interface {
5757
ID() string
5858

5959
// GetMetadata returns the opaque metadata associated with the request (e.g., header-derived context, subset filters).
60-
// This data is passed transparently to components like the contracts.PodLocator to resolve resources (candidate pods)
60+
// This data is passed transparently to components like contracts.EndpointCandidates to resolve resources (endpoint candidates)
6161
// lazily during the dispatch cycle.
6262
GetMetadata() map[string]any
6363

pkg/epp/framework/plugins/requesthandling/parsers/openai/openai.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/json"
2222
"errors"
23+
"fmt"
2324
"strings"
2425

2526
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
@@ -90,7 +91,7 @@ func (p *OpenAIParser) WithName(name string) *OpenAIParser {
9091
func (p *OpenAIParser) ParseRequest(ctx context.Context, body []byte, headers map[string]string) (*scheduling.LLMRequestBody, error) {
9192
bodyMap := make(map[string]any)
9293
if err := json.Unmarshal(body, &bodyMap); err != nil {
93-
return nil, errors.New("error unmarshaling request bodyMap")
94+
return nil, fmt.Errorf("error unmarshaling request bodyMap: %w", err)
9495
}
9596
extractedBody, err := extractRequestBody(body, headers)
9697
if err != nil {

0 commit comments

Comments
 (0)