Skip to content

[RFC]: OmniConnector for Multimodal Full Disaggregation (Encode/Prefill/Decode/Generator) #62

@natureofnature

Description

@natureofnature

🚀 The feature, motivation and pitch

RFC: OmniConnector for Multimodal Full Disaggregation (Encode/Prefill/Decode/Generator)

Status

  • Proposal

  • Target repos: vllm-omni (primary), optional hooks in vllm

  • References

    • vLLM EPD components (EC/KV transfer, proxies, connectors)
    • vllm-omni staged pipeline and OmniGPUModelRunner

Please refer to the design doc for details.


Motivation

Image

vLLM supports disaggregated prefilling (P+D) and encoder (E+P+D) using EC/KV connectors and proxies. vllm-omni extends this to multimodal and non-autoregressive stages, but:

  • Stages are mostly co-located / in-process.
  • There is no unified, typed connector abstraction for intermediate artifacts beyond EC/KV (hidden states, latents, audio/image tensors, final artifacts).

We want an OmniConnector abstraction that:

  • Supports full E–P–D–G separation across **same process / same node / cross-node **.
  • Transports typed intermediate payloads with clear, minimal semantics.
  • Reuses vLLM EC/KV connectors where possible without regressing performance.
  • Allows store-based (D2H2D) and D2D transports to coexist under one API.
  • Keeps control plane (HTTP/OpenAI API, scheduling, metadata) separate from data plane (tensor transport).

Initial focus: Mooncake-based store-backed connector (host-based D2H2D).
Later: D2D connectors via proxy-mediated ZMQ/UCX/NCCL/HCCL, without changing stage logic.


Goals

  • Configurable, per-connection connectors for the stage graph encode → prefill → decode → generator (EPDG).
  • Unified OmniPayload and OmniConnector API across modalities.
  • Adapter implementations for existing EC/KV connectors.
  • Capability model (D2D vs D2H2D, zero-copy, persistence, multi-consumer).
  • Request identity scheme compatible with encoder fanout and KV semantics.
  • Standardized observability and health reporting.

Terminology

  • Stage graph: DAG of stages: encode, prefill, decode, generator (plus optional modality-specific stages).
  • Control plane: HTTP/gRPC/OpenAI API, request routing, scheduling, policy (x-request-id, kv_transfer_params, etc.).
  • Data plane: tensor/bytes transport via connectors (EC/KV/Omni).
  • Payload types:
    encode_embeddings, kv_cache, hidden_states, latents, audio, image, final_artifact.
  • Store-like backend: persistent-ish KV-like store (e.g., Mooncake).
  • Stream-like backend: single-consumer, non-persistent transport (e.g., D2D channel).

High-Level Design

Stage Graph and Connections

  • Logical stages: encode, prefill, decode, generator.

  • Connections carry typed payloads:

    • encode → prefill: encode_embeddings (EC-like).
    • prefill → decode: kv_cache (KV path).
    • decode → generator: latents/hidden states, list[int], dict etc .
    • generator → output: final_artifact (image/audio/video/etc. bytes).

We configure connections : each connection is a logical data link from_stage → to_stage plus supported payload_types and a connector implementation.


Control Plane vs Data Plane

  • Control plane: Must not carry large tensors.

    • Generates request_id.
    • Carries metadata via HTTP headers/body (e.g., x-request-id, EC/KV options).
    • Drives scheduling and routing decisions.
    • Carries data location and data length information when needed.
    • For D2D connectors, implementations may perform a lightweight control-plane handshake with the proxy/router for dynamic target selection and channel setup.
  • Data plane:

    • OmniConnector transports large tensors/bytes using D2H2D or D2D methods, depending on the configured backend.
    • Multiple backends are supported: Mooncake (host store), EC/KV connectors, D2D transports (e.g., NCCL/UCX/ZMQ), Nixl connectors, and other data systems.

Request Identity

  • A parent request ID is generated at ingress (proxy/API).

  • OmniConnector keys payloads by:

    (from_stage, to_stage, request_id, payload_type, payload_key?)
    
    • payload_key: optional, for sub-components such as layer index, sequence step, chunk id.
  • For EC/KV:

    • Existing vLLM identifiers (engine id, rank, layer key) remain the source of truth.
    • EC/KV connectors can be wrapped by OmniConnector adapters that map internal IDs to this logical key scheme.

The exact mapping between Omni request_id and HTTP x-request-id or vLLM seq_group_id is an implementation detail, but must be consistent per deployment.


OmniConnector Abstraction

The core abstraction is designed to be lightweight and aligned with the vllm_omni implementation.

OmniConnectorBase API

from abc import ABC, abstractmethod
from typing import Any, Optional

class OmniConnectorBase(ABC):
    """Base class for all OmniConnectors."""

    @abstractmethod
    def put(self, from_stage: str, to_stage: str, request_id: str, data: Any) -> tuple[bool, int]:
        """
        Submit a payload for the logical connection (from_stage → to_stage).
        
        Args:
            from_stage: Source stage identifier.
            to_stage: Destination stage identifier.
            request_id: Unique request identifier.
            data: The payload object (e.g., torch.Tensor, List[Tensor], dict).
                  The connector handles serialization/transport.
        
        Returns:
            tuple: (success: bool, serialized_size: int)
        """
        pass

    @abstractmethod
    def get(self, from_stage: str, to_stage: str, request_id: str) -> Optional[Any]:
        """
        Consume a payload for the logical connection.
        
        Args:
            from_stage: Source stage identifier.
            to_stage: Destination stage identifier.
            request_id: Unique request identifier.
        
        Returns:
            The payload object if found, None otherwise (or raises timeout depending on impl).
        """
        pass

    @abstractmethod
    def cleanup(self, request_id: str) -> None:
        """
        Clean up resources for a request (e.g. remove keys from store).
        """
        pass

    @abstractmethod
    def health(self) -> dict[str, Any]:
        """
        Return health status and metrics.
        """
        pass

Payload Handling

Unlike complex schema definitions, the payload data is treated flexibly:

  • D2H2D (e.g., Mooncake): The connector serializes the Python object (e.g., using torch.save or custom serializers) to bytes and stores it in the shared store.
  • D2D (e.g., NCCL/IPC): The data might be a handle or metadata description that triggers a direct GPU transfer. The put operation initiates the send, and get completes the receive.

Configuration

Configuration is managed via OmniTransferConfig, which maps pipeline edges to ConnectorSpec definitions.

Data Structures

@dataclass
class ConnectorSpec:
    name: str                    # e.g., "MooncakeConnector", "P2pNcclConnector"
    extra: dict[str, Any]        # Backend-specific config (host, port, buffer size, etc.)

@dataclass
class OmniTransferConfig:
    # Map (from_stage, to_stage) -> ConnectorSpec
    connectors: dict[tuple[str, str], ConnectorSpec]
    default_connector: Optional[ConnectorSpec] = None

Example Config (YAML)

This configuration allows mixing different transport types in the same pipeline.

# OmniTransferConfig
connections:
  # Encode -> Prefill: Use high-bandwidth shared memory or Store (Mooncake)
  - from_stage: encode
    to_stage: prefill
    connector:
      name: MooncakeConnector
      extra:
        host: "127.0.0.1"
        metadata_server: "http://127.0.0.1:8080/metadata"

  # Prefill -> Decode: KV Cache transfer (could be P2P or Store)
  - from_stage: prefill
    to_stage: decode
    connector:
      name: P2pNcclConnector  # Future D2D implementation
      extra:
        nccl_num_channels: "16"

  # Decode -> Generator: Hidden states
  - from_stage: decode
    to_stage: generator
    connector:
      name: MooncakeConnector
      extra:
        host: "127.0.0.1"

Integration Logic

  1. Initialization:

    • OmniStageLLM (or equivalent stage runner) loads OmniTransferConfig.
    • It instantiates the specific OmniConnectorBase implementation for its input and output edges.
  2. Runtime (Producer - e.g., Encoder):

    # After computation
    embeddings = model.encode(inputs)
    connector.put(
        from_stage="encode",
        to_stage="prefill",
        request_id=request_id,
        data=embeddings
    )
  3. Runtime (Consumer - e.g., Prefill):

    # Before computation
    embeddings = connector.get(
        from_stage="encode",
        to_stage="prefill",
        request_id=request_id
    )
    if embeddings is None:
         # Handle wait/timeout
         pass
    process(embeddings)

refer to #955 for mooncake RDMA based doc.

Before submitting a new issue...

  • Make sure you already searched for relevant issues, and asked the chatbot living at the bottom right corner of the documentation page, which can answer lots of frequently asked questions.

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions