Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions docs/proposals/0845-scheduler-architecture-proposal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ The Scheduling Subsystem is a framework used to implement scheduling algorithms.
- The entry & exit points should be defined by the framework, acting as the API surface of the system
- Multiple scheduling 'profiles' should be able to be ran for a single request.
- They can be conditionally dependent on previous runs, or in parallel
- Plugin state is managed by the plugin itself
- State management
- State per request: This is managed by what we are calling CycleState and its lifecycle is tied to the request.
Cycle state is created internally by the Scheduler per request and its pointer is passed as argument.
- State managed by the plugin struct itself: The lifecycle of this state is tied to the plugin, and since plugins will be instantiated once,
it is a state that plugins can use across requests (like prefix-cache index).
- State managed by the data layer: each endpoint will be associated with state (currently metrics) that a data layer plugin can add to it.
A data layer plugin could be one that scrapes v1/models from the endpoint for example.

## Definitions
- **Scheduling Framework** - The system created to allow for a pluggable scheduling algorithm.
- **Scheduling Profile** - A named, specific set of Filter(s), Scorer(s), & Picker used to select endpoints.
- **Scheduler** - An extensible implementation of a scheduling algorithm. Including logic to select Scheduling Profiles, the Scheduling Profiles themselves, & logic to interpret the result.
- **Scheduling Cycle** - A single run of a Scheduler through the Scheduling Framework.
- **Scheduler Profile** - A named, specific set of Filter(s), Scorer(s), & Picker used to select endpoints.
- **Scheduler Profile Run** - a one time run of the Scheduler Profile filters, scorers and picker given a request.
- **Scheduler** - An extensible implementation of a scheduling algorithm. Including logic to select Scheduler Profiles iteratively,
the Scheduler Profiles themselves, & logic to interpret the result.
- **Scheduling Cycle** - A single run of a Scheduler through the Scheduling Framework. a scheduling cycle includes one or
more Scheduler Profile runs (at least one).
- **Plugin** - Implementation of framework-defined interface(s) to add or extend logic across the framework.

## Proposal
Expand All @@ -33,23 +42,24 @@ The Scheduling System can loosely be defined into 3 sections:
- A *configuration API* to define the Scheduler, Profile(s), & the plugins used within those profiles

A sketch of the System, with extension points is here:
<img src="./images/scheduler_subsystem.svg" alt="Scheduling Algorithm" width="1000" />
<img src="./images/scheduler_cycle.png" alt="Scheduling Algorithm" width="1000" />

Describing the interface extension points & flow is the simplest way to convey the intent of what the framework should enable:

### PreSchedule
### ProfileSelect (or ProfilePick)

PreSchedule is the entry point into the scheduling cycle (called by the framework). PreSchedule, selects profiles conditionally based on:
ProfilePick is the entry point into the scheduling cycle (called by the framework).
ProfileSelect, selects profiles conditionally based on:

- Request data
- Results
- Results of previously executed SchedulerProfiles
- Cycle State

PreSchedule will be continuously called so long as profiles are returned; multiple profiles may be returned in a single call. Only a single PreSchedule function may be defined per scheduler.
ProfileSelect will be continuously called so long as profiles are returned; multiple profiles may be returned in a single call. Only a single ProfileSelect function may be defined per scheduler.

### Profile Cycle
### Scheduler Profile Run

The profile cycle consists of 3 defined functions `Filter`, `Score`, & `Pick`
The SchedulerPprofile run consists of 3 defined phases `Filter`, `Score`, & `Pick`

*Profile Constraints*
- A profile can have any number of `Filter` plugins registered (including zero)
Expand All @@ -61,16 +71,16 @@ The profile cycle consists of 3 defined functions `Filter`, `Score`, & `Pick`
Filter runs before any scoring, and remove endpoints that are not fit for selection. The framework will return an error to the client if the endpoints are filtered to zero.

#### Score
Score applies a score to each remaining endpoint provided. Scorers SHOULD keep their score values in a normalized range: [0-1]. Any weighting should be added at the SchedulingProfile configuration level.
Score applies a score to each remaining endpoint provided. Scorers SHOULD keep their score values in a normalized range: [0-1]. Any weighting should be added at the SchedulerProfile configuration level.

#### Pick
Picker selects the endpoint(s) from the provided list of scored endpoints. Picker MUST return, one endpoint at minimum.


### PostSchedule
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like a PreRequest extension point (similar to the PostSchedule but I think calling it PreRequest is more accurate). The use cases are:

  1. Prefix cache affinity. Once a server is picked for a request, update the prefix indexes from the request prompt.
  2. LoRA affinity or some other form of session affinity. Update the server(s) a lora adapter is scheduled so the following requests can be scheduled on the same server.

Why not just use PostResponse? The issue is latency. Suppose you have 2 requests for the same LoRA adapter at t1, and t2. Request1 was sent to server1 and response1 was back at t3 (taking longer than t2-t1). Then for Request2 you lose the affinity.

What about request failures? Request failures will lead to an inaccurate recording of the affinity. However, the impact is negligible so long the failure rate is long (which should generally be expected).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll split my answer into two parts:

  1. your request sounds to me like you're looking for an extension point in the director rather than in scheduler (in requestcontrol layer). the scheduler is responsible for selecting the endpoint(s). once a selection was made the caller can do with this information anything like what you requested (director is the caller).
  2. the behavior you're describing sounds incorrect to me. the fact that scheduler selected an endpoint is not enough to get to a conclusion that next requests should be directed to the same server (in both examples you provided). the server might be non-responsive and scheduler might picked it according to metrics that were collected before its unavailability. this may lead to many subsequent requests that are sent to the same server that is non-responsive. If the server did NOT receive the request or did NOT handled the request successfully, we shouldn't assume it has the relevant cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the LoRA use case is applicable to supporting. It makes the assumption that LoRA loading is zero cost and happens on the fly, in response to requests. Unsure if this is always the case.
If not, then for the first request to choose some Pod meant that it already had the LoRA available and loaded, which means this is still the case when the second request comes. Hope I understood the use case described correctly...
Similarly for session affinity, do you see multiple same requests being sent in parallel without receiving a response? Might not be so likely for a chat like scenario but maybe there are others where it makes sense.

Regarding prefix, I understand that currently would benefit from both callbacks (optimistic prefix based on prompt once selection of target is done, and then validating and adding the response data in PostResponse). This is an implementation choice with tradeoffs (e.g., failure vs latency).

  • this optimizes for the case of independent requests having the same exact prefix. How common is that?
  • the PostResponse hook is called multiple times, first when the headers are received and then again for every body segment (one or more, depending if buffered). Headers are received soon after Prefill is done (typically much shorter than Decode processing). In that case, I'm not sure the window for receiving (the rare?) same prefix independent requests makes a good argument for adding another plugin point inside the scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your request sounds to me like you're looking for an extension point in the director rather than in scheduler (in requestcontrol layer). the scheduler is responsible for selecting the endpoint(s). once a selection was made the caller can do with this information anything like what you requested (director is the caller).

Director sounds like a good place to have that PreRequest extension point given it also hosts the PostResponse.

the server might be non-responsive and scheduler might picked it according to metrics that were collected before its unavailability.

This issue should be solved by health checks. If an endpoint is down it should be removed from eligible endpoints to pick from. This is a general issues, isn't it? Regardless of the "affinity" behavior, if a failed endpoint exists in the data layer, scheduler will pick it and receive error anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this optimizes for the case of independent requests having the same exact prefix. How common is that?

Depends on the use case, for example, independent requests could share the same system prompt or RAG documents.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we might be optimizing for a relatively narrow time window

This is optimizing the head of line blocking problem, which is quite important. Imagine you have a new system prompt and gets many concurrent requests containing that new shared system prompt. Without this optimization, you can spread them across many servers, losing the benefit of prefix affinity. Same applies for LoRA adapter.

Currently we implement LoRA affinity by refreshing the loaded LoRA adapter via metrics every 50ms. At higher QPS, we see LoRA adapters spread (losing affinity). So a even tighter metrics freshness is required.

You are right that if the first request is very fast to respond, then this is not a big problem. However we can not control that, as it's not uncommon to see inference requests to take seconds or even longer to finish.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we follow up with adding a PreRequest extension point in the director? @nirrozenbaum @elevran

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liu-cong I suggest to keep this proposal scoped to the scheduler and I'll open a new proposal that deals with more general pluggability soon, with section for each layer (and then we can add PreRequest as a plugin in requestcontrol layer). will tag all participants on this PR in the new one as well.
as @ahg-g wrote in the other comment, this PR seems to be very close to complete and merge, so maybe it's better to open a general one rather than continue iterating on this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

Do we have an up-to-date picture of the extension points and their names? Did we settle on names? Just want to ensure we are in agreement about where in the scheduling flow the extension points are run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. I’ll push soon (either later today or tomorrow) the most up to date names including an updated diagram

PostSchedule receives the output of the result(s) of the scheduling cycle(s) and makes sense of the data to be consumed by the calling system.
### ProcessProfilesResults
ProcessProfilesResults recieves the output of the result(s) of the scheduler profile(s) and makes sense of the data to be consumed by the calling system.

### PostResponse
### PostResponse (Out of Scheduler and mentioned here for completeness only)
PostResponse is a special case extension that can optionally be implemented by a plugin that needs to augment its state based on response or request data. This should only be implemented for plugins that need to update state outside of the scheduling cycle. PostResponse is ran at the time of processing a response.

## ConfigurationAPI
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -22,81 +22,116 @@ import (
scheduling "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

// READER NOTE: Currently CycleState is assumed to have appropriate request data rather that making a new object.

// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
}

type Endpoint struct {
State EndpointState
Score float64
}

type EndpointState struct {
// storage is per Scheduling Cycle, and so has no thread-safe concerns.
storage map[string]any //nolint:unused
// TODO should think if the above is true or should we use sync map for thread safety.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be the case unless the extension interfaces are changed to evaluate one pod, and the framework launches those evaluations in parallel.

Copy link
Contributor Author

@nirrozenbaum nirrozenbaum Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about running profiles in parallel? running different scorers in parallel?
if we will eventually have support in python scorers for example, we might want to parallelize those scorers.

I think this is very low priority atm, so maybe it's not worth discussing it at this point.

storage map[string]any
}

type SchedulingResult struct {
results map[string][]Endpoint //nolint:unused
// ScoredEndpoint encapsulates Endpoint with its Score.
// The lifecycle of an endpoint is typically different than a lifecycle of a request.
// This is intended to be used only internally by Scheduler logic and/or scheduler plugins within the lifecycle of the request.
// When returning the selected Endpoint(s) out of the Scheduler, an Endpoint is returned without the score.
type ScoredEndpoint struct {
Endpoint
Score float64
}

// Scheduler is the implementation of a... scheduler.
// The scheduler object is created at startup using the provided configuration.
type Scheduler interface {
// PreSchedule selects scheduling profiles through the implemented
// logic, and returns:
// - profiles - A subset of the registered scheduling profiles to be ran
PreSchedule(request map[string]any, data scheduling.CycleState, results map[string][]Endpoint) map[string]SchedulingProfile
type Scheduler struct {
// map from profile name to its set of plugins.
profiles map[string]*SchedulerProfile
// exactly one MultiProfilePlugin instance is required.
multiProfilePlugin MultiProfilePlugin

// PostSchedule receives the output of the result(s) of the scheduling cycle(s)
// and makes sense of the data to be consumed by the calling system.
// For example: suppose you have 2 profiles ShadowBoxing Profile & Production Profile.
// PostSchedule would know to simply log the result of ShadowBoxing
// profile, and do nothing else with it.
PostSchedule(profileResults map[string][]Endpoint) SchedulingResult
// should have a Schedule function
// Schedule(ctx context.Context, request map[string]any) (map[string][]*Endpoint, error)
}

// SchedulingProfile is used to describe a profile that will
// SchedulerConfig is the struct that maps to the configuration file that should be further discussed.
// the configuration file should include the multi profile plugin as well as the profiles with their plugins.
// TODO should update the configuration file example.yaml to discuss its structure.
type SchedulerConfig struct {
multiProfilePlugin MultiProfilePlugin
profiles map[string]*SchedulerProfile
}

// SchedulerProfile is used to describe a profile that will
// run for a given scheduling cycle.
type SchedulingProfile struct {
// Name of the profile.
Name string
// Filters lists all Filter plugins associated with this Profile. Filters
// are optional.
Filters []Filter
// Scorers lists all Score plugins associated with this Profile. Scorers
// are optional.
Scorers map[Scorer]int
type SchedulerProfile struct {
// Filters lists all Filter plugins associated with this Profile.
// Filters are optional.
filters []Filter
// Scorers lists all Score plugins associated with this Profile.
// Scorers are optional.
scorers []*WeightedScorer
// Picker returns the function that picks the endpoint(s). Picker is required.
Picker Picker
picker Picker

// should include also funcion Run() that receives a request and runs through the flow of filters, scorers, picker
// and returns the result.

// func (p *SchedulerProfile) Run(ctx context.Context, request map[string]any, cycleState *types.CycleState, endpoints []Endpoint) ([]Endpoint, error)
}

// Filter runs before any scoring, and remove endpoints that are not fit for
// selection. The framework will return an error to the client if the endpoints
// are filtered to zero.
// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
}

// MultiProfilePlugin defines the interface for handling multi SchedulerProfile instances.
type MultiProfilePlugin interface {
Plugin
// PickProfiles picks the SchedulingProfile objects to run from a list of candidate profiles,
// while taking into consideration the request properties
// and the previously executed SchedluderProfile runs along with their results.
// returns:
// - profiles - A subset of the registered scheduling profiles to be ran in next iteration
PickProfiles(request map[string]any, profiles map[string]*SchedulerProfile, executionResults map[string][]*ScoredEndpoint) map[string]*SchedulerProfile

// ProcessProfileResults handles the outcome of each selected profile.
// It may aggregate results, log test profile outputs, or apply custom logic.
// For example: suppose you have 2 profiles ShadowBoxing Profile & Production Profile.
// ProcessProfileResults would know to simply log the result of ShadowBoxing
// profile, and do nothing else with it.
ProcessProfileResults(request map[string]any, profileResults map[string][]*ScoredEndpoint) map[string][]*Endpoint
}
Copy link
Contributor Author

@nirrozenbaum nirrozenbaum Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a question I was thinking about-
since we agreed that Scheduler IS aware of http protocol (Request struct represents it), what are your thoughts about having ProcessResults get all profiles results but return only one?

in llm-d for example - since scheduler is aware of http, we can set the prefill header at this point and return only decode selection.
and in general I think it makes sense to return one Result, i.e., results of one profile (return []*Endpont and optionally array of backups)
@elevran @ahg-g

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that this will be done by the PostSchedule plugin. Here is my thinking of how we will compose with the default behavior:

This maintains the separation of concern and composes with the non p/d case nicely.

Copy link
Contributor Author

@nirrozenbaum nirrozenbaum Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed everything except for the CycleState part, in which I probably didn't understand your intention.
in current Proposal ProcessResults signature is:

ProcessProfileResults(request *Request, profileResults map[string][]*ScoredEndpoint) map[string][]*Endpoint

looking on the return value here - we get back a map from profile-name -> it's results (set of endpoints).

let's assume we have PostSchedule in director (or as @liu-cong named it, PreRequest, which I agree is more descriptive/accurate, plus having PreRequest and PostResponse sounds to me like a better terminology than PostSchedule and PostResponse).
using this plugin we should be able to set the headers of destination (decode) + prefill.

can you explain what you meant in the CycleState part?

ProcessResults returns the endpoints for the decoder (and so DestinationEndpoint plugin will set them as per the epp<-> proxy protocol), while the prefill endpoint is stored in CycleState.

CycleState scope is within the scheduler, so it's not returning back to requestcontrol layer.

Copy link
Contributor

@ahg-g ahg-g Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking on the return value here - we get back a map from profile-name -> it's results (set of endpoints).

Oh, I thought we return a single final list that in the normal case a PreRequest plugin uses to set the destination endpoints.

CycleState scope is within the scheduler, so it's not returning back to requestcontrol layer.

can you explain what you meant in the CycleState part?

I assumed CycleState is scoped across all extension points executed for a request. If we do that, then what I was proposing is to use CycleState as the communication channel between the P/D specific ProcessResults and PreRequest plugins to communicate the list of prefill servers to be set in the headers. Again, this was assuming that ProcessResults returned a list of endpoints, not a map.

Another approach is to continue to have ProcessResult return a map, but the decode endpoints should have a key that the default PreRequest plugin understands for both the default and p/d case (in the p/d case, ProcessResult can set the key for the decode profile endpoints to whatever value expected by the default PreRequest plugin). And the addition PreRequest plugin operates on the prefill endpoints to set them as a header.

I am trying to avoid having the scheduling layer implement parts of the epp <-> proxy protocol.

In summary, we will have two PreRequest plugins: The first ships with IGE and implements the destination endpoint protocol, from the map[string][]*Endpoint it uses a well known key to lookup the list of endpoints it will use. The second ships with llm-d and implements the vllm protocol for setting the prefill endpoints as a header.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the second approach, but I still think we should scope CycleState to be across all extension points executed on a request. We can define CycleState in the common pkg.

Copy link
Contributor Author

@nirrozenbaum nirrozenbaum Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m also ok with cycle state per request, although I’d prefer avoiding it. I just think we need to pay special attention that the field is not abused, cause it becomes extremely easy to communicate information in a generic cycle state rather than well defined interfaces. I would have a cycle state per layer during the request lifecycle to verify the interfaces between the layers are well defined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if we agree on this I think the proposal is ready for finalization, as we converged on everything

Yes, we have a few followups as well related to converting this into a generic extensibility proposal with subsections for various layers, defining a common pkg for types and interfaces, grouping the plugin implementations under one directory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should scope CycleState to be across all extension points executed on a request

Just calling this out. But if we do this. We are stating that the EPP architecture is no longer made up of independent subsystems that could be broken out into their own lib, and instead tying them all together.

I don't particularly love that. The hope was that the director/request control would be the mortar where we would handle the grey area. But not willing to die on this hill. More just calling it out.

Copy link
Contributor

@ahg-g ahg-g Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with Kellen, I think we need to strike a balance between our goal of maintaining a reusable scheduling library and a design that works well across the layers we define in EPP.

My recommendation is to define common types, like CycleState and Request that all layers depend on. This doesn't prevent the scheduling library from being reusable in a different context.

I don't think we want to prevent plugins from implementing extensions across subsystems if it makes sense to do so. Each plugin should declare its dependencies, and if someone wants to use the scheduling library in a different context, then they can disable the plugins that does that if they are incompatible with their system, or implement in their system the behavior expected by those other extensions (e.g, adding state to CycleState that is expected by the scheduling parts of the plugin)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Kellen (at least with his initial comment) that it would be good NOT to have one CycleState per request, as it may cause some issues of bypassing interfaces that we define between the layers.
as I stated in previous comments:

I just think we need to pay special attention that the field is not abused, cause it becomes extremely easy to communicate information in a generic cycle state rather than well defined interfaces. I would have a cycle state per layer during the request lifecycle to verify the interfaces between the layers are well defined.

I agree that we need to balance as @ahg-g mentioned.
let's discuss this more as we make progress with more extension points in more layers.


// Filter runs before any scoring, and remove endpoints that are not fit for selection.
// The framework will return an error to the client if the endpoints are filtered to zero.
type Filter interface {
Plugin
Filter(ctx context.Context, state scheduling.CycleState, endpoints []Endpoint) []Endpoint
Filter(ctx context.Context, request map[string]any, state *scheduling.CycleState, endpoints []*Endpoint) []*Endpoint
}

// Scorer applies a score to each remaining endpoint provided. Scorers SHOULD
// keep their score values in a normalized range: [0-1]. Any weighting should
// be added at the SchedulingProfile configuration level.
// Scorer applies a score to each remaining endpoint provided.
// Scorers SHOULD keep their score values in a normalized range: [0-1].
// Any weighting should be added at the SchedulerProfile configuration level.
type Scorer interface {
Plugin
Score(ctx context.Context, state scheduling.CycleState, endpoints []Endpoint) []Endpoint
Score(ctx context.Context, request map[string]any, state *scheduling.CycleState, endpoints []*Endpoint) []*ScoredEndpoint
}

// WeightedScorer is a struct that encapsulates a scorer with its weight.
// We need this struct in order to be able to keep scorers in profile as a slice instead of a map.
// This is very useful for having a generic AddPlugin function that registers a plugin to all its extension points.
// Using a map is much less convenient for this purpose.
type WeightedScorer struct {
Scorer
weight int
}

// Picker selects the endpoint(s) from the provided list of scored endpoints.
// Picker MUST return, one endpoint at minimum.
type Picker interface {
Plugin
Pick(ctx context.Context, state scheduling.CycleState, endpoints []Endpoint) []Endpoint
Pick(ctx context.Context, state *scheduling.CycleState, endpoints []*ScoredEndpoint) []*ScoredEndpoint
}

// PostResponse is NOT part of the scheduler subsystem but is specified here for completeness only.
type PostResponse interface {
Plugin
PostResponse(ctx context.Context, request map[string]any, response map[string]any)
Expand Down