Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions latencypredictor/prediction_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class PredictSettings:
DOWNLOAD_RETRIES: int = int(os.getenv("DOWNLOAD_RETRIES", "3"))

ENSEMBLE_MODE: bool = os.getenv("LATENCY_ENSEMBLE_MODE", "true").lower() == "true"
ENABLE_TOKEN_IN_FLIGHT_FEATURES: bool = os.getenv("LATENCY_ENABLE_TOKEN_IN_FLIGHT_FEATURES", "true").lower() == "true"

# Gated ensemble model paths (each wraps noqueue + queued sub-models)
LOCAL_TTFT_GATED_MODEL_PATH: str = os.getenv("LOCAL_TTFT_GATED_MODEL_PATH", "/local_models/ttft_gated.joblib")
Expand Down Expand Up @@ -361,6 +362,14 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str)
# Binary feature: gives the tree a clean signal to partition idle vs queued
df['is_queued'] = (df['num_request_waiting'] > 0).astype(int)

_tif = []
if settings.ENABLE_TOKEN_IN_FLIGHT_FEATURES:
if 'prefill_tokens_in_flight' not in df.columns:
df['prefill_tokens_in_flight'] = 0
if 'decode_tokens_in_flight' not in df.columns:
df['decode_tokens_in_flight'] = 0
_tif = ['prefill_tokens_in_flight', 'decode_tokens_in_flight']

if model_type == "ttft":
df['effective_input_tokens'] = (1 - df['prefix_cache_score']) * df['input_token_length']
df['prefill_score_bucket'] = (
Expand All @@ -372,22 +381,21 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str)
df['prefill_score_bucket'], categories=[0, 1, 2, 3], ordered=True
)

feature_cols = [
'is_queued',
'kv_cache_percentage', 'input_token_length',
'num_request_waiting', 'num_request_running',
'prefix_cache_score', 'effective_input_tokens',
'prefill_score_bucket', 'pod_type_cat',
]
feature_cols = (
['is_queued', 'kv_cache_percentage', 'input_token_length',
'num_request_waiting', 'num_request_running']
+ _tif
+ ['prefix_cache_score', 'effective_input_tokens', 'prefill_score_bucket', 'pod_type_cat']
)
return df[feature_cols]

else: # tpot
feature_cols = [
'is_queued',
'kv_cache_percentage', 'input_token_length',
'num_request_waiting', 'num_request_running',
'num_tokens_generated', 'pod_type_cat',
]
feature_cols = (
['is_queued', 'kv_cache_percentage', 'input_token_length',
'num_request_waiting', 'num_request_running']
+ _tif
+ ['num_tokens_generated', 'pod_type_cat']
)
return df[feature_cols]

def _prepare_features_for_ensemble(self, df: pd.DataFrame, model_type: str, queue_regime: str) -> pd.DataFrame:
Expand Down Expand Up @@ -733,12 +741,15 @@ def predict_batch_fast(self, reqs: list) -> Tuple[np.ndarray, np.ndarray]:

# Extract each feature column as a numpy array in one pass.
# np.fromiter with count= pre-allocates and avoids intermediate lists.
kv = np.fromiter((r.kv_cache_percentage for r in reqs), dtype=np.float64, count=n)
itl = np.fromiter((r.input_token_length for r in reqs), dtype=np.float64, count=n)
nrw = np.fromiter((r.num_request_waiting for r in reqs), dtype=np.float64, count=n)
nrr = np.fromiter((r.num_request_running for r in reqs), dtype=np.float64, count=n)
ntg = np.fromiter((r.num_tokens_generated for r in reqs), dtype=np.float64, count=n)
pcs = np.fromiter((r.prefix_cache_score for r in reqs), dtype=np.float64, count=n)
kv = np.fromiter((r.kv_cache_percentage for r in reqs), dtype=np.float64, count=n)
itl = np.fromiter((r.input_token_length for r in reqs), dtype=np.float64, count=n)
nrw = np.fromiter((r.num_request_waiting for r in reqs), dtype=np.float64, count=n)
nrr = np.fromiter((r.num_request_running for r in reqs), dtype=np.float64, count=n)
ntg = np.fromiter((r.num_tokens_generated for r in reqs), dtype=np.float64, count=n)
pcs = np.fromiter((r.prefix_cache_score for r in reqs), dtype=np.float64, count=n)
if settings.ENABLE_TOKEN_IN_FLIGHT_FEATURES:
pti = np.fromiter((r.prefill_tokens_in_flight for r in reqs), dtype=np.float64, count=n)
dti = np.fromiter((r.decode_tokens_in_flight for r in reqs), dtype=np.float64, count=n)

# Snapshot model references under lock (no inference here).
with self.lock:
Expand Down Expand Up @@ -771,6 +782,11 @@ def predict_batch_fast(self, reqs: list) -> Tuple[np.ndarray, np.ndarray]:
'num_request_running': nrr,
'num_tokens_generated': ntg,
})
if settings.ENABLE_TOKEN_IN_FLIGHT_FEATURES:
df_ttft_raw['prefill_tokens_in_flight'] = pti
df_ttft_raw['decode_tokens_in_flight'] = dti
df_tpot_raw['prefill_tokens_in_flight'] = pti
df_tpot_raw['decode_tokens_in_flight'] = dti

# Conditionally add pod_type column (only if any request uses it).
if any(r.pod_type for r in reqs):
Expand Down Expand Up @@ -846,6 +862,8 @@ class PredictionRequest(BaseModel):
num_tokens_generated: int = Field(..., ge=0)
prefix_cache_score: float = Field(..., ge=0.0, le=1.0)
pod_type: Optional[str] = Field(default="", description="Pod type: 'prefill', 'decode', or '' for monolithic")
prefill_tokens_in_flight: int = Field(default=0, ge=0)
decode_tokens_in_flight: int = Field(default=0, ge=0)


class PredictionResponse(BaseModel):
Expand Down
Loading
Loading