Skip to content

Commit 037beb7

Browse files
Mohamedma96cyclinder
authored andcommitted
[Refactor] use the framework types directly instead of the aliased name (#2603)
* use fwkdl.Endpoint instead of metrics.PodMetrics outside of backend/metrics Signed-off-by: mohamedmahameed <mohamed.mahameed@ibm.com> * Remove PodMetrics type alias, renames and code enhancements Signed-off-by: mohamedmahameed <mohamed.mahameed@ibm.com> * fix detecetor_test.go to use new functions names Signed-off-by: mohamedmahameed <mohamed.mahameed@ibm.com> --------- Signed-off-by: mohamedmahameed <mohamed.mahameed@ibm.com> Signed-off-by: Cyclinder Kuo <qifeng.guo@daocloud.io>
1 parent c86059d commit 037beb7

14 files changed

Lines changed: 144 additions & 142 deletions

File tree

cmd/epp/runner/runner.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,10 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
331331

332332
// --- Admission Control Initialization ---
333333
var admissionController requestcontrol.AdmissionController
334-
var locator contracts.PodLocator
335-
locator = requestcontrol.NewDatastorePodLocator(ds, requestcontrol.WithDisableEndpointSubsetFilter(opts.DisableEndpointSubsetFilter))
334+
var endpointCandidates contracts.EndpointCandidates
335+
endpointCandidates = requestcontrol.NewDatastoreEndpointCandidates(ds, requestcontrol.WithDisableEndpointSubsetFilter(opts.DisableEndpointSubsetFilter))
336336
if r.featureGates[flowcontrol.FeatureGate] {
337-
locator = requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
337+
endpointCandidates = requestcontrol.NewCachedEndpointCandidates(ctx, endpointCandidates, time.Millisecond*50)
338338
setupLog.Info("Initializing experimental Flow Control layer")
339339
registry, err := fcregistry.NewFlowRegistry(eppConfig.FlowControlConfig.Registry, setupLog)
340340
if err != nil {
@@ -345,7 +345,7 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
345345
opts.PoolName,
346346
eppConfig.FlowControlConfig.Controller,
347347
registry, saturationDetector,
348-
locator,
348+
endpointCandidates,
349349
)
350350
if err != nil {
351351
return nil, nil, fmt.Errorf("failed to initialize Flow Controller: %w", err)
@@ -354,10 +354,10 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
354354
admissionController = requestcontrol.NewFlowControlAdmissionController(fc, opts.PoolName)
355355
} else {
356356
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
357-
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector, locator)
357+
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector, endpointCandidates)
358358
}
359359

360-
director := requestcontrol.NewDirectorWithConfig(ds, scheduler, admissionController, r.parser, locator, r.requestControlConfig)
360+
director := requestcontrol.NewDirectorWithConfig(ds, scheduler, admissionController, r.parser, endpointCandidates, r.requestControlConfig)
361361

362362
// --- Setup ExtProc Server Runner ---
363363
serverRunner := &runserver.ExtProcServerRunner{

pkg/epp/flowcontrol/contracts/dependencies.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ import (
2222
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
2323
)
2424

25-
// PodLocator defines the contract for a component that resolves the set of candidate pods for a request based on its
26-
// metadata (e.g., subsetting).
25+
// EndpointCandidates defines the contract for a component that resolves the set of candidate endpoints for a request
26+
// based on its metadata (e.g., subsetting).
2727
//
28-
// This interface allows the Flow Controller to fetch a fresh list of pods dynamically during the dispatch cycle,
29-
// enabling support for "Scale-from-Zero" scenarios where pods may not exist when the request is first enqueued.
30-
type PodLocator interface {
31-
// Locate returns a list of pod metrics that match the criteria defined in the request metadata.
28+
// This interface allows the Flow Controller to fetch a fresh list of candidates dynamically during the dispatch cycle,
29+
// enabling support for "Scale-from-Zero" scenarios where endpoints may not exist when the request is first enqueued.
30+
type EndpointCandidates interface {
31+
// Locate returns a list of endpoint candidate metrics that match the criteria defined in the request metadata.
3232
Locate(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint
3333
}
3434

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,25 +120,26 @@ func (m *MockSaturationDetector) Saturation(ctx context.Context, candidatePods [
120120
return 0.0
121121
}
122122

123-
// MockPodLocator provides a mock implementation of the contracts.PodLocator interface.
124-
// It allows tests to control the exact set of pods returned for a given request.
125-
type MockPodLocator struct {
123+
// MockEndpointCandidates provides a mock implementation of the contracts.EndpointCandidates interface.
124+
// It allows tests to control the exact set of endpoint candidates returned for a given request.
125+
type MockEndpointCandidates struct {
126126
// LocateFunc allows injecting custom logic.
127127
LocateFunc func(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint
128-
// Pods is a static return value used if LocateFunc is nil.
129-
Pods []fwkdl.Endpoint
128+
// Candidates is a static return value used if LocateFunc is nil.
129+
Candidates []fwkdl.Endpoint
130130
}
131131

132-
func (m *MockPodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint {
132+
func (m *MockEndpointCandidates) Locate(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint {
133133
if m.LocateFunc != nil {
134134
return m.LocateFunc(ctx, requestMetadata)
135135
}
136136
// Return copy to be safe
137-
if m.Pods == nil {
137+
if m.Candidates == nil {
138138
return nil
139139
}
140-
result := make([]fwkdl.Endpoint, len(m.Pods))
141-
copy(result, m.Pods)
140+
141+
result := make([]fwkdl.Endpoint, len(m.Candidates))
142+
copy(result, m.Candidates)
142143
return result
143144
}
144145

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type shardProcessorFactory func(
6363
ctx context.Context,
6464
shard contracts.RegistryShard,
6565
saturationDetector contracts.SaturationDetector,
66-
podLocator contracts.PodLocator,
66+
endpointCandidates contracts.EndpointCandidates,
6767
clock clock.WithTicker,
6868
cleanupSweepInterval time.Duration,
6969
enqueueChannelBufferSize int,
@@ -99,7 +99,7 @@ type FlowController struct {
9999
config *Config
100100
registry registryClient
101101
saturationDetector contracts.SaturationDetector
102-
podLocator contracts.PodLocator
102+
endpointCandidates contracts.EndpointCandidates
103103
clock clock.WithTicker
104104
logger logr.Logger
105105
shardProcessorFactory shardProcessorFactory
@@ -132,14 +132,14 @@ func NewFlowController(
132132
config *Config,
133133
registry contracts.FlowRegistry,
134134
sd contracts.SaturationDetector,
135-
podLocator contracts.PodLocator,
135+
endpointCandidates contracts.EndpointCandidates,
136136
opts ...flowControllerOption,
137137
) (*FlowController, error) {
138138
fc := &FlowController{
139139
config: config,
140140
registry: registry,
141141
saturationDetector: sd,
142-
podLocator: podLocator,
142+
endpointCandidates: endpointCandidates,
143143
clock: clock.RealClock{},
144144
logger: log.FromContext(ctx).WithName("flow-controller"),
145145
parentCtx: ctx,
@@ -149,7 +149,7 @@ func NewFlowController(
149149
ctx context.Context,
150150
shard contracts.RegistryShard,
151151
saturationDetector contracts.SaturationDetector,
152-
podLocator contracts.PodLocator,
152+
endpointCandidates contracts.EndpointCandidates,
153153
clock clock.WithTicker,
154154
cleanupSweepInterval time.Duration,
155155
enqueueChannelBufferSize int,
@@ -160,7 +160,7 @@ func NewFlowController(
160160
poolName,
161161
shard,
162162
saturationDetector,
163-
podLocator,
163+
endpointCandidates,
164164
clock,
165165
cleanupSweepInterval,
166166
enqueueChannelBufferSize,
@@ -486,7 +486,7 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
486486
processorCtx,
487487
shard,
488488
fc.saturationDetector,
489-
fc.podLocator,
489+
fc.endpointCandidates,
490490
fc.clock,
491491
fc.config.ExpiryCleanupInterval,
492492
fc.config.EnqueueChannelBufferSize,

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func newUnitHarness(
116116
}
117117

118118
mockDetector := &mocks.MockSaturationDetector{}
119-
mockPodLocator := &mocks.MockPodLocator{}
119+
mockEndpointCandidates := &mocks.MockEndpointCandidates{}
120120

121121
mockProcessorFactory := &mockShardProcessorFactory{
122122
processors: make(map[string]*mockShardProcessor),
@@ -132,7 +132,7 @@ func newUnitHarness(
132132
withClock(harnessOpts.clock),
133133
withShardProcessorFactory(mockProcessorFactory.new),
134134
}
135-
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockPodLocator, fcOpts...)
135+
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockEndpointCandidates, fcOpts...)
136136
require.NoError(t, err, "failed to create FlowController for unit test harness")
137137

138138
h := &testHarness{
@@ -155,7 +155,7 @@ func newUnitHarness(
155155
func newIntegrationHarness(t *testing.T, ctx context.Context, cfg *Config, registry *mockRegistryClient) *testHarness {
156156
t.Helper()
157157
mockDetector := &mocks.MockSaturationDetector{}
158-
mockPodLocator := &mocks.MockPodLocator{}
158+
mockEndpointCandidates := &mocks.MockEndpointCandidates{}
159159

160160
// Align FakeClock with system time. See explanation in newUnitHarness.
161161
mockClock := testclock.NewFakeClock(time.Now())
@@ -167,7 +167,7 @@ func newIntegrationHarness(t *testing.T, ctx context.Context, cfg *Config, regis
167167
withRegistryClient(registry),
168168
withClock(mockClock),
169169
}
170-
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockPodLocator, opts...)
170+
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockEndpointCandidates, opts...)
171171
require.NoError(t, err, "failed to create FlowController for integration test harness")
172172

173173
h := &testHarness{
@@ -277,7 +277,7 @@ func (f *mockShardProcessorFactory) new(
277277
_ context.Context, // The factory does not use the lifecycle context; it's passed to the processor's Run method later.
278278
shard contracts.RegistryShard,
279279
_ contracts.SaturationDetector,
280-
_ contracts.PodLocator,
280+
_ contracts.EndpointCandidates,
281281
_ clock.WithTicker,
282282
_ time.Duration,
283283
_ int,
@@ -1141,7 +1141,7 @@ func TestFlowController_WorkerManagement(t *testing.T) {
11411141
ctx context.Context, // The context created by getOrStartWorker for the potential new processor.
11421142
shard contracts.RegistryShard,
11431143
_ contracts.SaturationDetector,
1144-
_ contracts.PodLocator,
1144+
_ contracts.EndpointCandidates,
11451145
_ clock.WithTicker,
11461146
_ time.Duration,
11471147
_ int,

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type ShardProcessor struct {
6868
poolName string
6969
shard contracts.RegistryShard
7070
saturationDetector contracts.SaturationDetector
71-
podLocator contracts.PodLocator
71+
endpointCandidates contracts.EndpointCandidates
7272
clock clock.WithTicker
7373
cleanupSweepInterval time.Duration
7474
logger logr.Logger
@@ -91,7 +91,7 @@ func NewShardProcessor(
9191
poolName string,
9292
shard contracts.RegistryShard,
9393
saturationDetector contracts.SaturationDetector,
94-
podLocator contracts.PodLocator,
94+
endpointCandidates contracts.EndpointCandidates,
9595
clock clock.WithTicker,
9696
cleanupSweepInterval time.Duration,
9797
enqueueChannelBufferSize int,
@@ -101,7 +101,7 @@ func NewShardProcessor(
101101
shard: shard,
102102
poolName: poolName,
103103
saturationDetector: saturationDetector,
104-
podLocator: podLocator,
104+
endpointCandidates: endpointCandidates,
105105
clock: clock,
106106
cleanupSweepInterval: cleanupSweepInterval,
107107
logger: logger,
@@ -314,7 +314,7 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
314314
metrics.RecordFlowControlDispatchCycleDuration(time.Since(dispatchCycleStart))
315315
}()
316316

317-
pool := sp.podLocator.Locate(ctx, nil)
317+
pool := sp.endpointCandidates.Locate(ctx, nil)
318318
saturation := sp.saturationDetector.Saturation(ctx, pool)
319319

320320
// Record pool saturation metric

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type testHarness struct {
7575
clock *testclock.FakeClock
7676
logger logr.Logger
7777
saturationDetector *mocks.MockSaturationDetector
78-
podLocator *mocks.MockPodLocator
78+
endpointCandidates *mocks.MockEndpointCandidates
7979

8080
// --- Centralized Mock State ---
8181
// The harness's mutex protects the single source of truth for all mock state.
@@ -96,7 +96,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
9696
clock: testclock.NewFakeClock(time.Now()),
9797
logger: logr.Discard(),
9898
saturationDetector: &mocks.MockSaturationDetector{},
99-
podLocator: &mocks.MockPodLocator{Pods: []fwkdl.Endpoint{&metrics.FakePodMetrics{}}},
99+
endpointCandidates: &mocks.MockEndpointCandidates{Candidates: []fwkdl.Endpoint{&metrics.FakePodMetrics{}}},
100100
startSignal: make(chan struct{}),
101101
queues: make(map[flowcontrol.FlowKey]*mocks.MockManagedQueue),
102102
priorityFlows: make(map[int][]flowcontrol.FlowKey),
@@ -124,7 +124,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
124124
"test-pool",
125125
h,
126126
h.saturationDetector,
127-
h.podLocator,
127+
h.endpointCandidates,
128128
h.clock,
129129
expiryCleanupInterval,
130130
100,

pkg/epp/framework/interface/flowcontrol/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type FlowControlRequest interface {
5757
ID() string
5858

5959
// GetMetadata returns the opaque metadata associated with the request (e.g., header-derived context, subset filters).
60-
// This data is passed transparently to components like the contracts.PodLocator to resolve resources (candidate pods)
60+
// This data is passed transparently to components like contracts.EndpointCandidates to resolve resources (endpoint candidates)
6161
// lazily during the dispatch cycle.
6262
GetMetadata() map[string]any
6363

pkg/epp/requestcontrol/admission.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ type flowController interface {
6565
func rejectIfSheddableAndSaturated(
6666
ctx context.Context,
6767
sd contracts.SaturationDetector,
68-
locator contracts.PodLocator,
68+
endpointCandidates contracts.EndpointCandidates,
6969
reqCtx *handlers.RequestContext,
7070
priority int,
7171
logger logr.Logger,
7272
) error {
7373
if requtil.IsSheddable(priority) {
74-
if sd.Saturation(ctx, locator.Locate(ctx, reqCtx.Request.Metadata)) >= 1.0 {
74+
if sd.Saturation(ctx, endpointCandidates.Locate(ctx, reqCtx.Request.Metadata)) >= 1.0 {
7575
logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable",
7676
"requestID", reqCtx.SchedulingRequest.RequestId)
7777
return errcommon.Error{
@@ -90,17 +90,17 @@ func rejectIfSheddableAndSaturated(
9090
// saturated. Non-sheddable requests always bypass the saturation check.
9191
type LegacyAdmissionController struct {
9292
saturationDetector contracts.SaturationDetector
93-
podLocator contracts.PodLocator
93+
endpointCandidates contracts.EndpointCandidates
9494
}
9595

9696
// NewLegacyAdmissionController creates a new LegacyAdmissionController.
9797
func NewLegacyAdmissionController(
9898
sd contracts.SaturationDetector,
99-
pl contracts.PodLocator,
99+
endpointCandidates contracts.EndpointCandidates,
100100
) *LegacyAdmissionController {
101101
return &LegacyAdmissionController{
102102
saturationDetector: sd,
103-
podLocator: pl,
103+
endpointCandidates: endpointCandidates,
104104
}
105105
}
106106

@@ -117,7 +117,7 @@ func (lac *LegacyAdmissionController) Admit(
117117
if err := rejectIfSheddableAndSaturated(
118118
ctx,
119119
lac.saturationDetector,
120-
lac.podLocator,
120+
lac.endpointCandidates,
121121
reqCtx, priority,
122122
logger,
123123
); err != nil {

pkg/epp/requestcontrol/admission_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ func TestLegacyAdmissionController_Admit(t *testing.T) {
119119
return 0.0
120120
},
121121
}
122-
locator := &mocks.MockPodLocator{Pods: tc.locatorPods}
123-
ac := NewLegacyAdmissionController(mockDetector, locator)
122+
endpointCandidates := &mocks.MockEndpointCandidates{Candidates: tc.locatorPods}
123+
ac := NewLegacyAdmissionController(mockDetector, endpointCandidates)
124124

125125
err := ac.Admit(ctx, reqCtx, tc.priority)
126126

0 commit comments

Comments
 (0)