Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/epp/flowcontrol/framework/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package framework defines the core plugin interfaces for extending the `controller.FlowController`.
// Package framework defines the core plugin interfaces for extending the Flow Control layer.
//
// It establishes the contracts that custom logic, such as queueing disciplines and dispatching policies, must adhere
// to. By building on these interfaces, the Flow Control system can be extended and customized without modifying the
// to. By building on these interfaces, the Flow Control layer can be extended and customized without modifying the
// core controller logic.
//
// The primary contracts are:
// - `SafeQueue`: An interface for concurrent-safe queue implementations.
// - `IntraFlowDispatchPolicy`: An interface for policies that decide which item to select from within a single flow's
// queue.
// - `ItemComparator`: An interface vended by policies to make their internal item-ordering logic explicit and
// available to other components.
// - SafeQueue: An interface for concurrent-safe queue implementations.
// - FairnessPolicy: The interface for policies that govern the competition between flows.
// - OrderingPolicy: The interface for policies that decide the strict sequence of service within a flow.
// - IntraFlowDispatchPolicy: (Deprecated) Legacy interface for intra-flow ordering. Replaced by OrderingPolicy.
// - ItemComparator: (Deprecated) Legacy interface for exposing ordering logic. Replaced by OrderingPolicy.
//
// These components are linked by `QueueCapability`, which allows policies to declare their queue requirements (e.g.,
// These components are linked by QueueCapability, which allows policies to declare their queue requirements (e.g.,
// FIFO or priority-based ordering).
package framework
40 changes: 40 additions & 0 deletions pkg/epp/flowcontrol/framework/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package framework
import (
"context"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

const (
// FairnessPolicyExtensionPoint identifies the plugin type responsible for managing contention between Flows.
FairnessPolicyExtensionPoint = "FairnessPolicy"

// OrderingPolicyExtensionPoint identifies the plugin type responsible for sorting requests within a Flow.
OrderingPolicyExtensionPoint = "OrderingPolicy"
)

// FairnessPolicy governs the distribution of dispatch opportunities among competing Flows within the same Priority
Expand Down Expand Up @@ -77,3 +81,39 @@ type FairnessPolicy interface {
// nothing is eligible.
Pick(ctx context.Context, flowGroup PriorityBandAccessor) (flow FlowQueueAccessor, err error)
}

// OrderingPolicy governs the strict sequence of service within a single Flow.
//
// In simple terms, this policy answers the question: "Which request in this specific queue should be processed next?"
//
// While "Fairness" governs the competition between flows, "Ordering" dictates the internal discipline of a single
// flow. This allows different flows to have different internal service objectives (e.g., FCFS vs. EDF).
//
// Architecture (Flyweight Pattern):
// Ordering policies are Singletons. A single instance handles the logic for all queues in a Priority Band.
// The policy is purely functional.
//
// - Logic: Defined here as a Comparator-centric interface.
// - State: Ordering policies are generally stateless, operating on the intrinsic properties of the items.
//
// Conformance: Implementations MUST ensure all methods are goroutine-safe.
type OrderingPolicy interface {
plugin.Plugin

// Less reports whether item 'a' should be dispatched before item 'b'.
// This makes the policy act as a sort.Interface for the queue.
//
// Invariants:
// - Returning true means 'a' has higher priority than 'b'.
// - If the queue supports CapabilityPriorityConfigurable, this function determines the heap order.
Less(a, b types.QueueItemAccessor) bool

// RequiredQueueCapabilities returns the set of capabilities that a SafeQueue MUST support to effectively apply this
// policy.
//
// For example:
// - "fcfs-ordering-policy" coupled with CapabilityFIFO is O(1).
// - "edf-ordering-policy" (Earliest Deadline First) REQUIRES CapabilityPriorityConfigurable (Heap) to function
// correctly.
RequiredQueueCapabilities() []QueueCapability
}
56 changes: 56 additions & 0 deletions pkg/epp/flowcontrol/framework/plugins/intraflow/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package intraflow provides the standard implementations of the OrderingPolicy interface.
//
// # Context: The 3-Tier Dispatch Hierarchy
//
// The Flow Control system manages traffic using a strict three-tier decision hierarchy.
// This package implements Tier 3.
//
// 1. Priority (Band Selection):
// The system first strictly selects the highest-priority Band that has pending work.
//
// 2. Fairness (Flow Selection):
// Once a Band is selected, the FairnessPolicy (interflow) determines which Flow within that band gets the next
// dispatch opportunity.
//
// 3. Ordering (Item Selection) - [THIS PACKAGE]:
// Once a Flow is selected, the OrderingPolicy determines which Request from that specific flow's queue is
// dispatched. This governs the "internal discipline" of the flow (e.g., whether to serve oldest requests first or
// most urgent).
//
// # Architecture: The Flyweight Pattern
//
// Ordering Policies are Singletons. A single instance handles the ordering logic for all queues in a Priority Band.
// To support this efficiently, the policy follows the Flyweight pattern:
//
// 1. The Plugin Instance (e.g., FCFS) is a Singleton. It defines the Logic (Less).
// 2. The Logic (Less) acts as a pure function (or comparator) that operates on the queue's items.
//
// # Standard Implementations
//
// This package includes the following core strategies:
//
// - FCFS ("First-Come, First-Served") ("fcfs-ordering-policy"): Orders requests by their logical arrival time.
// This is the default and most "intuitive" ordering.
//
// - EDF ("Earliest Deadline First") ("edf-ordering-policy"): Orders requests by their absolute deadline
// (EnqueueTime + TTL).
// This maximizes the number of requests served before their deadlines expire.
//
// TODO: Rename directory and package to "ordering".
package intraflow
86 changes: 56 additions & 30 deletions pkg/epp/flowcontrol/framework/plugins/intraflow/edf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@ limitations under the License.
package intraflow

import (
"encoding/json"
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

// EDFPolicyName is the name of the Earliest Deadline First (EDF) intra-flow dispatch policy.
// EDFOrderingPolicyType represents an ordering policy that implements a Earliest Deadline First (EDF) strategy.
//
// This policy implements a deadline-urgency scheduling strategy by selecting the request with the earliest absolute
// deadline, computed as `EnqueueTime() + EffectiveTTL()`. Requests without a valid TTL (i.e., EffectiveTTL <= 0) are
// treated as having no deadline and are scheduled after all time-bound requests, using FCFS as a tie-breaker for fairness.
const EDFPolicyName = "EDF"
// It selects the request with the earliest absolute deadline, computed as `EnqueueTime() + EffectiveTTL()`.
// Requests without a valid TTL (i.e., EffectiveTTL <= 0) are treated as having no deadline and are scheduled after all
// time-bound requests, using FCFS as a tie-breaker for fairness.
const EDFOrderingPolicyType = "edf-ordering-policy"

func init() {
MustRegisterPolicy(RegisteredPolicyName(EDFPolicyName),
// TODO(kubernetes-sigs/gateway-api-inference-extension#1405): Remove once migration to EPP plugin model is complete.
MustRegisterPolicy(RegisteredPolicyName(EDFOrderingPolicyType),
func() (framework.IntraFlowDispatchPolicy, error) {
return newEDFPolicy(), nil
})

plugin.Register(EDFOrderingPolicyType, func(string, json.RawMessage, plugin.Handle) (plugin.Plugin, error) {
return newEDFPolicy(), nil
})
}

// EDFPolicy implements an intra-flow dispatch policy based on the Earliest Deadline First (EDF) scheduling algorithm.
Expand All @@ -46,14 +53,14 @@ type EDFPolicy struct {

var _ framework.IntraFlowDispatchPolicy = &EDFPolicy{}

func newEDFPolicy() framework.IntraFlowDispatchPolicy {
return &EDFPolicy{
comparator: &edfComparator{},
}
func newEDFPolicy() *EDFPolicy {
p := &EDFPolicy{}
p.comparator = &edfComparator{policy: p}
return p
}

func (p *EDFPolicy) Name() string {
return EDFPolicyName
return EDFOrderingPolicyType
}

// RequiredQueueCapabilities returns the queue capabilities required by this policy.
Expand All @@ -62,6 +69,15 @@ func (p *EDFPolicy) RequiredQueueCapabilities() []framework.QueueCapability {
return []framework.QueueCapability{framework.CapabilityPriorityConfigurable}
}

// TypedName returns the type and name tuple of this plugin instance.
func (p *EDFPolicy) TypedName() plugin.TypedName {
return plugin.TypedName{
Type: EDFOrderingPolicyType,
Name: EDFOrderingPolicyType,
}
}

// TODO(kubernetes-sigs/gateway-api-inference-extension#1405): Remove once migration to EPP plugin model is complete.
func (p *EDFPolicy) Comparator() framework.ItemComparator {
return p.comparator
}
Expand Down Expand Up @@ -92,32 +108,42 @@ func calculateDeadline(item types.QueueItemAccessor) time.Time {
return item.EnqueueTime().Add(ttl)
}

type edfComparator struct{}
type edfComparator struct {
policy framework.OrderingPolicy
}

// TODO(kubernetes-sigs/gateway-api-inference-extension#1405): Remove once migration to EPP plugin model is complete.
func (d *edfComparator) Func() framework.ItemComparatorFunc {
return func(a, b types.QueueItemAccessor) bool {
if a == nil && b == nil {
return false
}
if a == nil { // Treat nil as lowest priority
return false
}
if b == nil { // Treat non-nil 'a' as higher priority than nil 'b'
return true
}
deadlineA := calculateDeadline(a)
deadlineB := calculateDeadline(b)

if !deadlineA.Equal(deadlineB) {
return deadlineA.Before(deadlineB) // earlier deadline = higher priority
}

// Same deadline: FCFS (earlier enqueue time = higher priority)
return a.EnqueueTime().Before(b.EnqueueTime())
return d.policy.Less(a, b)
}
}

// Less returns true if item 'a' should be dispatched before item 'b'.
// EDF orders by deadline (earliest first), using FCFS as a tie-breaker.
func (p *EDFPolicy) Less(a, b types.QueueItemAccessor) bool {
if a == nil && b == nil {
return false
}
if a == nil { // Treat nil as lowest priority
return false
}
if b == nil { // Treat non-nil 'a' as higher priority than nil 'b'
return true
}
deadlineA := calculateDeadline(a)
deadlineB := calculateDeadline(b)

if !deadlineA.Equal(deadlineB) {
return deadlineA.Before(deadlineB) // earlier deadline = higher priority
}

// Same deadline: FCFS (earlier enqueue time = higher priority)
return a.EnqueueTime().Before(b.EnqueueTime())
}

// ScoreType indicates this policy uses EDF-based scoring.
// TODO(kubernetes-sigs/gateway-api-inference-extension#1405): Remove once migration to EPP plugin model is complete.
func (d *edfComparator) ScoreType() string {
return string(framework.EDFPriorityScoreType)
}
8 changes: 5 additions & 3 deletions pkg/epp/flowcontrol/framework/plugins/intraflow/edf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
func TestEDFPolicy_Name(t *testing.T) {
t.Parallel()
policy := newEDFPolicy()
assert.Equal(t, EDFPolicyName, policy.Name())
assert.Equal(t, EDFOrderingPolicyType, policy.Name())
}

func TestEDFPolicy_RequiredQueueCapabilities(t *testing.T) {
Expand Down Expand Up @@ -60,7 +60,8 @@ func TestEDFPolicy_SelectItem(t *testing.T) {

func TestEDFComparator_Func(t *testing.T) {
t.Parallel()
comparator := &edfComparator{}
policy := newEDFPolicy()
comparator := policy.Comparator()
compareFunc := comparator.Func()
require.NotNil(t, compareFunc)

Expand Down Expand Up @@ -124,7 +125,8 @@ func TestEDFComparator_Func(t *testing.T) {

func TestEDFComparator_ScoreType(t *testing.T) {
t.Parallel()
comparator := &edfComparator{}
policy := newEDFPolicy()
comparator := policy.Comparator()
assert.Equal(t, string(framework.EDFPriorityScoreType), comparator.ScoreType())
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/epp/flowcontrol/framework/plugins/intraflow/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package dispatch provides the factory and registration mechanism for all `framework.IntraFlowDispatchPolicy`
// implementations.
// It allows new policies to be added to the system and instantiated by name.
package intraflow

import (
Expand Down
Loading