From 618780db47f24d6cb995597068901c567c6cb359 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 28 Jan 2021 11:33:05 +0800 Subject: [PATCH 1/3] fix compilation on 20.2 --- paddle/fluid/framework/fleet/ascend_wrapper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/framework/fleet/ascend_wrapper.h b/paddle/fluid/framework/fleet/ascend_wrapper.h index 912d1b1c040b97..cba98699fc5e40 100644 --- a/paddle/fluid/framework/fleet/ascend_wrapper.h +++ b/paddle/fluid/framework/fleet/ascend_wrapper.h @@ -40,7 +40,7 @@ namespace framework { typedef ge::Graph AscendGraphDesc; #ifdef PADDLE_WITH_ASCEND_STRING -using AscendString = AscendString; +using AscendString = ge::AscendString; #else using AscendString = std::string; #endif From 21f4af6ca65d24a503af8bec4018b3d9eaec6897 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Sun, 31 Jan 2021 18:15:34 +0800 Subject: [PATCH 2/3] run under rank table --- .../ascend/ascend_optimizer.py | 37 +++++++++++++------ .../meta_optimizers/ascend/ascend_parser.py | 32 ++++++++-------- 2 files changed, 41 insertions(+), 28 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py index 89a19b64795eef..5934a9f902fbc6 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py @@ -26,10 +26,11 @@ class AscendIRParser(object): - def __init__(self): + def __init__(self, auto_dp=False): self.graph_idx = 0 self.hcom_endpoints = {} self.groups_to_create = [] + self._auto_dp=auto_dp def _construct_input_map(self, input_varlist): ret_map = {} @@ -91,13 +92,11 @@ def parse_op(self, op): print("append to create group: %s, with rank_ids: %s" % (group_name, global_rank_ids)) elif op.type in ascend_parser.registerd_op: - print("Op[%s] has been registered, begin to parse it" % (op.type)) op_parser = self.parser_factory.create_parse( ascend_parser.registerd_op[op.type]) op_parser.apply(op) else: - print("Op[%s] has not been registered, so we have to skip it" % - (op.type)) + assert False, "Op[%s] has not been registered, so we have to skip it" % (op.type) def _parse_program(self, graph_name, @@ -161,6 +160,14 @@ def parse_program(self, startup_program, main_program, input_varlist, startup_graph = self._parse_program("startup", startup_program) main_graph = self._parse_program("main", main_program, input_varlist, fetch_list) + if self._auto_dp: + assert len(self.groups_to_create)==0, "can't parse program under auto_dp mode" + + from paddle.distributed import fleet + self.groups_to_create.append( + HcomGroupConfig( + name="hcom_group_0", nranks=fleet.world_size(), rank_ids=[x for x in range(fleet.world_size())])) + return startup_graph, main_graph @@ -196,7 +203,8 @@ def minimize(self, startup_program=None, parameter_list=None, no_grad_set=None, - auto_dp=False): + auto_dp=False, + rank_table_file=None): minimized = None if self.inner_opt: minimized = self.inner_opt.minimize( @@ -210,19 +218,24 @@ def minimize(self, t = ascend_transpiler.AscendTranspiler(startup_program, loss.block.program) t.transpile() - print(loss.block.program) + #print(loss.block.program) # Config about Graph Engine can be found in https://support.huaweicloud.com/ config = { "ge.exec.deviceId": str(fleet.local_device_ids()), "ge.graphRunMode": "1", "ge.exec.precision_mode": "must_keep_origin_dtype", - # if multi mode - "ge.exec.rankTableFile": os.getenv("RANK_TABLE_FILE"), - "ge.exec.rankId": str(fleet.worker_index()), - "ge.exec.isUseHcom": "1", - "ge.exec.deployMode": "0", + #"ge.exec.rankTableFile" = rank_table_file + #"ge.exec.rankId" = str(fleet.worker_index()) + #"ge.exec.isUseHcom" = "0" + #"ge.exec.deployMode" = "0" } + # if multi trainers + if rank_table_file: + config["ge.exec.rankTableFile"] = rank_table_file + config["ge.exec.rankId"] = str(fleet.worker_index()) + config["ge.exec.isUseHcom"] = "1" + config["ge.exec.deployMode"] = "0" print("ge_initialize config:", config) core.ge_initialize(config) @@ -230,7 +243,7 @@ def minimize(self, self.ascend_instance.init_global_resources() main_block = loss.block - self.parser = AscendIRParser() + self.parser = AscendIRParser(auto_dp=auto_dp) input_varlist = self._get_input_varlist(main_block.program) diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py index 1dc67f59dd80c2..1eb9a6c1e605b1 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py @@ -164,7 +164,7 @@ def update_output(self, geop_list, index_list): self.parser_name, len(index_list), output_num) for output_id in range(output_num): arguments = self.op.output(self.op.output_names[output_id]) - print("%d argument: %s" % (output_id, str(arguments))) + #print("%d argument: %s" % (output_id, str(arguments))) if len(arguments) > 0: assert len(arguments) == len( index_list[output_id] @@ -172,8 +172,8 @@ def update_output(self, geop_list, index_list): self.parser_name, output_id, len(index_list[output_id]), len(arguments)) for i in range(len(arguments)): - print("assgin index_list[%d][%d] to %s" % - (output_id, i, arguments[i])) + #print("assgin index_list[%d][%d] to %s" % + # (output_id, i, arguments[i])) self.var2geop[arguments[i]] = geop_list[index_list[ output_id][i]] @@ -184,7 +184,7 @@ def apply(self, op): self.op = op assert self.op.type == self.parser_name, "op [%s] != parser_name[%s]" % ( self.op.type, self.parser_name) - print("begin to parse op %s" % (self.parser_name)) + #print("begin to parse op %s" % (self.parser_name)) geop_list, index_list = self._apply() self.update_output(geop_list, index_list) @@ -786,8 +786,8 @@ def _apply(self): "Const").set_attr_tensor("value", tensor) self._mark_as_input(const) if self.op.block.var(self.op.output('Out')[0]).persistable: - print("%s is Persistable in fill_constant" % - (self.op.output('Out')[0])) + #print("%s is Persistable in fill_constant" % + # (self.op.output('Out')[0])) var = core.GEOperatorFactory.create_operator( self.op.output('Out')[0], "Variable") var.update_output_desc("y", @@ -799,10 +799,10 @@ def _apply(self): "assign" + self._accumulated_op_id(), "Assign").set_input( "value", const).set_input("ref", var) return [const], [[0]] - else: - print( - "self.op.output('Out')[0]: %s is not persistable in fill_constant" - % (self.op.output('Out')[0])) + #else: + # print( + # "self.op.output('Out')[0]: %s is not persistable in fill_constant" + # % (self.op.output('Out')[0])) return [const], [[0]] @@ -856,8 +856,8 @@ def _apply(self): ## wirte the output of truncatedNormal from startup_program to main_program if self.op.block.var(self.op.output('Out')[0]).persistable: - print("%s is Persistable in truncated_normal" % - (self.op.output('Out')[0])) + #print("%s is Persistable in truncated_normal" % + # (self.op.output('Out')[0])) var = core.GEOperatorFactory.create_operator( self.op.output('Out')[0], "Variable") var.update_output_desc("y", @@ -872,10 +872,10 @@ def _apply(self): shape_tensor, mean_tensor, std_tensor, min_tensor, max_tensor, truncated_normal ], [[-1]] - else: - print( - "self.op.output('Out')[0] is not persistable in truncated_noraml" - ) + #else: + # print( + # "self.op.output('Out')[0] is not persistable in truncated_noraml" + # ) return [truncated_normal], [[0]] From bdbb5c06917746f236314408fba15de2a108d429 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 1 Feb 2021 15:03:28 +0800 Subject: [PATCH 3/3] add ranksize --- .../ascend/ascend_optimizer.py | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py index 5934a9f902fbc6..71b22d9519d6bf 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py @@ -26,11 +26,12 @@ class AscendIRParser(object): - def __init__(self, auto_dp=False): + def __init__(self, auto_dp=False, world_rank_size=1): self.graph_idx = 0 self.hcom_endpoints = {} self.groups_to_create = [] - self._auto_dp=auto_dp + self._auto_dp = auto_dp + self._world_rank_size = world_rank_size def _construct_input_map(self, input_varlist): ret_map = {} @@ -96,7 +97,8 @@ def parse_op(self, op): ascend_parser.registerd_op[op.type]) op_parser.apply(op) else: - assert False, "Op[%s] has not been registered, so we have to skip it" % (op.type) + assert False, "Op[%s] has not been registered, so we have to skip it" % ( + op.type) def _parse_program(self, graph_name, @@ -160,13 +162,16 @@ def parse_program(self, startup_program, main_program, input_varlist, startup_graph = self._parse_program("startup", startup_program) main_graph = self._parse_program("main", main_program, input_varlist, fetch_list) - if self._auto_dp: - assert len(self.groups_to_create)==0, "can't parse program under auto_dp mode" + if self._auto_dp and self._world_rank_size > 1: + assert len(self.groups_to_create + ) == 0, "can't parse program under auto_dp mode" from paddle.distributed import fleet self.groups_to_create.append( HcomGroupConfig( - name="hcom_group_0", nranks=fleet.world_size(), rank_ids=[x for x in range(fleet.world_size())])) + name="hcom_group_0", + nranks=fleet.world_size(), + rank_ids=[x for x in range(fleet.world_size())])) return startup_graph, main_graph @@ -213,7 +218,7 @@ def minimize(self, self.ascend_instance = core.AscendInstance() from paddle.distributed import fleet - if auto_dp and fleet.worker_num() > 1: + if auto_dp and fleet.world_size() > 1: from paddle.fluid.transpiler import ascend_transpiler t = ascend_transpiler.AscendTranspiler(startup_program, loss.block.program) @@ -225,15 +230,11 @@ def minimize(self, "ge.exec.deviceId": str(fleet.local_device_ids()), "ge.graphRunMode": "1", "ge.exec.precision_mode": "must_keep_origin_dtype", - #"ge.exec.rankTableFile" = rank_table_file - #"ge.exec.rankId" = str(fleet.worker_index()) - #"ge.exec.isUseHcom" = "0" - #"ge.exec.deployMode" = "0" } # if multi trainers - if rank_table_file: + if rank_table_file and fleet.world_size() > 1: config["ge.exec.rankTableFile"] = rank_table_file - config["ge.exec.rankId"] = str(fleet.worker_index()) + config["ge.exec.rankId"] = str(fleet.worker_index()) config["ge.exec.isUseHcom"] = "1" config["ge.exec.deployMode"] = "0" print("ge_initialize config:", config) @@ -243,7 +244,8 @@ def minimize(self, self.ascend_instance.init_global_resources() main_block = loss.block - self.parser = AscendIRParser(auto_dp=auto_dp) + self.parser = AscendIRParser( + auto_dp=auto_dp, world_rank_size=fleet.world_size()) input_varlist = self._get_input_varlist(main_block.program) @@ -251,9 +253,9 @@ def minimize(self, startup_program, main_block.program, input_varlist, self.fetch_list) for cfg in self.parser.groups_to_create: - hccl.create_group(cfg.name, cfg.nranks, cfg.rank_ids) print("create group (%s), nranks: %d, rank_ids: %s" % (cfg.name, cfg.nranks, cfg.rank_ids)) + hccl.create_group(cfg.name, cfg.nranks, cfg.rank_ids) self.ascend_instance.add_ascend_subgraph(0, startup_graph) self.ascend_instance.add_ascend_subgraph(1, main_graph)