From f78816f325453808709e9ccc42aca7fcc6ba0196 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 21 Jan 2021 21:00:32 +0800 Subject: [PATCH 1/8] add ascend_group.py --- .../fluid/tests/unittests/ascend_group.py | 85 +++++++++++++++++++ .../tests/unittests/test_ascend_group.sh | 31 +++++++ 2 files changed, 116 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/ascend_group.py create mode 100644 python/paddle/fluid/tests/unittests/test_ascend_group.sh diff --git a/python/paddle/fluid/tests/unittests/ascend_group.py b/python/paddle/fluid/tests/unittests/ascend_group.py new file mode 100644 index 00000000000000..9af518775abdb7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ascend_group.py @@ -0,0 +1,85 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time + +def init_communicator(program, main_program, current_endpoint, endpoints, ring_id): + nranks = len(endpoints) + other_endpoints = endpoints[:] + other_endpoints.remove(current_endpoint) + group_rank=endpoints.index(current_endpoint) + assert group_rank >=0 + + block = program.global_block() + nccl_id_var = block.create_var( + name=unique_name.generate('nccl_id'), + persistable=True, + type=core.VarDesc.VarType.RAW) + block.append_op( + type='c_gen_nccl_id', + inputs={}, + outputs={'Out': nccl_id_var}, + attrs={ + 'rank': group_rank, + 'endpoint': current_endpoint, + 'other_endpoints': other_endpoints, + OP_ROLE_KEY: OpRole.Forward, + }) + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': group_rank, + 'ring_id': ring_id, + OP_ROLE_KEY: OpRole.Forward, + }) + block.create_var( + name="data", + persistable=True, + dtype='float32') + + with fluid.program_guard(main_program): + data=fluid.layers.fill_constant(shape=[1, positive_2], dtype='float32', value=2.5) + helper.append_op( + type=op_type, + inputs={'X': [data]}, + outputs={'Out': [data]}, + attrs={'ring_id': ring_id, + 'use_calc_stream': True}) + +def train(): + startup_programs=[] + main_programs=[] + + trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"] + groups=[] + groups[0]=[trainer_endpoints[0], trainer_endpoints[1]] + groups[1]=[trainer_endpoints[2], trainer_endpoints[3]] + groups[3]=[trainer_endpoints[0], trainer_endpoints[2]] + + for idx, group in enumerate(groups): + for te in group: + startup_program = fluid.program() + init_communicator(startup_program, main_program, te, group, str(idx)) + startup_programs.append(startup_program) + main_programs.append(main_program) + + print(startup_program[0]) + print(main_program[0]) + + diff --git a/python/paddle/fluid/tests/unittests/test_ascend_group.sh b/python/paddle/fluid/tests/unittests/test_ascend_group.sh new file mode 100644 index 00000000000000..b8b0182e37916b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_ascend_group.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# use paddlecloud +echo "begin test use paddlecloud" +cluster_node_ips="127.0.0.1" +export PADDLE_TRAINERS_NUM=4 +export POD_IP=127.0.0.1 +export PADDLE_TRAINERS=127.0.0.1 +export PADDLE_TRAINER_ID=0 + +export PADDLE_PORT=35789 +export TRAINER_PORTS_NUM=4 + +distributed_args="--ips=${cluster_node_ips} --ascend_npus=0,1,2,3 --log_dir=testlog" +python -m paddle.distributed.fleet.launch ${distributed_args} ascend_multi_process_collective.py fleetlaunchascend \ No newline at end of file From 9732e8a2a2a4f897a5edd07a20361dece4ba166a Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 21 Jan 2021 21:02:47 +0800 Subject: [PATCH 2/8] add ascend_group.py --- .../{ascend_group.py => test_ascend_group.py} | 0 .../tests/unittests/test_ascend_group.sh | 31 ------------------- 2 files changed, 31 deletions(-) rename python/paddle/fluid/tests/unittests/{ascend_group.py => test_ascend_group.py} (100%) delete mode 100644 python/paddle/fluid/tests/unittests/test_ascend_group.sh diff --git a/python/paddle/fluid/tests/unittests/ascend_group.py b/python/paddle/fluid/tests/unittests/test_ascend_group.py similarity index 100% rename from python/paddle/fluid/tests/unittests/ascend_group.py rename to python/paddle/fluid/tests/unittests/test_ascend_group.py diff --git a/python/paddle/fluid/tests/unittests/test_ascend_group.sh b/python/paddle/fluid/tests/unittests/test_ascend_group.sh deleted file mode 100644 index b8b0182e37916b..00000000000000 --- a/python/paddle/fluid/tests/unittests/test_ascend_group.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash - -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -e - -# use paddlecloud -echo "begin test use paddlecloud" -cluster_node_ips="127.0.0.1" -export PADDLE_TRAINERS_NUM=4 -export POD_IP=127.0.0.1 -export PADDLE_TRAINERS=127.0.0.1 -export PADDLE_TRAINER_ID=0 - -export PADDLE_PORT=35789 -export TRAINER_PORTS_NUM=4 - -distributed_args="--ips=${cluster_node_ips} --ascend_npus=0,1,2,3 --log_dir=testlog" -python -m paddle.distributed.fleet.launch ${distributed_args} ascend_multi_process_collective.py fleetlaunchascend \ No newline at end of file From ec51542d806792d6f689e37353fb49ca35084495 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 21 Jan 2021 21:17:44 +0800 Subject: [PATCH 3/8] add with_nccl --- .../fluid/operators/collective/CMakeLists.txt | 1 + .../operators/collective/c_gen_nccl_id_op.cc | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/paddle/fluid/operators/collective/CMakeLists.txt b/paddle/fluid/operators/collective/CMakeLists.txt index 55ad52f6447174..a119518c76d346 100644 --- a/paddle/fluid/operators/collective/CMakeLists.txt +++ b/paddle/fluid/operators/collective/CMakeLists.txt @@ -22,6 +22,7 @@ endif() if(WITH_ASCEND) op_library(gen_nccl_id_op) + op_library(c_gen_nccl_id_op) endif() diff --git a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc index 26f639ebc98b9e..18f2a40f3ddd0b 100644 --- a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc +++ b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc @@ -23,11 +23,15 @@ limitations under the License. */ #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/place.h" +#ifdef PADDLE_WITH_NCCL #include "paddle/fluid/operators/collective/gen_nccl_id_op_helper.h" +#endif namespace paddle { namespace operators { +#ifdef PADDLE_WITH_NCCL + class CGenNCCLIdOp : public framework::OperatorBase { public: CGenNCCLIdOp(const std::string& type, @@ -57,6 +61,22 @@ class CGenNCCLIdOp : public framework::OperatorBase { } }; +#else +class CGenNCCLIdOp : public framework::OperatorBase { + public: + CGenNCCLIdOp(const std::string& type, + const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + } +}; + +#endif + class CGenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { From 287bf2fdd1caff00e7e53f9334c746c38f68ea67 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 21 Jan 2021 21:37:45 +0800 Subject: [PATCH 4/8] add --- .../tests/unittests/test_ascend_group.py | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_ascend_group.py b/python/paddle/fluid/tests/unittests/test_ascend_group.py index 9af518775abdb7..0ed9fc0b92bcf5 100644 --- a/python/paddle/fluid/tests/unittests/test_ascend_group.py +++ b/python/paddle/fluid/tests/unittests/test_ascend_group.py @@ -15,15 +15,26 @@ import os import sys import time +import paddle.fluid as fluid +from paddle.fluid import unique_name +import paddle.fluid.core as core +import paddle +from paddle.fluid.layer_helper import LayerHelper -def init_communicator(program, main_program, current_endpoint, endpoints, ring_id): +paddle.enable_static() + +OpRole = core.op_proto_and_checker_maker.OpRole +OP_ROLE_KEY = core.op_proto_and_checker_maker.kOpRoleAttrName() +OP_ROLE_VAR_KEY = core.op_proto_and_checker_maker.kOpRoleVarAttrName() + +def init_communicator(startup_program, main_program, current_endpoint, endpoints, ring_id): nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) group_rank=endpoints.index(current_endpoint) assert group_rank >=0 - block = program.global_block() + block = startup_program.global_block() nccl_id_var = block.create_var( name=unique_name.generate('nccl_id'), persistable=True, @@ -54,7 +65,9 @@ def init_communicator(program, main_program, current_endpoint, endpoints, ring_i dtype='float32') with fluid.program_guard(main_program): - data=fluid.layers.fill_constant(shape=[1, positive_2], dtype='float32', value=2.5) + op_type="c_allreduce_sum" + data=fluid.layers.fill_constant(shape=[1], dtype='float32', value=2.5) + helper = LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [data]}, @@ -66,20 +79,26 @@ def train(): startup_programs=[] main_programs=[] + trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"] - groups=[] + groups=[[], [], []] groups[0]=[trainer_endpoints[0], trainer_endpoints[1]] groups[1]=[trainer_endpoints[2], trainer_endpoints[3]] - groups[3]=[trainer_endpoints[0], trainer_endpoints[2]] + groups[2]=[trainer_endpoints[0], trainer_endpoints[2]] + + for i in range(len(trainer_endpoints)): + startup_programs.append(fluid.Program()) + main_programs.append(fluid.Program()) for idx, group in enumerate(groups): for te in group: - startup_program = fluid.program() - init_communicator(startup_program, main_program, te, group, str(idx)) - startup_programs.append(startup_program) - main_programs.append(main_program) - - print(startup_program[0]) - print(main_program[0]) + te_idx = trainer_endpoints.index(te) + startup_program = startup_programs[te_idx] + main_program=main_programs[te_idx] + init_communicator(startup_program, main_program, te, group, idx) + print(len(startup_programs)) + print(startup_programs[0]) + print(main_programs[0]) +train() From 79a202c11bb6fff2a5a7db4174b0ae2a94e93a25 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 22 Jan 2021 10:11:48 +0800 Subject: [PATCH 5/8] add test --- .../{test_ascend_group.py => ascend_group.py} | 19 ++++++++++-- .../tests/unittests/test_ascend_group.sh | 30 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) rename python/paddle/fluid/tests/unittests/{test_ascend_group.py => ascend_group.py} (84%) create mode 100644 python/paddle/fluid/tests/unittests/test_ascend_group.sh diff --git a/python/paddle/fluid/tests/unittests/test_ascend_group.py b/python/paddle/fluid/tests/unittests/ascend_group.py similarity index 84% rename from python/paddle/fluid/tests/unittests/test_ascend_group.py rename to python/paddle/fluid/tests/unittests/ascend_group.py index 0ed9fc0b92bcf5..d144d4765a278a 100644 --- a/python/paddle/fluid/tests/unittests/test_ascend_group.py +++ b/python/paddle/fluid/tests/unittests/ascend_group.py @@ -20,6 +20,7 @@ import paddle.fluid.core as core import paddle from paddle.fluid.layer_helper import LayerHelper +from paddle.distributed import fleet paddle.enable_static() @@ -27,6 +28,9 @@ OP_ROLE_KEY = core.op_proto_and_checker_maker.kOpRoleAttrName() OP_ROLE_VAR_KEY = core.op_proto_and_checker_maker.kOpRoleVarAttrName() +role = fleet.PaddleCloudRoleMaker(is_collective=True) +fleet.init(role) + def init_communicator(startup_program, main_program, current_endpoint, endpoints, ring_id): nranks = len(endpoints) other_endpoints = endpoints[:] @@ -75,12 +79,13 @@ def init_communicator(startup_program, main_program, current_endpoint, endpoints attrs={'ring_id': ring_id, 'use_calc_stream': True}) -def train(): +def train(world_endpoints, world_device_ids, local_device_ids): startup_programs=[] main_programs=[] - trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"] + #trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"] + trainer_endpoints=world_endpoints groups=[[], [], []] groups[0]=[trainer_endpoints[0], trainer_endpoints[1]] groups[1]=[trainer_endpoints[2], trainer_endpoints[3]] @@ -101,4 +106,12 @@ def train(): print(startup_programs[0]) print(main_programs[0]) -train() +worker_endpoints=fleet.worker_endpoints() +world_device_ids=fleet.world_device_ids() +local_device_ids=fleet.local_device_ids() + +print("worker_endpoints:", worker_endpoints) +print("world_device_ids:", world_device_ids) +print("local_device_ids:", local_device_ids) + +train(worker_endpoints, world_device_ids,local_device_ids) diff --git a/python/paddle/fluid/tests/unittests/test_ascend_group.sh b/python/paddle/fluid/tests/unittests/test_ascend_group.sh new file mode 100644 index 00000000000000..5f901d59ad4829 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_ascend_group.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +cluster_node_ips="127.0.0.1" +export PADDLE_TRAINERS_NUM=4 +export POD_IP=127.0.0.1 +export PADDLE_TRAINERS=127.0.0.1 +export PADDLE_TRAINER_ID=0 + +export PADDLE_PORT=35789 +export TRAINER_PORTS_NUM=4 + +distributed_args="--ips=${cluster_node_ips} --ascend_npus=0,1,2,3 --log_dir=testlog" +python -m paddle.distributed.fleet.launch ${distributed_args} \ + ascend_group.py fleetascendgroup \ No newline at end of file From 7480190fad002a01bedce44d8e67b96a6cd37936 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 22 Jan 2021 10:13:43 +0800 Subject: [PATCH 6/8] add test --- python/paddle/fluid/tests/unittests/CMakeLists.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 129453149c6d37..1f4648f7963811 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -39,6 +39,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ascend) +list(APPEND MIXED_DIST_TEST_OPS test_ascend_group) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer) @@ -524,6 +525,7 @@ if(WITH_DISTRIBUTE) bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_ascend START_BASH test_fleet_launch_ascend.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_ascend_group START_BASH test_ascend_group.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) # port range (20000, 23000) is reserved for dist-ops set(dist_ut_port 20001) From 76b2ad6dd527fd4b8d9390ef3d528f76bf1defca Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 22 Jan 2021 10:16:11 +0800 Subject: [PATCH 7/8] add test --- python/paddle/fluid/tests/unittests/ascend_group.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/ascend_group.py b/python/paddle/fluid/tests/unittests/ascend_group.py index d144d4765a278a..2b6df7558b4f26 100644 --- a/python/paddle/fluid/tests/unittests/ascend_group.py +++ b/python/paddle/fluid/tests/unittests/ascend_group.py @@ -79,7 +79,7 @@ def init_communicator(startup_program, main_program, current_endpoint, endpoints attrs={'ring_id': ring_id, 'use_calc_stream': True}) -def train(world_endpoints, world_device_ids, local_device_ids): +def train(world_endpoints, world_device_ids, local_device_ids,local_rank): startup_programs=[] main_programs=[] @@ -103,15 +103,17 @@ def train(world_endpoints, world_device_ids, local_device_ids): init_communicator(startup_program, main_program, te, group, idx) print(len(startup_programs)) - print(startup_programs[0]) - print(main_programs[0]) + print(startup_programs[local_rank]) + print(main_programs[local_rank]) worker_endpoints=fleet.worker_endpoints() world_device_ids=fleet.world_device_ids() local_device_ids=fleet.local_device_ids() +local_rank=fleet.local_rank() print("worker_endpoints:", worker_endpoints) print("world_device_ids:", world_device_ids) print("local_device_ids:", local_device_ids) -train(worker_endpoints, world_device_ids,local_device_ids) + +train(worker_endpoints, world_device_ids,local_device_ids,local_rank) From 923e9c6b16ec4a5b447e40319d7b8a631405aaf8 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 22 Jan 2021 10:23:58 +0800 Subject: [PATCH 8/8] modify local_rank --- python/paddle/fluid/tests/unittests/ascend_group.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/ascend_group.py b/python/paddle/fluid/tests/unittests/ascend_group.py index 2b6df7558b4f26..2d5b709a48eeff 100644 --- a/python/paddle/fluid/tests/unittests/ascend_group.py +++ b/python/paddle/fluid/tests/unittests/ascend_group.py @@ -83,7 +83,6 @@ def train(world_endpoints, world_device_ids, local_device_ids,local_rank): startup_programs=[] main_programs=[] - #trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"] trainer_endpoints=world_endpoints groups=[[], [], []] @@ -109,11 +108,11 @@ def train(world_endpoints, world_device_ids, local_device_ids,local_rank): worker_endpoints=fleet.worker_endpoints() world_device_ids=fleet.world_device_ids() local_device_ids=fleet.local_device_ids() -local_rank=fleet.local_rank() +local_rank=int(fleet.local_rank()) print("worker_endpoints:", worker_endpoints) print("world_device_ids:", world_device_ids) print("local_device_ids:", local_device_ids) - +print("local_rank:", local_rank) train(worker_endpoints, world_device_ids,local_device_ids,local_rank)