Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
Expand Down Expand Up @@ -227,6 +228,8 @@ func run() error {

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)

director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I prefer adding WithPostResponsePlugins as an "Option" object and add that as an optinal argument to the NewDirector(). This is more discoverable and remove the need of this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really need this comment :)
this comment was added just until we add the WithPostResponse usage in main.go.

I implemented it this way to keep it consistent with how scheduler plugins are defined.
In general, both patterns are commonly used in go and more specifically in Kubernetes, but personally I prefer using the With... approach which reads clearer to me and also allows adding it only when used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option pattern is optional as well, you only use when you need it. But it's more discoverable in the function signature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option pattern is optional as well, you only use when you need it.

right. I was not trying to say otherwise :).

was just making the point that both are common patterns that are widely used in the community, and personally I prefer the With.. approach, which is also aligned with what was done in Scheduler. so it keeps the plugins setup consistent across the layers.


// --- Setup ExtProc Server Runner ---
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
Expand All @@ -237,7 +240,7 @@ func run() error {
SecureServing: *secureServing,
CertPath: *certPath,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Scheduler: scheduler,
Director: director,
SaturationDetector: saturationDetector,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
Expand Down Expand Up @@ -79,7 +80,7 @@ type StreamingServer struct {
// Specifically, there are fields related to the ext-proc protocol, and then fields related to the lifecycle of the request.
// We should split these apart as this monolithic object exposes too much data to too many layers.
type RequestContext struct {
TargetPod string
TargetPod *backend.Pod
TargetEndpoint string
Model string
ResolvedTargetModel string
Expand All @@ -93,6 +94,8 @@ type RequestContext struct {
RequestRunning bool
Request *Request

SchedulingRequest *schedulingtypes.LLMRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many duplicated fields in LLMRequest and RequestContext. Initially the LLMRequest was scoped to the scheduling package only.

Can we move LLMRequest out of scheduling package now it has wider scope? And consolidate duplicated fields such as the ResolvedTargetModel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is in conflict with some of the comments on #845 where the discussion went to the direction that scheduler shouldn't rely on structs outside of the scheduling package.

yes, I agree there are duplicate fields.
we should probably converge such that those fields are kept in scheduling request only and removed from RequestContext.

if it's not a hard issue from you PoV, I suggest to defer it to a follow up PR since this hasn't change in this PR (this was the situation also before this PR).
I like to keep PRs tightly scoped (the scope of this PR is just the move of PostResponse out of scheduler).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add the scheduling pkg type as a parameter here so that we don't duplicate the parameters.


RequestState StreamRequestState
modelServerStreaming bool

Expand Down
19 changes: 19 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,18 @@ var (
[]string{"plugin_type", "plugin_name"},
)

RequestControlPluginProcessingLatencies = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: InferenceExtension,
Name: "request_control_plugin_duration_seconds",
Help: metricsutil.HelpMsgWithStability("RequestControl plugin processing latency distribution in seconds for each plugin type and plugin name.", compbasemetrics.ALPHA),
Buckets: []float64{
0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
},
},
[]string{"plugin_type", "plugin_name"},
)

// Prefix indexer Metrics
PrefixCacheSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -263,6 +275,7 @@ func Register(customCollectors ...prometheus.Collector) {
metrics.Registry.MustRegister(inferencePoolReadyPods)
metrics.Registry.MustRegister(SchedulerPluginProcessingLatencies)
metrics.Registry.MustRegister(SchedulerE2ELatency)
metrics.Registry.MustRegister(RequestControlPluginProcessingLatencies)
metrics.Registry.MustRegister(InferenceExtensionInfo)
metrics.Registry.MustRegister(PrefixCacheSize)
metrics.Registry.MustRegister(PrefixCacheHitRatio)
Expand All @@ -289,6 +302,7 @@ func Reset() {
inferencePoolReadyPods.Reset()
SchedulerPluginProcessingLatencies.Reset()
SchedulerE2ELatency.Reset()
RequestControlPluginProcessingLatencies.Reset()
InferenceExtensionInfo.Reset()
PrefixCacheSize.Reset()
PrefixCacheHitRatio.Reset()
Expand Down Expand Up @@ -400,6 +414,11 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
}

// RecordRequestControlPluginProcessingLatency records the processing latency for a request-control plugin.
func RecordRequestControlPluginProcessingLatency(pluginType, pluginName string, duration time.Duration) {
RequestControlPluginProcessingLatencies.WithLabelValues(pluginType, pluginName).Observe(duration.Seconds())
}

// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
func RecordPrefixCacheSize(size int64) {
PrefixCacheSize.WithLabelValues().Set(float64(size))
Expand Down
24 changes: 24 additions & 0 deletions pkg/epp/plugins/plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugins

// Plugin defines the interface for a plugin.
// This interface should be embedded in all plugins across the code.
type Plugin interface {
// Name returns the name of the plugin.
Name() string
}
48 changes: 32 additions & 16 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"fmt"
"math/rand"
"strconv"
"time"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
Expand All @@ -39,24 +41,32 @@ import (
// Scheduler defines the interface required by the Director for scheduling.
type Scheduler interface {
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result map[string]*schedulingtypes.Result, err error)
OnResponse(ctx context.Context, resp *schedulingtypes.LLMResponse, targetPodName string)
}

// SaturationDetector provides a signal indicating whether the backends are considered saturated.
type SaturationDetector interface {
IsSaturated(ctx context.Context) bool
}

// NewDirector creates a new Director instance with all dependencies.
// postResponsePlugins remains nil as this is an optional field that can be set using the "WithPostResponsePlugins" function.
func NewDirector(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector) *Director {
return &Director{datastore: datastore, scheduler: scheduler, saturationDetector: saturationDetector}
}

// Director orchestrates the request handling flow, including scheduling.
type Director struct {
datastore datastore.Datastore
scheduler Scheduler
saturationDetector SaturationDetector
datastore datastore.Datastore
scheduler Scheduler
saturationDetector SaturationDetector
postResponsePlugins []PostResponsePlugin
}

// NewDirector creates a new Director instance with all dependencies.
func NewDirector(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector) *Director {
return &Director{datastore, scheduler, saturationDetector}
// WithPostResponsePlugins sets the given plugins as the PostResponse plugins.
// If the Director has PostResponse plugins already, this call replaces the existing plugins with the given ones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to the list might be more appropriate?

Copy link
Contributor Author

@nirrozenbaum nirrozenbaum Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally left it out of the list, since this is an optional field.
I would like to avoid creation of empty slice (or using nil) when caller doesn't need any PostResponsePlugins.
If I add it as arg the code will look like:

director := requestcontrol.NewDirector(datastore, scheduler, detector, []PostResponsePlugin{})

OR

director := requestcontrol.NewDirector(datastore, scheduler, detector, nil)

on the other hand, since this field is optional, it is possible to initialize detector with or without it like this -
without:

director := requestcontrol.NewDirector(datastore, scheduler, detector)

with:

director := requestcontrol.NewDirector(datastore, scheduler, detector).
    WithPostResponsePlugins(plugin1, plugin2, ...)

the latter gets also the same feeling of the Scheduler plugins.

func (d *Director) WithPostResponsePlugins(plugins ...PostResponsePlugin) *Director {
d.postResponsePlugins = plugins
return d
}

// HandleRequest orchestrates the request lifecycle:
Expand Down Expand Up @@ -104,7 +114,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}

// Prepare LLMRequest (needed for both saturation detection and Scheduler)
llmReq := &schedulingtypes.LLMRequest{
reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{
TargetModel: reqCtx.ResolvedTargetModel,
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
Critical: requestCriticality == v1alpha2.Critical,
Expand All @@ -113,7 +123,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}
logger = logger.WithValues(
"model", reqCtx.Model,
"resolvedTargetModel", llmReq.TargetModel,
"resolvedTargetModel", reqCtx.ResolvedTargetModel,
"criticality", requestCriticality,
)
ctx = log.IntoContext(ctx, logger)
Expand All @@ -126,7 +136,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}

// --- 3. Dispatch (Calls Scheduler) ---
results, dispatchErr := d.Dispatch(ctx, llmReq)
results, dispatchErr := d.Dispatch(ctx, reqCtx.SchedulingRequest)
if dispatchErr != nil {
return reqCtx, dispatchErr
}
Expand Down Expand Up @@ -193,22 +203,19 @@ func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestCon
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)

reqCtx.TargetPod = targetPod.NamespacedName.String()
reqCtx.TargetPod = targetPod
reqCtx.TargetEndpoint = endpoint

return reqCtx, nil
}

func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
logger := log.FromContext(ctx)

llmResp := &schedulingtypes.LLMResponse{
response := &Response{
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
Headers: reqCtx.Response.Headers,
}
logger.V(logutil.DEBUG).Info("LLM response assembled", "response", llmResp)

d.scheduler.OnResponse(ctx, llmResp, reqCtx.TargetPod)
d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)

return reqCtx, nil
}
Expand Down Expand Up @@ -253,3 +260,12 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
}
return ""
}

func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
for _, plugin := range d.postResponsePlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Name())
before := time.Now()
plugin.PostResponse(ctx, request, response, targetPod)
metrics.RecordRequestControlPluginProcessingLatency(PostResponsePluginType, plugin.Name(), time.Since(before))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is tracking post response only, why is it called request_control_plugin_duration_seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR moves PostResponse plugin from Scheduler to Requestcontrol layer. it is the first plugin in this layer out of the ones that appear in the northstar doc.

more plugins in request control are expected to be added.

}
}
Loading