Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from vllm.v1.core.sched.utils import check_stop, remove_all
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.metrics.stats import SchedulerStats
from vllm.v1.metrics.stats import PrefixCacheStats, SchedulerStats
from vllm.v1.outputs import DraftTokenIds, KVConnectorOutput, ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.spec_decode.metrics import SpecDecodingStats
Expand Down Expand Up @@ -84,6 +84,7 @@ def __init__(
# will have a corresponding KVConnector with Role=WORKER.
# KV Connector pushes/pull of remote KVs for P/D and offloading.
self.connector = None
self.connector_prefix_cache_stats: PrefixCacheStats | None = None
if self.vllm_config.kv_transfer_config is not None:
assert len(self.kv_cache_config.kv_cache_groups) == 1, (
"Multiple KV cache groups are not currently supported "
Expand All @@ -95,6 +96,8 @@ def __init__(
self.connector = KVConnectorFactory.create_connector(
config=self.vllm_config, role=KVConnectorRole.SCHEDULER
)
if self.log_stats:
self.connector_prefix_cache_stats = PrefixCacheStats()

self.kv_event_publisher = EventPublisherFactory.create(
self.kv_events_config,
Expand Down Expand Up @@ -525,6 +528,9 @@ def schedule(self) -> SchedulerOutput:
new_computed_blocks + new_blocks,
num_external_computed_tokens,
)
self._update_connector_prefix_cache_stats(
request.num_tokens, num_external_computed_tokens
)

# Request was already popped from self.waiting
# unless it was re-added above due to new_blocks being None.
Expand Down Expand Up @@ -1246,11 +1252,13 @@ def make_stats(
return None
prefix_cache_stats = self.kv_cache_manager.make_prefix_cache_stats()
assert prefix_cache_stats is not None
connector_prefix_cache_stats = self._make_connector_prefix_cache_stats()
return SchedulerStats(
num_running_reqs=len(self.running),
num_waiting_reqs=len(self.waiting),
kv_cache_usage=self.kv_cache_manager.usage,
prefix_cache_stats=prefix_cache_stats,
connector_prefix_cache_stats=connector_prefix_cache_stats,
spec_decoding_stats=spec_decoding_stats,
num_corrupted_reqs=sum(req.is_output_corrupted for req in self.running),
kv_connector_stats=kv_connector_stats.data if kv_connector_stats else None,
Expand Down Expand Up @@ -1281,6 +1289,22 @@ def shutdown(self) -> None:
# KV Connector Related Methods
########################################################################

def _update_connector_prefix_cache_stats(
self, request_num_tokens: int, num_external_tokens: int
) -> None:
if self.connector_prefix_cache_stats is None:
return
self.connector_prefix_cache_stats.requests += 1
self.connector_prefix_cache_stats.queries += request_num_tokens
self.connector_prefix_cache_stats.hits += num_external_tokens

def _make_connector_prefix_cache_stats(self) -> PrefixCacheStats | None:
if self.connector_prefix_cache_stats is None:
return None
stats = self.connector_prefix_cache_stats
self.connector_prefix_cache_stats = PrefixCacheStats()
return stats

def get_kv_connector(self) -> KVConnectorBase_V1 | None:
return self.connector

Expand Down
43 changes: 43 additions & 0 deletions vllm/v1/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
# Caching metrics. This cannot be reset.
# TODO: Make the interval configurable.
self.prefix_caching_metrics = CachingMetrics()
self.connector_prefix_caching_metrics = CachingMetrics()
self.mm_caching_metrics = CachingMetrics()

self.spec_decoding_logging = SpecDecodingLogging()
Expand Down Expand Up @@ -122,6 +123,11 @@ def record(
if scheduler_stats is not None:
self.prefix_caching_metrics.observe(scheduler_stats.prefix_cache_stats)

if scheduler_stats.connector_prefix_cache_stats is not None:
self.connector_prefix_caching_metrics.observe(
scheduler_stats.connector_prefix_cache_stats
)

if scheduler_stats.spec_decoding_stats is not None:
self.spec_decoding_logging.observe(scheduler_stats.spec_decoding_stats)
if kv_connector_stats := scheduler_stats.kv_connector_stats:
Expand Down Expand Up @@ -174,6 +180,9 @@ def log(self):
self.last_scheduler_stats.kv_cache_usage * 100,
self.prefix_caching_metrics.hit_rate * 100,
]
if not self.connector_prefix_caching_metrics.empty:
log_parts.append("KV Connector prefix cache hit rate: %.1f%%")
log_args.append(self.connector_prefix_caching_metrics.hit_rate * 100)
if not self.mm_caching_metrics.empty:
log_parts.append("MM cache hit rate: %.1f%%")
log_args.append(self.mm_caching_metrics.hit_rate * 100)
Expand Down Expand Up @@ -439,6 +448,32 @@ def __init__(
counter_prefix_cache_hits, engine_indexes, model_name
)

#
# KV connector cache
#
counter_connector_prefix_cache_queries = self._counter_cls(
name="vllm:connector_prefix_cache_queries",
documentation=(
"KV connector prefix cache queries, "
"in terms of number of queried tokens."
),
labelnames=labelnames,
)
self.counter_connector_prefix_cache_queries = make_per_engine(
counter_connector_prefix_cache_queries, engine_indexes, model_name
)

counter_connector_prefix_cache_hits = self._counter_cls(
name="vllm:connector_prefix_cache_hits",
documentation=(
"KV connector prefix cache hits, in terms of number of cached tokens."
),
labelnames=labelnames,
)
self.counter_connector_prefix_cache_hits = make_per_engine(
counter_connector_prefix_cache_hits, engine_indexes, model_name
)

#
# Multi-modal cache
#
Expand Down Expand Up @@ -865,6 +900,14 @@ def record(
scheduler_stats.prefix_cache_stats.hits
)

if scheduler_stats.connector_prefix_cache_stats is not None:
self.counter_connector_prefix_cache_queries[engine_idx].inc(
scheduler_stats.connector_prefix_cache_stats.queries
)
self.counter_connector_prefix_cache_hits[engine_idx].inc(
scheduler_stats.connector_prefix_cache_stats.hits
)

if scheduler_stats.spec_decoding_stats is not None:
self.spec_decoding_prom.observe(
scheduler_stats.spec_decoding_stats, engine_idx
Expand Down
1 change: 1 addition & 0 deletions vllm/v1/metrics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class SchedulerStats:
kv_cache_usage: float = 0.0

prefix_cache_stats: PrefixCacheStats = field(default_factory=PrefixCacheStats)
connector_prefix_cache_stats: PrefixCacheStats | None = None

spec_decoding_stats: SpecDecodingStats | None = None
kv_connector_stats: dict[str, Any] | None = None
Expand Down