Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,8 @@ func must[T any](t T, err error) T {

// TODO: This is hardcoded for POC only. This needs to be hooked up to our text-based config story.
var flowControlConfig = flowcontrol.Config{
Controller: fccontroller.Config{}, // Use all defaults.
Registry: must(fcregistry.NewConfig(
fcregistry.WithPriorityBand(
must(fcregistry.NewPriorityBandConfig(0, "Default")),
),
)),
Controller: fccontroller.Config{}, // Use all defaults.
Registry: must(fcregistry.NewConfig()), // Use all defaults.
}

var (
Expand Down
50 changes: 42 additions & 8 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,20 @@ type Config struct {

// PriorityBands defines the set of priority band templates managed by the `FlowRegistry`.
// It is a map keyed by Priority level, providing O(1) access and ensuring priority uniqueness by definition.
// Required: At least one `PriorityBandConfig` must be provided for a functional registry.
PriorityBands map[int]*PriorityBandConfig

// DefaultPriorityBand serves as a template for dynamically provisioning priority bands when a request arrives with a
// priority level that was not explicitly configured.
// If nil, it is automatically populated with system defaults during NewConfig.
DefaultPriorityBand *PriorityBandConfig

// InitialShardCount specifies the number of parallel shards to create when the registry is initialized.
// This value must be greater than zero.
// Optional: Defaults to `defaultInitialShardCount` (1).
InitialShardCount int

// FlowGCTimeout defines the interval at which the registry scans for and garbage collects idle flows. A flow is
// collected if it has been observed to be Idle for at least one full scan interval.
// FlowGCTimeout defines the interval at which the registry scans for and garbage collects idle flows.
// A flow is collected if it has been observed to be Idle for at least one full scan interval.
// Optional: Defaults to `defaultFlowGCTimeout` (5 minutes).
FlowGCTimeout time.Duration

Expand Down Expand Up @@ -223,6 +227,14 @@ func WithPriorityBand(band *PriorityBandConfig) ConfigOption {
}
}

// WithDefaultPriorityBand sets the template configuration used for dynamically provisioning priority bands.
func WithDefaultPriorityBand(band *PriorityBandConfig) ConfigOption {
return func(b *configBuilder) error {
b.config.DefaultPriorityBand = band
return nil
}
}

// withCapabilityChecker overrides the compatibility checker used during validation.
// It is intended for use only in internal unit tests.
// test-only
Expand Down Expand Up @@ -304,8 +316,19 @@ func NewConfig(opts ...ConfigOption) (*Config, error) {
}
}

// Apply defaults to all bands.
// This covers the case where a user passed a raw struct literal via WithPriorityBand.
// Initialize DefaultPriorityBand if missing.
// This ensures we always have a template for dynamic provisioning.
if builder.config.DefaultPriorityBand == nil {
builder.config.DefaultPriorityBand = &PriorityBandConfig{}
}

// Apply defaults to the template.
builder.config.DefaultPriorityBand.applyDefaults()
if builder.config.DefaultPriorityBand.PriorityName == "" {
builder.config.DefaultPriorityBand.PriorityName = "Dynamic-Default"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be the infObj object name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be much better for observability. The JIT path in this PR makes this tricky. If we manage to move to the controller path, this becomes much easier to populate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question though... How do we handle N InferenceObjectives mapped to the same priority level? Right now, priority bands are unique per int. It would not be simple (and perhaps not desirable) to break that.

I guess we could make the name a composite of the objectives mapped to it `"InfObjA,InfObjB,..."? The name itself then becomes dynamic data and is no longer immutable (though I don't foresee any issues with this).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point on multiple infObjs mapping to the same priority. I think the name has no value and it is rather confusing since naturally one will expect the infObj name to be logged. So I recommend removing it.

}

// Apply defaults to all explicitly configured bands.
for _, band := range builder.config.PriorityBands {
band.applyDefaults()
}
Expand All @@ -317,7 +340,7 @@ func NewConfig(opts ...ConfigOption) (*Config, error) {
}

// NewPriorityBandConfig creates a new band configuration with the required fields.
// It applies system defaults first, then applies any provided options to override those defaults
// It applies system defaults first, then applies any provided options to override those defaults.
func NewPriorityBandConfig(priority int, name string, opts ...PriorityBandConfigOption) (*PriorityBandConfig, error) {
pb := &PriorityBandConfig{
Priority: priority,
Expand Down Expand Up @@ -387,10 +410,15 @@ func (c *Config) validate(checker capabilityChecker) error {
return errors.New("eventChannelBufferSize must be greater than 0")
}

if len(c.PriorityBands) == 0 {
return errors.New("at least one priority band must be defined")
// Validate the dynamic template.
// We use a dummy priority since the template itself doesn't have a fixed priority.
templateValidationCopy := *c.DefaultPriorityBand
templateValidationCopy.Priority = 0
if err := templateValidationCopy.validate(checker); err != nil {
return fmt.Errorf("invalid DefaultPriorityBand configuration: %w", err)
}

// Validate statically configured bands.
names := make(map[string]struct{}, len(c.PriorityBands))
for _, band := range c.PriorityBands {
if _, exists := names[band.PriorityName]; exists {
Expand Down Expand Up @@ -462,6 +490,12 @@ func (c *Config) Clone() *Config {
}

clone := *c

if c.DefaultPriorityBand != nil {
val := *c.DefaultPriorityBand
clone.DefaultPriorityBand = &val
}

if c.PriorityBands != nil {
clone.PriorityBands = make(map[int]*PriorityBandConfig, len(c.PriorityBands))
for prio, band := range c.PriorityBands {
Expand Down
55 changes: 49 additions & 6 deletions pkg/epp/flowcontrol/registry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,40 @@ func TestNewConfig(t *testing.T) {
assert.Equal(t, defaultIntraFlowDispatchPolicy, band.IntraFlowDispatchPolicy)
},
},

// --- Validation Errors (Global) ---
{
name: "ShouldError_WhenNoPriorityBandsDefined",
opts: []ConfigOption{WithInitialShardCount(1)},
expectErr: true,
expectedErrIs: nil, // Generic error expected.
name: "ShouldSucceed_WhenNoPriorityBandsDefined_WithDynamicDefaults",
opts: []ConfigOption{
// No WithPriorityBand options provided.
// This relies entirely on dynamic provisioning.
},
assertion: func(t *testing.T, cfg *Config) {
assert.Empty(t, cfg.PriorityBands, "PriorityBands map should be empty")
require.NotNil(t, cfg.DefaultPriorityBand, "DefaultPriorityBand template must be initialized")
assert.Equal(t, "Dynamic-Default", cfg.DefaultPriorityBand.PriorityName)
assert.Equal(t, defaultQueue, cfg.DefaultPriorityBand.Queue)
},
},
{
name: "ShouldRespectCustomDefaultPriorityBand",
opts: []ConfigOption{
WithDefaultPriorityBand(&PriorityBandConfig{
PriorityName: "My-Custom-Template",
Queue: "CustomQueue",
}),
withCapabilityChecker(&mockCapabilityChecker{
checkCompatibilityFunc: func(_ intra.RegisteredPolicyName, _ queue.RegisteredQueueName) error { return nil },
}),
},
assertion: func(t *testing.T, cfg *Config) {
require.NotNil(t, cfg.DefaultPriorityBand)
assert.Equal(t, "My-Custom-Template", cfg.DefaultPriorityBand.PriorityName)
assert.Equal(t, queue.RegisteredQueueName("CustomQueue"), cfg.DefaultPriorityBand.Queue)
// Assert other defaults were applied to the template.
assert.Equal(t, defaultIntraFlowDispatchPolicy, cfg.DefaultPriorityBand.IntraFlowDispatchPolicy)
},
},

// --- Validation Errors (Global) ---
{
name: "ShouldError_WhenInitialShardCountIsInvalid",
opts: []ConfigOption{WithInitialShardCount(0)}, // Option itself should return error.
Expand Down Expand Up @@ -334,4 +360,21 @@ func TestConfig_Clone(t *testing.T) {
assert.Equal(t, uint64(99999), clone.PriorityBands[1].MaxBytes)
assert.Equal(t, "Modified", clone.PriorityBands[1].PriorityName)
})

t.Run("ShouldDeepCopyDefaultPriorityBand", func(t *testing.T) {
t.Parallel()
original, err := NewConfig()
require.NoError(t, err)

clone := original.Clone()

require.NotSame(t, original.DefaultPriorityBand, clone.DefaultPriorityBand,
"Clone should have a distinct pointer for DefaultPriorityBand")
assert.Equal(t, original.DefaultPriorityBand.PriorityName, clone.DefaultPriorityBand.PriorityName)

// Modify Clone.
clone.DefaultPriorityBand.PriorityName = "Hacked"
assert.Equal(t, "Dynamic-Default", original.DefaultPriorityBand.PriorityName,
"Modifying clone template should not affect original")
})
}
89 changes: 65 additions & 24 deletions pkg/epp/flowcontrol/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,14 @@ type FlowRegistry struct {
flowStates sync.Map // stores `types.FlowKey` -> *flowState

// Globally aggregated statistics, updated atomically via lock-free propagation.
totalByteSize atomic.Int64
totalLen atomic.Int64
perPriorityBandStats map[int]*bandStats // Keyed by priority.
totalByteSize atomic.Int64
totalLen atomic.Int64

// perPriorityBandStats tracks aggregated stats per priority.
// Key: int (priority), Value: *bandStats
// We use sync.Map here to allow for lock-free reads on the hot path (Stats) while allowing dynamic provisioning to
// add new keys safely.
perPriorityBandStats sync.Map

// --- Administrative state (protected by `mu`) ---

Expand Down Expand Up @@ -152,11 +157,10 @@ func withClock(clk clock.WithTickerAndDelayedExecution) RegistryOption {
func NewFlowRegistry(config *Config, logger logr.Logger, opts ...RegistryOption) (*FlowRegistry, error) {
cfg := config.Clone()
fr := &FlowRegistry{
config: cfg,
logger: logger.WithName("flow-registry"),
activeShards: []*registryShard{},
drainingShards: make(map[string]*registryShard),
perPriorityBandStats: make(map[int]*bandStats, len(cfg.PriorityBands)),
config: cfg,
logger: logger.WithName("flow-registry"),
activeShards: []*registryShard{},
drainingShards: make(map[string]*registryShard),
}

for _, opt := range opts {
Expand All @@ -167,7 +171,7 @@ func NewFlowRegistry(config *Config, logger logr.Logger, opts ...RegistryOption)
}

for prio := range cfg.PriorityBands {
fr.perPriorityBandStats[prio] = &bandStats{}
fr.perPriorityBandStats.Store(prio, &bandStats{})
}

if err := fr.updateShardCount(cfg.InitialShardCount); err != nil {
Expand Down Expand Up @@ -252,9 +256,18 @@ func (fr *FlowRegistry) WithConnection(key types.FlowKey, fn func(conn contracts

// prepareNewFlow creates a new `flowState` and synchronizes its queues and policies onto all existing shards.
func (fr *FlowRegistry) prepareNewFlow(key types.FlowKey) (*flowState, error) {
// Get a stable snapshot of the shard topology.
// An RLock is sufficient because while the list of shards must be stable, the internal state of each shard is
// protected by its own lock.
fr.mu.RLock()
_, exists := fr.config.PriorityBands[key.Priority]
fr.mu.RUnlock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fine grained management of locking is error prone, do we really need it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current JIT implementation, yes. In the controller implementation you suggested, probably not depending on how we handle the race condition between reconciliation and request serving.


// If the band was missing, we must acquire the Write Lock to create it.
if !exists {
if err := fr.ensurePriorityBand(key.Priority); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting that priority bands are created/deleted as a trigger of InfObj reconcile events. When a new infObj is created we create a band, when deleted we delete the band. doing it in the request processing path is an anti-pattern.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Driving this from the InferenceObjective controller is cleaner; however, it introduces specific synchronization and lifecycle challenges that the JIT approach handled implicitly. We need to decide how to handle these scenarios:

Consistency:

  1. Operator applies InferenceObjective { Priority: 10 }.
  2. Client immediately sends curl matching that objective.
  3. The request hits the EPP before the Reconciler loop has updated the FlowRegistry.
  4. The FlowRegistry sees "Priority 10", checks its map, sees nothing.

With the current JIT behavior: it works. With the InferenceObjective reconciliation path it fails (503 Service Unavailable or similar) or MUST fall back to a default priority band (e.g., "Priority 0"). I am not yet sure just how narrow we can make this race condition. Perhaps, we can make these reconciliation steps fully atomic, but I am not so certain that this will be simple or feasible.

No InferenceObjective:

An InferenceObjective is not required. Does "Priority 0" exist if no InferenceObjective defines it?
Traffic without an objective defaults to "Priority 0," so we need to always provision a "Default/0" band on startup, regardless of CRDs, to catch fall-through traffic. This places us right back at a hybrid of static + controller-managed config.

Lifecycle Management:

If we listen to InferenceObjective events, we must also handle Delete. When deleted, we must drain and remove the associated priority band. What if flows are currently active (have buffered requests) in that band? We need a graceful drain mechanism before destroying the sync.Map entry.

This isn't strictly an issue with the CRD reconciliation approach; my current PR only adds bands (which is likely okay for now). The state management issue just becomes more apparent under the CRD model though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistency:

This scenario can also happen when we look up the InfObj, it may not have been created by the time the request got to the epp, so it is not a new problem.

Perhaps, we can make these reconciliation steps fully atomic, but I am not so certain that this will be simple or feasible.

We only need to be eventually consistent, but when creating a new infObj should trigger a state update in the epp that includes the availability of the infObj itself as well as all associated state like the Band, this state update in the epp can happen atomically.

An InferenceObjective is not required. Does "Priority 0" exist if no InferenceObjective defines it?
Traffic without an objective defaults to "Priority 0," so we need to always provision a "Default/0" band on startup, regardless of CRDs, to catch fall-through traffic. This places us right back at a hybrid of static + controller-managed config.

The default priority is a special case for which we create a band for it on startup.

All that being said, I am ok proceeding with the current approach you have, but please open an issue to track this. I am sure we will revisit it when we introduce the flow CRD

return nil, err
}
}

// Now we know the band exists (or we errored). Re-acquire Read Lock to safely read the topology and build components.
fr.mu.RLock()
defer fr.mu.RUnlock()

Expand All @@ -272,6 +285,34 @@ func (fr *FlowRegistry) prepareNewFlow(key types.FlowKey) (*flowState, error) {
return &flowState{key: key}, nil
}

// ensurePriorityBand safely provisions a new priority band.
func (fr *FlowRegistry) ensurePriorityBand(priority int) error {
fr.mu.Lock()
defer fr.mu.Unlock()

// Double-Check: Someone might have created it while we swapped locks in prepareNewFlow.
if _, ok := fr.config.PriorityBands[priority]; ok {
return nil
}

fr.logger.Info("Dynamically provisioning new priority band", "priority", priority)

newBand := *fr.config.DefaultPriorityBand
newBand.Priority = priority
newBand.PriorityName = fmt.Sprintf("Dynamic-%d", priority)
fr.config.PriorityBands[priority] = &newBand

fr.perPriorityBandStats.LoadOrStore(priority, &bandStats{})

fr.repartitionShardConfigsLocked()

for _, shard := range fr.activeShards {
shard.addPriorityBand(priority)
}

return nil
}

// --- `contracts.FlowRegistryObserver` Implementation ---

// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
Expand All @@ -288,16 +329,19 @@ func (fr *FlowRegistry) Stats() contracts.AggregateStats {
PerPriorityBandStats: make(map[int]contracts.PriorityBandStats, len(fr.config.PriorityBands)),
}

for p, s := range fr.perPriorityBandStats {
bandCfg := fr.config.PriorityBands[p]
stats.PerPriorityBandStats[p] = contracts.PriorityBandStats{
Priority: p,
fr.perPriorityBandStats.Range(func(key, value any) bool {
priority := key.(int)
bandStats := value.(*bandStats)
bandCfg := fr.config.PriorityBands[priority]
stats.PerPriorityBandStats[priority] = contracts.PriorityBandStats{
Priority: priority,
PriorityName: bandCfg.PriorityName,
CapacityBytes: bandCfg.MaxBytes,
ByteSize: uint64(s.byteSize.Load()),
Len: uint64(s.len.Load()),
ByteSize: uint64(bandStats.byteSize.Load()),
Len: uint64(bandStats.len.Load()),
}
}
return true
})
return stats
}

Expand Down Expand Up @@ -585,11 +629,8 @@ func (fr *FlowRegistry) updateAllShardsCacheLocked() {

// propagateStatsDelta is the top-level, lock-free aggregator for all statistics.
func (fr *FlowRegistry) propagateStatsDelta(priority int, lenDelta, byteSizeDelta int64) {
stats, ok := fr.perPriorityBandStats[priority]
if !ok {
panic(fmt.Sprintf("invariant violation: priority band (%d) stats missing during propagation", priority))
}

val, _ := fr.perPriorityBandStats.Load(priority)
stats := val.(*bandStats)
stats.len.Add(lenDelta)
stats.byteSize.Add(byteSizeDelta)
fr.totalLen.Add(lenDelta)
Expand Down
Loading