Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions tests/v1/kv_connector/nixl_integration/toy_proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,28 @@ async def lifespan(app: FastAPI):

# Create prefill clients
for i, (host, port) in enumerate(global_args.prefiller_instances):

prefiller_base_url = f'http://{host}:{port}/v1'

client = httpx.AsyncClient(timeout=None, base_url=prefiller_base_url)

app.state.prefill_clients.append({
'client':
httpx.AsyncClient(timeout=None, base_url=prefiller_base_url),
'host':
host,
'port':
port,
'id':
i
'client': client,
'host': host,
'port': port,
'id': i
})

# Create decode clients
for i, (host, port) in enumerate(global_args.decoder_instances):
decoder_base_url = f'http://{host}:{port}/v1'

client = httpx.AsyncClient(timeout=None, base_url=decoder_base_url)
app.state.decode_clients.append({
'client':
httpx.AsyncClient(timeout=None, base_url=decoder_base_url),
'host':
host,
'port':
port,
'id':
i
'client': client,
'host': host,
'port': port,
'id': i
})

# Initialize round-robin iterators
Expand Down
5 changes: 5 additions & 0 deletions vllm/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import json
import os
import textwrap
import warnings
from collections import defaultdict
from contextlib import contextmanager
from dataclasses import field, fields, is_dataclass, replace
from functools import cached_property, lru_cache
Expand Down Expand Up @@ -663,6 +665,9 @@ def __post_init__(self):
if self.kv_transfer_config is not None:
# Hybrid KV cache manager is not compatible with KV transfer.
self.scheduler_config.disable_hybrid_kv_cache_manager = True
if self.kv_transfer_config.is_kv_transfer_instance:
self.parallel_config.kv_conn_endpoint_metadata = (
defaultdict(dict))
if self.kv_events_config is not None:
# Hybrid KV cache manager is not compatible with KV events.
self.scheduler_config.disable_hybrid_kv_cache_manager = True
Expand Down
8 changes: 8 additions & 0 deletions vllm/config/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
from ray.runtime_env import RuntimeEnv
from ray.util.placement_group import PlacementGroup

from vllm.distributed.kv_transfer.kv_connector.v1.base import (
KVConnectorHandshakeMetadata)
from vllm.executor.executor_base import ExecutorBase
else:
RuntimeEnv = Any
PlacementGroup = Any
KVConnectorHandshakeMetadata = Any
ExecutorBase = Any

logger = init_logger(__name__)
Expand Down Expand Up @@ -212,6 +215,11 @@ class is dynamically inherited by the worker class. This is used to inject
should only be set by API server scale-out.
"""

kv_conn_endpoint_metadata: Optional[dict[int, dict[
int, KVConnectorHandshakeMetadata]]] = None
""" Metadata for KV transfer handshake between prefill and decode engine
processes."""

@property
def world_size_across_dp(self) -> int:
"""world_size_across_dp is TPxPPxDP, it is the size of the world
Expand Down
20 changes: 20 additions & 0 deletions vllm/distributed/kv_transfer/kv_connector/v1/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ class KVConnectorRole(enum.Enum):
WORKER = 1


class KVConnectorHandshakeMetadata(ABC): # noqa: B024
"""
Metadata used for out of band connector handshakeandshake between
P/D workers. This needs to serializeable.
"""
pass


class KVConnectorMetadata(ABC): # noqa: B024
"""
Abstract Metadata used to communicate between the
Expand Down Expand Up @@ -243,6 +251,18 @@ def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
"""
return None

def get_handshake_metadata(self) -> Optional[KVConnectorHandshakeMetadata]:
"""
Get the KVConnector handshake metadata for this connector.
This metadata is used for out-of-band connector handshake
between P/D workers.

Returns:
KVConnectorHandshakeMetadata: the handshake metadata.
None if no handshake metadata is available.
"""
return None

# ==============================
# Scheduler-side methods
# ==============================
Expand Down
Loading