Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tests/distributed/test_comm_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def all_reduce_test_worker(tensor_parallel_size: int, rank: int,
del os.environ["CUDA_VISIBLE_DEVICES"]
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
init_test_distributed_environment(1, tensor_parallel_size, rank, rank,
init_test_distributed_environment(1, tensor_parallel_size, rank,
distributed_init_port)
num_elements = 8
all_tensors = [
Expand All @@ -46,7 +46,7 @@ def all_gather_test_worker(tensor_parallel_size: int, rank: int,
del os.environ["CUDA_VISIBLE_DEVICES"]
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
init_test_distributed_environment(1, tensor_parallel_size, rank, rank,
init_test_distributed_environment(1, tensor_parallel_size, rank,
distributed_init_port)
num_dimensions = 3
tensor_size = list(range(2, num_dimensions + 2))
Expand Down Expand Up @@ -74,7 +74,7 @@ def broadcast_tensor_dict_test_worker(tensor_parallel_size: int, rank: int,
del os.environ["CUDA_VISIBLE_DEVICES"]
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
init_test_distributed_environment(1, tensor_parallel_size, rank, rank,
init_test_distributed_environment(1, tensor_parallel_size, rank,
distributed_init_port)
test_dict = {
"a": torch.arange(8, dtype=torch.float32, device="cuda"),
Expand Down
4 changes: 2 additions & 2 deletions tests/distributed/test_custom_all_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def graph_allreduce(world_size, rank, distributed_init_port):
del os.environ["CUDA_VISIBLE_DEVICES"]
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
init_test_distributed_environment(1, world_size, rank, rank,
init_test_distributed_environment(1, world_size, rank,
distributed_init_port)

custom_ar.init_custom_ar()
Expand Down Expand Up @@ -58,7 +58,7 @@ def eager_allreduce(world_size, rank, distributed_init_port):
del os.environ["CUDA_VISIBLE_DEVICES"]
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
init_test_distributed_environment(1, world_size, rank, rank,
init_test_distributed_environment(1, world_size, rank,
distributed_init_port)

sz = 1024
Expand Down
2 changes: 2 additions & 0 deletions tests/distributed/test_pynccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def distributed_run(fn, world_size):
for i in range(number_of_processes):
env = os.environ.copy()
env['RANK'] = str(i)
env['LOCAL_RANK'] = str(i)
env['WORLD_SIZE'] = str(number_of_processes)
env['LOCAL_WORLD_SIZE'] = str(number_of_processes)
Comment on lines +17 to +19
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test the case where LOCAL_* is not present in env?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests using ray already tested this, i.e. the default value of local_rank=-1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks!

env['MASTER_ADDR'] = 'localhost'
env['MASTER_PORT'] = '12345'
p = multiprocessing.Process(target=fn, args=(env, ))
Expand Down
7 changes: 6 additions & 1 deletion vllm/model_executor/parallel_utils/pynccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ def __init__(
init_method=None,
timeout=datetime.timedelta(seconds=10),
world_size: int = -1,
local_rank: int = -1,
rank: int = -1,
store=None,
group_name: str = "",
pg_options=None,
local_rank: int = -1,
):
if not dist.is_initialized():
backend = backend or "nccl"
Expand All @@ -220,6 +220,11 @@ def __init__(
store=store,
group_name=group_name,
pg_options=pg_options)
self.rank = dist.get_rank()
self.world_size = dist.get_world_size()
if local_rank == -1:
local_rank = self.rank
self.local_rank = local_rank
torch.cuda.set_device(local_rank)
if rank == 0:
self.unique_id = ncclGetUniqueId()
Expand Down
6 changes: 4 additions & 2 deletions vllm/model_executor/parallel_utils/pynccl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ def set_pynccl_stream(stream: torch.cuda.Stream):
pass


def init_process_group(world_size: int, local_rank: int, rank: int,
init_method: str) -> None:
def init_process_group(world_size: int,
rank: int,
init_method: str,
local_rank: int = -1) -> None:
assert not is_initialized()
global comm
logger.info(f"vLLM is using nccl=={ncclGetVersion()}")
Expand Down
6 changes: 3 additions & 3 deletions vllm/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@
def init_test_distributed_environment(
pipeline_parallel_size: int,
tensor_parallel_size: int,
local_rank: int,
rank: int,
distributed_init_port: str,
local_rank: int = -1,
) -> None:
parallel_config = ParallelConfig(pipeline_parallel_size,
tensor_parallel_size,
worker_use_ray=True)
distributed_init_method = f"tcp://localhost:{distributed_init_port}"
init_distributed_environment(
parallel_config,
local_rank,
rank,
distributed_init_method=distributed_init_method)
distributed_init_method=distributed_init_method,
local_rank=local_rank)


def multi_process_tensor_parallel(
Expand Down
7 changes: 4 additions & 3 deletions vllm/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ def init_device(self) -> None:
raise RuntimeError(
f"Not support device type: {self.device_config.device}")
# Initialize the distributed environment.
init_distributed_environment(self.parallel_config, self.local_rank,
self.rank, self.distributed_init_method)
init_distributed_environment(self.parallel_config, self.rank,
self.distributed_init_method,
self.local_rank)
# Set random seed.
set_random_seed(self.model_config.seed)

Expand Down Expand Up @@ -249,9 +250,9 @@ def get_cache_block_size_bytes(self, block_size: int,

def init_distributed_environment(
parallel_config: ParallelConfig,
local_rank: int,
rank: int,
distributed_init_method: Optional[str] = None,
local_rank: int = -1,
) -> None:
"""Initialize the distributed environment."""
if torch.distributed.is_initialized():
Expand Down