Skip to content

Commit f36b9a7

Browse files
authored
【Fleet2.0 Util】 add documents (#26698)
* test=develop, util documents
1 parent e9a0fbf commit f36b9a7

14 files changed

Lines changed: 694 additions & 67 deletions

File tree

python/paddle/distributed/fleet/base/role_maker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ def __get_default_iface_from_interfaces(self):
637637
return "lo"
638638

639639
def __start_kv_server(self, http_server_d, size_d):
640-
from paddle.distributed.fleet.utils import KVServer
640+
from paddle.distributed.fleet.utils.http_server import KVServer
641641
http_server = KVServer(int(self._http_ip_port[1]), size_d)
642642
http_server.start()
643643
wait_seconds = 5
@@ -651,6 +651,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
651651
def __init__(self, is_collective=False, init_gloo=False, **kwargs):
652652
super(UserDefinedRoleMaker, self).__init__(
653653
is_collective=is_collective, init_gloo=init_gloo, **kwargs)
654+
self._init_gloo = init_gloo
654655

655656
def _user_defined_ps_env(self):
656657
self._server_endpoints = self._kwargs.get("server_endpoints")

python/paddle/distributed/fleet/base/util_factory.py

Lines changed: 193 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,18 @@
1616
"""basic collective operations in python"""
1717
"""remote file system"""
1818

19-
__all__ = ['UtilBase']
20-
21-
import numpy as np
22-
import os
23-
24-
import subprocess
25-
from paddle.fluid import core
26-
from collections import OrderedDict
27-
import paddle.fluid as fluid
28-
from google.protobuf import text_format
29-
from paddle.fluid import debugger
30-
from paddle.fluid.framework import Program
31-
from paddle.fluid.proto import framework_pb2
3219
from ..utils.fs import FS, LocalFS, HDFSClient
20+
from paddle.fluid.proto import framework_pb2
21+
from paddle.fluid.framework import Program
22+
from paddle.fluid import debugger
23+
from google.protobuf import text_format
24+
import paddle.fluid as fluid
25+
from collections import OrderedDict
26+
from paddle.fluid import core
27+
import subprocess
28+
import os
29+
import numpy as np
30+
__all__ = ['UtilBase']
3331

3432

3533
class UtilFactory(object):
@@ -53,7 +51,7 @@ def _set_strategy(self, dist_strategy):
5351
def _set_role_maker(self, role_maker):
5452
self.role_maker = role_maker
5553

56-
def set_file_system(self, fs_client):
54+
def _set_file_system(self, fs_client):
5755
assert isinstance(
5856
fs_client, FS
5957
), "fs_client must be the instance of paddle.distributed.fleet.utils.FS"
@@ -87,36 +85,183 @@ def __check_comm_world(self, comm_world="worker"):
8785
return _comm_world
8886

8987
def all_reduce(self, input, mode, comm_world="worker"):
88+
"""
89+
All reduce `input` between specified collection. This is a distributed API.
90+
91+
Args:
92+
input (list|numpy.array): The input variable to do all_reduce between specified collection.
93+
mode (str): "sum" or "min" or "max".
94+
comm_world (str, optional): Collection used to execute all_reduce operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .
95+
96+
Returns:
97+
output(Numpy.array|None): A numpy array with the same shape as the `input` .
98+
99+
Examples:
100+
.. code-block:: python
101+
102+
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
103+
from paddle.distributed.fleet.base.util_factory import fleet_util
104+
import paddle.distributed.fleet as fleet
105+
from paddle.distributed.fleet import PaddleCloudRoleMaker
106+
import sys
107+
import numpy as np
108+
109+
def train():
110+
role = PaddleCloudRoleMaker(
111+
is_collective=False,
112+
init_gloo=True,
113+
path="./tmp_gloo")
114+
fleet.init(role)
115+
fleet_util._set_role_maker(role)
116+
117+
if fleet.is_server():
118+
input = [1, 2]
119+
output = fleet_util.all_reduce(input, "sum", "server")
120+
print(output)
121+
# [2, 4]
122+
elif fleet.is_worker():
123+
input = np.array([3, 4])
124+
output = fleet_util.all_reduce(input, "sum", "worker")
125+
print(output)
126+
# [6, 8]
127+
output = fleet_util.all_reduce(input, "sum", "all")
128+
print(output)
129+
# [8, 12]
130+
if __name__ == "__main__":
131+
train()
132+
"""
90133
_comm_world = self.__check_comm_world(comm_world)
91134
return self.role_maker._all_reduce(_comm_world, input, mode)
92135

93136
def barrier(self, comm_world="worker"):
137+
"""
138+
Barrier between specified collection.
139+
140+
Args:
141+
comm_world (str, optional): Collection used to execute barrier operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .
142+
143+
Examples:
144+
145+
.. code-block:: python
146+
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
147+
148+
from paddle.distributed.fleet.base.util_factory import fleet_util
149+
import paddle.distributed.fleet as fleet
150+
from paddle.distributed.fleet import PaddleCloudRoleMaker
151+
import sys
152+
153+
def train():
154+
role = PaddleCloudRoleMaker(
155+
is_collective=False,
156+
init_gloo=True,
157+
path="./tmp_gloo")
158+
fleet.init(role)
159+
fleet_util._set_role_maker(role)
160+
161+
if fleet.is_server():
162+
fleet_util.barrier("server")
163+
print("all server arrive here")
164+
elif fleet.is_worker():
165+
fleet_util.barrier("worker")
166+
print("all server arrive here")
167+
fleet_util.barrier("all")
168+
print("all servers and workers arrive here")
169+
170+
if __name__ == "__main__":
171+
train()
172+
"""
94173
_comm_world = self.__check_comm_world(comm_world)
95174
self.role_maker._barrier(_comm_world)
96175

97176
def all_gather(self, input, comm_world="worker"):
177+
"""
178+
All gather `input` between specified collection.
179+
180+
Args:
181+
input (Int|Float): The input variable to do all_gather between specified collection.
182+
comm_world (str, optional): Collection used to execute all_reduce operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .
183+
184+
Returns:
185+
output (List): A list of gathered values.
186+
187+
Examples:
188+
189+
.. code-block:: python
190+
191+
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
192+
from paddle.distributed.fleet.base.util_factory import fleet_util
193+
import paddle.distributed.fleet as fleet
194+
from paddle.distributed.fleet import PaddleCloudRoleMaker
195+
import sys
196+
197+
def train():
198+
role = PaddleCloudRoleMaker(
199+
is_collective=False,
200+
init_gloo=True,
201+
path="./tmp_gloo")
202+
fleet.init(role)
203+
fleet_util._set_role_maker(role)
204+
205+
if fleet.is_server():
206+
input = fleet.server_index()
207+
output = fleet_util.all_gather(input, "server")
208+
print(output)
209+
# output = [0, 1]
210+
elif fleet.is_worker():
211+
input = fleet.worker_index()
212+
output = fleet_util.all_gather(input, "worker")
213+
# output = [0, 1]
214+
print(output)
215+
output = fleet_util.all_gather(input, "all")
216+
print(output)
217+
# output = [0, 1, 0, 1]
218+
219+
if __name__ == "__main__":
220+
train()
221+
"""
98222
_comm_world = self.__check_comm_world(comm_world)
99223
return self.role_maker._all_gather(_comm_world, input)
100224

101-
def broadcast(self):
225+
def _broadcast(self):
102226
pass
103227

104-
def scatter(self):
228+
def _scatter(self):
105229
pass
106230

107231
def get_file_shard(self, files):
108232
"""
109-
split files before distributed training,
110-
example 1: files is [a, b, c ,d, e] and trainer_num = 2, then trainer
111-
0 gets [a, b, c] and trainer 1 gets [d, e].
112-
example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
113-
[a], trainer 1 gets [b], trainer 2 gets []
233+
Split files before distributed training, and return filelist assigned to the current trainer.
234+
235+
.. code-block:: text
236+
237+
example 1: files is [a, b, c ,d, e] and trainer_num = 2, then trainer
238+
0 gets [a, b, c] and trainer 1 gets [d, e].
239+
example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
240+
[a], trainer 1 gets [b], trainer 2 gets []
114241
115242
Args:
116-
files(list): file list need to be read.
243+
files(list): File list need to be read.
117244
118245
Returns:
119-
list: files belongs to this worker.
246+
List: Files belong to this worker.
247+
248+
Examples:
249+
250+
.. code-block:: python
251+
252+
from paddle.distributed.fleet.base.util_factory import fleet_util
253+
import paddle.distributed.fleet.base.role_maker as role_maker
254+
255+
role = role_maker.UserDefinedRoleMaker(
256+
is_collective=False,
257+
init_gloo=False,
258+
current_id=0,
259+
role=role_maker.Role.WORKER,
260+
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
261+
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
262+
fleet_util._set_role_maker(role)
263+
files = fleet_util.get_file_shard(["file1", "file2", "file3"])
264+
# files = ["file1", "file2"]
120265
"""
121266
if not isinstance(files, list):
122267
raise TypeError("files should be a list of file need to be read.")
@@ -140,6 +285,30 @@ def get_file_shard(self, files):
140285
return trainer_files[trainer_id]
141286

142287
def print_on_rank(self, message, rank_id):
288+
"""
289+
Woker of rank `rank_id` print some message.
290+
291+
Args:
292+
message(str): Log to be printed.
293+
rank_id(int): trainer id.
294+
295+
Examples:
296+
297+
.. code-block:: python
298+
299+
from paddle.distributed.fleet.base.util_factory import fleet_util
300+
import paddle.distributed.fleet.base.role_maker as role_maker
301+
302+
role = role_maker.UserDefinedRoleMaker(
303+
is_collective=False,
304+
init_gloo=False,
305+
current_id=0,
306+
role=role_maker.Role.WORKER,
307+
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
308+
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
309+
fleet_util._set_role_maker(role)
310+
fleet_util.print_on_rank("I'm worker 0", 0)
311+
"""
143312
if self.role_maker.worker_index() != rank_id:
144313
return
145314
print(message)
@@ -297,7 +466,7 @@ def check_not_expected_ops(prog, not_expected_op_types):
297466
with fluid.scope_guard(scope):
298467
inference_program, feed_target_names, fetch_targets = \
299468
fluid.io.load_inference_model(config.dump_model_dir, exe, model_filename=model_filename,
300-
params_filename=config.save_params_filename)
469+
params_filename=config.save_params_filename)
301470

302471
# check program vars and saved vars shape
303472
orig_para_shape = {

python/paddle/distributed/fleet/launch.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def _parse_args():
8787
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
8888
''')
8989

90-
#Optional arguments for the launch helper
90+
# Optional arguments for the launch helper
9191
parser.add_argument(
9292
"--ips",
9393
type=str,
@@ -115,7 +115,7 @@ def _parse_args():
115115
default="log",
116116
help="The path for each process's log.If it's not set, the log will printed to default pipe."
117117
)
118-
#positional
118+
# positional
119119
parser.add_argument(
120120
"training_script",
121121
type=str,
@@ -124,7 +124,7 @@ def _parse_args():
124124
"followed by all the arguments for the "
125125
"training script")
126126

127-
#rest from the training program
127+
# rest from the training program
128128
parser.add_argument('training_script_args', nargs=REMAINDER)
129129
return parser.parse_args()
130130

@@ -138,7 +138,7 @@ def get_cluster_from_args(args, gpus):
138138

139139
# node_ip = args.node_ip
140140
assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \
141-
% (node_ip, node_ips)
141+
% (node_ip, node_ips)
142142
node_rank = node_ips.index(node_ip)
143143

144144
logger.debug("parsed from args: node_ips:{} node_ip:{} node_rank:{}".format(
@@ -280,7 +280,7 @@ def launch_ps(args):
280280
_, current_node_ip = get_host_name_ip()
281281

282282
assert current_node_ip in node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
283-
% (current_node_ip, node_ips)
283+
% (current_node_ip, node_ips)
284284
node_rank = node_ips.index(current_node_ip)
285285
logger.debug(
286286
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}, server_ports:{}".
@@ -323,10 +323,12 @@ def launch_ps(args):
323323
for idx, cur_server in enumerate(pod.servers):
324324
proc_env = {
325325
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
326+
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
326327
"PADDLE_PORT": cur_server.endpoint.split(":")[1],
327328
"TRAINING_ROLE": "PSERVER",
328329
"PADDLE_TRAINERS_NUM": str(worker_num),
329-
"POD_IP": cur_server.endpoint.split(":")[0]
330+
"POD_IP": cur_server.endpoint.split(":")[0],
331+
"PADDLE_WITH_GLOO": "1"
330332
}
331333
current_env.update(proc_env)
332334

@@ -365,7 +367,8 @@ def launch_ps(args):
365367
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
366368
"PADDLE_TRAINERS_NUM": str(worker_num),
367369
"TRAINING_ROLE": "TRAINER",
368-
"PADDLE_TRAINER_ID": str(cur_worker.rank)
370+
"PADDLE_TRAINER_ID": str(cur_worker.rank),
371+
"PADDLE_WITH_GLOO": "1"
369372
}
370373
current_env.update(proc_env)
371374

@@ -430,7 +433,11 @@ def launch():
430433
co_arg for co_arg in collective_args
431434
if co_arg in " ".join(sys.argv[1:-1])
432435
]
433-
cuda_device_num = fluid.core.get_cuda_device_count()
436+
if fluid.core.is_compiled_with_cuda():
437+
cuda_device_num = fluid.core.get_cuda_device_count()
438+
else:
439+
cuda_device_num = 0
440+
434441
if len(has_ps_args) > 0 or cuda_device_num == 0:
435442
logger.info(
436443
"Run parameter-sever cpu mode. pserver arguments:{}, cuda count:{}".

python/paddle/distributed/fleet/utils/__init__.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,3 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
15-
from .fs import *
16-
from .http_server import KVHandler, KVHTTPServer, KVServer
17-
18-
#__all__ = ['KVHandler', 'KVHTTPServer', 'KVServer'] + fs.__all__

0 commit comments

Comments
 (0)