Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ADD . /vllm-workspace/
COPY --from=build /workspace/vllm/*.so /vllm-workspace/vllm/
# ignore build dependencies installation because we are using pre-complied extensions
RUN rm pyproject.toml
RUN --mount=type=cache,target=/root/.cache/pip VLLM_USE_PRECOMPILED=1 pip install . --verbose
RUN --mount=type=cache,target=/root/.cache/pip VLLM_USE_PRECOMPILED=1 pip install .[ray] --verbose
#################### TEST IMAGE ####################


Expand All @@ -80,7 +80,6 @@ RUN --mount=type=cache,target=/root/.cache/pip VLLM_USE_PRECOMPILED=1 pip instal
# In the future it would be nice to get a container with pytorch and cuda without duplicating cuda
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04 AS vllm-base

# libnccl required for ray
RUN apt-get update -y \
&& apt-get install -y python3-pip

Expand Down
1 change: 0 additions & 1 deletion requirements-rocm.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ ninja # For faster builds.
typing-extensions>=4.8.0
starlette
psutil
ray >= 2.9
sentencepiece # Required for LLaMA tokenizer.
numpy
tokenizers>=0.15.0
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
ninja # For faster builds.
psutil
ray >= 2.9
sentencepiece # Required for LLaMA tokenizer.
numpy
torch == 2.1.2
Expand Down
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import subprocess
import warnings
from pathlib import Path
from typing import List, Set
from typing import List, Set, Dict

from packaging.version import parse, Version
import setuptools
import torch
import torch.utils.cpp_extension as torch_cpp_ext
from torch.utils.cpp_extension import BuildExtension, CUDAExtension, CUDA_HOME, ROCM_HOME
from typing import Optional

ROOT_DIR = os.path.dirname(__file__)

Expand Down Expand Up @@ -434,6 +435,12 @@ def get_requirements() -> List[str]:
return requirements


def get_ray_requirement() -> Optional[Dict[str, List[str]]]:
if _is_neuron():
return None
return {"ray": ["ray >= 2.9"]}


package_data = {
"vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"]
}
Expand Down Expand Up @@ -467,6 +474,7 @@ def get_requirements() -> List[str]:
"examples", "tests")),
python_requires=">=3.8",
install_requires=get_requirements(),
extras_requires=get_ray_requirement(),
ext_modules=ext_modules,
cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {},
package_data=package_data,
Expand Down
66 changes: 66 additions & 0 deletions tests/engine/test_local_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pytest
import torch
import multiprocessing as mp
from vllm import LLM, SamplingParams

TENSOR_PARALLEL_SIZE = 2
MAX_GENERATION_TOKENS = 256


def llm_generate(result_queue, prompt_token_ids, worker_use_ray=False):
try:
llm = LLM(model="facebook/opt-350m",
tensor_parallel_size=TENSOR_PARALLEL_SIZE,
worker_use_ray=worker_use_ray)

output = llm.generate(
prompt_token_ids=prompt_token_ids,
sampling_params=SamplingParams(max_tokens=MAX_GENERATION_TOKENS))
except BaseException as e:
output = e

result_queue.put(output)


def run_llm(prompt_token_ids, worker_use_ray=False):
result_queue = mp.Queue()
proc = mp.Process(target=llm_generate,
args=(result_queue, prompt_token_ids, worker_use_ray))
proc.start()
result = result_queue.get()
proc.join()
if isinstance(result, BaseException):
raise result
return result


def get_prompts():
# https://github.com/vllm-project/vllm/issues/367#issuecomment-1629872996
batch_size = 32
dim = 120
max_token_id = 32000
torch.manual_seed(42)
batch = torch.randint(max_token_id, (batch_size, dim))
prompt_token_ids = [tokens.tolist() for tokens in batch]
return prompt_token_ids


@pytest.mark.skip("Requires multiple GPUs")
def test_local_worker():
# Similar to tests/lora/test_llama.py
# Cannot use as it will initialize torch.cuda too early...
# if torch.cuda.device_count() < 2:
# pytest.skip(f"Not enough GPUs for tensor parallelism {2}")

prompt_token_ids = get_prompts()
output1 = run_llm(prompt_token_ids, worker_use_ray=False)
output2 = run_llm(prompt_token_ids, worker_use_ray=True)
assert len(output1) == len(output2)

completion_token_ids1 = [item.outputs[0].token_ids for item in output1]
completion_token_ids2 = [item.outputs[0].token_ids for item in output2]
assert completion_token_ids1 == completion_token_ids2


if __name__ == "__main__":
test_local_worker()
4 changes: 2 additions & 2 deletions vllm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.engine.llm_engine import LLMEngine
from vllm.engine.ray_utils import initialize_cluster
from vllm.engine.ray_utils import initialize_ray_cluster
from vllm.entrypoints.llm import LLM
from vllm.outputs import CompletionOutput, RequestOutput
from vllm.sampling_params import SamplingParams
Expand All @@ -19,5 +19,5 @@
"EngineArgs",
"AsyncLLMEngine",
"AsyncEngineArgs",
"initialize_cluster",
"initialize_ray_cluster",
]
22 changes: 12 additions & 10 deletions vllm/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib.util
from typing import Optional, Union, ClassVar
from dataclasses import dataclass
import os
Expand Down Expand Up @@ -376,9 +377,9 @@ class ParallelConfig:
Args:
pipeline_parallel_size: Number of pipeline parallel groups.
tensor_parallel_size: Number of tensor parallel groups.
worker_use_ray: Whether to use Ray for model workers. Will be set to
worker_use_ray: Whether to use Ray for model workers. Will default to
True if either pipeline_parallel_size or tensor_parallel_size is
greater than 1.
greater than 1 and Ray is installed.
max_parallel_loading_workers: Maximum number of multiple batches
when load model sequentially. To avoid RAM OOM when using tensor
parallel and large models.
Expand All @@ -392,7 +393,7 @@ def __init__(
self,
pipeline_parallel_size: int,
tensor_parallel_size: int,
worker_use_ray: bool,
worker_use_ray: Optional[bool] = None,
max_parallel_loading_workers: Optional[int] = None,
disable_custom_all_reduce: bool = False,
ray_workers_use_nsight: bool = False,
Expand All @@ -412,9 +413,10 @@ def __init__(
self.ray_workers_use_nsight = ray_workers_use_nsight

self.world_size = pipeline_parallel_size * self.tensor_parallel_size
# Ray worker is not supported for Neuron backend.
if self.world_size > 1 and not is_neuron():
self.worker_use_ray = True
if self.worker_use_ray is None:
ray_found = importlib.util.find_spec("ray") is not None
self.worker_use_ray = ray_found and self.world_size > 1

self._verify_args()

def _verify_args(self) -> None:
Expand Down Expand Up @@ -498,12 +500,12 @@ class DeviceConfig:
def __init__(self, device: str = "auto") -> None:
if device == "auto":
# Automated device type detection
if torch.cuda.is_available():
self.device_type = "cuda"
elif is_neuron():
if is_neuron():
self.device_type = "neuron"
else:
raise RuntimeError("No supported device detected.")
# We don't call torch.cuda.is_available() here to
# avoid initializing CUDA before workers are forked
self.device_type = "cuda"
else:
# Device type is assigned explicitly
self.device_type = device
Expand Down
12 changes: 7 additions & 5 deletions vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class EngineArgs:
kv_cache_dtype: str = 'auto'
seed: int = 0
max_model_len: Optional[int] = None
worker_use_ray: bool = False
worker_use_ray: Optional[bool] = None
pipeline_parallel_size: int = 1
tensor_parallel_size: int = 1
max_parallel_loading_workers: Optional[int] = None
Expand Down Expand Up @@ -149,10 +149,12 @@ def add_cli_args(
help='model context length. If unspecified, '
'will be automatically derived from the model.')
# Parallel arguments
parser.add_argument('--worker-use-ray',
action='store_true',
help='use Ray for distributed serving, will be '
'automatically set when using more than 1 GPU')
parser.add_argument(
'--worker-use-ray',
action=argparse.BooleanOptionalAction,
default=None,
help='use Ray for distributed serving, will default '
'to true when ray is installed and more than 1 GPU is used')
parser.add_argument('--pipeline-parallel-size',
'-pp',
type=int,
Expand Down
18 changes: 12 additions & 6 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from vllm.config import ModelConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.llm_engine import LLMEngine
from vllm.engine.ray_utils import initialize_cluster, ray
from vllm.engine.ray_utils import initialize_ray_cluster, ray
from vllm.logger import init_logger
from vllm.outputs import RequestOutput
from vllm.sampling_params import SamplingParams
Expand Down Expand Up @@ -287,9 +287,15 @@ async def _run_workers_async(
coros.append(asyncio.get_event_loop().run_in_executor(
None, partial(driver_executor, *driver_args, **driver_kwargs)))

# Run the ray workers asynchronously.
for worker in self.workers:
coros.append(worker.execute_method.remote(method, *args, **kwargs))
# Run the workers asynchronously.
if self.parallel_config.worker_use_ray:
for worker in self.workers:
coros.append(
worker.execute_method.remote(method, *args, **kwargs))
else:
for worker in self.workers:
coros.append(
worker.execute_method_async(method, *args, **kwargs))

all_outputs = await asyncio.gather(*coros)
return all_outputs
Expand Down Expand Up @@ -674,8 +680,8 @@ def from_engine_args(cls,
engine_configs = engine_args.create_engine_configs()
parallel_config = engine_configs[2]
# Initialize the cluster.
placement_group = initialize_cluster(parallel_config,
engine_args.engine_use_ray)
placement_group = initialize_ray_cluster(parallel_config,
engine_args.engine_use_ray)
# Create the async LLM engine.
engine = cls(parallel_config.worker_use_ray,
engine_args.engine_use_ray,
Expand Down
Loading