Skip to content
3 changes: 2 additions & 1 deletion python/paddle/distributed/fleet/base/role_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ def __get_default_iface_from_interfaces(self):
return "lo"

def __start_kv_server(self, http_server_d, size_d):
from paddle.distributed.fleet.utils import KVServer
from paddle.distributed.fleet.utils.http_server import KVServer
http_server = KVServer(int(self._http_ip_port[1]), size_d)
http_server.start()
wait_seconds = 5
Expand All @@ -651,6 +651,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
def __init__(self, is_collective=False, init_gloo=False, **kwargs):
super(UserDefinedRoleMaker, self).__init__(
is_collective=is_collective, init_gloo=init_gloo, **kwargs)
self._init_gloo = init_gloo

def _user_defined_ps_env(self):
self._server_endpoints = self._kwargs.get("server_endpoints")
Expand Down
217 changes: 193 additions & 24 deletions python/paddle/distributed/fleet/base/util_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@
"""basic collective operations in python"""
"""remote file system"""

__all__ = ['UtilBase']

import numpy as np
import os

import subprocess
from paddle.fluid import core
from collections import OrderedDict
import paddle.fluid as fluid
from google.protobuf import text_format
from paddle.fluid import debugger
from paddle.fluid.framework import Program
from paddle.fluid.proto import framework_pb2
from ..utils.fs import FS, LocalFS, HDFSClient
from paddle.fluid.proto import framework_pb2
from paddle.fluid.framework import Program
from paddle.fluid import debugger
from google.protobuf import text_format
import paddle.fluid as fluid
from collections import OrderedDict
from paddle.fluid import core
import subprocess
import os
import numpy as np
__all__ = ['UtilBase']


class UtilFactory(object):
Expand All @@ -53,7 +51,7 @@ def _set_strategy(self, dist_strategy):
def _set_role_maker(self, role_maker):
self.role_maker = role_maker

def set_file_system(self, fs_client):
def _set_file_system(self, fs_client):
assert isinstance(
fs_client, FS
), "fs_client must be the instance of paddle.distributed.fleet.utils.FS"
Expand Down Expand Up @@ -87,36 +85,183 @@ def __check_comm_world(self, comm_world="worker"):
return _comm_world

def all_reduce(self, input, mode, comm_world="worker"):
"""
All reduce `input` between specified collection. This is a distributed API.

Args:
input (list|numpy.array): The input variable to do all_reduce between specified collection.
mode (str): "sum" or "min" or "max".
comm_world (str, optional): Collection used to execute all_reduce operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .

Returns:
output(Numpy.array|None): A numpy array with the same shape as the `input` .

Examples:
.. code-block:: python

# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
import numpy as np

def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)

if fleet.is_server():
input = [1, 2]
output = fleet_util.all_reduce(input, "sum", "server")
print(output)
# [2, 4]
elif fleet.is_worker():
input = np.array([3, 4])
output = fleet_util.all_reduce(input, "sum", "worker")
print(output)
# [6, 8]
output = fleet_util.all_reduce(input, "sum", "all")
print(output)
# [8, 12]
if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
return self.role_maker._all_reduce(_comm_world, input, mode)

def barrier(self, comm_world="worker"):
"""
Barrier between specified collection.

Args:
comm_world (str, optional): Collection used to execute barrier operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .

Examples:

.. code-block:: python
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .

from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys

def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)

if fleet.is_server():
fleet_util.barrier("server")
print("all server arrive here")
elif fleet.is_worker():
fleet_util.barrier("worker")
print("all server arrive here")
fleet_util.barrier("all")
print("all servers and workers arrive here")

if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
self.role_maker._barrier(_comm_world)

def all_gather(self, input, comm_world="worker"):
"""
All gather `input` between specified collection.

Args:
input (Int|Float): The input variable to do all_gather between specified collection.
comm_world (str, optional): Collection used to execute all_reduce operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .

Returns:
output (List): A list of gathered values.

Examples:

.. code-block:: python

# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys

def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)

if fleet.is_server():
input = fleet.server_index()
output = fleet_util.all_gather(input, "server")
print(output)
# output = [0, 1]
elif fleet.is_worker():
input = fleet.worker_index()
output = fleet_util.all_gather(input, "worker")
# output = [0, 1]
print(output)
output = fleet_util.all_gather(input, "all")
print(output)
# output = [0, 1, 0, 1]

if __name__ == "__main__":
train()
"""
_comm_world = self.__check_comm_world(comm_world)
return self.role_maker._all_gather(_comm_world, input)

def broadcast(self):
def _broadcast(self):
pass

def scatter(self):
def _scatter(self):
pass

def get_file_shard(self, files):
"""
split files before distributed training,
example 1: files is [a, b, c ,d, e] and trainer_num = 2, then trainer
0 gets [a, b, c] and trainer 1 gets [d, e].
example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
[a], trainer 1 gets [b], trainer 2 gets []
Split files before distributed training, and return filelist assigned to the current trainer.

.. code-block:: text

example 1: files is [a, b, c ,d, e] and trainer_num = 2, then trainer
0 gets [a, b, c] and trainer 1 gets [d, e].
example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
[a], trainer 1 gets [b], trainer 2 gets []

Args:
files(list): file list need to be read.
files(list): File list need to be read.

Returns:
list: files belongs to this worker.
List: Files belong to this worker.

Examples:

.. code-block:: python

from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet.base.role_maker as role_maker

role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet_util._set_role_maker(role)
files = fleet_util.get_file_shard(["file1", "file2", "file3"])
# files = ["file1", "file2"]
"""
if not isinstance(files, list):
raise TypeError("files should be a list of file need to be read.")
Expand All @@ -140,6 +285,30 @@ def get_file_shard(self, files):
return trainer_files[trainer_id]

def print_on_rank(self, message, rank_id):
"""
Woker of rank `rank_id` print some message.

Args:
message(str): Log to be printed.
rank_id(int): trainer id.

Examples:

.. code-block:: python

from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet.base.role_maker as role_maker

role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet_util._set_role_maker(role)
fleet_util.print_on_rank("I'm worker 0", 0)
"""
if self.role_maker.worker_index() != rank_id:
return
print(message)
Expand Down Expand Up @@ -297,7 +466,7 @@ def check_not_expected_ops(prog, not_expected_op_types):
with fluid.scope_guard(scope):
inference_program, feed_target_names, fetch_targets = \
fluid.io.load_inference_model(config.dump_model_dir, exe, model_filename=model_filename,
params_filename=config.save_params_filename)
params_filename=config.save_params_filename)

# check program vars and saved vars shape
orig_para_shape = {
Expand Down
23 changes: 15 additions & 8 deletions python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _parse_args():
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
''')

#Optional arguments for the launch helper
# Optional arguments for the launch helper
parser.add_argument(
"--ips",
type=str,
Expand Down Expand Up @@ -115,7 +115,7 @@ def _parse_args():
default="log",
help="The path for each process's log.If it's not set, the log will printed to default pipe."
)
#positional
# positional
parser.add_argument(
"training_script",
type=str,
Expand All @@ -124,7 +124,7 @@ def _parse_args():
"followed by all the arguments for the "
"training script")

#rest from the training program
# rest from the training program
parser.add_argument('training_script_args', nargs=REMAINDER)
return parser.parse_args()

Expand All @@ -138,7 +138,7 @@ def get_cluster_from_args(args, gpus):

# node_ip = args.node_ip
assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \
% (node_ip, node_ips)
% (node_ip, node_ips)
node_rank = node_ips.index(node_ip)

logger.debug("parsed from args: node_ips:{} node_ip:{} node_rank:{}".format(
Expand Down Expand Up @@ -280,7 +280,7 @@ def launch_ps(args):
_, current_node_ip = get_host_name_ip()

assert current_node_ip in node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
% (current_node_ip, node_ips)
% (current_node_ip, node_ips)
node_rank = node_ips.index(current_node_ip)
logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}, server_ports:{}".
Expand Down Expand Up @@ -323,10 +323,12 @@ def launch_ps(args):
for idx, cur_server in enumerate(pod.servers):
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
"PADDLE_PORT": cur_server.endpoint.split(":")[1],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": cur_server.endpoint.split(":")[0]
"POD_IP": cur_server.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": "1"
}
current_env.update(proc_env)

Expand Down Expand Up @@ -365,7 +367,8 @@ def launch_ps(args):
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(cur_worker.rank)
"PADDLE_TRAINER_ID": str(cur_worker.rank),
"PADDLE_WITH_GLOO": "1"
}
current_env.update(proc_env)

Expand Down Expand Up @@ -430,7 +433,11 @@ def launch():
co_arg for co_arg in collective_args
if co_arg in " ".join(sys.argv[1:-1])
]
cuda_device_num = fluid.core.get_cuda_device_count()
if fluid.core.is_compiled_with_cuda():
cuda_device_num = fluid.core.get_cuda_device_count()
else:
cuda_device_num = 0

if len(has_ps_args) > 0 or cuda_device_num == 0:
logger.info(
"Run parameter-sever cpu mode. pserver arguments:{}, cuda count:{}".
Expand Down
5 changes: 0 additions & 5 deletions python/paddle/distributed/fleet/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .fs import *
from .http_server import KVHandler, KVHTTPServer, KVServer

#__all__ = ['KVHandler', 'KVHTTPServer', 'KVServer'] + fs.__all__
Loading