From a1f193194b3b6f392cd5ff24dacb6a4ec9b98a52 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 13 Sep 2024 11:58:13 -0700 Subject: [PATCH 01/12] init version --- vllm/distributed/parallel_state.py | 103 +++++++++++++++++++++++++++-- 1 file changed, 97 insertions(+), 6 deletions(-) diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 6755b20eec9b..35b59e02541c 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -21,6 +21,7 @@ """ import contextlib import pickle +import weakref from collections import namedtuple from contextlib import contextmanager, nullcontext from dataclasses import dataclass @@ -69,6 +70,57 @@ def _split_tensor_dict( return metadata_list, tensor_list +_group_name_counter: Dict[str, int] = {} + + +def _get_unique_name(name: str) -> str: + """Get a unique name for the group. + Example: + _get_unique_name("tp") -> "tp:0" + _get_unique_name("tp") -> "tp:1" + """ + if name not in _group_name_counter: + _group_name_counter[name] = 0 + newname = f"{name}:{_group_name_counter[name]}" + _group_name_counter[name] += 1 + return newname + + +_groups: Dict[str, weakref.ref["GroupCoordinator"]] = {} + + +def _register_group(group: "GroupCoordinator") -> None: + _groups[group.unique_name] = weakref.ref(group) + + +@torch.library.custom_op("vllm::inplace_all_reduce", mutates_args=["tensor"]) +def inplace_all_reduce(tensor: torch.Tensor, group_name: str) -> None: + assert group_name in _groups, f"Group {group_name} is not found." + group = _groups[group_name]() + if group is None: + raise ValueError(f"Group {group_name} is destroyed.") + group._all_reduce(tensor) + + +@inplace_all_reduce.register_fake +def _(tensor: torch.Tensor, group_name: str) -> None: + return + + +@torch.library.custom_op("vllm::outplace_all_reduce", mutates_args=[]) +def outplace_all_reduce(tensor: torch.Tensor, group_name: str) -> torch.Tensor: + assert group_name in _groups, f"Group {group_name} is not found." + group = _groups[group_name]() + if group is None: + raise ValueError(f"Group {group_name} is destroyed.") + return group._all_reduce(tensor) + + +@outplace_all_reduce.register_fake +def _(tensor: torch.Tensor, group_name: str) -> torch.Tensor: + return torch.empty_like(tensor) + + class GroupCoordinator: """ PyTorch ProcessGroup wrapper for a group of processes. @@ -111,7 +163,11 @@ def __init__( use_custom_allreduce: bool, use_tpu_communicator: bool, use_message_queue_broadcaster: bool = False, + group_name: Optional[str] = None, ): + group_name = group_name or "anonymous" + self.unique_name = _get_unique_name(group_name) + _register_group(self) self.rank = torch.distributed.get_rank() self.local_rank = local_rank @@ -264,16 +320,46 @@ def graph_capture( def all_reduce(self, input_: torch.Tensor) -> torch.Tensor: """ + User-facing all-reduce function before we actually call the + all-reduce operation. + + We need this because Dynamo does not support passing an arbitrary + object (`self` in this case) to a custom op. We need to pass the + group name as a string, and then look up the group coordinator from + the group name, dispatch the all-reduce operation to the group + coordinator. + + In addition, PyTorch custom ops do not support mutation or returning + a new tensor in the same op. So we need to figure out if the op is + in-place or out-of-place ahead of time. + """ + # Bypass the function if we are using only 1 GPU. + if self.world_size == 1: + return input_ + + if self.tpu_communicator is not None and \ + not self.tpu_communicator.disabled: + # TPU handles Dynamo with its own logic. + return self._all_reduce(input_) + + if self.ca_comm is not None and self.ca_comm.should_custom_ar(input_): + return torch.ops.vllm.outplace_all_reduce( + input_, group_name=self.unique_name) + else: + torch.ops.vllm.inplace_all_reduce(input_, + group_name=self.unique_name) + return input_ + + def _all_reduce(self, input_: torch.Tensor) -> torch.Tensor: + """ + The actual all-reduce implementation. + NOTE: This operation will be applied in-place or out-of-place. Always assume this function modifies its input, but use the return value as the output. """ ca_comm = self.ca_comm - # Bypass the function if we are using only 1 GPU. - if self.world_size == 1: - return input_ - # For TPUs, use TPU communicator. tpu_comm = self.tpu_communicator if tpu_comm is not None and not tpu_comm.disabled: @@ -758,6 +844,7 @@ def init_world_group(ranks: List[int], local_rank: int, use_pynccl=False, use_custom_allreduce=False, use_tpu_communicator=False, + group_name="world", ) @@ -767,6 +854,7 @@ def init_model_parallel_group( backend: str, use_custom_allreduce: Optional[bool] = None, use_message_queue_broadcaster: bool = False, + group_name: Optional[str] = None, ) -> GroupCoordinator: if use_custom_allreduce is None: use_custom_allreduce = _ENABLE_CUSTOM_ALL_REDUCE @@ -778,6 +866,7 @@ def init_model_parallel_group( use_custom_allreduce=use_custom_allreduce, use_tpu_communicator=True, use_message_queue_broadcaster=use_message_queue_broadcaster, + group_name=group_name, ) @@ -931,7 +1020,8 @@ def initialize_model_parallel( _TP = init_model_parallel_group(group_ranks, get_world_group().local_rank, backend, - use_message_queue_broadcaster=True) + use_message_queue_broadcaster=True, + group_name="tp") # Build the pipeline model-parallel groups. num_pipeline_model_parallel_groups: int = (world_size // @@ -947,7 +1037,8 @@ def initialize_model_parallel( _PP = init_model_parallel_group(group_ranks, get_world_group().local_rank, backend, - use_custom_allreduce=False) + use_custom_allreduce=False, + group_name="pp") def ensure_model_parallel_initialized( From bb2615bfd032fe1d3e8dd0b33abb2d41d120741d Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 12:32:03 -0700 Subject: [PATCH 02/12] remove spaces --- vllm/distributed/parallel_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 35b59e02541c..0e42078bee57 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -328,7 +328,7 @@ def all_reduce(self, input_: torch.Tensor) -> torch.Tensor: group name as a string, and then look up the group coordinator from the group name, dispatch the all-reduce operation to the group coordinator. - + In addition, PyTorch custom ops do not support mutation or returning a new tensor in the same op. So we need to figure out if the op is in-place or out-of-place ahead of time. From bbdfd6bf3bc35524e81acd32bc93e8c000e63564 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 15:48:39 -0700 Subject: [PATCH 03/12] move should_custom_ar to python --- .../device_communicators/custom_all_reduce.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/vllm/distributed/device_communicators/custom_all_reduce.py b/vllm/distributed/device_communicators/custom_all_reduce.py index 6229f1d6ec78..e3025bf402ea 100644 --- a/vllm/distributed/device_communicators/custom_all_reduce.py +++ b/vllm/distributed/device_communicators/custom_all_reduce.py @@ -33,6 +33,12 @@ def _can_p2p(rank: int, world_size: int) -> bool: return True +def is_weak_contiguous(inp: torch.Tensor): + return inp.is_contiguous() or (inp.storage().nbytes() - + inp.storage_offset() * inp.element_size() + == inp.numel() * inp.element_size()) + + class CustomAllreduce: _SUPPORTED_WORLD_SIZES = [2, 4, 6, 8] @@ -224,8 +230,17 @@ def register_graph_buffers(self): ops.register_graph_buffers(self._ptr, handles, offsets) def should_custom_ar(self, inp: torch.Tensor): - return ops.should_custom_ar(inp, self.max_size, self.world_size, - self.full_nvlink) + inp_size = inp.numel() * inp.element_size() + # custom allreduce requires input byte size to be multiples of 16 + if inp_size % 16 != 0: + return False + if not is_weak_contiguous(inp): + return False + # for 4 or more non NVLink-capable GPUs, custom allreduce provides + # little performance improvement over NCCL. + if self.world_size == 2 or self.full_nvlink: + return inp_size < self.max_size + return False # all reduce, assuming inp tensor is IPC registered with register_buffer, # or, in the context of cuda graphs, register_graph_buffers From 9b4b8029901d635dd7c418261719e414875c150b Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 15:55:00 -0700 Subject: [PATCH 04/12] remove should_custom_ar from c --- csrc/custom_all_reduce.cu | 12 ------------ csrc/ops.h | 2 -- csrc/torch_bindings.cpp | 5 ----- vllm/_custom_ops.py | 6 ------ 4 files changed, 25 deletions(-) diff --git a/csrc/custom_all_reduce.cu b/csrc/custom_all_reduce.cu index 82a3563979f1..9b82bec44c3c 100644 --- a/csrc/custom_all_reduce.cu +++ b/csrc/custom_all_reduce.cu @@ -55,18 +55,6 @@ bool _is_weak_contiguous(torch::Tensor& t) { t.numel() * t.element_size()); } -bool should_custom_ar(torch::Tensor& inp, int64_t max_size, int64_t world_size, - bool full_nvlink) { - auto inp_size = inp.numel() * inp.element_size(); - // custom allreduce requires input byte size to be multiples of 16 - if (inp_size % 16 != 0) return false; - if (!_is_weak_contiguous(inp)) return false; - if (world_size == 2 || full_nvlink) return inp_size <= max_size; - // for 4 or more non NVLink-capable GPUs, custom allreduce provides little - // performance improvement over NCCL. - return false; -} - void _all_reduce(fptr_t _fa, torch::Tensor& inp, torch::Tensor& out, cudaStream_t stream) { auto fa = reinterpret_cast(_fa); diff --git a/csrc/ops.h b/csrc/ops.h index 681ab4b898ca..ee89ad32cb02 100644 --- a/csrc/ops.h +++ b/csrc/ops.h @@ -241,8 +241,6 @@ fptr_t init_custom_ar(torch::Tensor& meta, torch::Tensor& rank_data, const std::vector& handles, const std::vector& offsets, int64_t rank, bool full_nvlink); -bool should_custom_ar(torch::Tensor& inp, int64_t max_size, int64_t world_size, - bool full_nvlink); void all_reduce_reg(fptr_t _fa, torch::Tensor& inp, torch::Tensor& out); void all_reduce_unreg(fptr_t _fa, torch::Tensor& inp, torch::Tensor& reg_buffer, torch::Tensor& out); diff --git a/csrc/torch_bindings.cpp b/csrc/torch_bindings.cpp index d7f7547fbef5..7009180a8687 100644 --- a/csrc/torch_bindings.cpp +++ b/csrc/torch_bindings.cpp @@ -411,11 +411,6 @@ TORCH_LIBRARY_EXPAND(CONCAT(TORCH_EXTENSION_NAME, _custom_ar), custom_ar) { "bool full_nvlink) -> int"); custom_ar.impl("init_custom_ar", torch::kCUDA, &init_custom_ar); - custom_ar.def( - "should_custom_ar(Tensor inp, int max_size, int world_size, " - "bool full_nvlink) -> bool"); - custom_ar.impl("should_custom_ar", torch::kCUDA, &should_custom_ar); - custom_ar.def("all_reduce_reg(int fa, Tensor inp, Tensor! out) -> ()"); custom_ar.impl("all_reduce_reg", torch::kCUDA, &all_reduce_reg); diff --git a/vllm/_custom_ops.py b/vllm/_custom_ops.py index d5b3d7bc6dd5..ac90895b11c3 100644 --- a/vllm/_custom_ops.py +++ b/vllm/_custom_ops.py @@ -870,12 +870,6 @@ def init_custom_ar(meta: torch.Tensor, rank_data: torch.Tensor, offsets, rank, full_nvlink) -def should_custom_ar(inp: torch.Tensor, max_size: int, world_size: int, - full_nvlink: bool) -> bool: - return torch.ops._C_custom_ar.should_custom_ar(inp, max_size, world_size, - full_nvlink) - - def all_reduce_reg(fa: int, inp: torch.Tensor, out: torch.Tensor) -> None: torch.ops._C_custom_ar.all_reduce_reg(fa, inp, out) From 8376f3510f9c3478dd4260fb06d5359248e966f1 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 15:56:23 -0700 Subject: [PATCH 05/12] slight changes --- vllm/distributed/parallel_state.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 0e42078bee57..740b059f71a7 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -205,28 +205,24 @@ def __init__( from vllm.distributed.device_communicators.pynccl import ( PyNcclCommunicator) - self.pynccl_comm: Optional[PyNcclCommunicator] + self.pynccl_comm: Optional[PyNcclCommunicator] = None if use_pynccl and self.world_size > 1: self.pynccl_comm = PyNcclCommunicator( group=self.cpu_group, device=self.device, ) - else: - self.pynccl_comm = None - self.ca_comm: Optional[CustomAllreduce] + self.ca_comm: Optional[CustomAllreduce] = None if use_custom_allreduce and self.world_size > 1: # Initialize a custom fast all-reduce implementation. self.ca_comm = CustomAllreduce( group=self.cpu_group, device=self.device, ) - else: - self.ca_comm = None from vllm.distributed.device_communicators.tpu_communicator import ( TpuCommunicator) - self.tpu_communicator: Optional[TpuCommunicator] + self.tpu_communicator: Optional[TpuCommunicator] = None if use_tpu_communicator and self.world_size > 1: self.tpu_communicator = TpuCommunicator(group=self.cpu_group) From 82e9581548e5f0b52d10258a4fedf61aab8335a0 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 16:03:12 -0700 Subject: [PATCH 06/12] add tests --- .buildkite/test-pipeline.yaml | 10 +++------- tests/compile/test_full_graph.py | 10 +++++++++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index 9b0cb6663a55..9483adcc5d58 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -163,13 +163,6 @@ steps: - python3 tensorize_vllm_model.py --model facebook/opt-125m serialize --serialized-directory /tmp/ --suffix v1 && python3 tensorize_vllm_model.py --model facebook/opt-125m deserialize --path-to-tensors /tmp/vllm/facebook/opt-125m/v1/model.tensors - python3 offline_inference_encoder_decoder.py -- label: torch compile integration test - source_file_dependencies: - - vllm/ - commands: - - pytest -v -s ./compile/test_full_graph.py - - pytest -v -s ./compile/test_wrapper.py - - label: Prefix Caching Test # 7min #mirror_hardwares: [amd] source_file_dependencies: @@ -348,7 +341,10 @@ steps: - vllm/executor/ - vllm/model_executor/models/ - tests/distributed/ + - vllm/compilation commands: + - pytest -v -s ./compile/test_full_graph.py + - pytest -v -s ./compile/test_wrapper.py - VLLM_TEST_SAME_HOST=1 torchrun --nproc-per-node=4 distributed/test_same_node.py | grep -q 'Same node test passed' - TARGET_TEST_SUITE=L4 pytest basic_correctness/ -v -s -m distributed_2_gpus # Avoid importing model tests that cause CUDA reinitialization error diff --git a/tests/compile/test_full_graph.py b/tests/compile/test_full_graph.py index 5452ce6be811..8b6aee751aa1 100644 --- a/tests/compile/test_full_graph.py +++ b/tests/compile/test_full_graph.py @@ -2,9 +2,17 @@ import pytest +from vllm.utils import cuda_device_count_stateless + @pytest.mark.parametrize("model", ["meta-llama/Meta-Llama-3-8B"]) -def test_full_graph(model): +@pytest.mark.parametrize("tp_size", [1, 2]) +def test_full_graph(model, tp_size): + + # Skip the test if there are not enough CUDA devices. + if cuda_device_count_stateless() < tp_size: + pytest.skip("Not enough CUDA devices for the test.") + # make sure these models can be captured in full graph mode if "VLLM_TEST_DYNAMO_GRAPH_CAPTURE" not in os.environ: os.environ["VLLM_TEST_DYNAMO_GRAPH_CAPTURE"] = "1" From d25c8a4832326e456725b7e2837d5fdb1848f8c4 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 16:06:22 -0700 Subject: [PATCH 07/12] add tests --- tests/compile/test_full_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/compile/test_full_graph.py b/tests/compile/test_full_graph.py index 8b6aee751aa1..a65963d4a298 100644 --- a/tests/compile/test_full_graph.py +++ b/tests/compile/test_full_graph.py @@ -25,7 +25,7 @@ def test_full_graph(model, tp_size): "The future of AI is", ] sampling_params = SamplingParams(temperature=0) - llm = LLM(model=model, enforce_eager=True) + llm = LLM(model=model, enforce_eager=True, tensor_parallel_size=tp_size) outputs = llm.generate(prompts, sampling_params) From 244bbd4eb629bdb2df80b9cb0472627d2294d11d Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 16:37:33 -0700 Subject: [PATCH 08/12] type annotation --- vllm/distributed/parallel_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 740b059f71a7..3d303d855ed6 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -86,7 +86,7 @@ def _get_unique_name(name: str) -> str: return newname -_groups: Dict[str, weakref.ref["GroupCoordinator"]] = {} +_groups: Dict[str, weakref.ReferenceType["GroupCoordinator"]] = {} def _register_group(group: "GroupCoordinator") -> None: From 2da8f0688860fdc06d0673def2d769ca4ca1e9db Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 16:50:06 -0700 Subject: [PATCH 09/12] fix type --- vllm/distributed/parallel_state.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 3d303d855ed6..1c864bcd5d70 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -26,7 +26,7 @@ from contextlib import contextmanager, nullcontext from dataclasses import dataclass from multiprocessing import shared_memory -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from unittest.mock import patch import torch @@ -86,11 +86,12 @@ def _get_unique_name(name: str) -> str: return newname -_groups: Dict[str, weakref.ReferenceType["GroupCoordinator"]] = {} +_groups: Dict[str, Callable[[], "GroupCoordinator"]] = {} def _register_group(group: "GroupCoordinator") -> None: - _groups[group.unique_name] = weakref.ref(group) + # looks like Python 3.8 does not understand `ReferenceType` + _groups[group.unique_name] = weakref.ref(group) # type: ignore @torch.library.custom_op("vllm::inplace_all_reduce", mutates_args=["tensor"]) From 7da3dc87264bfbde6c9c9eabdd1aac9c12fb8046 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 18:49:21 -0700 Subject: [PATCH 10/12] add one more condition --- vllm/distributed/device_communicators/custom_all_reduce.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vllm/distributed/device_communicators/custom_all_reduce.py b/vllm/distributed/device_communicators/custom_all_reduce.py index e3025bf402ea..d239d645edc1 100644 --- a/vllm/distributed/device_communicators/custom_all_reduce.py +++ b/vllm/distributed/device_communicators/custom_all_reduce.py @@ -230,6 +230,8 @@ def register_graph_buffers(self): ops.register_graph_buffers(self._ptr, handles, offsets) def should_custom_ar(self, inp: torch.Tensor): + if self.disabled: + return False inp_size = inp.numel() * inp.element_size() # custom allreduce requires input byte size to be multiples of 16 if inp_size % 16 != 0: From 9f9359cfa25a0d6df102fc1d16a2ed89c1d003ac Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 19:57:29 -0700 Subject: [PATCH 11/12] update tests --- tests/compile/test_full_graph.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/compile/test_full_graph.py b/tests/compile/test_full_graph.py index a65963d4a298..6fc445539bbb 100644 --- a/tests/compile/test_full_graph.py +++ b/tests/compile/test_full_graph.py @@ -4,9 +4,12 @@ from vllm.utils import cuda_device_count_stateless +from ..utils import fork_new_process_for_each_test + @pytest.mark.parametrize("model", ["meta-llama/Meta-Llama-3-8B"]) @pytest.mark.parametrize("tp_size", [1, 2]) +@fork_new_process_for_each_test def test_full_graph(model, tp_size): # Skip the test if there are not enough CUDA devices. From e32ee5694756c9fcb088a20f08a145faa0b1efb8 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 16 Sep 2024 20:29:28 -0700 Subject: [PATCH 12/12] add package --- tests/compile/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/compile/__init__.py diff --git a/tests/compile/__init__.py b/tests/compile/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1