From a8e71970e736c7acfdcb50ef050097ab077441b3 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 13 May 2021 09:20:55 +0000 Subject: [PATCH 01/19] elastic etcd ready --- python/paddle/distributed/fleet/elastic.py | 142 +++++++++++++ python/paddle/distributed/fleet/launch.py | 234 ++++++++++++++++----- 2 files changed, 318 insertions(+), 58 deletions(-) create mode 100644 python/paddle/distributed/fleet/elastic.py diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py new file mode 100644 index 00000000000000..25d83d8f81c62c --- /dev/null +++ b/python/paddle/distributed/fleet/elastic.py @@ -0,0 +1,142 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import etcd3 +import time +import socket +import os +import six +import logging + + +class Status: + READY = 'ready' + RUNNING = 'running' + ERROR = 'error' + COMPLETED = 'completed' + + +class ElasticManager(object): + def __init__(self, server, name, np, host=None, scale=0, force=False): + + logging.info('[elastic] init with server {} host {}'.format(server, + host)) + + srv, port = server.split(':') + self.etcd = etcd3.client(host=srv, port=port) + self.host = host if host else self._get_host() + self.hosts = [] + + # etcd data + self.prefix = "/paddle/" + name + self.node_prefix = self.prefix + '/nodes/' + self.np_path = self.prefix + '/np' + self.host_path = self.node_prefix + self.host + + self.np = np + scale + ''' + 0 group mode, be aware of healthy status of other workers + 1 decouple mode, check own status only + ''' + self.etcd.put(self.prefix, b'0') + + # host + # register self host to etcd + # register watch to reset host after host been deleted + self.etcd.delete_prefix(self.node_prefix) + self.etcd.put(self.host_path, six.b(self.host)) + + def host_call_back(event): + if self.etcd.get(self.host_path)[0] == None: + logging.info('[elastic] register host agin {}'.format( + self.host)) + self.etcd.put(self.host_path, six.b(self.host)) + + host_watch = self.etcd.add_watch_callback(self.host_path, + host_call_back) + + # np + # + inp = int(self.etcd.get(self.np_path)[0] or 0) + if scale == 0 and not force: + assert ( + inp == np, + "[elastic] np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". + format(np, inp)) + else: + assert (inp == np or inp == self.np, + "[elastic] np {} scale to {} by {} is not allowed".format( + inp, self.np, scale)) + + self.etcd.put(self.np_path, six.b("%d" % (self.np))) + + def np_call_back(event): + gnp = int(self.etcd.get(self.np_path)[0]) + if gnp != self.np: + logging.info("[elastic] scale np {} to {} ".format(self.np, + gnp)) + self.np = gnp + + np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) + + self.watches = [host_watch, np_watch] + + def exit(self, completed=False): + logging.info('[elastic] manager exist completed {}'.format(completed)) + + if completed: + self.etcd.put(self.prefix, b'1') + + for watch in self.watches: + self.etcd.cancel_watch(watch) + self.etcd.delete(self.host_path) + + hosts = [i for i in self.etcd.get_prefix(self.node_prefix)] + if len(hosts) == 0: + self.etcd.delete_prefix(self.prefix) + + def _get_host(self): + try: + return socket.gethostbyname(socket.getfqdn(socket.gethostname())) + except: + return '127.0.0.1' + + def _completed(self): + return int(self.etcd.get(self.prefix)[0]) == 1 + + def _match(self): + self.hosts = [ + six.ensure_str(i[0]) for i in self.etcd.get_prefix(self.node_prefix) + ] + if len(self.hosts) == self.np: + return True + else: + return False + + def ready(self): + while True: + if self._match(): + logging.info('[elastic] ready with hosts {}'.format(self.hosts)) + return True + logging.info('[elastic] not ready for np {} with hosts {}'.format( + self.np, self.hosts)) + time.sleep(3) + return False + + def health(self): + return self._completed() or self._match() + + def signal_handler(self, sigint, frame): + self.exit() + exit(0) diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 25b10133191788..0cc528d3c62f2f 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -70,11 +70,13 @@ import paddle.fluid as fluid from paddle.distributed.fleet import launch_utils -# TODO(danleifeng): Don't import * from a module +#from paddle.distributed.fleet.launch_utils import DistributeMode, DeviceMode, get_logger, get_cluster from paddle.distributed.fleet.launch_utils import * import paddle.distributed.fleet.cloud_utils as cloud_utils import paddle.distributed.fleet.ascend_utils as ascend_utils +from paddle.distributed.fleet.elastic import ElasticManager + __all__ = [] @@ -175,6 +177,16 @@ def _parse_args(): "--heter_worker_num", type=int, help="number of heter_workers") ps_group.add_argument("--http_port", type=int, help="Gloo http Port") + # parameter elastic mode + elastic_group = parser.add_argument_group("Elastic Parameters") + elastic_group.add_argument("--elastic_server", type=str, help="") + elastic_group.add_argument("--job_name", type=str, help="") + elastic_group.add_argument("--np", type=int, help="") + elastic_group.add_argument("--scale", type=int, default=0, help="") + elastic_group.add_argument("--port", type=int, help="") + elastic_group.add_argument("--host", type=str, help="") + elastic_group.add_argument("--force", type=bool, default=False, help="") + return parser.parse_args() @@ -214,65 +226,128 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): devices_per_proc) -def launch_collective(args): - # parse arguments, used for cloud-single-machine and local - (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) - trainers_num = cloud_utils.get_trainers_num() - logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format( - trainers_num, device_mode, devices_per_proc)) - - cluster = None - pod = None - - start_port = 6170 - if os.environ.get('FLAGS_START_PORT') is not None: - start_port = os.environ.get('FLAGS_START_PORT') - if cloud_utils.use_paddlecloud() and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster( - args.ips, device_mode, devices_per_proc, start_port) - logger.debug("get cluster from cloud:{}".format(cluster)) - elif device_mode == DeviceMode.ASCEND_NPU: - # for ascend - cluster, pod = ascend_utils.get_cloud_cluster( - rank_table_file=os.getenv("RANK_TABLE_FILE", None), - device_mode=device_mode, - start_port=start_port) - else: - # trainers_num = 1 or not use paddlecloud ips="a,b" - cluster, pod = get_cluster_from_args(args, device_mode, - devices_per_proc) - logger.debug("get cluster from args:{}".format(cluster)) +class LauncherInterface(object): + def __init__(self, args): + self.args = args + self.procs = [] + + def _terminate_procs(self): + for p in self.procs: + if p.proc.poll() is None: + p.proc.terminate() + if p.log_fn: + p.log_fn.close() + logger.debug("terminate process id:{}".format(p.proc.pid)) + + for step in range(0, 50): + alive = False + for p in procs: + if p.proc.poll() is None: # not termniate + os.kill(p.proc.pid, signal.SIGKILL) + alive = True + + if not alive: + logger.info("terminate all the procs") + return True + + time.sleep(1) + return False + + def _check_procs(self): + alive = False + result = None + for p in self.procs: + ret = p.proc.poll() + if ret is None: + alive = True + elif ret != 0: + logger.error("ERROR rank {} error with code {}".format(p.rank, + ret)) + result = ret + if not alive and result is None: + return 0 + else: + return result - global_envs = copy.copy(os.environ.copy()) - gloo_rendezvous_dir = tempfile.mkdtemp() - # add gloo env - global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) - global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" - global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + def launch(self): + raise NotImplementedError - procs = start_local_trainers( - cluster, - pod, - training_script=args.training_script, - training_script_args=args.training_script_args, - log_dir=args.log_dir, - envs=global_envs) + def stop(self): + raise NotImplementedError - for idx, proc in enumerate(procs): - print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + def watch(self): + raise NotImplementedError - while True: - alive = watch_local_trainers(procs, cluster.trainers_nranks()) - if not alive: - logger.info("Local processes completed.") - logger.debug("POD info:{}".format(pod)) - break +class CollectiveLauncher(LauncherInterface): + def __init__(self, args): + self.args = args + self.procs = [] - time.sleep(3) + def launch(self): + print("collective lauchner launch ...") + args = self.args + # parse arguments, used for cloud-single-machine and local + (device_mode, + devices_per_proc) = launch_utils.get_device_proc_info(args) + trainers_num = cloud_utils.get_trainers_num() + logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}". + format(trainers_num, device_mode, devices_per_proc)) - if os.path.exists(gloo_rendezvous_dir): - shutil.rmtree(gloo_rendezvous_dir) + cluster = None + pod = None + + start_port = 6170 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = os.environ.get('FLAGS_START_PORT') + if cloud_utils.use_paddlecloud() and trainers_num != 1: + cluster, pod = cloud_utils.get_cloud_cluster( + args.ips, device_mode, devices_per_proc, start_port) + logger.debug("get cluster from cloud:{}".format(cluster)) + elif device_mode == DeviceMode.ASCEND_NPU: + # for ascend + cluster, pod = ascend_utils.get_cloud_cluster( + rank_table_file=os.getenv("RANK_TABLE_FILE", None), + device_mode=device_mode, + start_port=start_port) + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + cluster, pod = get_cluster_from_args(args, device_mode, + devices_per_proc) + logger.debug("get cluster from args:{}".format(cluster)) + + global_envs = copy.copy(os.environ.copy()) + self.gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + global_envs["PADDLE_WITH_GLOO"] = str( + os.getenv("PADDLE_WITH_GLOO", "0")) + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" + global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir + + procs = start_local_trainers( + cluster, + pod, + training_script=args.training_script, + training_script_args=args.training_script_args, + log_dir=args.log_dir, + envs=global_envs) + + for idx, proc in enumerate(procs): + print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + + def stop(self): + print("collective lauchner stop ...") + self._terminate_procs() + if os.path.exists(self.gloo_rendezvous_dir): + shutil.rmtree(self.gloo_rendezvous_dir) + + def watch(self): + print("collective lauchner watch ...") + for p in self.procs: + if p.log_fn and p.local_rank == 0: + pull_worker_log(p) + ret = self._check_procs() + return ret def launch_ps(args, distribute_mode): @@ -361,11 +436,7 @@ def which_distributed_mode(args): return DistributeMode.COLLECTIVE -def launch(): - args = _parse_args() - logger = get_logger() - _print_arguments(args) - +def launch_elastic(): distribute_mode = which_distributed_mode(args) if distribute_mode == DistributeMode.COLLECTIVE: launch_collective(args) @@ -373,5 +444,52 @@ def launch(): launch_ps(args, distribute_mode) +def launch(): + args = _parse_args() + logger = get_logger() + _print_arguments(args) + + print('launch host', args.host) + elastic = ElasticManager( + args.elastic_server, + args.job_name, + args.np, + args.host, + scale=args.scale, + force=args.force, ) + signal.signal(signal.SIGTERM, elastic.signal_handler) + signal.signal(signal.SIGABRT, elastic.signal_handler) + signal.signal(signal.SIGINT, elastic.signal_handler) + + while elastic.ready(): + + args.ips = ','.join(elastic.hosts) + os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) + + distribute_mode = which_distributed_mode(args) + if distribute_mode == DistributeMode.COLLECTIVE: + launcher = CollectiveLauncher(args) + + launcher.launch() + + while True: + + ret = launcher.watch() + + if ret == 0: # completed + launcher.stop() + elastic.exit(completed=True) + exit(0) + elif ret is not None: # error + launcher.stop() + break + + if not elastic.health(): + launcher.stop() + break + + time.sleep(3) + + if __name__ == "__main__": launch() From ac7dbee513c4cd11b5327ba4cc06c7b62503922f Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 14 May 2021 09:34:55 +0000 Subject: [PATCH 02/19] collective demo ready --- python/paddle/distributed/fleet/elastic.py | 60 ++++++++++++--------- python/paddle/distributed/fleet/launch.py | 63 +++++++++++++--------- 2 files changed, 73 insertions(+), 50 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 25d83d8f81c62c..9d0042445b6845 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -1,18 +1,17 @@ # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import etcd3 import time import socket import os @@ -20,23 +19,25 @@ import logging -class Status: - READY = 'ready' - RUNNING = 'running' - ERROR = 'error' - COMPLETED = 'completed' - - class ElasticManager(object): def __init__(self, server, name, np, host=None, scale=0, force=False): - logging.info('[elastic] init with server {} host {}'.format(server, - host)) + print('[elastic] init with server {} host {}'.format(server, host)) + + self.hosts = [] + self.stopped = False + + if not server or not name or not np: + self.enable = False + return + else: + self.enable = True + + import etcd3 srv, port = server.split(':') self.etcd = etcd3.client(host=srv, port=port) self.host = host if host else self._get_host() - self.hosts = [] # etcd data self.prefix = "/paddle/" + name @@ -59,8 +60,7 @@ def __init__(self, server, name, np, host=None, scale=0, force=False): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: - logging.info('[elastic] register host agin {}'.format( - self.host)) + print('[elastic] register host agin {}'.format(self.host)) self.etcd.put(self.host_path, six.b(self.host)) host_watch = self.etcd.add_watch_callback(self.host_path, @@ -84,8 +84,7 @@ def host_call_back(event): def np_call_back(event): gnp = int(self.etcd.get(self.np_path)[0]) if gnp != self.np: - logging.info("[elastic] scale np {} to {} ".format(self.np, - gnp)) + print("[elastic] scale np {} to {} ".format(self.np, gnp)) self.np = gnp np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) @@ -93,7 +92,10 @@ def np_call_back(event): self.watches = [host_watch, np_watch] def exit(self, completed=False): - logging.info('[elastic] manager exist completed {}'.format(completed)) + print('[elastic] manager exist completed {}'.format(completed)) + + if not self.enable: + return if completed: self.etcd.put(self.prefix, b'1') @@ -125,18 +127,28 @@ def _match(self): return False def ready(self): - while True: + if not self.enable: + return True + + while not self.stopped: if self._match(): - logging.info('[elastic] ready with hosts {}'.format(self.hosts)) + print('[elastic] ready with hosts {}'.format(self.hosts)) return True - logging.info('[elastic] not ready for np {} with hosts {}'.format( + print('[elastic] not ready for np {} with hosts {}'.format( self.np, self.hosts)) time.sleep(3) return False def health(self): + if self.stopped: + return False + + if not self.enable: + return True + return self._completed() or self._match() def signal_handler(self, sigint, frame): - self.exit() - exit(0) + if self.enable: + self.exit() + self.stopped = True diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 0cc528d3c62f2f..0c80c7f4097298 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -195,7 +195,10 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): if len(node_ips) == 1: node_ip = node_ips[0] else: - _, node_ip = get_host_name_ip() + if args.host: + node_ip = args.host + else: + _, node_ip = get_host_name_ip() assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \ % (node_ip, node_ips) @@ -241,7 +244,7 @@ def _terminate_procs(self): for step in range(0, 50): alive = False - for p in procs: + for p in self.procs: if p.proc.poll() is None: # not termniate os.kill(p.proc.pid, signal.SIGKILL) alive = True @@ -324,7 +327,7 @@ def launch(self): global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir - procs = start_local_trainers( + self.procs = start_local_trainers( cluster, pod, training_script=args.training_script, @@ -332,7 +335,7 @@ def launch(self): log_dir=args.log_dir, envs=global_envs) - for idx, proc in enumerate(procs): + for idx, proc in enumerate(self.procs): print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) def stop(self): @@ -436,53 +439,58 @@ def which_distributed_mode(args): return DistributeMode.COLLECTIVE -def launch_elastic(): - distribute_mode = which_distributed_mode(args) - if distribute_mode == DistributeMode.COLLECTIVE: - launch_collective(args) - else: - launch_ps(args, distribute_mode) - - def launch(): args = _parse_args() logger = get_logger() _print_arguments(args) print('launch host', args.host) + elastic_server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') + job_name = args.job_name or os.getenv('PADDLE_JOB_NAME') + np = args.np or int(os.getenv('PADDLE_NP', 0)) + host = args.host or os.getenv('POD_IP') + scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) + force = args.force or os.getenv('PADDLE_FORCE') + elastic = ElasticManager( - args.elastic_server, - args.job_name, - args.np, - args.host, - scale=args.scale, - force=args.force, ) + elastic_server, + job_name, + np, + host, + scale=scale, + force=force, ) signal.signal(signal.SIGTERM, elastic.signal_handler) signal.signal(signal.SIGABRT, elastic.signal_handler) signal.signal(signal.SIGINT, elastic.signal_handler) while elastic.ready(): - args.ips = ','.join(elastic.hosts) - os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) + if elastic.enable: + args.ips = ','.join(elastic.hosts) + os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) distribute_mode = which_distributed_mode(args) if distribute_mode == DistributeMode.COLLECTIVE: launcher = CollectiveLauncher(args) + else: + # TODO(kuizhiqing) ps elastic support later + launch_ps(args, distribute_mode) + return launcher.launch() while True: ret = launcher.watch() + print("launch watch ret", ret) - if ret == 0: # completed - launcher.stop() - elastic.exit(completed=True) - exit(0) - elif ret is not None: # error + if ret is not None: + print('job exit', ret) + # process is completed if ret >= 0 or error else + completed = True if ret == 0 else False launcher.stop() - break + elastic.exit(completed=completed) + sys.exit(ret) if not elastic.health(): launcher.stop() @@ -490,6 +498,9 @@ def launch(): time.sleep(3) + launcher.stop() + sys.exit(0) + if __name__ == "__main__": launch() From e9ff9a81fdcef7f0249a616856c13323b62d7960 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Tue, 25 May 2021 12:56:00 +0000 Subject: [PATCH 03/19] update rank, use logger --- python/paddle/distributed/fleet/elastic.py | 159 ++++++++++++++++++--- python/paddle/distributed/fleet/launch.py | 139 +++++------------- 2 files changed, 171 insertions(+), 127 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 9d0042445b6845..3b8d22770c2574 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -17,17 +17,97 @@ import os import six import logging +import signal + +logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper()) +logger = logging.getLogger("ELASTIC") + +ELASTIC_EXIT_CODE = 10001 + + +class ElasticStatus: + COMPLETED = "completed" + ERROR = "error" + HOLD = "hold" + RESTART = "restart" + EXIT = "exit" + + +class LauncherInterface(object): + def __init__(self, args): + self.args = args + self.procs = [] + + def _terminate_procs(self): + for p in self.procs: + if p.proc.poll() is None: + p.proc.terminate() + if p.log_fn: + p.log_fn.close() + logger.info("terminate process id:{}".format(p.proc.pid)) + + for step in range(0, 50): + alive = False + for p in self.procs: + if p.proc.poll() is None: # not termniate + os.kill(p.proc.pid, signal.SIGKILL) + alive = True + + if not alive: + logger.info("terminate all the procs") + return True + + time.sleep(1) + return False + + def _check_procs(self): + alive = False + result = None + for p in self.procs: + ret = p.proc.poll() + if ret is None: + alive = True + elif ret != 0: + logger.error("ERROR rank {} error with code {}".format(p.rank, + ret)) + result = ret + if not alive and result is None: + return 0 + else: + return result + + def launch(self): + raise NotImplementedError + + def stop(self): + raise NotImplementedError + + def watch(self): + raise NotImplementedError class ElasticManager(object): - def __init__(self, server, name, np, host=None, scale=0, force=False): + def __init__(self, args): + + self.args = args + server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') + name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') + np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) + host = args.host or os.getenv('POD_IP') + scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) + force = args.force or os.getenv('PADDLE_FORCE') + + self.elastic_level = int( + os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) - print('[elastic] init with server {} host {}'.format(server, host)) + #elastic_timeout = os.getenv('PADDLE_ELASTIC_TIMEOUT',1) + + logger.debug('init with server {} host {}'.format(server, host)) self.hosts = [] self.stopped = False - if not server or not name or not np: + if not server or ':' not in server or not name or not np: self.enable = False return else: @@ -60,23 +140,22 @@ def __init__(self, server, name, np, host=None, scale=0, force=False): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: - print('[elastic] register host agin {}'.format(self.host)) + logger.debug('register host again {}'.format(self.host)) self.etcd.put(self.host_path, six.b(self.host)) host_watch = self.etcd.add_watch_callback(self.host_path, host_call_back) - # np - # + # np describes the exact number of nodes to run the job inp = int(self.etcd.get(self.np_path)[0] or 0) if scale == 0 and not force: assert ( inp == np, - "[elastic] np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". + "np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". format(np, inp)) else: assert (inp == np or inp == self.np, - "[elastic] np {} scale to {} by {} is not allowed".format( + "np {} scale to {} by {} is not allowed".format( inp, self.np, scale)) self.etcd.put(self.np_path, six.b("%d" % (self.np))) @@ -84,7 +163,7 @@ def host_call_back(event): def np_call_back(event): gnp = int(self.etcd.get(self.np_path)[0]) if gnp != self.np: - print("[elastic] scale np {} to {} ".format(self.np, gnp)) + logger.info("scale np {} to {} ".format(self.np, gnp)) self.np = gnp np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) @@ -92,7 +171,7 @@ def np_call_back(event): self.watches = [host_watch, np_watch] def exit(self, completed=False): - print('[elastic] manager exist completed {}'.format(completed)) + logger.info('manager exist completed {}'.format(completed)) if not self.enable: return @@ -115,6 +194,9 @@ def _get_host(self): return '127.0.0.1' def _completed(self): + if not self.enable: + return True + return int(self.etcd.get(self.prefix)[0]) == 1 def _match(self): @@ -126,27 +208,60 @@ def _match(self): else: return False - def ready(self): + def _update_hosts(self): + assert (len(self.hosts) != 0, 'hosts empty') + hosts = ','.join(self.hosts) + self.args.ips = hosts + os.environ['PADDLE_TRAINERS'] = hosts + os.environ['PADDLE_TRAINER_ID'] = '{}'.format( + self.hosts.index(self.host)) + + def wait(self): if not self.enable: - return True + return while not self.stopped: if self._match(): - print('[elastic] ready with hosts {}'.format(self.hosts)) - return True - print('[elastic] not ready for np {} with hosts {}'.format( - self.np, self.hosts)) + logger.info('ready with hosts {}'.format(self.hosts)) + self._update_hosts() + return + logger.info('not ready for np {} with hosts {}'.format(self.np, + self.hosts)) time.sleep(3) - return False + return - def health(self): + def run(self, launcher): if self.stopped: - return False + return - if not self.enable: - return True + self.launcher = launcher(self.args) + self.launcher.launch() + + def watch(self): + + while not self.stopped: + ret = self.launcher.watch() + + if ret is not None: # self terminated + logger.info('job exit with code {}'.format(ret)) + # process is completed if ret >= 0 or error else + completed = True if ret == 0 else False + self.launcher.stop() + self.exit(completed=completed) + if completed: + return ElasticStatus.COMPLETED + if self.elastic_level == 1: + return ElasticStatus.RESTART + else: + return ElasticStatus.ERROR + + if not self._completed() and not self._match(): + self.launcher.stop() + return ElasticStatus.HOLD + + time.sleep(3) - return self._completed() or self._match() + return ElasticStatus.EXIT def signal_handler(self, sigint, frame): if self.enable: diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 0c80c7f4097298..c151ac3777c26d 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -69,6 +69,7 @@ import paddle import paddle.fluid as fluid from paddle.distributed.fleet import launch_utils +import signal #from paddle.distributed.fleet.launch_utils import DistributeMode, DeviceMode, get_logger, get_cluster from paddle.distributed.fleet.launch_utils import * @@ -76,6 +77,9 @@ import paddle.distributed.fleet.ascend_utils as ascend_utils from paddle.distributed.fleet.elastic import ElasticManager +from paddle.distributed.fleet.elastic import LauncherInterface +from paddle.distributed.fleet.elastic import ElasticStatus +from paddle.distributed.fleet.elastic import ELASTIC_EXIT_CODE __all__ = [] @@ -180,7 +184,7 @@ def _parse_args(): # parameter elastic mode elastic_group = parser.add_argument_group("Elastic Parameters") elastic_group.add_argument("--elastic_server", type=str, help="") - elastic_group.add_argument("--job_name", type=str, help="") + elastic_group.add_argument("--job_id", type=str, help="") elastic_group.add_argument("--np", type=int, help="") elastic_group.add_argument("--scale", type=int, default=0, help="") elastic_group.add_argument("--port", type=int, help="") @@ -229,66 +233,13 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): devices_per_proc) -class LauncherInterface(object): - def __init__(self, args): - self.args = args - self.procs = [] - - def _terminate_procs(self): - for p in self.procs: - if p.proc.poll() is None: - p.proc.terminate() - if p.log_fn: - p.log_fn.close() - logger.debug("terminate process id:{}".format(p.proc.pid)) - - for step in range(0, 50): - alive = False - for p in self.procs: - if p.proc.poll() is None: # not termniate - os.kill(p.proc.pid, signal.SIGKILL) - alive = True - - if not alive: - logger.info("terminate all the procs") - return True - - time.sleep(1) - return False - - def _check_procs(self): - alive = False - result = None - for p in self.procs: - ret = p.proc.poll() - if ret is None: - alive = True - elif ret != 0: - logger.error("ERROR rank {} error with code {}".format(p.rank, - ret)) - result = ret - if not alive and result is None: - return 0 - else: - return result - - def launch(self): - raise NotImplementedError - - def stop(self): - raise NotImplementedError - - def watch(self): - raise NotImplementedError - - class CollectiveLauncher(LauncherInterface): def __init__(self, args): self.args = args self.procs = [] def launch(self): - print("collective lauchner launch ...") + logger.info("collective lauchner launch ...") args = self.args # parse arguments, used for cloud-single-machine and local (device_mode, @@ -336,16 +287,16 @@ def launch(self): envs=global_envs) for idx, proc in enumerate(self.procs): - print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + logger.info("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) def stop(self): - print("collective lauchner stop ...") + logger.info("collective lauchner stop ...") self._terminate_procs() if os.path.exists(self.gloo_rendezvous_dir): shutil.rmtree(self.gloo_rendezvous_dir) def watch(self): - print("collective lauchner watch ...") + logger.debug("collective lauchner watch ...") for p in self.procs: if p.log_fn and p.local_rank == 0: pull_worker_log(p) @@ -444,61 +395,39 @@ def launch(): logger = get_logger() _print_arguments(args) - print('launch host', args.host) - elastic_server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') - job_name = args.job_name or os.getenv('PADDLE_JOB_NAME') - np = args.np or int(os.getenv('PADDLE_NP', 0)) - host = args.host or os.getenv('POD_IP') - scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) - force = args.force or os.getenv('PADDLE_FORCE') - - elastic = ElasticManager( - elastic_server, - job_name, - np, - host, - scale=scale, - force=force, ) + distribute_mode = which_distributed_mode(args) + # TODO(kuizhiqing) support ps later + if not distribute_mode == DistributeMode.COLLECTIVE: + launch_ps(args, distribute_mode) + return + + elastic = ElasticManager(args) + signal.signal(signal.SIGTERM, elastic.signal_handler) signal.signal(signal.SIGABRT, elastic.signal_handler) signal.signal(signal.SIGINT, elastic.signal_handler) - while elastic.ready(): - - if elastic.enable: - args.ips = ','.join(elastic.hosts) - os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) - - distribute_mode = which_distributed_mode(args) - if distribute_mode == DistributeMode.COLLECTIVE: - launcher = CollectiveLauncher(args) - else: - # TODO(kuizhiqing) ps elastic support later - launch_ps(args, distribute_mode) - return - - launcher.launch() - - while True: - - ret = launcher.watch() - print("launch watch ret", ret) + while True: - if ret is not None: - print('job exit', ret) - # process is completed if ret >= 0 or error else - completed = True if ret == 0 else False - launcher.stop() - elastic.exit(completed=completed) - sys.exit(ret) + # wait for all nodes ready to run + elastic.wait() - if not elastic.health(): - launcher.stop() - break + # run self with specified launcher + elastic.run(CollectiveLauncher) - time.sleep(3) + # keep wathing the health status of self and being notified for other's failure + ret = elastic.watch() + if ret == ElasticStatus.COMPLETED: + break + if ret == ElasticStatus.HOLD: + continue + if ret == ElasticStatus.EXIT: + break + if ret == ElasticStatus.ERROR: + sys.exit(3) + if ret == ElasticStatus.RESTART: + sys.exit(ELASTIC_EXIT_CODE) - launcher.stop() sys.exit(0) From 6b3393ceea0756163a70762448b2c4a90ffbb7f9 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Wed, 26 May 2021 09:08:34 +0000 Subject: [PATCH 04/19] fix assert --- python/paddle/distributed/fleet/elastic.py | 17 +++++++---------- python/paddle/distributed/fleet/launch.py | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 3b8d22770c2574..6b7c069b9ef067 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -95,7 +95,7 @@ def __init__(self, args): np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) host = args.host or os.getenv('POD_IP') scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) - force = args.force or os.getenv('PADDLE_FORCE') + force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') self.elastic_level = int( os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) @@ -136,7 +136,6 @@ def __init__(self, args): # register self host to etcd # register watch to reset host after host been deleted self.etcd.delete_prefix(self.node_prefix) - self.etcd.put(self.host_path, six.b(self.host)) def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: @@ -145,18 +144,16 @@ def host_call_back(event): host_watch = self.etcd.add_watch_callback(self.host_path, host_call_back) + self.etcd.put(self.host_path, six.b(self.host)) # np describes the exact number of nodes to run the job inp = int(self.etcd.get(self.np_path)[0] or 0) if scale == 0 and not force: - assert ( - inp == np, - "np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". - format(np, inp)) + assert inp == np or inp == 0, "np {} is not consistent with np in etcd {}".format( + np, inp) else: - assert (inp == np or inp == self.np, - "np {} scale to {} by {} is not allowed".format( - inp, self.np, scale)) + assert inp == np or inp == self.np, "np {} scale to {} by {} is not allowed".format( + inp, self.np, scale) self.etcd.put(self.np_path, six.b("%d" % (self.np))) @@ -209,7 +206,7 @@ def _match(self): return False def _update_hosts(self): - assert (len(self.hosts) != 0, 'hosts empty') + assert len(self.hosts) != 0, 'hosts empty' hosts = ','.join(self.hosts) self.args.ips = hosts os.environ['PADDLE_TRAINERS'] = hosts diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index c151ac3777c26d..21573c5f03c995 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -71,7 +71,7 @@ from paddle.distributed.fleet import launch_utils import signal -#from paddle.distributed.fleet.launch_utils import DistributeMode, DeviceMode, get_logger, get_cluster +# TODO(danleifeng): Don't import * from a module from paddle.distributed.fleet.launch_utils import * import paddle.distributed.fleet.cloud_utils as cloud_utils import paddle.distributed.fleet.ascend_utils as ascend_utils From 4401c3ec74385707d78f3d56d5f455bd5e84192e Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 4 Jun 2021 11:36:49 +0000 Subject: [PATCH 05/19] handle force kill --- python/paddle/distributed/fleet/elastic.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 6b7c069b9ef067..3e3a1716cbd653 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -106,6 +106,10 @@ def __init__(self, args): self.hosts = [] self.stopped = False + ''' + set to True when other failed, then restart self + ''' + self.restart_flag = False if not server or ':' not in server or not name or not np: self.enable = False @@ -139,6 +143,9 @@ def __init__(self, args): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: + self.restart_flag = True + time.sleep(5) + logger.debug('register host again {}'.format(self.host)) self.etcd.put(self.host_path, six.b(self.host)) @@ -252,6 +259,11 @@ def watch(self): else: return ElasticStatus.ERROR + if self.restart_flag: + self.restart_flag = False + self.launcher.stop() + return ElasticStatus.HOLD + if not self._completed() and not self._match(): self.launcher.stop() return ElasticStatus.HOLD From 1623655992b1dfacd2c932564007c869447f410d Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 7 Jun 2021 02:37:04 +0000 Subject: [PATCH 06/19] exit code 101 --- python/paddle/distributed/fleet/elastic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 3e3a1716cbd653..896922fa380dcd 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -22,7 +22,7 @@ logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper()) logger = logging.getLogger("ELASTIC") -ELASTIC_EXIT_CODE = 10001 +ELASTIC_EXIT_CODE = 101 class ElasticStatus: From ce84da8c7bf62fa4f7e427d2dbb064d8b463e703 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 13 May 2021 09:20:55 +0000 Subject: [PATCH 07/19] elastic etcd ready --- python/paddle/distributed/fleet/elastic.py | 142 +++++++++++++ python/paddle/distributed/fleet/launch.py | 234 ++++++++++++++++----- 2 files changed, 318 insertions(+), 58 deletions(-) create mode 100644 python/paddle/distributed/fleet/elastic.py diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py new file mode 100644 index 00000000000000..25d83d8f81c62c --- /dev/null +++ b/python/paddle/distributed/fleet/elastic.py @@ -0,0 +1,142 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import etcd3 +import time +import socket +import os +import six +import logging + + +class Status: + READY = 'ready' + RUNNING = 'running' + ERROR = 'error' + COMPLETED = 'completed' + + +class ElasticManager(object): + def __init__(self, server, name, np, host=None, scale=0, force=False): + + logging.info('[elastic] init with server {} host {}'.format(server, + host)) + + srv, port = server.split(':') + self.etcd = etcd3.client(host=srv, port=port) + self.host = host if host else self._get_host() + self.hosts = [] + + # etcd data + self.prefix = "/paddle/" + name + self.node_prefix = self.prefix + '/nodes/' + self.np_path = self.prefix + '/np' + self.host_path = self.node_prefix + self.host + + self.np = np + scale + ''' + 0 group mode, be aware of healthy status of other workers + 1 decouple mode, check own status only + ''' + self.etcd.put(self.prefix, b'0') + + # host + # register self host to etcd + # register watch to reset host after host been deleted + self.etcd.delete_prefix(self.node_prefix) + self.etcd.put(self.host_path, six.b(self.host)) + + def host_call_back(event): + if self.etcd.get(self.host_path)[0] == None: + logging.info('[elastic] register host agin {}'.format( + self.host)) + self.etcd.put(self.host_path, six.b(self.host)) + + host_watch = self.etcd.add_watch_callback(self.host_path, + host_call_back) + + # np + # + inp = int(self.etcd.get(self.np_path)[0] or 0) + if scale == 0 and not force: + assert ( + inp == np, + "[elastic] np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". + format(np, inp)) + else: + assert (inp == np or inp == self.np, + "[elastic] np {} scale to {} by {} is not allowed".format( + inp, self.np, scale)) + + self.etcd.put(self.np_path, six.b("%d" % (self.np))) + + def np_call_back(event): + gnp = int(self.etcd.get(self.np_path)[0]) + if gnp != self.np: + logging.info("[elastic] scale np {} to {} ".format(self.np, + gnp)) + self.np = gnp + + np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) + + self.watches = [host_watch, np_watch] + + def exit(self, completed=False): + logging.info('[elastic] manager exist completed {}'.format(completed)) + + if completed: + self.etcd.put(self.prefix, b'1') + + for watch in self.watches: + self.etcd.cancel_watch(watch) + self.etcd.delete(self.host_path) + + hosts = [i for i in self.etcd.get_prefix(self.node_prefix)] + if len(hosts) == 0: + self.etcd.delete_prefix(self.prefix) + + def _get_host(self): + try: + return socket.gethostbyname(socket.getfqdn(socket.gethostname())) + except: + return '127.0.0.1' + + def _completed(self): + return int(self.etcd.get(self.prefix)[0]) == 1 + + def _match(self): + self.hosts = [ + six.ensure_str(i[0]) for i in self.etcd.get_prefix(self.node_prefix) + ] + if len(self.hosts) == self.np: + return True + else: + return False + + def ready(self): + while True: + if self._match(): + logging.info('[elastic] ready with hosts {}'.format(self.hosts)) + return True + logging.info('[elastic] not ready for np {} with hosts {}'.format( + self.np, self.hosts)) + time.sleep(3) + return False + + def health(self): + return self._completed() or self._match() + + def signal_handler(self, sigint, frame): + self.exit() + exit(0) diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 25b10133191788..0cc528d3c62f2f 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -70,11 +70,13 @@ import paddle.fluid as fluid from paddle.distributed.fleet import launch_utils -# TODO(danleifeng): Don't import * from a module +#from paddle.distributed.fleet.launch_utils import DistributeMode, DeviceMode, get_logger, get_cluster from paddle.distributed.fleet.launch_utils import * import paddle.distributed.fleet.cloud_utils as cloud_utils import paddle.distributed.fleet.ascend_utils as ascend_utils +from paddle.distributed.fleet.elastic import ElasticManager + __all__ = [] @@ -175,6 +177,16 @@ def _parse_args(): "--heter_worker_num", type=int, help="number of heter_workers") ps_group.add_argument("--http_port", type=int, help="Gloo http Port") + # parameter elastic mode + elastic_group = parser.add_argument_group("Elastic Parameters") + elastic_group.add_argument("--elastic_server", type=str, help="") + elastic_group.add_argument("--job_name", type=str, help="") + elastic_group.add_argument("--np", type=int, help="") + elastic_group.add_argument("--scale", type=int, default=0, help="") + elastic_group.add_argument("--port", type=int, help="") + elastic_group.add_argument("--host", type=str, help="") + elastic_group.add_argument("--force", type=bool, default=False, help="") + return parser.parse_args() @@ -214,65 +226,128 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): devices_per_proc) -def launch_collective(args): - # parse arguments, used for cloud-single-machine and local - (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) - trainers_num = cloud_utils.get_trainers_num() - logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format( - trainers_num, device_mode, devices_per_proc)) - - cluster = None - pod = None - - start_port = 6170 - if os.environ.get('FLAGS_START_PORT') is not None: - start_port = os.environ.get('FLAGS_START_PORT') - if cloud_utils.use_paddlecloud() and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster( - args.ips, device_mode, devices_per_proc, start_port) - logger.debug("get cluster from cloud:{}".format(cluster)) - elif device_mode == DeviceMode.ASCEND_NPU: - # for ascend - cluster, pod = ascend_utils.get_cloud_cluster( - rank_table_file=os.getenv("RANK_TABLE_FILE", None), - device_mode=device_mode, - start_port=start_port) - else: - # trainers_num = 1 or not use paddlecloud ips="a,b" - cluster, pod = get_cluster_from_args(args, device_mode, - devices_per_proc) - logger.debug("get cluster from args:{}".format(cluster)) +class LauncherInterface(object): + def __init__(self, args): + self.args = args + self.procs = [] + + def _terminate_procs(self): + for p in self.procs: + if p.proc.poll() is None: + p.proc.terminate() + if p.log_fn: + p.log_fn.close() + logger.debug("terminate process id:{}".format(p.proc.pid)) + + for step in range(0, 50): + alive = False + for p in procs: + if p.proc.poll() is None: # not termniate + os.kill(p.proc.pid, signal.SIGKILL) + alive = True + + if not alive: + logger.info("terminate all the procs") + return True + + time.sleep(1) + return False + + def _check_procs(self): + alive = False + result = None + for p in self.procs: + ret = p.proc.poll() + if ret is None: + alive = True + elif ret != 0: + logger.error("ERROR rank {} error with code {}".format(p.rank, + ret)) + result = ret + if not alive and result is None: + return 0 + else: + return result - global_envs = copy.copy(os.environ.copy()) - gloo_rendezvous_dir = tempfile.mkdtemp() - # add gloo env - global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) - global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" - global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + def launch(self): + raise NotImplementedError - procs = start_local_trainers( - cluster, - pod, - training_script=args.training_script, - training_script_args=args.training_script_args, - log_dir=args.log_dir, - envs=global_envs) + def stop(self): + raise NotImplementedError - for idx, proc in enumerate(procs): - print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + def watch(self): + raise NotImplementedError - while True: - alive = watch_local_trainers(procs, cluster.trainers_nranks()) - if not alive: - logger.info("Local processes completed.") - logger.debug("POD info:{}".format(pod)) - break +class CollectiveLauncher(LauncherInterface): + def __init__(self, args): + self.args = args + self.procs = [] - time.sleep(3) + def launch(self): + print("collective lauchner launch ...") + args = self.args + # parse arguments, used for cloud-single-machine and local + (device_mode, + devices_per_proc) = launch_utils.get_device_proc_info(args) + trainers_num = cloud_utils.get_trainers_num() + logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}". + format(trainers_num, device_mode, devices_per_proc)) - if os.path.exists(gloo_rendezvous_dir): - shutil.rmtree(gloo_rendezvous_dir) + cluster = None + pod = None + + start_port = 6170 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = os.environ.get('FLAGS_START_PORT') + if cloud_utils.use_paddlecloud() and trainers_num != 1: + cluster, pod = cloud_utils.get_cloud_cluster( + args.ips, device_mode, devices_per_proc, start_port) + logger.debug("get cluster from cloud:{}".format(cluster)) + elif device_mode == DeviceMode.ASCEND_NPU: + # for ascend + cluster, pod = ascend_utils.get_cloud_cluster( + rank_table_file=os.getenv("RANK_TABLE_FILE", None), + device_mode=device_mode, + start_port=start_port) + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + cluster, pod = get_cluster_from_args(args, device_mode, + devices_per_proc) + logger.debug("get cluster from args:{}".format(cluster)) + + global_envs = copy.copy(os.environ.copy()) + self.gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + global_envs["PADDLE_WITH_GLOO"] = str( + os.getenv("PADDLE_WITH_GLOO", "0")) + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" + global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir + + procs = start_local_trainers( + cluster, + pod, + training_script=args.training_script, + training_script_args=args.training_script_args, + log_dir=args.log_dir, + envs=global_envs) + + for idx, proc in enumerate(procs): + print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + + def stop(self): + print("collective lauchner stop ...") + self._terminate_procs() + if os.path.exists(self.gloo_rendezvous_dir): + shutil.rmtree(self.gloo_rendezvous_dir) + + def watch(self): + print("collective lauchner watch ...") + for p in self.procs: + if p.log_fn and p.local_rank == 0: + pull_worker_log(p) + ret = self._check_procs() + return ret def launch_ps(args, distribute_mode): @@ -361,11 +436,7 @@ def which_distributed_mode(args): return DistributeMode.COLLECTIVE -def launch(): - args = _parse_args() - logger = get_logger() - _print_arguments(args) - +def launch_elastic(): distribute_mode = which_distributed_mode(args) if distribute_mode == DistributeMode.COLLECTIVE: launch_collective(args) @@ -373,5 +444,52 @@ def launch(): launch_ps(args, distribute_mode) +def launch(): + args = _parse_args() + logger = get_logger() + _print_arguments(args) + + print('launch host', args.host) + elastic = ElasticManager( + args.elastic_server, + args.job_name, + args.np, + args.host, + scale=args.scale, + force=args.force, ) + signal.signal(signal.SIGTERM, elastic.signal_handler) + signal.signal(signal.SIGABRT, elastic.signal_handler) + signal.signal(signal.SIGINT, elastic.signal_handler) + + while elastic.ready(): + + args.ips = ','.join(elastic.hosts) + os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) + + distribute_mode = which_distributed_mode(args) + if distribute_mode == DistributeMode.COLLECTIVE: + launcher = CollectiveLauncher(args) + + launcher.launch() + + while True: + + ret = launcher.watch() + + if ret == 0: # completed + launcher.stop() + elastic.exit(completed=True) + exit(0) + elif ret is not None: # error + launcher.stop() + break + + if not elastic.health(): + launcher.stop() + break + + time.sleep(3) + + if __name__ == "__main__": launch() From d30f89ad231d94b8ece5e61a508380e3c11d84cd Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 14 May 2021 09:34:55 +0000 Subject: [PATCH 08/19] collective demo ready --- python/paddle/distributed/fleet/elastic.py | 60 ++++++++++++--------- python/paddle/distributed/fleet/launch.py | 63 +++++++++++++--------- 2 files changed, 73 insertions(+), 50 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 25d83d8f81c62c..9d0042445b6845 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -1,18 +1,17 @@ # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import etcd3 import time import socket import os @@ -20,23 +19,25 @@ import logging -class Status: - READY = 'ready' - RUNNING = 'running' - ERROR = 'error' - COMPLETED = 'completed' - - class ElasticManager(object): def __init__(self, server, name, np, host=None, scale=0, force=False): - logging.info('[elastic] init with server {} host {}'.format(server, - host)) + print('[elastic] init with server {} host {}'.format(server, host)) + + self.hosts = [] + self.stopped = False + + if not server or not name or not np: + self.enable = False + return + else: + self.enable = True + + import etcd3 srv, port = server.split(':') self.etcd = etcd3.client(host=srv, port=port) self.host = host if host else self._get_host() - self.hosts = [] # etcd data self.prefix = "/paddle/" + name @@ -59,8 +60,7 @@ def __init__(self, server, name, np, host=None, scale=0, force=False): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: - logging.info('[elastic] register host agin {}'.format( - self.host)) + print('[elastic] register host agin {}'.format(self.host)) self.etcd.put(self.host_path, six.b(self.host)) host_watch = self.etcd.add_watch_callback(self.host_path, @@ -84,8 +84,7 @@ def host_call_back(event): def np_call_back(event): gnp = int(self.etcd.get(self.np_path)[0]) if gnp != self.np: - logging.info("[elastic] scale np {} to {} ".format(self.np, - gnp)) + print("[elastic] scale np {} to {} ".format(self.np, gnp)) self.np = gnp np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) @@ -93,7 +92,10 @@ def np_call_back(event): self.watches = [host_watch, np_watch] def exit(self, completed=False): - logging.info('[elastic] manager exist completed {}'.format(completed)) + print('[elastic] manager exist completed {}'.format(completed)) + + if not self.enable: + return if completed: self.etcd.put(self.prefix, b'1') @@ -125,18 +127,28 @@ def _match(self): return False def ready(self): - while True: + if not self.enable: + return True + + while not self.stopped: if self._match(): - logging.info('[elastic] ready with hosts {}'.format(self.hosts)) + print('[elastic] ready with hosts {}'.format(self.hosts)) return True - logging.info('[elastic] not ready for np {} with hosts {}'.format( + print('[elastic] not ready for np {} with hosts {}'.format( self.np, self.hosts)) time.sleep(3) return False def health(self): + if self.stopped: + return False + + if not self.enable: + return True + return self._completed() or self._match() def signal_handler(self, sigint, frame): - self.exit() - exit(0) + if self.enable: + self.exit() + self.stopped = True diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 0cc528d3c62f2f..0c80c7f4097298 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -195,7 +195,10 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): if len(node_ips) == 1: node_ip = node_ips[0] else: - _, node_ip = get_host_name_ip() + if args.host: + node_ip = args.host + else: + _, node_ip = get_host_name_ip() assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \ % (node_ip, node_ips) @@ -241,7 +244,7 @@ def _terminate_procs(self): for step in range(0, 50): alive = False - for p in procs: + for p in self.procs: if p.proc.poll() is None: # not termniate os.kill(p.proc.pid, signal.SIGKILL) alive = True @@ -324,7 +327,7 @@ def launch(self): global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir - procs = start_local_trainers( + self.procs = start_local_trainers( cluster, pod, training_script=args.training_script, @@ -332,7 +335,7 @@ def launch(self): log_dir=args.log_dir, envs=global_envs) - for idx, proc in enumerate(procs): + for idx, proc in enumerate(self.procs): print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) def stop(self): @@ -436,53 +439,58 @@ def which_distributed_mode(args): return DistributeMode.COLLECTIVE -def launch_elastic(): - distribute_mode = which_distributed_mode(args) - if distribute_mode == DistributeMode.COLLECTIVE: - launch_collective(args) - else: - launch_ps(args, distribute_mode) - - def launch(): args = _parse_args() logger = get_logger() _print_arguments(args) print('launch host', args.host) + elastic_server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') + job_name = args.job_name or os.getenv('PADDLE_JOB_NAME') + np = args.np or int(os.getenv('PADDLE_NP', 0)) + host = args.host or os.getenv('POD_IP') + scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) + force = args.force or os.getenv('PADDLE_FORCE') + elastic = ElasticManager( - args.elastic_server, - args.job_name, - args.np, - args.host, - scale=args.scale, - force=args.force, ) + elastic_server, + job_name, + np, + host, + scale=scale, + force=force, ) signal.signal(signal.SIGTERM, elastic.signal_handler) signal.signal(signal.SIGABRT, elastic.signal_handler) signal.signal(signal.SIGINT, elastic.signal_handler) while elastic.ready(): - args.ips = ','.join(elastic.hosts) - os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) + if elastic.enable: + args.ips = ','.join(elastic.hosts) + os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) distribute_mode = which_distributed_mode(args) if distribute_mode == DistributeMode.COLLECTIVE: launcher = CollectiveLauncher(args) + else: + # TODO(kuizhiqing) ps elastic support later + launch_ps(args, distribute_mode) + return launcher.launch() while True: ret = launcher.watch() + print("launch watch ret", ret) - if ret == 0: # completed - launcher.stop() - elastic.exit(completed=True) - exit(0) - elif ret is not None: # error + if ret is not None: + print('job exit', ret) + # process is completed if ret >= 0 or error else + completed = True if ret == 0 else False launcher.stop() - break + elastic.exit(completed=completed) + sys.exit(ret) if not elastic.health(): launcher.stop() @@ -490,6 +498,9 @@ def launch(): time.sleep(3) + launcher.stop() + sys.exit(0) + if __name__ == "__main__": launch() From e0ff65682b4e9952dc1446e6f68ebf8ffe4353b2 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Tue, 25 May 2021 12:56:00 +0000 Subject: [PATCH 09/19] update rank, use logger --- python/paddle/distributed/fleet/elastic.py | 159 ++++++++++++++++++--- python/paddle/distributed/fleet/launch.py | 139 +++++------------- 2 files changed, 171 insertions(+), 127 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 9d0042445b6845..3b8d22770c2574 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -17,17 +17,97 @@ import os import six import logging +import signal + +logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper()) +logger = logging.getLogger("ELASTIC") + +ELASTIC_EXIT_CODE = 10001 + + +class ElasticStatus: + COMPLETED = "completed" + ERROR = "error" + HOLD = "hold" + RESTART = "restart" + EXIT = "exit" + + +class LauncherInterface(object): + def __init__(self, args): + self.args = args + self.procs = [] + + def _terminate_procs(self): + for p in self.procs: + if p.proc.poll() is None: + p.proc.terminate() + if p.log_fn: + p.log_fn.close() + logger.info("terminate process id:{}".format(p.proc.pid)) + + for step in range(0, 50): + alive = False + for p in self.procs: + if p.proc.poll() is None: # not termniate + os.kill(p.proc.pid, signal.SIGKILL) + alive = True + + if not alive: + logger.info("terminate all the procs") + return True + + time.sleep(1) + return False + + def _check_procs(self): + alive = False + result = None + for p in self.procs: + ret = p.proc.poll() + if ret is None: + alive = True + elif ret != 0: + logger.error("ERROR rank {} error with code {}".format(p.rank, + ret)) + result = ret + if not alive and result is None: + return 0 + else: + return result + + def launch(self): + raise NotImplementedError + + def stop(self): + raise NotImplementedError + + def watch(self): + raise NotImplementedError class ElasticManager(object): - def __init__(self, server, name, np, host=None, scale=0, force=False): + def __init__(self, args): + + self.args = args + server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') + name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') + np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) + host = args.host or os.getenv('POD_IP') + scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) + force = args.force or os.getenv('PADDLE_FORCE') + + self.elastic_level = int( + os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) - print('[elastic] init with server {} host {}'.format(server, host)) + #elastic_timeout = os.getenv('PADDLE_ELASTIC_TIMEOUT',1) + + logger.debug('init with server {} host {}'.format(server, host)) self.hosts = [] self.stopped = False - if not server or not name or not np: + if not server or ':' not in server or not name or not np: self.enable = False return else: @@ -60,23 +140,22 @@ def __init__(self, server, name, np, host=None, scale=0, force=False): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: - print('[elastic] register host agin {}'.format(self.host)) + logger.debug('register host again {}'.format(self.host)) self.etcd.put(self.host_path, six.b(self.host)) host_watch = self.etcd.add_watch_callback(self.host_path, host_call_back) - # np - # + # np describes the exact number of nodes to run the job inp = int(self.etcd.get(self.np_path)[0] or 0) if scale == 0 and not force: assert ( inp == np, - "[elastic] np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". + "np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". format(np, inp)) else: assert (inp == np or inp == self.np, - "[elastic] np {} scale to {} by {} is not allowed".format( + "np {} scale to {} by {} is not allowed".format( inp, self.np, scale)) self.etcd.put(self.np_path, six.b("%d" % (self.np))) @@ -84,7 +163,7 @@ def host_call_back(event): def np_call_back(event): gnp = int(self.etcd.get(self.np_path)[0]) if gnp != self.np: - print("[elastic] scale np {} to {} ".format(self.np, gnp)) + logger.info("scale np {} to {} ".format(self.np, gnp)) self.np = gnp np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) @@ -92,7 +171,7 @@ def np_call_back(event): self.watches = [host_watch, np_watch] def exit(self, completed=False): - print('[elastic] manager exist completed {}'.format(completed)) + logger.info('manager exist completed {}'.format(completed)) if not self.enable: return @@ -115,6 +194,9 @@ def _get_host(self): return '127.0.0.1' def _completed(self): + if not self.enable: + return True + return int(self.etcd.get(self.prefix)[0]) == 1 def _match(self): @@ -126,27 +208,60 @@ def _match(self): else: return False - def ready(self): + def _update_hosts(self): + assert (len(self.hosts) != 0, 'hosts empty') + hosts = ','.join(self.hosts) + self.args.ips = hosts + os.environ['PADDLE_TRAINERS'] = hosts + os.environ['PADDLE_TRAINER_ID'] = '{}'.format( + self.hosts.index(self.host)) + + def wait(self): if not self.enable: - return True + return while not self.stopped: if self._match(): - print('[elastic] ready with hosts {}'.format(self.hosts)) - return True - print('[elastic] not ready for np {} with hosts {}'.format( - self.np, self.hosts)) + logger.info('ready with hosts {}'.format(self.hosts)) + self._update_hosts() + return + logger.info('not ready for np {} with hosts {}'.format(self.np, + self.hosts)) time.sleep(3) - return False + return - def health(self): + def run(self, launcher): if self.stopped: - return False + return - if not self.enable: - return True + self.launcher = launcher(self.args) + self.launcher.launch() + + def watch(self): + + while not self.stopped: + ret = self.launcher.watch() + + if ret is not None: # self terminated + logger.info('job exit with code {}'.format(ret)) + # process is completed if ret >= 0 or error else + completed = True if ret == 0 else False + self.launcher.stop() + self.exit(completed=completed) + if completed: + return ElasticStatus.COMPLETED + if self.elastic_level == 1: + return ElasticStatus.RESTART + else: + return ElasticStatus.ERROR + + if not self._completed() and not self._match(): + self.launcher.stop() + return ElasticStatus.HOLD + + time.sleep(3) - return self._completed() or self._match() + return ElasticStatus.EXIT def signal_handler(self, sigint, frame): if self.enable: diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 0c80c7f4097298..c151ac3777c26d 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -69,6 +69,7 @@ import paddle import paddle.fluid as fluid from paddle.distributed.fleet import launch_utils +import signal #from paddle.distributed.fleet.launch_utils import DistributeMode, DeviceMode, get_logger, get_cluster from paddle.distributed.fleet.launch_utils import * @@ -76,6 +77,9 @@ import paddle.distributed.fleet.ascend_utils as ascend_utils from paddle.distributed.fleet.elastic import ElasticManager +from paddle.distributed.fleet.elastic import LauncherInterface +from paddle.distributed.fleet.elastic import ElasticStatus +from paddle.distributed.fleet.elastic import ELASTIC_EXIT_CODE __all__ = [] @@ -180,7 +184,7 @@ def _parse_args(): # parameter elastic mode elastic_group = parser.add_argument_group("Elastic Parameters") elastic_group.add_argument("--elastic_server", type=str, help="") - elastic_group.add_argument("--job_name", type=str, help="") + elastic_group.add_argument("--job_id", type=str, help="") elastic_group.add_argument("--np", type=int, help="") elastic_group.add_argument("--scale", type=int, default=0, help="") elastic_group.add_argument("--port", type=int, help="") @@ -229,66 +233,13 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): devices_per_proc) -class LauncherInterface(object): - def __init__(self, args): - self.args = args - self.procs = [] - - def _terminate_procs(self): - for p in self.procs: - if p.proc.poll() is None: - p.proc.terminate() - if p.log_fn: - p.log_fn.close() - logger.debug("terminate process id:{}".format(p.proc.pid)) - - for step in range(0, 50): - alive = False - for p in self.procs: - if p.proc.poll() is None: # not termniate - os.kill(p.proc.pid, signal.SIGKILL) - alive = True - - if not alive: - logger.info("terminate all the procs") - return True - - time.sleep(1) - return False - - def _check_procs(self): - alive = False - result = None - for p in self.procs: - ret = p.proc.poll() - if ret is None: - alive = True - elif ret != 0: - logger.error("ERROR rank {} error with code {}".format(p.rank, - ret)) - result = ret - if not alive and result is None: - return 0 - else: - return result - - def launch(self): - raise NotImplementedError - - def stop(self): - raise NotImplementedError - - def watch(self): - raise NotImplementedError - - class CollectiveLauncher(LauncherInterface): def __init__(self, args): self.args = args self.procs = [] def launch(self): - print("collective lauchner launch ...") + logger.info("collective lauchner launch ...") args = self.args # parse arguments, used for cloud-single-machine and local (device_mode, @@ -336,16 +287,16 @@ def launch(self): envs=global_envs) for idx, proc in enumerate(self.procs): - print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + logger.info("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) def stop(self): - print("collective lauchner stop ...") + logger.info("collective lauchner stop ...") self._terminate_procs() if os.path.exists(self.gloo_rendezvous_dir): shutil.rmtree(self.gloo_rendezvous_dir) def watch(self): - print("collective lauchner watch ...") + logger.debug("collective lauchner watch ...") for p in self.procs: if p.log_fn and p.local_rank == 0: pull_worker_log(p) @@ -444,61 +395,39 @@ def launch(): logger = get_logger() _print_arguments(args) - print('launch host', args.host) - elastic_server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') - job_name = args.job_name or os.getenv('PADDLE_JOB_NAME') - np = args.np or int(os.getenv('PADDLE_NP', 0)) - host = args.host or os.getenv('POD_IP') - scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) - force = args.force or os.getenv('PADDLE_FORCE') - - elastic = ElasticManager( - elastic_server, - job_name, - np, - host, - scale=scale, - force=force, ) + distribute_mode = which_distributed_mode(args) + # TODO(kuizhiqing) support ps later + if not distribute_mode == DistributeMode.COLLECTIVE: + launch_ps(args, distribute_mode) + return + + elastic = ElasticManager(args) + signal.signal(signal.SIGTERM, elastic.signal_handler) signal.signal(signal.SIGABRT, elastic.signal_handler) signal.signal(signal.SIGINT, elastic.signal_handler) - while elastic.ready(): - - if elastic.enable: - args.ips = ','.join(elastic.hosts) - os.environ['PADDLE_TRAINERS'] = ','.join(elastic.hosts) - - distribute_mode = which_distributed_mode(args) - if distribute_mode == DistributeMode.COLLECTIVE: - launcher = CollectiveLauncher(args) - else: - # TODO(kuizhiqing) ps elastic support later - launch_ps(args, distribute_mode) - return - - launcher.launch() - - while True: - - ret = launcher.watch() - print("launch watch ret", ret) + while True: - if ret is not None: - print('job exit', ret) - # process is completed if ret >= 0 or error else - completed = True if ret == 0 else False - launcher.stop() - elastic.exit(completed=completed) - sys.exit(ret) + # wait for all nodes ready to run + elastic.wait() - if not elastic.health(): - launcher.stop() - break + # run self with specified launcher + elastic.run(CollectiveLauncher) - time.sleep(3) + # keep wathing the health status of self and being notified for other's failure + ret = elastic.watch() + if ret == ElasticStatus.COMPLETED: + break + if ret == ElasticStatus.HOLD: + continue + if ret == ElasticStatus.EXIT: + break + if ret == ElasticStatus.ERROR: + sys.exit(3) + if ret == ElasticStatus.RESTART: + sys.exit(ELASTIC_EXIT_CODE) - launcher.stop() sys.exit(0) From 5f990a77fda9011315f1a15d03cea8128dcd8d5d Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Wed, 26 May 2021 09:08:34 +0000 Subject: [PATCH 10/19] fix assert --- python/paddle/distributed/fleet/elastic.py | 17 +++++++---------- python/paddle/distributed/fleet/launch.py | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 3b8d22770c2574..6b7c069b9ef067 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -95,7 +95,7 @@ def __init__(self, args): np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) host = args.host or os.getenv('POD_IP') scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) - force = args.force or os.getenv('PADDLE_FORCE') + force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') self.elastic_level = int( os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) @@ -136,7 +136,6 @@ def __init__(self, args): # register self host to etcd # register watch to reset host after host been deleted self.etcd.delete_prefix(self.node_prefix) - self.etcd.put(self.host_path, six.b(self.host)) def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: @@ -145,18 +144,16 @@ def host_call_back(event): host_watch = self.etcd.add_watch_callback(self.host_path, host_call_back) + self.etcd.put(self.host_path, six.b(self.host)) # np describes the exact number of nodes to run the job inp = int(self.etcd.get(self.np_path)[0] or 0) if scale == 0 and not force: - assert ( - inp == np, - "np {} is not consistent with np in etcd {}, maybe the job with the same name exited unexpected, try --force=true". - format(np, inp)) + assert inp == np or inp == 0, "np {} is not consistent with np in etcd {}".format( + np, inp) else: - assert (inp == np or inp == self.np, - "np {} scale to {} by {} is not allowed".format( - inp, self.np, scale)) + assert inp == np or inp == self.np, "np {} scale to {} by {} is not allowed".format( + inp, self.np, scale) self.etcd.put(self.np_path, six.b("%d" % (self.np))) @@ -209,7 +206,7 @@ def _match(self): return False def _update_hosts(self): - assert (len(self.hosts) != 0, 'hosts empty') + assert len(self.hosts) != 0, 'hosts empty' hosts = ','.join(self.hosts) self.args.ips = hosts os.environ['PADDLE_TRAINERS'] = hosts diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index c151ac3777c26d..21573c5f03c995 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -71,7 +71,7 @@ from paddle.distributed.fleet import launch_utils import signal -#from paddle.distributed.fleet.launch_utils import DistributeMode, DeviceMode, get_logger, get_cluster +# TODO(danleifeng): Don't import * from a module from paddle.distributed.fleet.launch_utils import * import paddle.distributed.fleet.cloud_utils as cloud_utils import paddle.distributed.fleet.ascend_utils as ascend_utils From b7860528f4ed66da31e9982ca22a013eeb2370bb Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 4 Jun 2021 11:36:49 +0000 Subject: [PATCH 11/19] handle force kill --- python/paddle/distributed/fleet/elastic.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 6b7c069b9ef067..3e3a1716cbd653 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -106,6 +106,10 @@ def __init__(self, args): self.hosts = [] self.stopped = False + ''' + set to True when other failed, then restart self + ''' + self.restart_flag = False if not server or ':' not in server or not name or not np: self.enable = False @@ -139,6 +143,9 @@ def __init__(self, args): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: + self.restart_flag = True + time.sleep(5) + logger.debug('register host again {}'.format(self.host)) self.etcd.put(self.host_path, six.b(self.host)) @@ -252,6 +259,11 @@ def watch(self): else: return ElasticStatus.ERROR + if self.restart_flag: + self.restart_flag = False + self.launcher.stop() + return ElasticStatus.HOLD + if not self._completed() and not self._match(): self.launcher.stop() return ElasticStatus.HOLD From b2ceb3b7e1b551d432f9ca40a155280c969e3386 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 7 Jun 2021 02:37:04 +0000 Subject: [PATCH 12/19] exit code 101 --- python/paddle/distributed/fleet/elastic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 3e3a1716cbd653..896922fa380dcd 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -22,7 +22,7 @@ logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper()) logger = logging.getLogger("ELASTIC") -ELASTIC_EXIT_CODE = 10001 +ELASTIC_EXIT_CODE = 101 class ElasticStatus: From 0a7216d85cab63b34395ce6fb9e5e4e62d95f69e Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 7 Jun 2021 11:06:28 +0000 Subject: [PATCH 13/19] keep rank as env --- python/paddle/distributed/fleet/elastic.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 896922fa380dcd..b3c2417d9e23ff 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -214,11 +214,20 @@ def _match(self): def _update_hosts(self): assert len(self.hosts) != 0, 'hosts empty' + + rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) + idx = self.hosts.index(self.host) + + # swap if self.host not in the right position + if rank >= 0: + self.hosts[idx] = self.hosts[rank] + self.hosts[rank] = self.host + else: + os.environ['PADDLE_TRAINER_ID'] = '{}'.format(idx) + hosts = ','.join(self.hosts) self.args.ips = hosts os.environ['PADDLE_TRAINERS'] = hosts - os.environ['PADDLE_TRAINER_ID'] = '{}'.format( - self.hosts.index(self.host)) def wait(self): if not self.enable: From 5ed9f5ece7f27b4d128b1f1ebe2509326c5e5a6e Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 10 Jun 2021 07:21:03 +0000 Subject: [PATCH 14/19] add comment --- python/paddle/distributed/fleet/elastic.py | 5 ++++- python/paddle/distributed/fleet/launch.py | 16 +++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index b3c2417d9e23ff..891c8d87651aa4 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -94,7 +94,7 @@ def __init__(self, args): name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) host = args.host or os.getenv('POD_IP') - scale = args.scale or int(os.getenv('PADDLE_SCALE', 0)) + scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0)) force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') self.elastic_level = int( @@ -112,6 +112,9 @@ def __init__(self, args): self.restart_flag = False if not server or ':' not in server or not name or not np: + logger.info( + 'Elastic is not enabled with server {} name {} and np {}'. + format(server, name, np)) self.enable = False return else: diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 21573c5f03c995..50a05690a14e98 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -183,13 +183,15 @@ def _parse_args(): # parameter elastic mode elastic_group = parser.add_argument_group("Elastic Parameters") - elastic_group.add_argument("--elastic_server", type=str, help="") - elastic_group.add_argument("--job_id", type=str, help="") - elastic_group.add_argument("--np", type=int, help="") - elastic_group.add_argument("--scale", type=int, default=0, help="") - elastic_group.add_argument("--port", type=int, help="") - elastic_group.add_argument("--host", type=str, help="") - elastic_group.add_argument("--force", type=bool, default=False, help="") + elastic_group.add_argument( + "--elastic_server", type=str, help="etcd server host:port") + elastic_group.add_argument("--job_id", type=str, help="job unique id") + elastic_group.add_argument("--np", type=int, help="job pod/node number") + elastic_group.add_argument("--scale", type=int, default=0, help="scale np") + elastic_group.add_argument( + "--host", type=str, help="bind host, default to POD_IP env") + elastic_group.add_argument( + "--force", type=bool, default=False, help="update np force") return parser.parse_args() From e16267fcf5a0013ef106a24929b8908dc7f2be46 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 10 Jun 2021 13:28:43 +0000 Subject: [PATCH 15/19] add support for ports --- python/paddle/distributed/fleet/elastic.py | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 891c8d87651aa4..1e7fbece89183b 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -97,6 +97,8 @@ def __init__(self, args): scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0)) force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') + self.dte = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '') + self.elastic_level = int( os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) @@ -106,10 +108,6 @@ def __init__(self, args): self.hosts = [] self.stopped = False - ''' - set to True when other failed, then restart self - ''' - self.restart_flag = False if not server or ':' not in server or not name or not np: logger.info( @@ -130,6 +128,7 @@ def __init__(self, args): self.prefix = "/paddle/" + name self.node_prefix = self.prefix + '/nodes/' self.np_path = self.prefix + '/np' + self.dte_path = self.prefix + '/dte' self.host_path = self.node_prefix + self.host self.np = np + scale @@ -146,10 +145,10 @@ def __init__(self, args): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: - self.restart_flag = True + # ensure unmatch trigger + logger.info('register host again {}'.format(self.host)) time.sleep(5) - logger.debug('register host again {}'.format(self.host)) self.etcd.put(self.host_path, six.b(self.host)) host_watch = self.etcd.add_watch_callback(self.host_path, @@ -175,7 +174,17 @@ def np_call_back(event): np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) - self.watches = [host_watch, np_watch] + # dte handle DISTRIBUTED_TRAINER_ENDPOINTS + self.etcd.put(self.dte_path, six.b(self.dte)) + + def dte_call_back(event): + logger.info("etcd: set DISTRIBUTED_TRAINER_ENDPOINTS to {} ".format( + self.dte)) + self.dte = six.ensure_str(self.etcd.get(self.dte_path)[0] or '') + + dte_watch = self.etcd.add_watch_callback(self.dte_path, dte_call_back) + + self.watches = [host_watch, np_watch, dte_watch] def exit(self, completed=False): logger.info('manager exist completed {}'.format(completed)) @@ -218,6 +227,12 @@ def _match(self): def _update_hosts(self): assert len(self.hosts) != 0, 'hosts empty' + if self.host in self.dte: + os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dte + logger.info("update self DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( + self.dte)) + return + rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) idx = self.hosts.index(self.host) @@ -271,11 +286,6 @@ def watch(self): else: return ElasticStatus.ERROR - if self.restart_flag: - self.restart_flag = False - self.launcher.stop() - return ElasticStatus.HOLD - if not self._completed() and not self._match(): self.launcher.stop() return ElasticStatus.HOLD From 531f6eebfbcb404060037405a75373f536238a5f Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 11 Jun 2021 03:04:42 +0000 Subject: [PATCH 16/19] sync trainers --- python/paddle/distributed/fleet/elastic.py | 44 ++++++++++++++-------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 1e7fbece89183b..7ad846aa4f4e20 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -97,7 +97,9 @@ def __init__(self, args): scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0)) force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') - self.dte = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '') + self.endpoints = '{}|{}'.format( + os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', ''), + os.getenv('PADDLE_TRAINERS', '')) self.elastic_level = int( os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) @@ -128,7 +130,7 @@ def __init__(self, args): self.prefix = "/paddle/" + name self.node_prefix = self.prefix + '/nodes/' self.np_path = self.prefix + '/np' - self.dte_path = self.prefix + '/dte' + self.endpoints_path = self.prefix + '/endpoints' self.host_path = self.node_prefix + self.host self.np = np + scale @@ -174,17 +176,20 @@ def np_call_back(event): np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) - # dte handle DISTRIBUTED_TRAINER_ENDPOINTS - self.etcd.put(self.dte_path, six.b(self.dte)) + # endpoints handle DISTRIBUTED_TRAINER_ENDPOINTS and PADDLE_TRAINERS + self.etcd.put(self.endpoints_path, six.b(self.endpoints)) - def dte_call_back(event): - logger.info("etcd: set DISTRIBUTED_TRAINER_ENDPOINTS to {} ".format( - self.dte)) - self.dte = six.ensure_str(self.etcd.get(self.dte_path)[0] or '') + def endpoints_call_back(event): + logger.info( + "etcd: set DISTRIBUTED_TRAINER_ENDPOINTS|PADDLE_TRAINERS to {} ". + format(self.endpoints)) + self.endpoints = six.ensure_str( + self.etcd.get(self.endpoints_path)[0] or '') - dte_watch = self.etcd.add_watch_callback(self.dte_path, dte_call_back) + endpoints_watch = self.etcd.add_watch_callback(self.endpoints_path, + endpoints_call_back) - self.watches = [host_watch, np_watch, dte_watch] + self.watches = [host_watch, np_watch, endpoints_watch] def exit(self, completed=False): logger.info('manager exist completed {}'.format(completed)) @@ -227,11 +232,20 @@ def _match(self): def _update_hosts(self): assert len(self.hosts) != 0, 'hosts empty' - if self.host in self.dte: - os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dte - logger.info("update self DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( - self.dte)) - return + endpoints, trainers = self.endpoints.split('|') + if ':' in endpoints: + if self.host in endpoints: + os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = endpoints + os.environ['PADDLE_TRAINERS'] = trainers + logger.info("update DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( + endpoints)) + logger.info("update PADDLE_TRAINERS {} ".format(trainers)) + return + else: + self.exit() + raise Exception( + "ENV DISTRIBUTED_TRAINER_ENDPOINTS {} set but incorrect ". + format(endpoints)) rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) idx = self.hosts.index(self.host) From 5888d4636d03c4029187e75e484a5ead1dea90f3 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 11 Jun 2021 09:30:07 +0000 Subject: [PATCH 17/19] fix same ip --- python/paddle/distributed/fleet/elastic.py | 43 ++++++++++------------ 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 7ad846aa4f4e20..303b3f5d8ddf78 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -97,9 +97,8 @@ def __init__(self, args): scale = args.scale or int(os.getenv('PADDLE_ELASTIC_SCALE', 0)) force = args.force or os.getenv('PADDLE_ELASTIC_FORCE') - self.endpoints = '{}|{}'.format( - os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', ''), - os.getenv('PADDLE_TRAINERS', '')) + self.endpoints = os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS', '') + self.trainers = os.getenv('PADDLE_TRAINERS', '') self.elastic_level = int( os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) @@ -131,7 +130,7 @@ def __init__(self, args): self.node_prefix = self.prefix + '/nodes/' self.np_path = self.prefix + '/np' self.endpoints_path = self.prefix + '/endpoints' - self.host_path = self.node_prefix + self.host + self.host_path = '{}{}'.format(self.node_prefix, time.time()) self.np = np + scale ''' @@ -177,14 +176,17 @@ def np_call_back(event): np_watch = self.etcd.add_watch_callback(self.np_path, np_call_back) # endpoints handle DISTRIBUTED_TRAINER_ENDPOINTS and PADDLE_TRAINERS - self.etcd.put(self.endpoints_path, six.b(self.endpoints)) + self.etcd.put(self.endpoints_path, + six.b('{}|{}'.format(self.endpoints, self.trainers))) def endpoints_call_back(event): - logger.info( - "etcd: set DISTRIBUTED_TRAINER_ENDPOINTS|PADDLE_TRAINERS to {} ". - format(self.endpoints)) - self.endpoints = six.ensure_str( - self.etcd.get(self.endpoints_path)[0] or '') + if not self.endpoints: + return + edps = six.ensure_str(self.etcd.get(self.endpoints_path)[0] or '') + self.endpoints, self.trainers = edps.split('|') + logger.info("set DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( + self.endpoints)) + logger.info("set PADDLE_TRAINERS {} ".format(self.trainers)) endpoints_watch = self.etcd.add_watch_callback(self.endpoints_path, endpoints_call_back) @@ -232,20 +234,13 @@ def _match(self): def _update_hosts(self): assert len(self.hosts) != 0, 'hosts empty' - endpoints, trainers = self.endpoints.split('|') - if ':' in endpoints: - if self.host in endpoints: - os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = endpoints - os.environ['PADDLE_TRAINERS'] = trainers - logger.info("update DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( - endpoints)) - logger.info("update PADDLE_TRAINERS {} ".format(trainers)) - return - else: - self.exit() - raise Exception( - "ENV DISTRIBUTED_TRAINER_ENDPOINTS {} set but incorrect ". - format(endpoints)) + if self.host in self.endpoints: + os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.endpoints + os.environ['PADDLE_TRAINERS'] = self.trainers + logger.info("update env DISTRIBUTED_TRAINER_ENDPOINTS {} ".format( + self.endpoints)) + logger.info("update env PADDLE_TRAINERS {} ".format(self.trainers)) + return rank = int(os.getenv('PADDLE_TRAINER_ID', -1)) idx = self.hosts.index(self.host) From 59cdd5ab8b875151dc20b3c400266f7138fc3472 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Tue, 15 Jun 2021 08:31:39 +0000 Subject: [PATCH 18/19] pass sigint --- python/paddle/distributed/fleet/elastic.py | 3 +++ python/paddle/distributed/fleet/launch.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index 303b3f5d8ddf78..b919c4737576d5 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -110,6 +110,8 @@ def __init__(self, args): self.hosts = [] self.stopped = False + self.sigint = 0 + if not server or ':' not in server or not name or not np: logger.info( 'Elastic is not enabled with server {} name {} and np {}'. @@ -306,4 +308,5 @@ def watch(self): def signal_handler(self, sigint, frame): if self.enable: self.exit() + self.sigint = sigint self.stopped = True diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 50a05690a14e98..e253fb13973569 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -430,7 +430,7 @@ def launch(): if ret == ElasticStatus.RESTART: sys.exit(ELASTIC_EXIT_CODE) - sys.exit(0) + sys.exit(128 + int(elastic.sigint)) if __name__ == "__main__": From 9fe0f6af41f2934e764695cf2f77775dcace89a0 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Wed, 16 Jun 2021 06:37:28 +0000 Subject: [PATCH 19/19] exit 0 --- python/paddle/distributed/fleet/launch.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index e253fb13973569..07862a07c92c41 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -430,7 +430,10 @@ def launch(): if ret == ElasticStatus.RESTART: sys.exit(ELASTIC_EXIT_CODE) - sys.exit(128 + int(elastic.sigint)) + if int(elastic.sigint) > 0: + sys.exit(128 + int(elastic.sigint)) + else: + sys.exit(0) if __name__ == "__main__":