Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions python/paddle/distributed/auto_parallel/process_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import numpy as np

import paddle
from paddle.distributed import fleet
from paddle.distributed.collective import _get_group_map
from paddle.distributed.communication.group import is_initialized
from paddle.framework import core

Expand Down Expand Up @@ -442,8 +444,32 @@ def get_group(
f"{dim_name} not in the dimension names {self._dim_names}"
)
else:
pg = paddle.distributed.new_group(self._process_ids)
return pg
if hasattr(fleet.fleet, "_hcg"):
hcg = fleet.get_hybrid_communicate_group()
if hcg is not None:

parallel_group_map = {
"pp": hcg.get_pipe_parallel_group,
"dp": hcg.get_data_parallel_group,
"mp": hcg.get_model_parallel_group,
"sep": hcg.get_sep_parallel_group,
"sharding": hcg.get_sharding_parallel_group,
}

if dim_name not in parallel_group_map:
raise ValueError(
f"{dim_name} is not a valid dim name."
)

return parallel_group_map[dim_name]()
existing_group = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

冗余变量,在if set(group.ranks) == set(self._process_ids)分支下直接返回 group就好

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

group_map = _get_group_map()
for group in group_map.values():
if set(group.ranks) == set(self._process_ids):
existing_group = group
if existing_group is not None:
return existing_group
return paddle.distributed.new_group(self._process_ids)
else:
if dim_name not in self._dim_names:
raise ValueError(
Expand Down
2 changes: 1 addition & 1 deletion test/auto_parallel/hybrid_strategy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,6 @@ if((WITH_GPU) AND (LINUX))
py_test_modules(
test_process_mesh MODULES test_process_mesh ENVS
"http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_process_mesh PROPERTIES TIMEOUT "60" LABELS
set_tests_properties(test_process_mesh PROPERTIES TIMEOUT "150" LABELS
"RUN_TYPE=HYBRID")
endif()
44 changes: 44 additions & 0 deletions test/auto_parallel/hybrid_strategy/fleet_test_dp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle.distributed as dist
from paddle.distributed import fleet


def test_dp_parallel():
dist_strategy = fleet.DistributedStrategy()
dist_strategy.hybrid_configs = {
"dp_degree": 2,
"mp_degree": 1,
"pp_degree": 1,
}
fleet.init(is_collective=True, strategy=dist_strategy)

mesh = dist.ProcessMesh([0, 1], dim_names=["dp"])

hcg = fleet.get_hybrid_communicate_group()

group = mesh.get_group(dim_name="dp")
hcg_group = hcg.get_data_parallel_group()

group_ranks = group.ranks
hcg_group_ranks = hcg_group.ranks
assert set(group_ranks) == set(hcg_group_ranks)
group_id = group.id
hcg_group_id = hcg_group.id
assert group_id == hcg_group_id


if __name__ == "__main__":
test_dp_parallel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fleet_test_xx,都是类似的文件,是否可以想办法合并成一个文件

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

44 changes: 44 additions & 0 deletions test/auto_parallel/hybrid_strategy/fleet_test_mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle.distributed as dist
from paddle.distributed import fleet


def test_mp_parallel():
dist_strategy = fleet.DistributedStrategy()
dist_strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": 2,
"pp_degree": 1,
}
fleet.init(is_collective=True, strategy=dist_strategy)

mesh = dist.ProcessMesh([0, 1], dim_names=["mp"])

hcg = fleet.get_hybrid_communicate_group()

group = mesh.get_group(dim_name="mp")
hcg_group = hcg.get_model_parallel_group()

group_ranks = group.ranks
hcg_group_ranks = hcg_group.ranks
assert set(group_ranks) == set(hcg_group_ranks)
group_id = group.id
hcg_group_id = hcg_group.id
assert group_id == hcg_group_id


if __name__ == "__main__":
test_mp_parallel()
44 changes: 44 additions & 0 deletions test/auto_parallel/hybrid_strategy/fleet_test_pp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle.distributed as dist
from paddle.distributed import fleet


def test_pp_parallel():
dist_strategy = fleet.DistributedStrategy()
dist_strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": 1,
"pp_degree": 2,
}
fleet.init(is_collective=True, strategy=dist_strategy)

mesh = dist.ProcessMesh([0, 1], dim_names=["pp"])

hcg = fleet.get_hybrid_communicate_group()

group = mesh.get_group(dim_name="pp")
hcg_group = hcg.get_pipe_parallel_group()

group_ranks = group.ranks
hcg_group_ranks = hcg_group.ranks
assert set(group_ranks) == set(hcg_group_ranks)
group_id = group.id
hcg_group_id = hcg_group.id
assert group_id == hcg_group_id


if __name__ == "__main__":
test_pp_parallel()
45 changes: 45 additions & 0 deletions test/auto_parallel/hybrid_strategy/fleet_test_sep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle.distributed as dist
from paddle.distributed import fleet


def test_sep_parallel():
dist_strategy = fleet.DistributedStrategy()
dist_strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": 1,
"pp_degree": 1,
"sep_degree": 2,
}
fleet.init(is_collective=True, strategy=dist_strategy)

mesh = dist.ProcessMesh([0, 1], dim_names=["sep"])

hcg = fleet.get_hybrid_communicate_group()

group = mesh.get_group(dim_name="sep")
hcg_group = hcg.get_sep_parallel_group()

group_ranks = group.ranks
hcg_group_ranks = hcg_group.ranks
assert set(group_ranks) == set(hcg_group_ranks)
group_id = group.id
hcg_group_id = hcg_group.id
assert group_id == hcg_group_id


if __name__ == "__main__":
test_sep_parallel()
45 changes: 45 additions & 0 deletions test/auto_parallel/hybrid_strategy/fleet_test_sharding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import paddle.distributed as dist
from paddle.distributed import fleet


def test_sharding_parallel():
dist_strategy = fleet.DistributedStrategy()
dist_strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": 1,
"pp_degree": 1,
"sharding_degree": 2,
}
fleet.init(is_collective=True, strategy=dist_strategy)

mesh = dist.ProcessMesh([0, 1], dim_names=["sharding"])

hcg = fleet.get_hybrid_communicate_group()

group = mesh.get_group(dim_name="sharding")
hcg_group = hcg.get_sharding_parallel_group()

group_ranks = group.ranks
hcg_group_ranks = hcg_group.ranks
assert set(group_ranks) == set(hcg_group_ranks)
group_id = group.id
hcg_group_id = hcg_group.id
assert group_id == hcg_group_id


if __name__ == "__main__":
test_sharding_parallel()
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_get_group(self):
assert isinstance(
group_1d_with_name, dist.communication.group.Group
)

assert group_1d_with_name.id == group_1d.id
# Test case 3: Single dimension mesh with wrong dim_name
try:
mesh_1d.get_group(dim_name="wrong_name")
Expand Down
19 changes: 14 additions & 5 deletions test/auto_parallel/hybrid_strategy/test_process_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TestProcessMeshPass(test_base.CommunicationTestDistBase):
def setUp(self):
super().setUp(
num_of_devices=2,
timeout=50,
timeout=150,
)
self._default_envs = {
"FLAGS_cudnn_deterministic": "1",
Expand All @@ -35,11 +35,20 @@ def test_process_mesh(self):
envs_list = test_base.gen_product_envs_list(
self._default_envs, self._changeable_envs
)
test_files = [
"fleet_test_dp.py",
"fleet_test_mp.py",
"fleet_test_pp.py",
"fleet_test_sep.py",
"fleet_test_sharding.py",
"process_mesh_demo_unittest.py",
]
for envs in envs_list:
self.run_test_case(
"process_mesh_demo_unittest.py",
user_defined_envs=envs,
)
for test_file in test_files:
self.run_test_case(
test_file,
user_defined_envs=envs,
)


if __name__ == "__main__":
Expand Down
Loading