Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions vllm/distributed/parallel_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,29 +897,22 @@ def initialize_model_parallel(
get_world_group().device_group)

data_parallel_size = 1
has_external_dp = False
from vllm.config import get_current_vllm_config
config = get_current_vllm_config()
if config is not None:
if config.parallel_config.world_size != world_size:
# detect external data parallelism.
# dp in vllm means all dp instances need to run together.
# if the world size does not match, it means this dp is external,
# and the dp instances can run independently, e.g. in rlhf workflow
# from https://github.com/volcengine/verl .
# in that case, we treat the rest dimensions as if they are
# data parallel, and create a dummy dp group that is not used.
data_parallel_size = world_size // (pipeline_model_parallel_size *
tensor_model_parallel_size)
has_external_dp = True
else:
data_parallel_size = config.parallel_config.data_parallel_size

# the layout order is: DP x PP x TP
data_parallel_size = config.parallel_config.data_parallel_size

# the layout order is: ExternalDP x DP x PP x TP
# ExternalDP is the data parallel group that is not part of the model,
# every dp rank can generate independently (in verl integration).
# DP is the data parallel group that is part of the model,
# all the ranks in the same DP group should generate simultaneously,
# i.e. the `generate` call in the same DP group should be called together,
# otherwise it will cause deadlock.
# to get group_ranks for each dimension, transpose that dimension to the
# last dimension, then reshape to 2D, then unbind the last dimension
all_ranks = torch.arange(world_size).reshape(
data_parallel_size, pipeline_model_parallel_size,
-1, data_parallel_size, pipeline_model_parallel_size,
tensor_model_parallel_size) # noqa

# Build the tensor model-parallel groups.
Expand All @@ -939,7 +932,7 @@ def initialize_model_parallel(
global _PP
assert _PP is None, (
"pipeline model parallel group is already initialized")
group_ranks = all_ranks.transpose(1, 2).reshape(
group_ranks = all_ranks.transpose(2, 3).reshape(
-1, pipeline_model_parallel_size).unbind(0)
group_ranks = [x.tolist() for x in group_ranks]
_PP = init_model_parallel_group(group_ranks,
Expand All @@ -949,16 +942,10 @@ def initialize_model_parallel(

global _DP
assert _DP is None, ("data parallel group is already initialized")
group_ranks = all_ranks.transpose(0,
2).reshape(-1,
group_ranks = all_ranks.transpose(1,
3).reshape(-1,
data_parallel_size).unbind(0)
group_ranks = [x.tolist() for x in group_ranks]
if has_external_dp:
# create a dummy dp group that is not used actually,
# since this dp is external.
# a dummy dp group means every rank is a group itself.
# this way, no communication is needed, no memory is wasted.
group_ranks = [[x] for x in range(world_size)]
_DP = init_model_parallel_group(group_ranks,
get_world_group().local_rank,
backend,
Expand Down