Skip to content
1 change: 1 addition & 0 deletions python/paddle/distributed/fleet/base/role_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
def __init__(self, is_collective=False, init_gloo=False, **kwargs):
super(UserDefinedRoleMaker, self).__init__(
is_collective=is_collective, init_gloo=init_gloo, **kwargs)
self._init_gloo = init_gloo

def _user_defined_ps_env(self):
self._server_endpoints = self._kwargs.get("server_endpoints")
Expand Down
203 changes: 181 additions & 22 deletions python/paddle/distributed/fleet/base/util_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@
"""basic collective operations in python"""
"""remote file system"""

__all__ = ['UtilBase']

import numpy as np
import os

import subprocess
from paddle.fluid import core
from collections import OrderedDict
import paddle.fluid as fluid
from google.protobuf import text_format
from paddle.fluid import debugger
from paddle.fluid.framework import Program
from paddle.fluid.proto import framework_pb2
from ..utils.fs import FS, LocalFS, HDFSClient
from paddle.fluid.proto import framework_pb2
from paddle.fluid.framework import Program
from paddle.fluid import debugger
from google.protobuf import text_format
import paddle.fluid as fluid
from collections import OrderedDict
from paddle.fluid import core
import subprocess
import os
import numpy as np
__all__ = ['UtilBase']


class UtilFactory(object):
Expand All @@ -53,7 +51,7 @@ def _set_strategy(self, dist_strategy):
def _set_role_maker(self, role_maker):
self.role_maker = role_maker

def set_file_system(self, fs_client):
def _set_file_system(self, fs_client):
assert isinstance(
fs_client, FS
), "fs_client must be the instance of paddle.distributed.fleet.utils.FS"
Expand Down Expand Up @@ -87,36 +85,174 @@ def __check_comm_world(self, comm_world="worker"):
return _comm_world

def all_reduce(self, input, mode, comm_world="worker"):
"""
All reduce `input` between specified collection. This is a distributed API.
Args:
input (list|numpy.array): The input variable to do all_reduce between specified collection.
mode (string): "sum" or "min" or "max".
comm_world (String, optional): Collection used to execute all_reduce operation. Currently, supported
collection incudes `worker` , `server` and `all` . The default is `worker` .

Returns:
output (numpy.array|None): A numpy array with the same shape as the `input` .

Examples:
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
.. code-block:: python
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
import numpy as np

def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)

if fleet.is_server():
input = [1, 2]
output = fleet_util.all_reduce(input, "sum", "server")
print(output)
# [2, 4]
elif fleet.is_worker():
input = np.array([3, 4])
output = fleet_util.all_reduce(input, "sum", "worker")
print(output)
# [6, 8]
output = fleet_util.all_reduce(input, "sum", "all")
print(output)
# [8, 12]
if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
return self.role_maker._all_reduce(_comm_world, input, mode)

def barrier(self, comm_world="worker"):
"""
Barrier between specified collection.
Args:
comm_world (String, optional): Collection used to execute barrier operation. Currently, supported
collection incudes `worker` , `server` and `all` . The default is `worker` .

Examples:
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
.. code-block:: python
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys

def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)

if fleet.is_server():
fleet_util.barrier("server")
print("all server arrive here")
elif fleet.is_worker():
fleet_util.barrier("worker")
print("all server arrive here")
fleet_util.barrier("all")
print("all servers and workers arrive here")

if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
self.role_maker._barrier(_comm_world)

def all_gather(self, input, comm_world="worker"):
"""
All gather `input` between specified collection.
Args:
input (int|float): The input variable to do all_gather between specified collection.
comm_world (String, optional): Collection used to execute all_reduce operation. Currently, supported
collection incudes `worker` , `server` and `all` . The default is `worker` .

Returns:
output (list): A list of gathered values.

Examples:
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
.. code-block:: python
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys

def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)

if fleet.is_server():
input = fleet.server_index()
output = fleet_util.all_gather(input, "server")
print(output)
# output = [0, 1]
elif fleet.is_worker():
input = fleet.worker_index()
output = fleet_util.all_gather(input, "worker")
# output = [0, 1]
print(output)
output = fleet_util.all_gather(input, "all")
print(output)
# output = [0, 1, 0, 1]

if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
return self.role_maker._all_gather(_comm_world, input)

def broadcast(self):
def _broadcast(self):
pass

def scatter(self):
def _scatter(self):
pass

def get_file_shard(self, files):
"""
split files before distributed training,
example 1: files is [a, b, c ,d, e] and trainer_num = 2, then trainer
0 gets [a, b, c] and trainer 1 gets [d, e].
example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
[a], trainer 1 gets [b], trainer 2 gets []
Split files before distributed training, and return filelist assigned to the current trainer.

.. code-block:: text
example 1: files is [a, b, c ,d, e] and trainer_num = 2, then trainer
0 gets [a, b, c] and trainer 1 gets [d, e].
example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
[a], trainer 1 gets [b], trainer 2 gets []

Args:
files(list): file list need to be read.

Returns:
list: files belongs to this worker.

Examples:
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet.base.role_maker as role_maker

role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet_util._set_role_maker(role)
files = fleet_util.get_file_shard(["file1", "file2", "file3"])
# files = ["file1", "file2"]
"""
if not isinstance(files, list):
raise TypeError("files should be a list of file need to be read.")
Expand All @@ -140,6 +276,29 @@ def get_file_shard(self, files):
return trainer_files[trainer_id]

def print_on_rank(self, message, rank_id):
"""
Woker of rank `rank_id` print some message.
Args:
message(String): Log to be printed.
rank_id(int): trainer id.

Returns:
None.

Examples:
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet.base.role_maker as role_maker

role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet_util._set_role_maker(role)
fleet_util.print_on_rank("I'm worker 0", 0)
"""
if self.role_maker.worker_index() != rank_id:
return
print(message)
Expand Down Expand Up @@ -297,7 +456,7 @@ def check_not_expected_ops(prog, not_expected_op_types):
with fluid.scope_guard(scope):
inference_program, feed_target_names, fetch_targets = \
fluid.io.load_inference_model(config.dump_model_dir, exe, model_filename=model_filename,
params_filename=config.save_params_filename)
params_filename=config.save_params_filename)

# check program vars and saved vars shape
orig_para_shape = {
Expand Down
27 changes: 17 additions & 10 deletions python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _parse_args():
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
''')

#Optional arguments for the launch helper
# Optional arguments for the launch helper
parser.add_argument(
"--ips",
type=str,
Expand Down Expand Up @@ -115,7 +115,7 @@ def _parse_args():
default="log",
help="The path for each process's log.If it's not set, the log will printed to default pipe."
)
#positional
# positional
parser.add_argument(
"training_script",
type=str,
Expand All @@ -124,7 +124,7 @@ def _parse_args():
"followed by all the arguments for the "
"training script")

#rest from the training program
# rest from the training program
parser.add_argument('training_script_args', nargs=REMAINDER)
return parser.parse_args()

Expand All @@ -138,7 +138,7 @@ def get_cluster_from_args(args, gpus):

# node_ip = args.node_ip
assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \
% (node_ip, node_ips)
% (node_ip, node_ips)
node_rank = node_ips.index(node_ip)

logger.debug("parsed from args: node_ips:{} node_ip:{} node_rank:{}".format(
Expand Down Expand Up @@ -175,8 +175,8 @@ def get_gpus(gpus):
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in gpus.split(','):
assert x in cuda_visible_devices_list, "Can't find "\
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
gpus = [
cuda_visible_devices_list.index(x.strip())
for x in gpus.split(',')
Expand Down Expand Up @@ -273,7 +273,7 @@ def launch_ps(args):
_, current_node_ip = get_host_name_ip()

assert current_node_ip in node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
% (current_node_ip, node_ips)
% (current_node_ip, node_ips)
node_rank = node_ips.index(current_node_ip)
logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}, server_ports:{}".
Expand Down Expand Up @@ -316,10 +316,12 @@ def launch_ps(args):
for idx, cur_server in enumerate(pod.servers):
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
"PADDLE_PORT": cur_server.endpoint.split(":")[1],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": cur_server.endpoint.split(":")[0]
"POD_IP": cur_server.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": "1"
}
current_env.update(proc_env)

Expand Down Expand Up @@ -358,7 +360,8 @@ def launch_ps(args):
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(cur_worker.rank)
"PADDLE_TRAINER_ID": str(cur_worker.rank),
"PADDLE_WITH_GLOO": "1"
}
current_env.update(proc_env)

Expand Down Expand Up @@ -423,7 +426,11 @@ def launch():
co_arg for co_arg in collective_args
if co_arg in " ".join(sys.argv[1:-1])
]
cuda_device_num = fluid.core.get_cuda_device_count()
if fluid.core.is_compiled_with_cuda():
cuda_device_num = fluid.core.get_cuda_device_count()
else:
cuda_device_num = 0

if len(has_ps_args) > 0 or cuda_device_num == 0:
logger.info(
"Run parameter-sever cpu mode. pserver arguments:{}, cuda count:{}".
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/fluid/tests/unittests/test_fleet_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_fs(self):
dirs, files = fs.ls_dir("test_tmp")
dirs, files = fs.ls_dir("./")
self.assertFalse(fs.need_upload_download())
fleet_util.set_file_system(fs)
fleet_util._set_file_system(fs)

def test_barrier(self):
try:
Expand Down