From 6fbdd8a0ec42dacfeec2899c8b3b1c9224d2fad5 Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 11:40:59 +0800 Subject: [PATCH 01/23] out dp as optimizer sharding --- .../framework/distributed_strategy.proto | 2 + .../meta_optimizers/sharding/fp16_helper.py | 9 +- .../meta_optimizers/sharding_optimizer.py | 90 +++++++++++++------ 3 files changed, 67 insertions(+), 34 deletions(-) diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 4674ba4007f0af..808b5d27d62eba 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -43,6 +43,8 @@ message ShardingConfig { optional bool pp_allreduce_in_optimize = 10 [ default = false ]; optional int32 pp_degree = 11 [ default = 1 ]; optional bool optimize_cast = 12 [ default = false ]; + // optimizer sharding + optional bool dp_as_opt_sharding = 13 [ default = false ]; } message HybridConfig { diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py index e939ac765b2c9e..cdc94263ddb39b 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py @@ -105,7 +105,6 @@ def prune_fp16(block, shard, reduced_grads_to_param, ring_ids): if op.type == "update_loss_scaling": update_loss_scaling_op_idx = idx inf_var_name = op.desc.input('FoundInfinite')[0] - op._rename_input(inf_var_name, inf_var_name + "@sharding") if op.type in ["check_finite_and_unscale", "update_loss_scaling"]: reversed_x = [] reversed_x_paramname = [] @@ -142,10 +141,6 @@ def prune_fp16(block, shard, reduced_grads_to_param, ring_ids): name=inf_var_name + "@cast_int32", shape=inf_var.shape, dtype=core.VarDesc.VarType.INT32) - inf_var_sharding = block.create_var( - name=inf_var_name + "@sharding", - shape=inf_var.shape, - dtype=inf_var.dtype) block._insert_op_without_sync( update_loss_scaling_op_idx, @@ -179,10 +174,10 @@ def prune_fp16(block, shard, reduced_grads_to_param, ring_ids): update_loss_scaling_op_idx, type='cast', inputs={'X': inf_var_int32}, - outputs={'Out': inf_var_sharding}, + outputs={'Out': inf_var}, attrs={ "in_dtype": inf_var_int32.dtype, - "out_dtype": inf_var_sharding.dtype, + "out_dtype": inf_var.dtype, OP_ROLE_KEY: OpRole.Optimize }) update_loss_scaling_op_idx += 1 diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index a76a70cdcab3df..4e169dd7ef73e4 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -15,19 +15,20 @@ import paddle from paddle.fluid import unique_name, core import paddle.fluid as fluid -from paddle.distributed.fleet.meta_optimizers.common import OpRole, OP_ROLE_VAR_KEY, CollectiveHelper -from paddle.distributed.fleet.meta_optimizers.common import is_backward_op, is_optimizer_op, is_update_op -from paddle.distributed.fleet.meta_optimizers.meta_optimizer_base import MetaOptimizerBase -from paddle.distributed.fleet.meta_optimizers.sharding.shard import Shard, ProgramSegment -from paddle.distributed.fleet.meta_optimizers.sharding.fp16_helper import FP16Utils -from paddle.distributed.fleet.meta_optimizers.sharding.weight_decay_helper import WeightDecayHelper -from paddle.distributed.fleet.meta_optimizers.sharding.gradient_clip_helper import GradientClipHelper -from .sharding.offload_helper import OffloadHelper -from paddle.distributed.fleet.meta_optimizers.sharding.prune import ProgramDeps -from paddle.distributed.fleet.meta_optimizers.sharding.utils import * -from paddle.fluid.framework import Program, Variable, name_scope, default_main_program, default_startup_program, device_guard +from paddle.static import default_startup_program, device_guard from paddle.fluid import layers +from .common import OpRole, OP_ROLE_VAR_KEY, CollectiveHelper +from .common import is_backward_op, is_optimizer_op, is_update_op +from .meta_optimizer_base import MetaOptimizerBase +from .sharding.shard import Shard, ProgramSegment +from .sharding.fp16_helper import FP16Utils +from .sharding.weight_decay_helper import WeightDecayHelper +from .sharding.gradient_clip_helper import GradientClipHelper +from .sharding.offload_helper import OffloadHelper +from .sharding.prune import ProgramDeps +from .sharding.utils import * + import logging logger = logging.getLogger(__name__) formatter = logging.Formatter( @@ -35,7 +36,6 @@ ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) -from functools import reduce __all__ = [] @@ -64,6 +64,7 @@ def __init__(self, optimizer): # reduced grads to param name self._reduced_grads_to_param = {} self._shard = Shard() + self._opt_shard = Shard() self._verbose = False # use sharding as outer parallelism (e.g. inner:Megatron & outer sharding) @@ -287,8 +288,44 @@ def _apply_sharding_pass(self, params_grads): startup_block._sync_with_cpp() # step4: remove unneeded ops and vars from block - self._prune_main_program(main_block) - self._prune_startup_program(startup_block) + self._prune_main_program( + main_block, self._shard, + [self.mp_ring_id, self.sharding_ring_id, self.pp_ring_id]) + self._prune_startup_program(startup_block, self._shard) + + def _apply_opt_sharding_pass(self, params_grads): + strategy = self.user_defined_strategy + sharding_configs = strategy.sharding_configs + + # if sharding_configs.dp_as_opt_sharding is False: return + if self.dp_degree == 1: return + + # TODO(wangxi): dp_as_opt_sharding need support with sharding + if self.sharding_degree > 1: return + + main_block = self._main_program.global_block() + startup_block = self._startup_program.global_block() + + # step1: build shard + self._opt_params = set([x[0].name for x in params_grads]) + self._opt_shard.setup(params_grads, self.dp_rank, self.dp_degree) + + # step 3: get broadcast vars + self._opt_broadcast_vars = self._shard.find_broadcast_params(main_block) + + main_block._sync_with_cpp() + startup_block._sync_with_cpp() + + # step4: remove unneeded ops and vars from block + self._prune_main_program( + main_block, + self._opt_shard, + # sharding_ring_id will be -1 for now + [ + self.mp_ring_id, self.sharding_ring_id, self.pp_ring_id, + self.dp_ring_id + ]) + self._prune_startup_program(startup_block, self._opt_shard) def _insert_allreduce_for_pp(self): if self.pp_degree == 1: return @@ -449,6 +486,9 @@ def minimize_impl(self, self._apply_sharding_pass(params_grads) + # for tmp, can only apply when sharding_degree<=1 + self._apply_opt_sharding_pass(params_grads) + self._insert_allreduce_for_pp() self._adapt_amp_clip_without_sharding() @@ -787,7 +827,7 @@ def _split_program(self, block): self._segments[idx_]._end_idx].desc.input_arg_names())) return - def _prune_main_program(self, block): + def _prune_main_program(self, block, shard, rings): """ calculate deps from allredce op to optimize op, remove ops and vars not needed in this worker @@ -799,21 +839,17 @@ def _prune_main_program(self, block): """ weightdecay_helper = WeightDecayHelper() - weightdecay_helper.prune_weight_decay(block, self._shard) + weightdecay_helper.prune_weight_decay(block, shard) # FIXME(wangxi): mp should prune duplicated param_grads # NOTE (JZ-LIANG) the sync of FoundInfinite should among one entire Model Parallelism # group. and each Data Parallelism group should have its own sync of FoundInfinite # amp could use global group for sync - FP16Utils.prune_fp16( - block, self._shard, self._reduced_grads_to_param, - [self.mp_ring_id, self.sharding_ring_id, self.pp_ring_id]) + FP16Utils.prune_fp16(block, shard, self._reduced_grads_to_param, rings) # clipbyglobalnorm should only use the Model paramllelism group (mp-sharding-pp) gradientclip_helper = GradientClipHelper(None) - gradientclip_helper.prune_gradient_clip( - block, self._shard, - [self.mp_ring_id, self.sharding_ring_id, self.pp_ring_id]) + gradientclip_helper.prune_gradient_clip(block, shard, rings) # build prog deps reduced_grads = [] @@ -828,8 +864,8 @@ def _prune_main_program(self, block): # prune optimizer state and param pruned_opti_vars = [] for var_name in list(block.vars.keys()): - if self._shard.is_opti_var(var_name) and \ - not self._shard.has_opt_var(var_name): + if shard.is_opti_var(var_name) and \ + not shard.has_opt_var(var_name): pruned_opti_vars.append(var_name) program_deps = ProgramDeps(block, reduced_grads, pruned_opti_vars) @@ -1112,17 +1148,17 @@ def _add_broadcast_allreduce(self, block): return - def _prune_startup_program(self, block): + def _prune_startup_program(self, block, shard): for idx, op in reversed(list(enumerate(block.ops))): for output_name in op.desc.output_arg_names(): - if self._shard.has_var(output_name): + if shard.has_var(output_name): continue #TODO why do we remove op, when only one var is removed block._remove_op(idx, sync=False) break for var_name in list(block.vars.keys()): - if self._shard.has_var(var_name): + if shard.has_var(var_name): continue block._remove_var(var_name, sync=False) block._sync_with_cpp() From 81240b94305ef147ba0de95d7bc970fd3332e6ad Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 12:12:40 +0800 Subject: [PATCH 02/23] refine --- .../meta_optimizers/sharding_optimizer.py | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 4e169dd7ef73e4..fe4c9753a7fcc0 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -64,7 +64,6 @@ def __init__(self, optimizer): # reduced grads to param name self._reduced_grads_to_param = {} self._shard = Shard() - self._opt_shard = Shard() self._verbose = False # use sharding as outer parallelism (e.g. inner:Megatron & outer sharding) @@ -155,8 +154,8 @@ def _get_hybrid_degree(self): def _get_hybrid_dp_mode(self): """ get - self.hybrid_dp_mode - self.gradient_merge_mode + self.hybrid_dp_mode = 'pp_hybrid_dp' or 'sharding_hybrid_dp' + self.gradient_merge_mode = 'pp_gm' or 'sharding_gm' self._gradient_merge_acc_step self.pp_allreduce_in_optimize """ @@ -297,35 +296,26 @@ def _apply_opt_sharding_pass(self, params_grads): strategy = self.user_defined_strategy sharding_configs = strategy.sharding_configs - # if sharding_configs.dp_as_opt_sharding is False: return - if self.dp_degree == 1: return - - # TODO(wangxi): dp_as_opt_sharding need support with sharding + # TODO(wangxi): need support dp_as_opt_sharding with sharding if self.sharding_degree > 1: return + if sharding_configs.dp_as_opt_sharding is False: return + if self.dp_degree == 1: return main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() # step1: build shard - self._opt_params = set([x[0].name for x in params_grads]) - self._opt_shard.setup(params_grads, self.dp_rank, self.dp_degree) + self._params = set([x[0].name for x in params_grads]) + self._shard.setup(params_grads, self.dp_rank, self.dp_degree) # step 3: get broadcast vars - self._opt_broadcast_vars = self._shard.find_broadcast_params(main_block) - - main_block._sync_with_cpp() - startup_block._sync_with_cpp() + self._broadcast_vars = self._shard.find_broadcast_params(main_block) # step4: remove unneeded ops and vars from block self._prune_main_program( - main_block, - self._opt_shard, - # sharding_ring_id will be -1 for now - [ - self.mp_ring_id, self.sharding_ring_id, self.pp_ring_id, - self.dp_ring_id - ]) - self._prune_startup_program(startup_block, self._opt_shard) + main_block, self._shard, + [self.mp_ring_id, self.pp_ring_id, self.dp_ring_id]) + self._prune_startup_program(startup_block, self._shard) def _insert_allreduce_for_pp(self): if self.pp_degree == 1: return From 9df39b4d29436f1c1cc7373b28ca5c757d48643e Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 15:45:33 +0800 Subject: [PATCH 03/23] fix adapt amp clip --- .../fleet/meta_optimizers/sharding/fp16_helper.py | 4 ---- .../fleet/meta_optimizers/sharding_optimizer.py | 9 +++++++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py index cdc94263ddb39b..c5b2d9227bc16a 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/fp16_helper.py @@ -205,10 +205,6 @@ def sync_amp_check_nan_inf(block, ring_ids): name=inf_var_name + "@cast_int32", shape=inf_var.shape, dtype=core.VarDesc.VarType.INT32) - inf_var_global = block.create_var( - name=inf_var_name + "@GLOBAL_WORLD", - shape=inf_var.shape, - dtype=inf_var.dtype) block._insert_op_without_sync( update_loss_scaling_op_idx, type='cast', diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index fe4c9753a7fcc0..f67af902ec953a 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -298,7 +298,7 @@ def _apply_opt_sharding_pass(self, params_grads): # TODO(wangxi): need support dp_as_opt_sharding with sharding if self.sharding_degree > 1: return - if sharding_configs.dp_as_opt_sharding is False: return + if sharding_configs['dp_as_opt_sharding'] is False: return if self.dp_degree == 1: return main_block = self._main_program.global_block() @@ -388,9 +388,14 @@ def _insert_allreduce_for_pp(self): # FIXME(wangxi): if fp16_allreduce, put cast fp16->fp32 to there? def _adapt_amp_clip_without_sharding(self): - if self.sharding_degree > 1: return + strategy = self.user_defined_strategy + sharding_configs = strategy.sharding_configs + # if not use sharding, adapt amp/clip, for remain parallelism. # cast --> amp --> clip --> opt + if self.sharding_degree > 1: return + if sharding_configs['dp_as_opt_sharding'] and self.dp_degree > 1: + return main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() From d14845b627ac6dbcf945726648768f3809578e96 Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 18:27:11 +0800 Subject: [PATCH 04/23] add reduce and broadcast op --- .../fleet/meta_optimizers/sharding/shard.py | 10 ++-- .../fleet/meta_optimizers/sharding/utils.py | 51 ++++++++++++++++++- .../meta_optimizers/sharding_optimizer.py | 46 ++++++++++++++--- python/paddle/fluid/optimizer.py | 9 ++-- 4 files changed, 100 insertions(+), 16 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/shard.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/shard.py index 0c33a78120cb84..52dfed83d33c45 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/shard.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/shard.py @@ -24,7 +24,8 @@ def __init__(self, ): self.global_params = set([]) self.worker_idx = -1 self.worker_num = -1 - self.global_param2device = {} + self.global_param2device = dict() + self.device2global_params = dict() def setup(self, params_grads, worker_idx, worker_num): # param names of all devices @@ -33,8 +34,9 @@ def setup(self, params_grads, worker_idx, worker_num): self.worker_idx = worker_idx self.worker_num = worker_num # global_param2device contains fp32 params and fp16 params - self.global_param2device = self._split_params(params_grads, worker_idx, - worker_num) + # device2global_params only contains fp32 params + self.global_param2device, self.device2global_params \ + = self._split_params(params_grads, worker_idx, worker_num) def has_param(self, var_name): return var_name in self.global_param2device and \ @@ -64,7 +66,7 @@ def _split_params(self, params_grads, worker_idx, worker_num): device2params[device_idx].append(param_name) param2device[param_name] = device_idx mem_accu += mem - return param2device + return param2device, device2params def _var_device_id(self, var_name): if var_name in self.global_param2device: diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 16fbc7bea6c8b6..e22dbbd917a4c5 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -438,7 +438,7 @@ def insert_reduce_ops(block, use_calc_stream=False, rank=None): """ - _add_allreduce_ops + _add_reduce_ops """ grad_in_this_device = [] for var in reduce_vars: @@ -463,6 +463,55 @@ def insert_reduce_ops(block, return grad_in_this_device +def insert_broadcast_param_ops(block, + insert_idx, + ring_id, + params, + shard, + op_role=OpRole.Optimize, + use_calc_stream=False, + rank=None): + """ + _add_broadcast_param_ops + """ + param_in_this_device = [] + for param in params: + root_id = shard.device(param) + assert root_id >= 0, "root id should be a positive int, but now root id is {}".format( + root_id) + if rank is not None and rank == root_id: + param_in_this_device.append(param) + block._insert_op_without_sync( + insert_idx, + type='c_broadcast', + inputs={'X': param}, + outputs={'Out': param}, + attrs={ + 'ring_id': ring_id, + 'root_id': root_id, + 'use_calc_stream': use_calc_stream, + OP_ROLE_KEY: op_role + }) + + return param_in_this_device + + for broadcast_name, root_device in broadcast2root: + block._insert_op_without_sync + paddle.fluid.framework.Block.append_op() + block._insert_op_without_sync( + insert_idx, + type='c_broadcast', + inputs={'X': broadcast_name}, + outputs={'Out': broadcast_name}, + attrs={ + 'ring_id': ring_id, + 'root': root_device, + OP_ROLE_KEY: op_role + }) + + return + + def get_grad_device(grad_name, shard): assert "@GRAD" in grad_name, "[{}] should be a grad variable.".format( grad_name) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index f67af902ec953a..88a2b1308ba524 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -27,6 +27,7 @@ from .sharding.gradient_clip_helper import GradientClipHelper from .sharding.offload_helper import OffloadHelper from .sharding.prune import ProgramDeps +from .sharding import utils from .sharding.utils import * import logging @@ -293,6 +294,7 @@ def _apply_sharding_pass(self, params_grads): self._prune_startup_program(startup_block, self._shard) def _apply_opt_sharding_pass(self, params_grads): + """ outer dp as optimizer sharding""" strategy = self.user_defined_strategy sharding_configs = strategy.sharding_configs @@ -301,6 +303,9 @@ def _apply_opt_sharding_pass(self, params_grads): if sharding_configs['dp_as_opt_sharding'] is False: return if self.dp_degree == 1: return + # TODO(wangxi): need support without pp in future + if self.pp_degree == 1: return + main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() @@ -321,7 +326,7 @@ def _insert_allreduce_for_pp(self): if self.pp_degree == 1: return strategy = self.user_defined_strategy - fp16_allreduce = strategy.fp16_allreduce + sharding_configs = strategy.sharding_configs main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() @@ -345,10 +350,14 @@ def _insert_allreduce_for_pp(self): if in_name not in main_block.vars: main_block._remove_op(idx) + optimizer_sharding = False + if sharding_configs['dp_as_opt_sharding'] and self.dp_degree > 1: + # TODO(wangxi): support fuse grad merge with optimizer sharding + strategy.fuse_grad_merge = False + optimizer_sharding = True + accumulated_grad_names = self._pp_optimizer._accumulate_gradients( - main_block, - fp16_allreduce=fp16_allreduce, - user_defined_strategy=strategy) + main_block, strategy=strategy) len_of_ops = len(main_block.ops) first_optimize_op_index = get_first_optimize_op_idx(main_block) @@ -373,7 +382,33 @@ def _insert_allreduce_for_pp(self): first_optimize_op_index += (len(main_block.ops) - len_of_ops) len_of_ops = len(main_block.ops) - if self.hybrid_dp and self.hybrid_dp_mode == "pp_hybrid_dp": + if optimizer_sharding: + logger.info("Pipeline Persistable grad is {}".format( + accumulated_grad_names)) + accumulated_grad_names = insert_reduce_ops( + main_block, + first_optimize_op_index, + self.dp_ring_id, + accumulated_grad_names, + self._shard, + OpRole.Optimize, + use_calc_stream=True, + rank=self.dp_rank) + logger.info("PP-Sharding grad is {}".format(accumulated_grad_names)) + first_optimize_op_index += (len(main_block.ops) - len_of_ops) + len_of_ops = len(main_block.ops) + + utils.insert_broadcast_param_ops( + main_block, + len_of_ops, + self.dp_ring_id, + self._shard.global_params, + self._shard, + OpRole.Optimize, + use_calc_stream=True, + rank=self.dp_rank) + + elif self.hybrid_dp and self.hybrid_dp_mode == "pp_hybrid_dp": insert_allreduce_ops( main_block, first_optimize_op_index, @@ -481,7 +516,6 @@ def minimize_impl(self, self._apply_sharding_pass(params_grads) - # for tmp, can only apply when sharding_degree<=1 self._apply_opt_sharding_pass(params_grads) self._insert_allreduce_for_pp() diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index bd5daf35a751ae..29cc80c893c2da 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5049,16 +5049,15 @@ def _rename_gradient_var_name(self, block): def _accumulate_gradients(self, block, pp_allreduce_in_optimize=False, - fp16_allreduce=False, - user_defined_strategy=None): + strategy=None): """ Create a new merged gradient for each parameter and accumulate the corresponding gradient to it. """ - if user_defined_strategy and user_defined_strategy.fuse_grad_merge: + fp16_allreduce = strategy.fp16_allreduce if strategy else False + if strategy and strategy.fuse_grad_merge: fused_gradient_names = self._accumulate_gradients_with_fuse( - block, fp16_allreduce, - user_defined_strategy.fuse_grad_size_in_MB) + block, fp16_allreduce, strategy.fuse_grad_size_in_MB) return fused_gradient_names merged_gradient_names = [] From bbbb760ef1757a88d6ca0f143c0afb7edfdfbb4a Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 18:57:49 +0800 Subject: [PATCH 05/23] reserved param --- .../fleet/meta_optimizers/sharding/prune.py | 6 +++++- .../meta_optimizers/sharding_optimizer.py | 21 +++++++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py index dd4e16b576fcf0..e063695a0b8a95 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py @@ -117,12 +117,16 @@ def crop_output_var_from_op(self, op_idx, var_name): var_name] == []: self._block._remove_var(var_name, sync=False) - def remove_op(self, op_idx): + def remove_op(self, op_idx, reserved_vars=None): # update deps op = self._block.ops[op_idx] for input_name in op.desc.input_arg_names(): + if reserved_vars is not None and input_name in reserved_vars: + continue self.crop_input_var_from_op(op_idx, input_name) for output_name in op.desc.output_arg_names(): + if reserved_vars is not None and output_name in reserved_vars: + continue self.crop_output_var_from_op(op_idx, output_name) self._block._remove_op(op_idx, sync=False) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 88a2b1308ba524..4d1a362e5c0c30 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -159,6 +159,7 @@ def _get_hybrid_dp_mode(self): self.gradient_merge_mode = 'pp_gm' or 'sharding_gm' self._gradient_merge_acc_step self.pp_allreduce_in_optimize + self.optimizer_sharding """ strategy = self.user_defined_strategy sharding_configs = strategy.sharding_configs @@ -195,9 +196,16 @@ def _get_hybrid_dp_mode(self): logger.info("Gradient merge in [{}], acc step = [{}]".format( gm_mode, gm_acc_step)) + optimizer_sharding = False + # TODO(wangxi): need support dp_as_opt_sharding with sharding + if self.sharding_degree == 1 and self.dp_degree > 1 \ + and sharding_configs['dp_as_opt_sharding']: + optimizer_sharding = True + self.hybrid_dp_mode = dp_mode self.gradient_merge_mode = gm_mode self._gradient_merge_acc_step = gm_acc_step + self.optimizer_sharding = optimizer_sharding # this feature is design for ascend, and should NOT be used in GPU training self.pp_allreduce_in_optimize = sharding_configs[ @@ -295,13 +303,8 @@ def _apply_sharding_pass(self, params_grads): def _apply_opt_sharding_pass(self, params_grads): """ outer dp as optimizer sharding""" - strategy = self.user_defined_strategy - sharding_configs = strategy.sharding_configs - - # TODO(wangxi): need support dp_as_opt_sharding with sharding - if self.sharding_degree > 1: return - if sharding_configs['dp_as_opt_sharding'] is False: return - if self.dp_degree == 1: return + if self.optimizer_sharding is False: + return # TODO(wangxi): need support without pp in future if self.pp_degree == 1: return @@ -407,7 +410,6 @@ def _insert_allreduce_for_pp(self): OpRole.Optimize, use_calc_stream=True, rank=self.dp_rank) - elif self.hybrid_dp and self.hybrid_dp_mode == "pp_hybrid_dp": insert_allreduce_ops( main_block, @@ -946,7 +948,8 @@ def _prune_main_program(self, block, shard, rings): # if all outputs of this op are in _should_removed_var # _should_removed_var: opt state not cur shard if program_deps.should_remove_op(idx): - program_deps.remove_op(idx) + reserved_vars = self._params if self.optimizer_sharding else None + program_deps.remove_op(idx, reserved_vars) # NOTE (JZ-LIANG) revise and unify logic here # sharding support fp16_allreduce logic From 6ce85dcae1af9afa6138324ed456fd55c010120c Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 21:07:55 +0800 Subject: [PATCH 06/23] fix cast --- .../meta_optimizers/sharding_optimizer.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 4d1a362e5c0c30..8529ac46686779 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -319,6 +319,10 @@ def _apply_opt_sharding_pass(self, params_grads): # step 3: get broadcast vars self._broadcast_vars = self._shard.find_broadcast_params(main_block) + # NOTE(wangxi): prune_main_program will prune cast if not add this + for param, grad in params_grads: + self._reduced_grads_to_param[grad.name] = param.name + # step4: remove unneeded ops and vars from block self._prune_main_program( main_block, self._shard, @@ -353,6 +357,10 @@ def _insert_allreduce_for_pp(self): if in_name not in main_block.vars: main_block._remove_op(idx) + if self.optimizer_sharding: + # TODO(wangxi): support fuse grad merge with optimizer sharding + strategy.fuse_grad_merge = False + optimizer_sharding = False if sharding_configs['dp_as_opt_sharding'] and self.dp_degree > 1: # TODO(wangxi): support fuse grad merge with optimizer sharding @@ -386,9 +394,7 @@ def _insert_allreduce_for_pp(self): len_of_ops = len(main_block.ops) if optimizer_sharding: - logger.info("Pipeline Persistable grad is {}".format( - accumulated_grad_names)) - accumulated_grad_names = insert_reduce_ops( + accumulated_grad_names = utils.insert_reduce_ops( main_block, first_optimize_op_index, self.dp_ring_id, @@ -397,19 +403,22 @@ def _insert_allreduce_for_pp(self): OpRole.Optimize, use_calc_stream=True, rank=self.dp_rank) - logger.info("PP-Sharding grad is {}".format(accumulated_grad_names)) + logger.info("Optimizer grad in this rank {}".format( + accumulated_grad_names)) first_optimize_op_index += (len(main_block.ops) - len_of_ops) len_of_ops = len(main_block.ops) - utils.insert_broadcast_param_ops( + optimizer_param = utils.insert_broadcast_param_ops( main_block, len_of_ops, self.dp_ring_id, - self._shard.global_params, + self._params, self._shard, OpRole.Optimize, use_calc_stream=True, rank=self.dp_rank) + logger.info("Optimizer param in this rank {}".format( + optimizer_param)) elif self.hybrid_dp and self.hybrid_dp_mode == "pp_hybrid_dp": insert_allreduce_ops( main_block, From d6732cc835885cdf124732d934f7eb21611a1cce Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 21:22:18 +0800 Subject: [PATCH 07/23] fix c_broadcast root --- .../paddle/distributed/fleet/meta_optimizers/sharding/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index e22dbbd917a4c5..6e2cb698505c03 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -488,7 +488,7 @@ def insert_broadcast_param_ops(block, outputs={'Out': param}, attrs={ 'ring_id': ring_id, - 'root_id': root_id, + 'root': root_id, 'use_calc_stream': use_calc_stream, OP_ROLE_KEY: op_role }) From 6bc6962ec69c20c5196996b675965431bba1474f Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 21:52:56 +0800 Subject: [PATCH 08/23] fix startup program --- .../distributed/fleet/meta_optimizers/sharding_optimizer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 8529ac46686779..51b59021039652 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -957,6 +957,7 @@ def _prune_main_program(self, block, shard, rings): # if all outputs of this op are in _should_removed_var # _should_removed_var: opt state not cur shard if program_deps.should_remove_op(idx): + # NOTE(wangxi): need reserve all param in optimizer_sharding reserved_vars = self._params if self.optimizer_sharding else None program_deps.remove_op(idx, reserved_vars) @@ -1194,6 +1195,8 @@ def _prune_startup_program(self, block, shard): for output_name in op.desc.output_arg_names(): if shard.has_var(output_name): continue + if self.optimizer_sharding and shard.is_param(output_name): + continue #TODO why do we remove op, when only one var is removed block._remove_op(idx, sync=False) break @@ -1201,6 +1204,8 @@ def _prune_startup_program(self, block, shard): for var_name in list(block.vars.keys()): if shard.has_var(var_name): continue + if self.optimizer_sharding and shard.is_param(var_name): + continue block._remove_var(var_name, sync=False) block._sync_with_cpp() From 39a562635444c00c71e29f98321c4548badfd96a Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 7 Sep 2021 23:05:30 +0800 Subject: [PATCH 09/23] fix prune and broadcast order --- .../fleet/meta_optimizers/sharding/prune.py | 7 +++++++ .../fleet/meta_optimizers/sharding_optimizer.py | 12 +++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py index e063695a0b8a95..908e9df8c52c50 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py @@ -136,6 +136,13 @@ def should_remove_op(self, op_idx): # remove check_finite_and_unscale op if its input 'X' is empty if op.type == 'check_finite_and_unscale' and len(op.input('X')) == 0: return True + + # NOTE: At present, it is found that the OP without output is + # only send_v2 and partial_send op, which will be used in + # all device + if len(op.desc.output_arg_names) == 0: + return False + for output_name in op.desc.output_arg_names(): if output_name not in self._should_removed_var: return False diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 51b59021039652..e8eeef8d50ec05 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -329,7 +329,7 @@ def _apply_opt_sharding_pass(self, params_grads): [self.mp_ring_id, self.pp_ring_id, self.dp_ring_id]) self._prune_startup_program(startup_block, self._shard) - def _insert_allreduce_for_pp(self): + def _insert_allreduce_for_pp(self, params_grads): if self.pp_degree == 1: return strategy = self.user_defined_strategy @@ -411,14 +411,14 @@ def _insert_allreduce_for_pp(self): optimizer_param = utils.insert_broadcast_param_ops( main_block, len_of_ops, - self.dp_ring_id, - self._params, + self.dp_ring_id, [x[0].name for x in params_grads], self._shard, OpRole.Optimize, use_calc_stream=True, rank=self.dp_rank) logger.info("Optimizer param in this rank {}".format( optimizer_param)) + assert len(accumulated_grad_names) == len(optimizer_param) elif self.hybrid_dp and self.hybrid_dp_mode == "pp_hybrid_dp": insert_allreduce_ops( main_block, @@ -529,7 +529,7 @@ def minimize_impl(self, self._apply_opt_sharding_pass(params_grads) - self._insert_allreduce_for_pp() + self._insert_allreduce_for_pp(params_grads) self._adapt_amp_clip_without_sharding() @@ -896,7 +896,9 @@ def _prune_main_program(self, block, shard, rings): for idx, op in enumerate(block.ops): input_names = op.desc.input_arg_names() output_names = op.desc.output_arg_names() - if op.type == "c_allreduce_sum": + # FIXME(wangxi): need use grads, pipeline grad is @GRAD@MERGE + if op.type == "c_allreduce_sum" and op.attr( + 'use_model_parallel') is False: assert (len(output_names) == 1) output_name = output_names[0] reduced_grads.append(output_name) From 6ea8f6ae96b9b254129f421f1c4c1f7add26309c Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 8 Sep 2021 11:35:44 +0800 Subject: [PATCH 10/23] fix --- .../paddle/distributed/fleet/meta_optimizers/sharding/prune.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py index 908e9df8c52c50..1e974695ec395c 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py @@ -140,7 +140,7 @@ def should_remove_op(self, op_idx): # NOTE: At present, it is found that the OP without output is # only send_v2 and partial_send op, which will be used in # all device - if len(op.desc.output_arg_names) == 0: + if len(op.desc.output_arg_names()) == 0: return False for output_name in op.desc.output_arg_names(): From 408f217012f94803b9f406302709d738ad35086f Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 8 Sep 2021 16:48:13 +0800 Subject: [PATCH 11/23] fix --- .../meta_optimizers/sharding_optimizer.py | 50 +++++++------------ 1 file changed, 19 insertions(+), 31 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index e8eeef8d50ec05..42223f4d8bdaf3 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -159,7 +159,7 @@ def _get_hybrid_dp_mode(self): self.gradient_merge_mode = 'pp_gm' or 'sharding_gm' self._gradient_merge_acc_step self.pp_allreduce_in_optimize - self.optimizer_sharding + self._optimizer_sharding """ strategy = self.user_defined_strategy sharding_configs = strategy.sharding_configs @@ -198,14 +198,15 @@ def _get_hybrid_dp_mode(self): optimizer_sharding = False # TODO(wangxi): need support dp_as_opt_sharding with sharding - if self.sharding_degree == 1 and self.dp_degree > 1 \ - and sharding_configs['dp_as_opt_sharding']: + # need support without pp in future + if self.sharding_degree == 1 and self.dp_degree > 1 and \ + sharding_configs['dp_as_opt_sharding'] and self.pp_degree > 1: optimizer_sharding = True self.hybrid_dp_mode = dp_mode self.gradient_merge_mode = gm_mode self._gradient_merge_acc_step = gm_acc_step - self.optimizer_sharding = optimizer_sharding + self._optimizer_sharding = optimizer_sharding # this feature is design for ascend, and should NOT be used in GPU training self.pp_allreduce_in_optimize = sharding_configs[ @@ -285,7 +286,8 @@ def _apply_sharding_pass(self, params_grads): startup_block = self._startup_program.global_block() # step1: build shard - self._build_shard(params_grads) + self._build_shard(params_grads, self.sharding_rank, + self.sharding_degree) # step2: split_program self._split_program(main_block) @@ -302,22 +304,15 @@ def _apply_sharding_pass(self, params_grads): self._prune_startup_program(startup_block, self._shard) def _apply_opt_sharding_pass(self, params_grads): - """ outer dp as optimizer sharding""" - if self.optimizer_sharding is False: + """ outer dp as optimizer sharding """ + if self._optimizer_sharding is False: return - # TODO(wangxi): need support without pp in future - if self.pp_degree == 1: return - main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() # step1: build shard - self._params = set([x[0].name for x in params_grads]) - self._shard.setup(params_grads, self.dp_rank, self.dp_degree) - - # step 3: get broadcast vars - self._broadcast_vars = self._shard.find_broadcast_params(main_block) + self._build_shard(params_grads, self.dp_rank, self.dp_degree) # NOTE(wangxi): prune_main_program will prune cast if not add this for param, grad in params_grads: @@ -357,16 +352,10 @@ def _insert_allreduce_for_pp(self, params_grads): if in_name not in main_block.vars: main_block._remove_op(idx) - if self.optimizer_sharding: + if self._optimizer_sharding: # TODO(wangxi): support fuse grad merge with optimizer sharding strategy.fuse_grad_merge = False - optimizer_sharding = False - if sharding_configs['dp_as_opt_sharding'] and self.dp_degree > 1: - # TODO(wangxi): support fuse grad merge with optimizer sharding - strategy.fuse_grad_merge = False - optimizer_sharding = True - accumulated_grad_names = self._pp_optimizer._accumulate_gradients( main_block, strategy=strategy) @@ -393,7 +382,7 @@ def _insert_allreduce_for_pp(self, params_grads): first_optimize_op_index += (len(main_block.ops) - len_of_ops) len_of_ops = len(main_block.ops) - if optimizer_sharding: + if self._optimizer_sharding: accumulated_grad_names = utils.insert_reduce_ops( main_block, first_optimize_op_index, @@ -440,8 +429,7 @@ def _adapt_amp_clip_without_sharding(self): # if not use sharding, adapt amp/clip, for remain parallelism. # cast --> amp --> clip --> opt if self.sharding_degree > 1: return - if sharding_configs['dp_as_opt_sharding'] and self.dp_degree > 1: - return + if self._optimizer_sharding: return main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() @@ -710,7 +698,7 @@ def _init_comm(self): startup_block._sync_with_cpp() - def _build_shard(self, params_grads): + def _build_shard(self, params_grads, shard_rank, shard_size): # step 2: split params self._params = set([x[0].name for x in params_grads]) self._shard.setup(params_grads, self.sharding_rank, @@ -897,8 +885,8 @@ def _prune_main_program(self, block, shard, rings): input_names = op.desc.input_arg_names() output_names = op.desc.output_arg_names() # FIXME(wangxi): need use grads, pipeline grad is @GRAD@MERGE - if op.type == "c_allreduce_sum" and op.attr( - 'use_model_parallel') is False: + if op.type == "c_allreduce_sum" and \ + op.attr('use_model_parallel') is False: assert (len(output_names) == 1) output_name = output_names[0] reduced_grads.append(output_name) @@ -960,7 +948,7 @@ def _prune_main_program(self, block, shard, rings): # _should_removed_var: opt state not cur shard if program_deps.should_remove_op(idx): # NOTE(wangxi): need reserve all param in optimizer_sharding - reserved_vars = self._params if self.optimizer_sharding else None + reserved_vars = self._params if self._optimizer_sharding else None program_deps.remove_op(idx, reserved_vars) # NOTE (JZ-LIANG) revise and unify logic here @@ -1197,7 +1185,7 @@ def _prune_startup_program(self, block, shard): for output_name in op.desc.output_arg_names(): if shard.has_var(output_name): continue - if self.optimizer_sharding and shard.is_param(output_name): + if self._optimizer_sharding and shard.is_param(output_name): continue #TODO why do we remove op, when only one var is removed block._remove_op(idx, sync=False) @@ -1206,7 +1194,7 @@ def _prune_startup_program(self, block, shard): for var_name in list(block.vars.keys()): if shard.has_var(var_name): continue - if self.optimizer_sharding and shard.is_param(var_name): + if self._optimizer_sharding and shard.is_param(var_name): continue block._remove_var(var_name, sync=False) block._sync_with_cpp() From d534f9fa6144d2bca67a6328b18a810723023e63 Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 8 Sep 2021 19:08:52 +0800 Subject: [PATCH 12/23] add fuse reduce_sum --- .../fleet/meta_optimizers/sharding/utils.py | 97 +++++++++++++++++-- .../meta_optimizers/sharding_optimizer.py | 9 +- 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 6e2cb698505c03..c7874384886521 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -365,20 +365,15 @@ def insert_allreduce_ops(block, return -def insert_fused_allreduce_ops(block, - insert_idx, - ring_id, - allreduce_vars, - op_role=OpRole.Backward, - use_calc_stream=False, - fuse_grad_size_in_MB=32): +def get_fused_groups(block, allreduce_vars, fuse_grad_size): + """ coalesce tensor, get fused group """ segments = [] cur_size = 0. last_dtype = None for var in allreduce_vars: real_var = block.var(var) var_size = get_var_size(real_var) - if cur_size + var_size > fuse_grad_size_in_MB \ + if cur_size + var_size > fuse_grad_size \ or len(segments) == 0 \ or real_var.dtype != last_dtype: segments.append([real_var]) @@ -387,6 +382,17 @@ def insert_fused_allreduce_ops(block, else: segments[-1].append(real_var) cur_size += var_size + return segments + + +def insert_fused_allreduce_ops(block, + insert_idx, + ring_id, + allreduce_vars, + op_role=OpRole.Backward, + use_calc_stream=False, + fuse_grad_size_in_MB=32): + segments = get_fused_groups(block, allreduce_vars, fuse_grad_size_in_MB) fused_vars = [] for segment in segments: @@ -429,6 +435,72 @@ def insert_fused_allreduce_ops(block, attrs={OP_ROLE_KEY: op_role}) +def insert_fused_reduce_ops(block, + insert_idx, + ring_id, + reduce_vars, + shard, + op_role=OpRole.Backward, + use_calc_stream=False, + rank=None, + fuse_grad_size=32): + nranks = shard.worker_num + device_to_vars = [[] for _ in range(nranks)] + + for var in reduce_vars: + root_id = get_grad_device(var, shard) + assert 0 <= root_id < nranks, "root_id should >=0 and < nranks, " \ + "but now nranks={}, the root_id of var={} is {}"\ + .format(nranks, var, root_id) + device_to_vars[root_id].append(var) + + for root_id, vars_name in enumerate(device_to_vars): + segments = get_fused_groups(block, vars_name, fuse_grad_size) + + fused_vars = [] + for segment in segments: + tmp_var = block.create_var( + name=unique_name.generate('FusedOutput_{}'.format(segment[0] + .name)), + dtype=segment[0].dtype, + persistable=False, + stop_gradient=True) + fused_vars.append(tmp_var) + block._insert_op_without_sync( + insert_idx, + type="coalesce_tensor", + inputs={"Input": segment}, + outputs={"Output": segment, + "FusedOutput": tmp_var}, + attrs={ + "copy_data": True, + "use_align": True, + "dtype": segment[0].dtype, + OP_ROLE_KEY: op_role + }) + + for fused_var in fused_vars: + block._insert_op_without_sync( + insert_idx + len(fused_vars), + type='c_reduce_sum', + inputs={'X': fused_var}, + outputs={'Out': fused_var}, + attrs={ + 'ring_id': ring_id, + 'root_id': root_id, + 'use_calc_stream': use_calc_stream, + OP_ROLE_KEY: op_role + }) + if not use_calc_stream: + block._insert_op_without_sync( + insert_idx + len(fused_vars), + type='c_sync_calc_stream', + inputs={'X': fused_var}, + outputs={'Out': fused_var}, + attrs={OP_ROLE_KEY: op_role}) + return [] if rank is None else device_to_vars[rank] + + def insert_reduce_ops(block, insert_idx, ring_id, @@ -436,10 +508,17 @@ def insert_reduce_ops(block, shard, op_role=OpRole.Backward, use_calc_stream=False, - rank=None): + rank=None, + strategy=None): """ _add_reduce_ops """ + if strategy and strategy.fuse_all_reduce_ops and \ + not strategy.fuse_grad_merge: + return insert_fused_reduce_ops(block, insert_idx, ring_id, reduce_vars, + shard, op_role, use_calc_stream, rank, + strategy.fuse_grad_size_in_MB) + grad_in_this_device = [] for var in reduce_vars: diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 42223f4d8bdaf3..2233606f8b2286 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -28,6 +28,7 @@ from .sharding.offload_helper import OffloadHelper from .sharding.prune import ProgramDeps from .sharding import utils +# FIXME: import * from .sharding.utils import * import logging @@ -305,8 +306,7 @@ def _apply_sharding_pass(self, params_grads): def _apply_opt_sharding_pass(self, params_grads): """ outer dp as optimizer sharding """ - if self._optimizer_sharding is False: - return + if self._optimizer_sharding is False: return main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() @@ -328,7 +328,6 @@ def _insert_allreduce_for_pp(self, params_grads): if self.pp_degree == 1: return strategy = self.user_defined_strategy - sharding_configs = strategy.sharding_configs main_block = self._main_program.global_block() startup_block = self._startup_program.global_block() @@ -353,8 +352,10 @@ def _insert_allreduce_for_pp(self, params_grads): main_block._remove_op(idx) if self._optimizer_sharding: - # TODO(wangxi): support fuse grad merge with optimizer sharding + # TODO(wangxi): support fuse_grad_merge and fp16_allreduce + # with optimizer sharding strategy.fuse_grad_merge = False + strategy.fp16_allreduce = False accumulated_grad_names = self._pp_optimizer._accumulate_gradients( main_block, strategy=strategy) From b5ba5b14fb21847330b26279d621e00a4421d4ca Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 8 Sep 2021 19:48:39 +0800 Subject: [PATCH 13/23] add fuse broadcast --- .../fleet/meta_optimizers/sharding/utils.py | 94 +++++++++++++++---- .../meta_optimizers/sharding_optimizer.py | 9 +- 2 files changed, 81 insertions(+), 22 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index c7874384886521..cba965fe76aa04 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -498,6 +498,7 @@ def insert_fused_reduce_ops(block, inputs={'X': fused_var}, outputs={'Out': fused_var}, attrs={OP_ROLE_KEY: op_role}) + return [] if rank is None else device_to_vars[rank] @@ -542,6 +543,73 @@ def insert_reduce_ops(block, return grad_in_this_device +def insert_fused_broadcast_param_ops(block, + insert_idx, + ring_id, + params, + shard, + op_role=OpRole.Optimize, + use_calc_stream=False, + rank=None, + fuse_grad_size=32): + nranks = shard.worker_num + device_to_vars = [[] for _ in range(nranks)] + + for var in params: + root_id = shard.device(var) + assert 0 <= root_id < nranks, "root_id should >=0 and < nranks, " \ + "but now nranks={}, the root_id of var={} is {}"\ + .format(nranks, var, root_id) + device_to_vars[root_id].append(var) + + for root_id, vars_name in enumerate(device_to_vars): + segments = get_fused_groups(block, vars_name, fuse_grad_size) + + fused_vars = [] + for segment in segments: + tmp_var = block.create_var( + name=unique_name.generate('FusedParam_{}'.format(segment[0] + .name)), + dtype=segment[0].dtype, + persistable=False, + stop_gradient=True) + fused_vars.append(tmp_var) + block._insert_op_without_sync( + insert_idx, + type="coalesce_tensor", + inputs={"Input": segment}, + outputs={"Output": segment, + "FusedOutput": tmp_var}, + attrs={ + "copy_data": True, + "use_align": True, + "dtype": segment[0].dtype, + OP_ROLE_KEY: op_role + }) + + for fused_var in fused_vars: + block._insert_op_without_sync( + insert_idx + len(fused_vars), + type='c_broadcast', + inputs={'X': fused_var}, + outputs={'Out': fused_var}, + attrs={ + 'ring_id': ring_id, + 'root': root_id, + 'use_calc_stream': use_calc_stream, + OP_ROLE_KEY: op_role + }) + if not use_calc_stream: + block._insert_op_without_sync( + insert_idx + len(fused_vars), + type='c_sync_calc_stream', + inputs={'X': fused_var}, + outputs={'Out': fused_var}, + attrs={OP_ROLE_KEY: op_role}) + + return [] if rank is None else device_to_vars[rank] + + def insert_broadcast_param_ops(block, insert_idx, ring_id, @@ -549,10 +617,16 @@ def insert_broadcast_param_ops(block, shard, op_role=OpRole.Optimize, use_calc_stream=False, - rank=None): + rank=None, + strategy=None): """ - _add_broadcast_param_ops + add broadcast param ops """ + if strategy and strategy.fuse_all_reduce_ops: + return insert_fused_broadcast_param_ops( + block, insert_idx, ring_id, params, shard, op_role, use_calc_stream, + rank, strategy.fuse_grad_size_in_MB) + param_in_this_device = [] for param in params: root_id = shard.device(param) @@ -574,22 +648,6 @@ def insert_broadcast_param_ops(block, return param_in_this_device - for broadcast_name, root_device in broadcast2root: - block._insert_op_without_sync - paddle.fluid.framework.Block.append_op() - block._insert_op_without_sync( - insert_idx, - type='c_broadcast', - inputs={'X': broadcast_name}, - outputs={'Out': broadcast_name}, - attrs={ - 'ring_id': ring_id, - 'root': root_device, - OP_ROLE_KEY: op_role - }) - - return - def get_grad_device(grad_name, shard): assert "@GRAD" in grad_name, "[{}] should be a grad variable.".format( diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 2233606f8b2286..259771ddccbd28 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -392,7 +392,8 @@ def _insert_allreduce_for_pp(self, params_grads): self._shard, OpRole.Optimize, use_calc_stream=True, - rank=self.dp_rank) + rank=self.dp_rank, + strategy=strategy) logger.info("Optimizer grad in this rank {}".format( accumulated_grad_names)) first_optimize_op_index += (len(main_block.ops) - len_of_ops) @@ -405,7 +406,8 @@ def _insert_allreduce_for_pp(self, params_grads): self._shard, OpRole.Optimize, use_calc_stream=True, - rank=self.dp_rank) + rank=self.dp_rank, + strategy=strategy) logger.info("Optimizer param in this rank {}".format( optimizer_param)) assert len(accumulated_grad_names) == len(optimizer_param) @@ -702,8 +704,7 @@ def _init_comm(self): def _build_shard(self, params_grads, shard_rank, shard_size): # step 2: split params self._params = set([x[0].name for x in params_grads]) - self._shard.setup(params_grads, self.sharding_rank, - self.sharding_degree) + self._shard.setup(params_grads, shard_rank, shard_size) # step 3: get broadcast vars self._broadcast_vars = self._shard.find_broadcast_params( From 31101be0192ed993c1d000937ce2446fba8860f3 Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 8 Sep 2021 20:25:16 +0800 Subject: [PATCH 14/23] add fuse broadcast --- .../distributed/fleet/meta_optimizers/sharding/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index cba965fe76aa04..dd81cbd128e0a0 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -459,6 +459,10 @@ def insert_fused_reduce_ops(block, fused_vars = [] for segment in segments: + if len(segment) == 1: + fused_vars.append(segment[0]) + continue + tmp_var = block.create_var( name=unique_name.generate('FusedOutput_{}'.format(segment[0] .name)), From ccb3aa4345b91cdcab77199b188186b98474e557 Mon Sep 17 00:00:00 2001 From: WangXi Date: Thu, 9 Sep 2021 15:03:24 +0800 Subject: [PATCH 15/23] optimizer sharding support fuse gradient merge --- .../framework/distributed_strategy.proto | 4 +- .../fleet/meta_optimizers/sharding/utils.py | 165 ++++++++---------- .../meta_optimizers/sharding_optimizer.py | 15 +- python/paddle/fluid/optimizer.py | 107 +++++++----- 4 files changed, 149 insertions(+), 142 deletions(-) diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 808b5d27d62eba..17d15a94c7287b 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -43,8 +43,8 @@ message ShardingConfig { optional bool pp_allreduce_in_optimize = 10 [ default = false ]; optional int32 pp_degree = 11 [ default = 1 ]; optional bool optimize_cast = 12 [ default = false ]; - // optimizer sharding - optional bool dp_as_opt_sharding = 13 [ default = false ]; + // Optimizer sharding. Temporary plans and may be deprecated + optional bool _dp_as_optimizer_sharding = 13 [ default = false ]; } message HybridConfig { diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index dd81cbd128e0a0..023daee5142955 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -365,24 +365,61 @@ def insert_allreduce_ops(block, return -def get_fused_groups(block, allreduce_vars, fuse_grad_size): - """ coalesce tensor, get fused group """ - segments = [] - cur_size = 0. - last_dtype = None - for var in allreduce_vars: - real_var = block.var(var) - var_size = get_var_size(real_var) - if cur_size + var_size > fuse_grad_size \ - or len(segments) == 0 \ - or real_var.dtype != last_dtype: - segments.append([real_var]) - cur_size = var_size - last_dtype = real_var.dtype - else: - segments[-1].append(real_var) - cur_size += var_size - return segments +class FuseHelper(object): + @staticmethod + def get_fused_groups(block, vars_name, fuse_size=32.): + """ coalesce tensor, get fused group """ + groups = [] + cur_size = 0. + last_dtype = None + for var_name in vars_name: + real_var = block.var(var_name) + var_size = get_var_size(real_var) + if cur_size + var_size > fuse_size \ + or len(groups) == 0 \ + or real_var.dtype != last_dtype: + groups.append([real_var]) + cur_size = var_size + last_dtype = real_var.dtype + else: + groups[-1].append(real_var) + cur_size += var_size + return groups + + @staticmethod + def coalesce_tensor(block, + index, + groups, + op_role=OpRole.Backward, + prefix="Output"): + fused_vars = [] + for group in groups: + assert len(group) >= 1 + if len(group) == 1: + # no need fuse + fused_vars.append(group[0]) + continue + + fused_var = block.create_var( + name=unique_name.generate('Fused{}_{}'.format(prefix, group[0] + .name)), + dtype=group[0].dtype, + persistable=False, + stop_gradient=True) + fused_vars.append(fused_var) + block._insert_op_without_sync( + index, + type="coalesce_tensor", + inputs={"Input": group}, + outputs={"Output": group, + "FusedOutput": fused_var}, + attrs={ + "copy_data": True, + "use_align": True, + "dtype": group[0].dtype, + OP_ROLE_KEY: op_role + }) + return fused_vars def insert_fused_allreduce_ops(block, @@ -392,28 +429,11 @@ def insert_fused_allreduce_ops(block, op_role=OpRole.Backward, use_calc_stream=False, fuse_grad_size_in_MB=32): - segments = get_fused_groups(block, allreduce_vars, fuse_grad_size_in_MB) - - fused_vars = [] - for segment in segments: - tmp_var = block.create_var( - name=unique_name.generate('FusedOutput_{}'.format(segment[0].name)), - dtype=segment[0].dtype, - persistable=False, - stop_gradient=True) - fused_vars.append(tmp_var) - block._insert_op_without_sync( - insert_idx, - type="coalesce_tensor", - inputs={"Input": segment}, - outputs={"Output": segment, - "FusedOutput": tmp_var}, - attrs={ - "copy_data": True, - "use_align": True, - "dtype": segment[0].dtype, - OP_ROLE_KEY: op_role - }) + groups = FuseHelper.get_fused_groups(block, allreduce_vars, + fuse_grad_size_in_MB) + + fused_vars = FuseHelper.coalesce_tensor( + block, insert_idx, groups, op_role, prefix="Grad") for fused_var in fused_vars: block._insert_op_without_sync( @@ -455,33 +475,10 @@ def insert_fused_reduce_ops(block, device_to_vars[root_id].append(var) for root_id, vars_name in enumerate(device_to_vars): - segments = get_fused_groups(block, vars_name, fuse_grad_size) - - fused_vars = [] - for segment in segments: - if len(segment) == 1: - fused_vars.append(segment[0]) - continue + groups = FuseHelper.get_fused_groups(block, vars_name, fuse_grad_size) - tmp_var = block.create_var( - name=unique_name.generate('FusedOutput_{}'.format(segment[0] - .name)), - dtype=segment[0].dtype, - persistable=False, - stop_gradient=True) - fused_vars.append(tmp_var) - block._insert_op_without_sync( - insert_idx, - type="coalesce_tensor", - inputs={"Input": segment}, - outputs={"Output": segment, - "FusedOutput": tmp_var}, - attrs={ - "copy_data": True, - "use_align": True, - "dtype": segment[0].dtype, - OP_ROLE_KEY: op_role - }) + fused_vars = FuseHelper.coalesce_tensor( + block, insert_idx, groups, op_role, prefix="Grad") for fused_var in fused_vars: block._insert_op_without_sync( @@ -526,8 +523,12 @@ def insert_reduce_ops(block, grad_in_this_device = [] for var in reduce_vars: - - root_id = get_grad_device(var, shard) + grad_var = var + if strategy.fuse_grad_merge: + # TODO(wangxi): if support fp16_allreduce, need be + # 'FusedMergedGrad.cast_fp16._' + grad_var = var.replace('FusedMergedGrad_', '') + root_id = get_grad_device(grad_var, shard) assert root_id >= 0, "root id should be a positive int, but now root id is {}".format( root_id) if rank is not None and rank == root_id: @@ -555,7 +556,7 @@ def insert_fused_broadcast_param_ops(block, op_role=OpRole.Optimize, use_calc_stream=False, rank=None, - fuse_grad_size=32): + fuse_size=32): nranks = shard.worker_num device_to_vars = [[] for _ in range(nranks)] @@ -567,29 +568,10 @@ def insert_fused_broadcast_param_ops(block, device_to_vars[root_id].append(var) for root_id, vars_name in enumerate(device_to_vars): - segments = get_fused_groups(block, vars_name, fuse_grad_size) + groups = FuseHelper.get_fused_groups(block, vars_name, fuse_size) - fused_vars = [] - for segment in segments: - tmp_var = block.create_var( - name=unique_name.generate('FusedParam_{}'.format(segment[0] - .name)), - dtype=segment[0].dtype, - persistable=False, - stop_gradient=True) - fused_vars.append(tmp_var) - block._insert_op_without_sync( - insert_idx, - type="coalesce_tensor", - inputs={"Input": segment}, - outputs={"Output": segment, - "FusedOutput": tmp_var}, - attrs={ - "copy_data": True, - "use_align": True, - "dtype": segment[0].dtype, - OP_ROLE_KEY: op_role - }) + fused_vars = FuseHelper.coalesce_tensor( + block, insert_idx, groups, op_role, prefix="Param") for fused_var in fused_vars: block._insert_op_without_sync( @@ -627,6 +609,7 @@ def insert_broadcast_param_ops(block, add broadcast param ops """ if strategy and strategy.fuse_all_reduce_ops: + # TODO(wangxi): put fused var in startup_program, only need exec once return insert_fused_broadcast_param_ops( block, insert_idx, ring_id, params, shard, op_role, use_calc_stream, rank, strategy.fuse_grad_size_in_MB) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 259771ddccbd28..4b3acb4a7e9242 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -200,8 +200,9 @@ def _get_hybrid_dp_mode(self): optimizer_sharding = False # TODO(wangxi): need support dp_as_opt_sharding with sharding # need support without pp in future - if self.sharding_degree == 1 and self.dp_degree > 1 and \ - sharding_configs['dp_as_opt_sharding'] and self.pp_degree > 1: + if self.sharding_degree == 1 and self.dp_degree > 1 \ + and sharding_configs['_dp_as_optimizer_sharding'] \ + and self.pp_degree > 1: optimizer_sharding = True self.hybrid_dp_mode = dp_mode @@ -352,13 +353,12 @@ def _insert_allreduce_for_pp(self, params_grads): main_block._remove_op(idx) if self._optimizer_sharding: - # TODO(wangxi): support fuse_grad_merge and fp16_allreduce - # with optimizer sharding - strategy.fuse_grad_merge = False + # TODO(wangxi): support fp16_allreduce with optimizer sharding strategy.fp16_allreduce = False + shard = self._shard if self._optimizer_sharding else None accumulated_grad_names = self._pp_optimizer._accumulate_gradients( - main_block, strategy=strategy) + main_block, strategy=strategy, shard=shard) len_of_ops = len(main_block.ops) first_optimize_op_index = get_first_optimize_op_idx(main_block) @@ -410,7 +410,8 @@ def _insert_allreduce_for_pp(self, params_grads): strategy=strategy) logger.info("Optimizer param in this rank {}".format( optimizer_param)) - assert len(accumulated_grad_names) == len(optimizer_param) + if not strategy.fuse_grad_merge: + assert len(accumulated_grad_names) == len(optimizer_param) elif self.hybrid_dp and self.hybrid_dp_mode == "pp_hybrid_dp": insert_allreduce_ops( main_block, diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 29cc80c893c2da..93eca99066859c 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5049,7 +5049,8 @@ def _rename_gradient_var_name(self, block): def _accumulate_gradients(self, block, pp_allreduce_in_optimize=False, - strategy=None): + strategy=None, + shard=None): """ Create a new merged gradient for each parameter and accumulate the corresponding gradient to it. @@ -5057,7 +5058,7 @@ def _accumulate_gradients(self, fp16_allreduce = strategy.fp16_allreduce if strategy else False if strategy and strategy.fuse_grad_merge: fused_gradient_names = self._accumulate_gradients_with_fuse( - block, fp16_allreduce, strategy.fuse_grad_size_in_MB) + block, fp16_allreduce, strategy.fuse_grad_size_in_MB, shard) return fused_gradient_names merged_gradient_names = [] @@ -5189,44 +5190,9 @@ def _accumulate_gradients(self, return merged_gradient_names - def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): - first_opt_op_idx = None - grad_param_pairs = [] - # obtain all param/grad pairs that needed to be fused - for index, op in reversed(tuple(enumerate(list(main_block.ops)))): - # remove the cast op of fp16 grad to fp32 grad - if self._is_optimize_op(op) and op.type == 'cast': - in_name = op.input_arg_names[0] - out_name = op.output_arg_names[0] - if out_name.strip('@GRAD') in self._param_device_map: - assert in_name.replace('.cast_fp16', '') == out_name - main_block._remove_op(index) - continue - - if self._is_backward_op(op) and first_opt_op_idx is None: - first_opt_op_idx = index + 1 - # no optimize phase - if first_opt_op_idx == len(main_block.ops): - return - - if self._is_backward_op(op) and ( - self._op_role_var_key in op.attr_names): - op_role_var = op.attr(self._op_role_var_key) - if len(op_role_var) == 0: - continue - assert len(op_role_var) % 2 == 0 - for i in range(0, len(op_role_var), 2): - param_name = op_role_var[i] - if not main_block.has_var(param_name): - continue - if '@BroadCast' in param_name: - continue - grad_param_pairs.append( - (op_role_var[i + 1], op_role_var[i])) - - if len(grad_param_pairs) == 0: - return - + def _insert_accumulate_gradients_with_fuse(self, main_block, fp16, + fused_size, grad_param_pairs, + first_opt_op_idx): grad_param_pairs = self._sort_grad_param_by_dtype(main_block, grad_param_pairs) @@ -5425,9 +5391,66 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): for i in range(len(fused_merged_gradients)): fused_merged_gradients[i] = fused_merged_gradients[i].name - main_block._sync_with_cpp() + return fused_merged_gradients, first_opt_op_idx + + def _accumulate_gradients_with_fuse(self, + main_block, + fp16, + fused_size, + shard=None): + first_opt_op_idx = None + grad_param_pairs = [] + # obtain all param/grad pairs that needed to be fused + for index, op in reversed(tuple(enumerate(list(main_block.ops)))): + # remove the cast op of fp16 grad to fp32 grad + if self._is_optimize_op(op) and op.type == 'cast': + in_name = op.input_arg_names[0] + out_name = op.output_arg_names[0] + if out_name.strip('@GRAD') in self._param_device_map: + assert in_name.replace('.cast_fp16', '') == out_name + main_block._remove_op(index) + continue + + if self._is_backward_op(op) and first_opt_op_idx is None: + first_opt_op_idx = index + 1 + # no optimize phase + if first_opt_op_idx == len(main_block.ops): + return - return fused_merged_gradients + if self._is_backward_op(op) and ( + self._op_role_var_key in op.attr_names): + op_role_var = op.attr(self._op_role_var_key) + if len(op_role_var) == 0: + continue + assert len(op_role_var) % 2 == 0 + for i in range(0, len(op_role_var), 2): + param_name = op_role_var[i] + if not main_block.has_var(param_name): + continue + if '@BroadCast' in param_name: + continue + grad_param_pairs.append( + (op_role_var[i + 1], op_role_var[i])) + + if len(grad_param_pairs) == 0: + return + + nranks = shard.worker_num if shard else 1 + device_to_pairs = [[] for _ in range(nranks)] + for pair in grad_param_pairs: + root_id = shard.device(pair[1]) if shard else 0 + assert 0 <= root_id < nranks + device_to_pairs[root_id].append(pair) + + all_fused_merged_gradients = [] + for pairs in device_to_pairs: + fused_merged_gradients, first_opt_op_idx = \ + self._insert_accumulate_gradients_with_fuse( + main_block, fp16, fused_size, pairs, first_opt_op_idx) + all_fused_merged_gradients += fused_merged_gradients + + main_block._sync_with_cpp() + return all_fused_merged_gradients def _sort_grad_param_by_dtype(self, main_block, grad_param_pairs): # sort the grad param paris by the dtype From 09cc1ac9e152305f2a18e6faca95275bd8bfc8e9 Mon Sep 17 00:00:00 2001 From: WangXi Date: Thu, 9 Sep 2021 15:58:37 +0800 Subject: [PATCH 16/23] fix insert_num --- .../fleet/meta_optimizers/sharding/utils.py | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 023daee5142955..be4d47f91f78f7 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -387,12 +387,13 @@ def get_fused_groups(block, vars_name, fuse_size=32.): return groups @staticmethod - def coalesce_tensor(block, - index, - groups, - op_role=OpRole.Backward, - prefix="Output"): + def insert_coalesce_tensor(block, + index, + groups, + op_role=OpRole.Backward, + prefix="Output"): fused_vars = [] + insert_num = 0 for group in groups: assert len(group) >= 1 if len(group) == 1: @@ -419,7 +420,8 @@ def coalesce_tensor(block, "dtype": group[0].dtype, OP_ROLE_KEY: op_role }) - return fused_vars + insert_num += 1 + return fused_vars, insert_num def insert_fused_allreduce_ops(block, @@ -432,12 +434,12 @@ def insert_fused_allreduce_ops(block, groups = FuseHelper.get_fused_groups(block, allreduce_vars, fuse_grad_size_in_MB) - fused_vars = FuseHelper.coalesce_tensor( + fused_vars, insert_num = FuseHelper.insert_coalesce_tensor( block, insert_idx, groups, op_role, prefix="Grad") for fused_var in fused_vars: block._insert_op_without_sync( - insert_idx + len(fused_vars), + insert_idx + insert_num, type='c_allreduce_sum', inputs={'X': fused_var}, outputs={'Out': fused_var}, @@ -448,7 +450,7 @@ def insert_fused_allreduce_ops(block, }) if not use_calc_stream: block._insert_op_without_sync( - insert_idx + len(fused_vars), + insert_idx + insert_num, type='c_sync_calc_stream', inputs={'X': fused_var}, outputs={'Out': fused_var}, @@ -477,12 +479,12 @@ def insert_fused_reduce_ops(block, for root_id, vars_name in enumerate(device_to_vars): groups = FuseHelper.get_fused_groups(block, vars_name, fuse_grad_size) - fused_vars = FuseHelper.coalesce_tensor( + fused_vars, insert_num = FuseHelper.insert_coalesce_tensor( block, insert_idx, groups, op_role, prefix="Grad") for fused_var in fused_vars: block._insert_op_without_sync( - insert_idx + len(fused_vars), + insert_idx + insert_num, type='c_reduce_sum', inputs={'X': fused_var}, outputs={'Out': fused_var}, @@ -494,7 +496,7 @@ def insert_fused_reduce_ops(block, }) if not use_calc_stream: block._insert_op_without_sync( - insert_idx + len(fused_vars), + insert_idx + insert_num, type='c_sync_calc_stream', inputs={'X': fused_var}, outputs={'Out': fused_var}, @@ -570,12 +572,12 @@ def insert_fused_broadcast_param_ops(block, for root_id, vars_name in enumerate(device_to_vars): groups = FuseHelper.get_fused_groups(block, vars_name, fuse_size) - fused_vars = FuseHelper.coalesce_tensor( + fused_vars, insert_num = FuseHelper.insert_coalesce_tensor( block, insert_idx, groups, op_role, prefix="Param") for fused_var in fused_vars: block._insert_op_without_sync( - insert_idx + len(fused_vars), + insert_idx + insert_num, type='c_broadcast', inputs={'X': fused_var}, outputs={'Out': fused_var}, @@ -587,7 +589,7 @@ def insert_fused_broadcast_param_ops(block, }) if not use_calc_stream: block._insert_op_without_sync( - insert_idx + len(fused_vars), + insert_idx + insert_num, type='c_sync_calc_stream', inputs={'X': fused_var}, outputs={'Out': fused_var}, From fcf1e8df7d58c07f80bb58845427b787ef8c5f39 Mon Sep 17 00:00:00 2001 From: WangXi Date: Thu, 9 Sep 2021 19:42:31 +0800 Subject: [PATCH 17/23] clean --- .../distributed/fleet/meta_optimizers/sharding_optimizer.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 4b3acb4a7e9242..7f5c780f1f5cfa 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -427,9 +427,6 @@ def _insert_allreduce_for_pp(self, params_grads): # FIXME(wangxi): if fp16_allreduce, put cast fp16->fp32 to there? def _adapt_amp_clip_without_sharding(self): - strategy = self.user_defined_strategy - sharding_configs = strategy.sharding_configs - # if not use sharding, adapt amp/clip, for remain parallelism. # cast --> amp --> clip --> opt if self.sharding_degree > 1: return From 01af7621a0fb590e48e6790f460ee024fca29c8f Mon Sep 17 00:00:00 2001 From: WangXi Date: Fri, 10 Sep 2021 19:01:09 +0800 Subject: [PATCH 18/23] fix sharding prune --- .../amp/check_finite_and_unscale_op.cc | 37 ++++++++++--------- .../amp/check_finite_and_unscale_op.cu | 2 + .../operators/amp/update_loss_scaling_op.cc | 32 +++++++++++----- .../operators/amp/update_loss_scaling_op.cu | 2 + .../sharding/gradient_clip_helper.py | 24 +++++++++++- .../fleet/meta_optimizers/sharding/prune.py | 4 -- 6 files changed, 70 insertions(+), 31 deletions(-) diff --git a/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc b/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc index c7520dbd34f6a9..d104fba6d542bb 100644 --- a/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc +++ b/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc @@ -26,27 +26,28 @@ class CheckFiniteAndUnscaleOp : public framework::OperatorWithKernel { : OperatorWithKernel(type, inputs, outputs, attrs) {} void InferShape(framework::InferShapeContext* ctx) const override { - OP_INOUT_CHECK(ctx->HasInputs("X"), "Input", "X", - "check_finite_and_unscale"); - OP_INOUT_CHECK(ctx->HasOutputs("Out"), "Output", "Out", - "check_finite_and_unscale"); - PADDLE_ENFORCE_EQ( - ctx->Inputs("X").size(), ctx->Outputs("Out").size(), - platform::errors::InvalidArgument( - "The input(X) and output(Out) should have same size in " - "Operator(check_finite_and_unscale), size of input(X) is %d " - "and size of output(Out) is %d.", - ctx->Inputs("X").size(), ctx->Outputs("Out").size())); - auto x_dims = ctx->GetInputsDim("X"); - ctx->SetOutputsDim("Out", x_dims); + if (ctx->HasInputs("X") || ctx->HasOutputs("Out")) { + PADDLE_ENFORCE_EQ( + ctx->Inputs("X").size(), ctx->Outputs("Out").size(), + platform::errors::InvalidArgument( + "The input(X) and output(Out) should have same size in " + "Operator(check_finite_and_unscale), size of input(X) is %d " + "and size of output(Out) is %d.", + ctx->Inputs("X").size(), ctx->Outputs("Out").size())); + auto x_dims = ctx->GetInputsDim("X"); + ctx->SetOutputsDim("Out", x_dims); + } ctx->SetOutputDim("FoundInfinite", {1}); } protected: framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext& ctx) const override { - return framework::OpKernelType( - OperatorWithKernel::IndicateVarDataType(ctx, "X"), ctx.GetPlace()); + auto dtype = framework::proto::VarType::FP32; + if (ctx.MultiInputVar("X").size() >= 1) { + dtype = OperatorWithKernel::IndicateVarDataType(ctx, "X"); + } + return framework::OpKernelType(dtype, ctx.GetPlace()); } }; @@ -56,7 +57,8 @@ class CheckFiniteAndUnscaleOpMaker : public framework::OpProtoAndCheckerMaker { AddInput( "X", "(Tensors) The input tensors of check_finite_and_unscale operator.") - .AsDuplicable(); + .AsDuplicable() + .AsDispensable(); AddInput("Scale", "(Tensor) 1-dim tensor, the scale of check_finite_and_unscale " "operator."); @@ -69,7 +71,8 @@ class CheckFiniteAndUnscaleOpMaker : public framework::OpProtoAndCheckerMaker { AddOutput("Out", "(Tensors) The scaled output tensor of " "check_finite_and_unscale operator.") - .AsDuplicable(); + .AsDuplicable() + .AsDispensable(); AddOutput("FoundInfinite", "(Tensor) 1-dim tensor, contains a bool scalar, which indicates " "if there there is infinite or nan item in input X."); diff --git a/paddle/fluid/operators/amp/check_finite_and_unscale_op.cu b/paddle/fluid/operators/amp/check_finite_and_unscale_op.cu index c699486a9140a3..f8c0426d7b1fbc 100644 --- a/paddle/fluid/operators/amp/check_finite_and_unscale_op.cu +++ b/paddle/fluid/operators/amp/check_finite_and_unscale_op.cu @@ -97,6 +97,8 @@ class CheckFiniteAndUnscaleGpuKernel : public framework::OpKernel { scale_data, inverse_scale_v, found_inf_data); size_t xs_size = xs.size(); + if (xs_size == 0) return; + const auto& cpu_place = platform::CPUPlace(); // calculate each tensor's start index and copy to device auto h_starts_tensor = diff --git a/paddle/fluid/operators/amp/update_loss_scaling_op.cc b/paddle/fluid/operators/amp/update_loss_scaling_op.cc index 1ac286ef4ad1c9..2098674038fa2e 100644 --- a/paddle/fluid/operators/amp/update_loss_scaling_op.cc +++ b/paddle/fluid/operators/amp/update_loss_scaling_op.cc @@ -26,7 +26,6 @@ class UpdateLossScalingOp : public framework::OperatorWithKernel { using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext* ctx) const override { - OP_INOUT_CHECK(ctx->HasInputs("X"), "Input", "X", "update_loss_scaling"); OP_INOUT_CHECK(ctx->HasInput("FoundInfinite"), "Input", "FoundInfinite", "update_loss_scaling"); OP_INOUT_CHECK(ctx->HasInput("PrevLossScaling"), "Input", "PrevLossScaling", @@ -35,16 +34,25 @@ class UpdateLossScalingOp : public framework::OperatorWithKernel { "update_loss_scaling"); OP_INOUT_CHECK(ctx->HasInput("InBadSteps"), "Input", "InBadSteps", "update_loss_scaling"); - OP_INOUT_CHECK(ctx->HasOutputs("Out"), "Output", "Out", - "update_loss_scaling"); OP_INOUT_CHECK(ctx->HasOutput("LossScaling"), "Output", "LossScaling", "update_loss_scaling"); OP_INOUT_CHECK(ctx->HasOutput("OutGoodSteps"), "Output", "OutGoodSteps", "update_loss_scaling"); OP_INOUT_CHECK(ctx->HasOutput("OutBadSteps"), "Output", "OutBadSteps", "update_loss_scaling"); - auto x_dims = ctx->GetInputsDim("X"); - ctx->SetOutputsDim("Out", x_dims); + + if (ctx->HasInputs("X") || ctx->HasOutputs("Out")) { + PADDLE_ENFORCE_EQ( + ctx->Inputs("X").size(), ctx->Outputs("Out").size(), + platform::errors::InvalidArgument( + "The input(X) and output(Out) should have same size in " + "Operator(update_loss_scaling), size of input(X) is %d " + "and size of output(Out) is %d.", + ctx->Inputs("X").size(), ctx->Outputs("Out").size())); + auto x_dims = ctx->GetInputsDim("X"); + ctx->SetOutputsDim("Out", x_dims); + } + ctx->SetOutputDim("LossScaling", {1}); ctx->SetOutputDim("OutGoodSteps", {1}); ctx->SetOutputDim("OutBadSteps", {1}); @@ -53,8 +61,12 @@ class UpdateLossScalingOp : public framework::OperatorWithKernel { protected: framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext& ctx) const override { - return framework::OpKernelType( - OperatorWithKernel::IndicateVarDataType(ctx, "X"), ctx.GetPlace()); + auto dtype = framework::proto::VarType::FP32; + if (ctx.MultiInputVar("X").size() >= 1) { + dtype = OperatorWithKernel::IndicateVarDataType(ctx, "X"); + } + + return framework::OpKernelType(dtype, ctx.GetPlace()); } }; @@ -63,7 +75,8 @@ class UpdateLossScalingOpMaker : public framework::OpProtoAndCheckerMaker { void Make() override { AddInput("X", "(Tensors) The input tensors of update_loss_scaling operator.") - .AsDuplicable(); + .AsDuplicable() + .AsDispensable(); AddInput("FoundInfinite", "(Tensor) 1-dim tensor, contains a bool scalar, which indicates " "whether there is any infinite gradient."); @@ -77,7 +90,8 @@ class UpdateLossScalingOpMaker : public framework::OpProtoAndCheckerMaker { "gradients are infinite."); AddOutput("Out", "(Tensors) The output tensor of update_loss_scaling operator.") - .AsDuplicable(); + .AsDuplicable() + .AsDispensable(); AddOutput("LossScaling", "(Tensor) 1-dim tensor, updated loss scaling."); AddOutput("OutGoodSteps", "(Tensor) 1-dim tensor, pdated good steps."); AddOutput("OutBadSteps", "(Tensor) 1-dim tensor, updated bad steps."); diff --git a/paddle/fluid/operators/amp/update_loss_scaling_op.cu b/paddle/fluid/operators/amp/update_loss_scaling_op.cu index de1f83c1ee50d0..ba8e2bd15874f9 100644 --- a/paddle/fluid/operators/amp/update_loss_scaling_op.cu +++ b/paddle/fluid/operators/amp/update_loss_scaling_op.cu @@ -95,6 +95,8 @@ class LazyZeros { const std::vector& xs, const std::vector& outs) const { size_t xs_size = xs.size(); + if (xs_size == 0) return; + const auto& cpu_place = platform::CPUPlace(); // alloc each tensor's start index and copy to device auto h_in_starts_mem = diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py index e3d344dca25b36..8fde8de5adff7e 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py @@ -71,8 +71,8 @@ def prune_gradient_clip(self, block, shard, ring_ids): if idx in deperate_op_idx: block._remove_op(idx, sync=False) continue - reversed_inputs = [] if op.type == "sum": + reversed_inputs = [] global_norm_sum_op_idx = idx for input_name in op.desc.input_arg_names(): if input_name not in deperated_vars: @@ -82,6 +82,28 @@ def prune_gradient_clip(self, block, shard, ring_ids): assert (len(op.desc.output_arg_names()) == 1) sum_res = op.desc.output_arg_names()[0] + # NOTE(wangxi): If we have 2 param, but sharding is 4, + # then the sum op in some cards will not have input. + # So we use fill_constant_op to set `sum_var` to zero, + # which does not affect correctness. + if len(reversed_inputs) == 0: + sum_var = block.var(sum_res) + namescope = op.attr("op_namescope") + + block._remove_op(idx, sync=False) + op = block._insert_op_without_sync( + idx, + type='fill_constant', + inputs={}, + outputs={'Out': sum_res}, + attrs={ + 'shape': sum_var.shape, + 'dtype': sum_var.dtype, + 'value': 0.0, + OP_ROLE_KEY: OpRole.Optimize + }) + op._set_attr('op_namescope', namescope) + # allreduce(mp)->allreduce(sharding)->allreduce(pp) idx_offset = 1 for ring_id in ring_ids: diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py index 1e974695ec395c..9e577ca0c670af 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/prune.py @@ -132,10 +132,6 @@ def remove_op(self, op_idx, reserved_vars=None): def should_remove_op(self, op_idx): op = self._block.ops[op_idx] - # TODO (JZ-LIANG) revise this for uniform mixed parallelism - # remove check_finite_and_unscale op if its input 'X' is empty - if op.type == 'check_finite_and_unscale' and len(op.input('X')) == 0: - return True # NOTE: At present, it is found that the OP without output is # only send_v2 and partial_send op, which will be used in From c8974c8e086ea27caae1d9bbc6986e8ce891c66d Mon Sep 17 00:00:00 2001 From: WangXi Date: Fri, 10 Sep 2021 19:39:52 +0800 Subject: [PATCH 19/23] fix sharding prune --- paddle/fluid/operators/amp/check_finite_and_unscale_op.cc | 6 ++---- paddle/fluid/operators/amp/update_loss_scaling_op.cc | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc b/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc index d104fba6d542bb..8df77d69980c1f 100644 --- a/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc +++ b/paddle/fluid/operators/amp/check_finite_and_unscale_op.cc @@ -57,8 +57,7 @@ class CheckFiniteAndUnscaleOpMaker : public framework::OpProtoAndCheckerMaker { AddInput( "X", "(Tensors) The input tensors of check_finite_and_unscale operator.") - .AsDuplicable() - .AsDispensable(); + .AsDuplicable(); AddInput("Scale", "(Tensor) 1-dim tensor, the scale of check_finite_and_unscale " "operator."); @@ -71,8 +70,7 @@ class CheckFiniteAndUnscaleOpMaker : public framework::OpProtoAndCheckerMaker { AddOutput("Out", "(Tensors) The scaled output tensor of " "check_finite_and_unscale operator.") - .AsDuplicable() - .AsDispensable(); + .AsDuplicable(); AddOutput("FoundInfinite", "(Tensor) 1-dim tensor, contains a bool scalar, which indicates " "if there there is infinite or nan item in input X."); diff --git a/paddle/fluid/operators/amp/update_loss_scaling_op.cc b/paddle/fluid/operators/amp/update_loss_scaling_op.cc index 2098674038fa2e..b974f606720b2a 100644 --- a/paddle/fluid/operators/amp/update_loss_scaling_op.cc +++ b/paddle/fluid/operators/amp/update_loss_scaling_op.cc @@ -75,8 +75,7 @@ class UpdateLossScalingOpMaker : public framework::OpProtoAndCheckerMaker { void Make() override { AddInput("X", "(Tensors) The input tensors of update_loss_scaling operator.") - .AsDuplicable() - .AsDispensable(); + .AsDuplicable(); AddInput("FoundInfinite", "(Tensor) 1-dim tensor, contains a bool scalar, which indicates " "whether there is any infinite gradient."); @@ -90,8 +89,7 @@ class UpdateLossScalingOpMaker : public framework::OpProtoAndCheckerMaker { "gradients are infinite."); AddOutput("Out", "(Tensors) The output tensor of update_loss_scaling operator.") - .AsDuplicable() - .AsDispensable(); + .AsDuplicable(); AddOutput("LossScaling", "(Tensor) 1-dim tensor, updated loss scaling."); AddOutput("OutGoodSteps", "(Tensor) 1-dim tensor, pdated good steps."); AddOutput("OutBadSteps", "(Tensor) 1-dim tensor, updated bad steps."); From 6d60e9374ac14aa252107e1254d583f571d9ecc9 Mon Sep 17 00:00:00 2001 From: WangXi Date: Fri, 10 Sep 2021 23:13:13 +0800 Subject: [PATCH 20/23] fix boundary --- .../fleet/meta_optimizers/sharding/gradient_clip_helper.py | 6 +++++- python/paddle/fluid/optimizer.py | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py index 8fde8de5adff7e..3580e85fc89c19 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/gradient_clip_helper.py @@ -39,6 +39,7 @@ def prune_gradient_clip(self, block, shard, ring_ids): if not self._is_gradient_clip_op(op): continue if op.type == "sum": + global_norm_sum_op_idx = idx continue deperate_op = False for input_name in op.desc.input_arg_names(): @@ -61,7 +62,10 @@ def prune_gradient_clip(self, block, shard, ring_ids): if output_name not in op.desc.input_arg_names(): deperated_vars.add(output_name) - if not deperated_vars: + # NOTE(wangxi): If only have 2 sharding, and 1 param. + # sharding 0 will not deperated_vars, will return, only + # sharding 1 will insert allreduce, then hang. + if not deperated_vars and global_norm_sum_op_idx == -1: # got no gradient_clip op return diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 93eca99066859c..bd5a6a26cc89ba 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5079,8 +5079,8 @@ def _accumulate_gradients(self, if self._is_backward_op(op) and first_opt_op_idx is None: first_opt_op_idx = index + 1 - # no optimize phase - if first_opt_op_idx == len(block.ops): return + # maybe have no optimize + # if first_opt_op_idx == len(block.ops): return if self._is_backward_op(op) and ( self._op_role_var_key in op.attr_names): From 6a35978f4d505b0a03f73accc9588b83903382f3 Mon Sep 17 00:00:00 2001 From: WangXi Date: Mon, 13 Sep 2021 16:07:45 +0800 Subject: [PATCH 21/23] add optimizer sharding unittest --- .../fleet/meta_optimizers/sharding/utils.py | 3 +- .../unittests/fleet_meta_optimizer_base.py | 39 ++ .../test_fleet_hybrid_meta_optimizer.py | 434 ++++++++++++++++++ .../test_fleet_sharding_meta_optimizer.py | 2 +- 4 files changed, 476 insertions(+), 2 deletions(-) create mode 100755 python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index be4d47f91f78f7..d6acf541be5472 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -526,7 +526,8 @@ def insert_reduce_ops(block, grad_in_this_device = [] for var in reduce_vars: grad_var = var - if strategy.fuse_grad_merge: + if strategy and strategy.fuse_all_reduce_ops and \ + strategy.fuse_grad_merge: # TODO(wangxi): if support fp16_allreduce, need be # 'FusedMergedGrad.cast_fp16._' grad_var = var.replace('FusedMergedGrad_', '') diff --git a/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py b/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py index 730fa4ca60d31e..fe4b60c11b1219 100755 --- a/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py +++ b/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import unittest import paddle from paddle import fluid @@ -25,6 +26,23 @@ def setUp(self): os.environ["PADDLE_TRAINER_ID"] = "1" os.environ[ "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.1:36002" + self._debug = False + + def debug_program(self, main_prog, startup_prog): + if not self._debug: return + + main_prog_ops = main_prog.global_block().ops + startup_prog_ops = startup_prog.global_block().ops + + main_prog_op_types = [op.type for op in main_prog_ops] + startup_prog_op_types = [op.type for op in startup_prog_ops] + + print("=== debug program and ops in func [{}] ===" + .format(inspect.stack()[1].function)) + print(main_prog) + print(main_prog_op_types) + print(startup_prog) + print(startup_prog_op_types) def net(self, main_prog, startup_prog): with fluid.program_guard(main_prog, startup_prog): @@ -82,6 +100,20 @@ def fc_block(input_x): strategy = paddle.distributed.fleet.DistributedStrategy() return avg_cost, strategy + def boundary_net(self, main_prog, startup_prog): + with fluid.program_guard(main_prog, startup_prog): + fleet.init(is_collective=True) + x = paddle.static.data(name='x', shape=[-1, 4], dtype='float32') + with paddle.static.device_guard('gpu:0'): + linear = paddle.nn.Linear(4, 8, bias_attr=False) + out = linear(x) + with paddle.static.device_guard('gpu:1'): + linear = paddle.nn.Linear(8, 5, bias_attr=False) + out = linear(out) + avg_cost = paddle.mean(out) + strategy = fleet.DistributedStrategy() + return avg_cost, strategy + def optimizer(self, loss, strategy, @@ -190,5 +222,12 @@ def set_strategy(self, strategy, name): "enable_offload": True, "checkpoint_shape": [256] } + elif name == "pipeline": + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4, + } else: raise NotImplementedError() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py new file mode 100755 index 00000000000000..80252e31db0265 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py @@ -0,0 +1,434 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import paddle +import paddle.static as static +import unittest + +from fleet_meta_optimizer_base import TestFleetMetaOptimizer + +paddle.enable_static() + + +class TestFleetHybridOptimizer(TestFleetMetaOptimizer): + def setUp(self): + os.environ["PADDLE_TRAINER_ID"] = "3" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = \ + "127.0.0.1:36001,127.0.0.1:36002,127.0.0.1:36003,127.0.0.1:36004" + # pre-assigned ring id + self.mp_ring_id = 0 + self.sharding_ring_id = 1 + self.dp_ring_id = 2 + self.global_ring_id = 3 + self.pp_pair_ring_id = 20 + self._debug = True + + def test_opt_sharding_with_pp(self): + train_prog, startup_prog = static.Program(), static.Program() + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + + self.set_strategy(strategy, 'pipeline') + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "_dp_as_optimizer_sharding": True, + } + strategy.fuse_all_reduce_ops = False + + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + self.debug_program(train_prog, startup_prog) + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # global, sharding, pp_send, pp_recv + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', + 'fill_constant', 'scale', 'scale', 'mean_grad', + 'cross_entropy_grad2', 'softmax_grad', 'elementwise_add_grad', + 'mul_grad', 'tanh_grad', 'elementwise_add_grad', 'mul_grad', + 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'fill_constant', 'sum', 'fill_constant', 'sum', 'fill_constant', + 'sum', 'fill_constant', 'sum', 'fill_constant', 'sum', + 'fill_constant', 'sum', 'fill_constant', 'sum', 'fill_constant', + 'sum', 'c_reduce_sum', 'c_reduce_sum', 'c_reduce_sum', + 'c_reduce_sum', 'c_reduce_sum', 'c_reduce_sum', 'c_reduce_sum', + 'c_reduce_sum', 'c_sync_comm_stream', 'momentum', 'momentum', + 'momentum', 'momentum', 'momentum', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast' + ]) + + # should has ring id for pp + created_ring_ids = [ + op.desc.attr("ring_id") for op in startup_prog_ops + if op.type == "c_comm_init" + ] + self.assertIn(self.dp_ring_id, created_ring_ids) + self.assertIn(self.pp_pair_ring_id, created_ring_ids) + + # check correctness of pp group + pp_group_waiting_prots = None + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and \ + op.desc.output_arg_names()[0] == "comm_id_0": + pp_group_waiting_prots = op.desc.attr("other_endpoints") + self.assertEqual(pp_group_waiting_prots, ['127.0.0.1:36003']) + + # check correctness of sharding group + dp_group_waiting_ports = None + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" \ + and op.desc.output_arg_names()[0] == "comm_id_3": + dp_group_waiting_ports = op.desc.attr("other_endpoints") + self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + + def test_opt_sharding_with_pp_with_allreduce_fuse(self): + train_prog, startup_prog = static.Program(), static.Program() + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + + self.set_strategy(strategy, 'pipeline') + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "_dp_as_optimizer_sharding": True, + } + strategy.fuse_all_reduce_ops = True + strategy.fuse_grad_size_in_MB = 32 + + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # global, sharding, pp_send, pp_recv + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', + 'fill_constant', 'scale', 'scale', 'mean_grad', + 'cross_entropy_grad2', 'softmax_grad', 'elementwise_add_grad', + 'mul_grad', 'tanh_grad', 'elementwise_add_grad', 'mul_grad', + 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'fill_constant', 'sum', 'fill_constant', 'sum', 'fill_constant', + 'sum', 'fill_constant', 'sum', 'fill_constant', 'sum', + 'fill_constant', 'sum', 'fill_constant', 'sum', 'fill_constant', + 'sum', 'coalesce_tensor', 'c_reduce_sum', 'coalesce_tensor', + 'c_reduce_sum', 'c_sync_comm_stream', 'momentum', 'momentum', + 'momentum', 'momentum', 'momentum', 'coalesce_tensor', + 'c_broadcast', 'coalesce_tensor', 'c_broadcast' + ]) + + def test_opt_sharding_with_pp_amp_gclip(self): + train_prog, startup_prog = static.Program(), static.Program() + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + + self.set_strategy(strategy, 'amp') + self.set_strategy(strategy, 'pipeline') + + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "_dp_as_optimizer_sharding": True, + } + strategy.fuse_all_reduce_ops = True + strategy.fuse_grad_size_in_MB = 32 + clip = paddle.fluid.clip.GradientClipByGlobalNorm(1.0) + + self.optimizer( + avg_cost, strategy, train_prog, startup_prog, grad_clip=clip) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + self.debug_program(train_prog, startup_prog) + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # global, sharding, pp_send, pp_recv + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'c_gen_nccl_id', + 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', + 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'cast', + 'tanh', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'cast', + 'tanh', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'cast', + 'tanh', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'softmax', + 'cast', 'cross_entropy2', 'mean', 'elementwise_mul', + 'fill_constant', 'scale', 'scale', 'elementwise_mul_grad', + 'mean_grad', 'cross_entropy_grad2', 'cast', 'softmax_grad', + 'elementwise_add_grad', 'mul_grad', 'cast', 'tanh_grad', 'cast', + 'elementwise_add_grad', 'mul_grad', 'cast', 'tanh_grad', 'cast', + 'elementwise_add_grad', 'mul_grad', 'cast', 'tanh_grad', 'cast', + 'elementwise_add_grad', 'mul_grad', 'cast', 'c_sync_calc_stream', + 'send_v2', 'fill_constant', 'cast', 'sum', 'fill_constant', 'cast', + 'sum', 'fill_constant', 'cast', 'sum', 'fill_constant', 'cast', + 'sum', 'fill_constant', 'cast', 'sum', 'fill_constant', 'cast', + 'sum', 'fill_constant', 'cast', 'sum', 'fill_constant', 'cast', + 'sum', 'coalesce_tensor', 'c_reduce_sum', 'coalesce_tensor', + 'c_reduce_sum', 'c_sync_comm_stream', 'check_finite_and_unscale', + 'cast', 'c_allreduce_max', 'c_allreduce_max', 'cast', + 'update_loss_scaling', 'squared_l2_norm', 'squared_l2_norm', + 'squared_l2_norm', 'squared_l2_norm', 'squared_l2_norm', 'sum', + 'c_allreduce_sum', 'c_allreduce_sum', 'sqrt', 'fill_constant', + 'elementwise_max', 'elementwise_div', 'elementwise_mul', + 'elementwise_mul', 'elementwise_mul', 'elementwise_mul', + 'elementwise_mul', 'momentum', 'momentum', 'momentum', 'momentum', + 'momentum', 'coalesce_tensor', 'c_broadcast', 'coalesce_tensor', + 'c_broadcast' + ]) + + def test_opt_sharding_with_pp_amp_gclip_fuse_gm(self): + train_prog, startup_prog = static.Program(), static.Program() + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + + self.set_strategy(strategy, 'amp') + self.set_strategy(strategy, 'pipeline') + + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "_dp_as_optimizer_sharding": True, + } + strategy.fuse_all_reduce_ops = True + strategy.fuse_grad_size_in_MB = 32 + strategy.fuse_grad_merge = True + clip = paddle.fluid.clip.GradientClipByGlobalNorm(1.0) + + self.optimizer( + avg_cost, strategy, train_prog, startup_prog, grad_clip=clip) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + self.debug_program(train_prog, startup_prog) + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # global, sharding, pp_send, pp_recv + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'c_gen_nccl_id', + 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', + 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'cast', + 'tanh', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'cast', + 'tanh', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'cast', + 'tanh', 'cast', 'cast', 'mul', 'cast', 'elementwise_add', 'softmax', + 'cast', 'cross_entropy2', 'mean', 'elementwise_mul', + 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'fill_constant', 'scale', 'scale', + 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', 'cast', + 'softmax_grad', 'elementwise_add_grad', 'mul_grad', 'cast', + 'tanh_grad', 'cast', 'elementwise_add_grad', 'mul_grad', 'cast', + 'tanh_grad', 'cast', 'elementwise_add_grad', 'mul_grad', 'cast', + 'tanh_grad', 'cast', 'elementwise_add_grad', 'mul_grad', 'cast', + 'c_sync_calc_stream', 'send_v2', 'cast', 'sum', 'cast', 'sum', + 'c_reduce_sum', 'c_reduce_sum', 'c_sync_comm_stream', + 'check_finite_and_unscale', 'cast', 'c_allreduce_max', + 'c_allreduce_max', 'cast', 'update_loss_scaling', 'squared_l2_norm', + 'squared_l2_norm', 'squared_l2_norm', 'squared_l2_norm', + 'squared_l2_norm', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', + 'sqrt', 'fill_constant', 'elementwise_max', 'elementwise_div', + 'elementwise_mul', 'elementwise_mul', 'elementwise_mul', + 'elementwise_mul', 'elementwise_mul', 'momentum', 'momentum', + 'momentum', 'momentum', 'momentum', 'coalesce_tensor', + 'c_broadcast', 'coalesce_tensor', 'c_broadcast' + ]) + + +class TestFleetHybridOptimizerBoundary(TestFleetMetaOptimizer): + def setUp(self): + os.environ["PADDLE_TRAINER_ID"] = "3" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = \ + "127.0.0.1:36001,127.0.0.1:36002,127.0.0.1:36003,127.0.0.1:36004" + # pre-assigned ring id + self.mp_ring_id = 0 + self.sharding_ring_id = 1 + self.dp_ring_id = 2 + self.global_ring_id = 3 + self.pp_pair_ring_id = 20 + self._debug = False + + def test_opt_sharding_with_pp_amp_gclip_boundary(self): + """ test optimizer sharding without parameter """ + train_prog, startup_prog = static.Program(), static.Program() + avg_cost, strategy = self.boundary_net(train_prog, startup_prog) + + self.set_strategy(strategy, 'amp') + self.set_strategy(strategy, 'pipeline') + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "_dp_as_optimizer_sharding": True, + } + strategy.fuse_all_reduce_ops = True + strategy.fuse_grad_size_in_MB = 32 + clip = paddle.fluid.clip.GradientClipByGlobalNorm(1.0) + + self.optimizer( + avg_cost, strategy, train_prog, startup_prog, grad_clip=clip) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + self.debug_program(train_prog, startup_prog) + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # global, sharding, pp_send, pp_recv + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', + 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', + 'c_comm_init', 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'matmul', 'cast', 'reduce_mean', + 'elementwise_mul', 'fill_constant', 'scale', 'scale', + 'elementwise_mul_grad', 'reduce_mean_grad', 'cast', 'matmul_grad', + 'c_sync_calc_stream', 'send_v2', 'fill_constant', 'cast', 'sum', + 'c_reduce_sum', 'c_sync_comm_stream', 'check_finite_and_unscale', + 'cast', 'c_allreduce_max', 'c_allreduce_max', 'cast', + 'update_loss_scaling', 'fill_constant', 'c_allreduce_sum', + 'c_allreduce_sum', 'sqrt', 'fill_constant', 'elementwise_max', + 'elementwise_div', 'c_broadcast' + ]) + + def test_opt_sharding_with_pp_amp_gclip_boundary_card1(self): + """ test optimizer sharding without parameter in card0 """ + os.environ["PADDLE_TRAINER_ID"] = "1" + train_prog, startup_prog = static.Program(), static.Program() + avg_cost, strategy = self.boundary_net(train_prog, startup_prog) + + self.set_strategy(strategy, 'amp') + self.set_strategy(strategy, 'pipeline') + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "_dp_as_optimizer_sharding": True, + } + strategy.fuse_all_reduce_ops = True + strategy.fuse_grad_size_in_MB = 32 + clip = paddle.fluid.clip.GradientClipByGlobalNorm(1.0) + + self.optimizer( + avg_cost, strategy, train_prog, startup_prog, grad_clip=clip) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + self.debug_program(train_prog, startup_prog) + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # global, sharding, pp_send, pp_recv + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'matmul', 'cast', 'reduce_mean', + 'elementwise_mul', 'fill_constant', 'scale', 'scale', + 'elementwise_mul_grad', 'reduce_mean_grad', 'cast', 'matmul_grad', + 'c_sync_calc_stream', 'send_v2', 'fill_constant', 'cast', 'sum', + 'c_reduce_sum', 'c_sync_comm_stream', 'check_finite_and_unscale', + 'cast', 'c_allreduce_max', 'c_allreduce_max', 'cast', + 'update_loss_scaling', 'squared_l2_norm', 'sum', 'c_allreduce_sum', + 'c_allreduce_sum', 'sqrt', 'fill_constant', 'elementwise_max', + 'elementwise_div', 'elementwise_mul', 'momentum', 'c_broadcast' + ]) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index c462896eed22d1..34f9cbeb8d4ed1 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -613,7 +613,7 @@ def test_sharding_dp_with_allreduce_fuse(self): for op in main_prog_ops: if op.type == 'c_allreduce_sum': - assert 'FusedOutput' in op.input_arg_names[0] + assert 'FusedGrad' in op.input_arg_names[0] def test_hybrid_with_mp_pp_amp_gclip(self): train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( From b042db42348a5b83a658bfe6e68b1240b5b82f9f Mon Sep 17 00:00:00 2001 From: WangXi Date: Mon, 13 Sep 2021 20:47:35 +0800 Subject: [PATCH 22/23] fix ci --- python/paddle/fluid/tests/unittests/CMakeLists.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 9f32443e757620..eecfda76ae2016 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -70,6 +70,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_amp_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_amp_init) list(APPEND MIXED_DIST_TEST_OPS test_fleet_gradient_merge_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_sharding_meta_optimizer) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_hybrid_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_localsgd_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_lars_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_lamb_meta_optimizer) @@ -568,6 +569,7 @@ if(WITH_DISTRIBUTE) py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS}) py_test_modules(test_fleet_gradient_merge_meta_optimizer MODULES test_fleet_gradient_merge_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_sharding_meta_optimizer MODULES test_fleet_sharding_meta_optimizer ENVS ${dist_ENVS}) + py_test_modules(test_fleet_hybrid_meta_optimizer MODULES test_fleet_hybrid_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_amp_meta_optimizer MODULES test_fleet_amp_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_amp_init MODULES test_fleet_amp_init ENVS ${dist_ENVS}) py_test_modules(test_fleet_fp16_allreduce_meta_optimizer MODULES test_fleet_fp16_allreduce_meta_optimizer ENVS ${dist_ENVS}) From f5a55971056db6397ebdbcc0dec3728633476186 Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 14 Sep 2021 10:28:30 +0800 Subject: [PATCH 23/23] fix develop Linear matmul_v2 --- .../paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py | 4 ++-- .../fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py b/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py index fe4b60c11b1219..6150df5c29a9b5 100755 --- a/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py +++ b/python/paddle/fluid/tests/unittests/fleet_meta_optimizer_base.py @@ -105,10 +105,10 @@ def boundary_net(self, main_prog, startup_prog): fleet.init(is_collective=True) x = paddle.static.data(name='x', shape=[-1, 4], dtype='float32') with paddle.static.device_guard('gpu:0'): - linear = paddle.nn.Linear(4, 8, bias_attr=False) + linear = fluid.Linear(4, 8, bias_attr=False) out = linear(x) with paddle.static.device_guard('gpu:1'): - linear = paddle.nn.Linear(8, 5, bias_attr=False) + linear = fluid.Linear(8, 5, bias_attr=False) out = linear(out) avg_cost = paddle.mean(out) strategy = fleet.DistributedStrategy() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py index 80252e31db0265..a832bf8adfcbc0 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_hybrid_meta_optimizer.py @@ -33,7 +33,7 @@ def setUp(self): self.dp_ring_id = 2 self.global_ring_id = 3 self.pp_pair_ring_id = 20 - self._debug = True + self._debug = False def test_opt_sharding_with_pp(self): train_prog, startup_prog = static.Program(), static.Program()