From 037d6473aa3162125b5bb4f7349e27eb35cf3ce7 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 13 Aug 2021 12:08:49 +0800 Subject: [PATCH 01/32] fuse grad startup --- .../framework/distributed_strategy.proto | 1 + .../fleet/base/distributed_strategy.py | 22 + .../fleet/meta_optimizers/sharding/utils.py | 4 +- .../meta_optimizers/sharding_optimizer.py | 5 +- python/paddle/fluid/optimizer.py | 546 +++++++++++++++++- 5 files changed, 575 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 546b9d2601df57..1d443ebc39a0ef 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -200,6 +200,7 @@ message DistributedStrategy { optional int32 fuse_grad_size_in_num = 31 [ default = 8 ]; optional bool calc_comm_same_stream = 32 [ default = false ]; optional bool asp = 33 [ default = false ]; + optional bool fuse_param_grad = 34 [ default = false ]; optional RecomputeConfig recompute_configs = 101; optional AMPConfig amp_configs = 102; diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index d43292ddbd32e9..49229c8350821c 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -967,6 +967,28 @@ def _calc_comm_same_stream(self, same): "WARNING: calc_comm_same_stream should have value of boolean type" ) + @property + def fuse_param_grad(self): + """ + Set whether fuse the param and grad. + Note: this flag only supports adam optimizer with GlobalGradClip + The default value for the fuse_param_grad is False + Examples: + .. code-block:: python + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + strategy.fuse_param_grad = True + """ + return self.strategy.fuse_param_grad + + @fuse_param_grad.setter + @is_strict_auto + def fuse_param_grad(self, fuse_param_grad): + if isinstance(fuse_param_grad, bool): + self.strategy.fuse_param_grad = fuse_param_grad + else: + print("WARNING: fuse_param_grad should have value of boolean type") + @property def fuse_grad_size_in_num(self): """ diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 52ef843aa0d751..b3c353b23c3484 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -341,7 +341,9 @@ def insert_allreduce_ops(block, if len(allreduce_vars) == 0: return - if user_defined_strategy and user_defined_strategy.fuse_all_reduce_ops: + if user_defined_strategy and \ + user_defined_strategy.fuse_all_reduce_ops and \ + not user_defined_strategy.fuse_param_grad: insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars, op_role, use_calc_stream, user_defined_strategy.fuse_grad_size_in_MB) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 5c2f24054f835c..0a2eff6628fb67 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -319,7 +319,10 @@ def _insert_allreduce_for_pp(self): main_block._remove_op(idx) accumulated_grad_names = self._pp_optimizer._accumulate_gradients( - main_block, fp16_allreduce=fp16_allreduce) + main_block, + startup_block, + fp16_allreduce=fp16_allreduce, + user_defined_strategy=strategy) len_of_ops = len(main_block.ops) first_optimize_op_index = get_first_optimize_op_idx(main_block) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 9e87681c4bef30..2e6f073ec1048d 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5036,12 +5036,20 @@ def _rename_gradient_var_name(self, block): def _accumulate_gradients(self, block, + startup_block, pp_allreduce_in_optimize=False, - fp16_allreduce=False): + fp16_allreduce=False, + user_defined_strategy=None): """ Create a new merged gradient for each parameter and accumulate the corresponding gradient to it. """ + if user_defined_strategy.fuse_param_grad: + fused_gradient_names, fused_param_names = self._accumulate_gradients_with_fuse( + block, startup_block, fp16_allreduce, + user_defined_strategy.fuse_grad_size_in_MB) + return fused_gradient_names + merged_gradient_names = [] first_opt_op_idx = None @@ -5171,6 +5179,542 @@ def _accumulate_gradients(self, return merged_gradient_names + def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, + fused_size): + first_opt_op_idx = None + grad_param_pairs = [] + # obtain all param/grad pairs that needed to be fused + for index, op in reversed(tuple(enumerate(list(main_block.ops)))): + # remove the cast op of fp16 grad to fp32 grad + if self._is_optimize_op(op) and op.type == 'cast': + in_name = op.input_arg_names[0] + out_name = op.output_arg_names[0] + if out_name.strip('@GRAD') in self._param_device_map: + assert in_name.replace('.cast_fp16', '') == out_name + main_block._remove_op(index) + continue + + if self._is_backward_op(op) and first_opt_op_idx is None: + first_opt_op_idx = index + 1 + # no optimize phase + if first_opt_op_idx == len(main_block.ops): + return + + if self._is_backward_op(op) and ( + self._op_role_var_key in op.attr_names): + op_role_var = op.attr(self._op_role_var_key) + if len(op_role_var) == 0: + continue + assert len(op_role_var) % 2 == 0 + for i in range(0, len(op_role_var), 2): + param_name = op_role_var[i] + if not main_block.has_var(param_name): + continue + if '@BroadCast' in param_name: + continue + grad_param_pairs.append( + (op_role_var[i + 1], op_role_var[i])) + + if len(grad_param_pairs) == 0: + return + + # allocate each gard/param paris in different segments + # structure of grad_param_segments is + # [([grad0, grad1], [param0, param1]), ([grad2, grad3], [param2, param3])] + # each entry of the list is a tuple stores the grads segment list and + # the corresponding params segment list + # Two strategy determine create new segment: + # 1. current segment's size is reach the limits (defined by fuse_grad_size_in_MB) + # 2. new grad has different weight decay performance + grad_param_segments = [] + cur_size = 0. + last_dtype = None + # split the grad based on dtype and fused size + for grad, param in grad_param_pairs: + real_grad = main_block.var(grad) + real_param = main_block.var(param) + tmp_size = self._get_var_size(real_grad) + if len(grad_param_segments) == 0 \ + or cur_size + tmp_size > fused_size \ + or real_grad.dtype != last_dtype: + grad_param_segments.append(([real_grad], [real_param])) + last_dtype = real_grad.dtype + cur_size = 0. + else: + grad_param_segments[-1][0].append(real_grad) + grad_param_segments[-1][1].append(real_param) + cur_size += tmp_size + + merged_gradients = [] + merged_params = [] + # create fused vars for grad and param + for grad_param_segment in grad_param_segments: + grad_segment = grad_param_segment[0] + param_segment = grad_param_segment[1] + shape = 0 + for i in range(len(grad_segment)): + assert grad_segment[i].shape == param_segment[i].shape + shape += reduce((lambda x, y: x * y), grad_segment[i].shape) + grad_fused = main_block.create_var( + name='FusedOutput_{}'.format(grad_segment[0].name), + dtype=grad_segment[0].dtype, + shape=[shape], + persistable=True, + stop_gradient=True) + param_fused_startup = startup_block.create_var( + name='FusedInput_{}'.format(param_segment[0].name), + dtype=param_segment[0].dtype, + shape=[shape], + persistable=True, + stop_gradient=True) + param_fused_main = main_block.create_var( + name='FusedInput_{}'.format(param_segment[0].name), + dtype=param_segment[0].dtype, + shape=[shape], + persistable=True, + stop_gradient=True) + merged_gradients.append(grad_fused) + merged_params.append(param_fused_startup) + + assert len(merged_gradients) == len(grad_param_segments) + assert len(merged_params) == len(grad_param_segments) + + # insert coalesce op to init fused values + first_opt_op_idx = None + for index, op in reversed(tuple(enumerate(list(main_block.ops)))): + if self._is_backward_op(op) and first_opt_op_idx is None: + first_opt_op_idx = index + 1 + break + assert first_opt_op_idx is not None + offset = 0 + fuse_param_pos = len(startup_block.ops) + for i in range(len(grad_param_segments)): + fused_grad = merged_gradients[i] + fused_param = merged_params[i] + grads = grad_param_segments[i][0] + params = grad_param_segments[i][1] + main_block._insert_op_without_sync( + first_opt_op_idx + offset, + type="coalesce_tensor", + inputs={"Input": grads}, + outputs={"Output": grads, + "FusedOutput": fused_grad}, + attrs={ + "copy_data": True, + "use_align": True, + "dtype": grads[0].dtype, + self._op_role_key: self._op_role.Backward + }) + startup_block._insert_op_without_sync( + fuse_param_pos + offset, + type="coalesce_tensor", + inputs={"Input": params}, + outputs={"Output": params, + "FusedOutput": fused_param}, + attrs={ + "copy_data": True, + "use_align": True, + "dtype": params[0].dtype, + self._op_role_key: self._op_role.Forward + }) + offset += 1 + + first_opt_op_idx += offset + offset = 0 + merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' + dtype = paddle.float16 if fp16 else None + merged_gradients_tmp = [] + for i in range(len(merged_gradients)): + param = merged_params[i] + grad_var = merged_gradients[i] + assert param.shape == grad_var.shape + grad_name = grad_var.name + param_grad_name = param.name + core.grad_var_suffix() + merged_param_grad_name = param_grad_name + merged_suffix + merged_param_grad_var = main_block.create_var( + name=merged_param_grad_name, + dtype=dtype if dtype else param.dtype, + shape=param.shape, + persistable=True, + stop_gradient=True) + + main_block._insert_op( + index=first_opt_op_idx + offset, + type='fill_constant', + inputs={}, + outputs={'Out': [merged_param_grad_var]}, + attrs={ + 'shape': merged_param_grad_var.shape, + 'dtype': merged_param_grad_var.dtype, + 'value': float(0), + self._op_role_key: self._op_role.Optimize.LRSched, + }) + offset += 1 + + is_fp16_grad = 'cast_fp16' in grad_name + need_cast = (is_fp16_grad is not fp16) + + if need_cast: + cast_grad_var_name = param_grad_name + '@TMP' + cast_grad_var = main_block.create_var( + name=cast_grad_var_name, + dtype=dtype if dtype else merged_param_grad_var.dtype, + shape=grad_var.shape, + persistable=False, + stop_gradient=True) + main_block._insert_op( + index=first_opt_op_idx + offset, + type='cast', + inputs={'X': grad_var}, + outputs={'Out': cast_grad_var}, + attrs={ + 'in_dtype': grad_var.dtype, + 'out_dtype': cast_grad_var.dtype, + self._op_role_key: self._op_role.Backward, + }) + offset += 1 + grad_var = cast_grad_var + + assert merged_param_grad_var.shape == grad_var.shape + main_block._insert_op( + index=first_opt_op_idx + offset, + type='sum', + inputs={'X': [merged_param_grad_var, grad_var]}, + outputs={'Out': merged_param_grad_var}, + attrs={self._op_role_key: self._op_role.Backward, }) + offset += 1 + merged_gradients_tmp.append(merged_param_grad_var) + merged_gradients = merged_gradients_tmp + + # insert the fused grad to check_finite_and_unscale and update_loss_scaling + for index, op in reversed(tuple(enumerate(list(main_block.ops)))): + if op.type == 'check_finite_and_unscale': + extend_x = [] + for grad in merged_gradients: + extend_x.append(grad.name) + op.desc.set_input('X', extend_x) + op.desc.set_output('Out', extend_x) + + if op.type == 'update_loss_scaling': + extend_x = [] + for grad in merged_gradients: + extend_x.append(grad.name) + op.desc.set_input('X', extend_x) + op.desc.set_output('Out', extend_x) + + fp32_gradients = None + if fp16: + # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 + fp32_gradients = [] + for fp16_grad in merged_gradients: + fp32_grad = main_block.create_var( + name=fp16_grad.name + '@BACK@FP32', + dtype=paddle.float32, + shape=fp16_grad.shape, + persistable=False, + stop_gradient=True) + main_block._insert_op( + index=first_opt_op_idx + offset, + type='cast', + inputs={'X': fp16_grad}, + outputs={'Out': fp32_grad}, + attrs={ + 'in_dtype': paddle.float16, + 'out_dtype': paddle.float32, + self._op_role_key: self._op_role.Optimize, + }) + offset += 1 + fp32_gradients.append(fp32_grad) + + # prune the grad clip ops and adam optimizer opds + self._prune_grad_clip(main_block, merged_gradients + if not fp32_gradients else fp32_gradients) + self._prune_adam_optimizer(main_block, startup_block, merged_params, + merged_gradients + if not fp32_gradients else fp32_gradients) + + # repalce the var with it's name + for i in range(len(merged_gradients)): + merged_gradients[i] = merged_gradients[i].name + merged_params[i] = merged_params[i].name + + return merged_gradients, merged_params + + def _prune_adam_optimizer(self, main_block, startup_block, merged_params, + merged_gradients): + # remove adam op then reinsert them with fused param and grad + removed_idx = [] + insert_idx = None + lr = None + beta1 = None + beta2 = None + epsilon = None + for idx, op in enumerate(main_block.ops): + if op.type == 'adam': + if insert_idx is None: + lr = op.input('LearningRate')[0] + beta1 = op.attr('beta1') + beta2 = op.attr('beta2') + epsilon = op.attr('epsilon') + insert_idx = idx + removed_idx.append(idx) + if len(removed_idx) == 0: + return + assert insert_idx is not None + removed_idx = sorted(removed_idx, reverse=True) + for idx in removed_idx: + main_block._remove_op(idx, sync=False) + + beta1_suffix = '_beta1_pow_acc_0' + beta2_suffix = '_beta2_pow_acc_0' + moment1_suffix = '_moment1_0' + moment2_suffix = '_moment2_0' + offset = 0 + for i in range(len(merged_gradients)): + grad = merged_gradients[i] + param = merged_params[i] + moment1 = self._insert_adam_var_helper(main_block, startup_block, + grad.name + moment1_suffix, + grad.dtype, grad.shape) + moment2 = self._insert_adam_var_helper(main_block, startup_block, + grad.name + moment2_suffix, + grad.dtype, grad.shape) + beta1_pow_acc = self._insert_adam_var_helper( + main_block, startup_block, grad.name + beta1_suffix, grad.dtype, + [1], 0.9) + beta2_pow_acc = self._insert_adam_var_helper( + main_block, startup_block, grad.name + beta2_suffix, grad.dtype, + [1], 0.999) + inputs = { + "Param": [param], + "Grad": [grad], + "LearningRate": [lr], + "Moment1": [moment1], + "Moment2": [moment2], + "Beta1Pow": [beta1_pow_acc], + "Beta2Pow": [beta2_pow_acc] + } + + outputs = { + "ParamOut": [param], + "Moment1Out": [moment1], + "Moment2Out": [moment2], + "Beta1PowOut": [beta1_pow_acc], + "Beta2PowOut": [beta2_pow_acc], + } + attrs = { + "lazy_mode": False, + "min_row_size_to_use_multithread": 1000, + 'use_global_beta_pow': False, + 'beta1': beta1, + 'beta2': beta2, + 'epsilon': epsilon, + self._op_role_key: self._op_role.Optimize + } + main_block._insert_op( + index=insert_idx + offset, + type='adam', + inputs=inputs, + outputs=outputs, + attrs=attrs) + return + + def _insert_adam_var_helper(self, + main_block, + startup_block, + name, + dtype, + shape, + value=0.): + startup_block.create_var( + name=name, + dtype=dtype, + shape=shape, + persistable=True, + stop_gradient=False, + initializer=Constant(value=float(value))) + return main_block.create_var( + name=name, + dtype=dtype, + shape=shape, + persistable=True, + stop_gradient=False) + + def _prune_grad_clip(self, main_block, merged_gradients): + # remove ops related with ClipGradByGlobalNorm + # squared_l2_norm, sum, sqrt, fill_constant, elementwise_{max/div/mul} + removed_idx = [] + removed_list = [ + 'squared_l2_norm', 'sum', 'sqrt', 'fill_constant', + 'elementwise_max', 'elementwise_div', 'elementwise_mul' + ] + insert_idx = None + for idx, op in enumerate(main_block.ops): + if op.type == 'update_loss_scaling' and insert_idx is None: + insert_idx = idx + 1 + continue + if op.type in removed_list and insert_idx is not None and idx >= insert_idx: + removed_idx.append(idx) + if len(removed_idx) == 0: + return + assert insert_idx is not None + removed_idx = sorted(removed_idx, reverse=True) + for idx in removed_idx: + main_block._remove_op(idx, sync=False) + + with framework.name_scope('gradient_clip'): + # namespace for gradient clip + squared_l2_norm_tmp = [] + offset = 0 + l2_norm_prefix = 'squared_l2_norm_' + # insert squared l2 norm op + for idx, grad in enumerate(merged_gradients): + squared_l2_norm_var = main_block.create_var( + name=l2_norm_prefix + str(idx), + dtype=grad.dtype, + shape=[1], + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=insert_idx + offset, + type='squared_l2_norm', + inputs={'X': grad}, + outputs={'Out': squared_l2_norm_var}, + attrs={self._op_role_key: self._op_role.Optimize, }) + offset += 1 + squared_l2_norm_tmp.append(squared_l2_norm_var) + + # insert sum op + sum_rst = main_block.create_var( + name=unique_name.generate('gradient_clip_sum'), + dtype=squared_l2_norm_tmp[0].dtype, + shape=[1], + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=insert_idx + offset, + type='sum', + inputs={'X': squared_l2_norm_tmp}, + outputs={'Out': sum_rst}, + attrs={ + self._op_role_key: self._op_role.Optimize, + 'use_mkldnn': False + }) + offset += 1 + + # insert sqrt op + sqrt_rst = main_block.create_var( + name=unique_name.generate('gradient_clip_sqrt'), + dtype=sum_rst.dtype, + shape=[1], + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=insert_idx + offset, + type='sqrt', + inputs={'X': sum_rst}, + outputs={'Out': sqrt_rst}, + attrs={ + self._op_role_key: self._op_role.Optimize, + 'use_mkldnn': False, + 'use_cudnn': False + }) + offset += 1 + + # insert fill constant op + fill_constant_rst = main_block.create_var( + name=unique_name.generate('gradient_clip_fill_constant'), + dtype=sqrt_rst.dtype, + shape=[1], + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=insert_idx + offset, + type='fill_constant', + inputs={}, + outputs={'Out': [fill_constant_rst]}, + attrs={ + 'shape': fill_constant_rst.shape, + 'dtype': fill_constant_rst.dtype, + 'value': float(1.0), + self._op_role_key: self._op_role.Optimize, + }) + offset += 1 + + # insert elementwise_max op + elementwise_max_rst = main_block.create_var( + name=unique_name.generate('gradient_clip_elementwise_max'), + dtype=fill_constant_rst.dtype, + shape=[1], + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=insert_idx + offset, + type='elementwise_max', + inputs={'X': sqrt_rst, + 'Y': fill_constant_rst}, + outputs={'Out': [elementwise_max_rst]}, + attrs={ + self._op_role_key: self._op_role.Optimize, + 'use_mkldnn': False, + 'use_quantizer': False, + 'axis': -1 + }) + offset += 1 + + # insert elementwise_div op + elementwise_div_rst = main_block.create_var( + name=unique_name.generate('gradient_clip_elementwise_div'), + dtype=elementwise_max_rst.dtype, + shape=[1], + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=insert_idx + offset, + type='elementwise_div', + inputs={'X': fill_constant_rst, + 'Y': elementwise_max_rst}, + outputs={'Out': [elementwise_div_rst]}, + attrs={ + self._op_role_key: self._op_role.Optimize, + 'use_mkldnn': False, + 'use_quantizer': False, + 'axis': -1 + }) + offset += 1 + + # insert elementwise_mul op + for idx, grad in enumerate(merged_gradients): + main_block._insert_op( + index=insert_idx + offset, + type='elementwise_mul', + inputs={'X': grad, + 'Y': elementwise_div_rst}, + outputs={'Out': grad}, + attrs={ + self._op_role_key: self._op_role.Optimize, + 'use_mkldnn': False, + 'use_quantizer': False, + 'axis': -1 + }) + offset += 1 + return + + def _get_var_size(self, var): + dtype_to_size = { + core.VarDesc.VarType.FP16: 2, + core.VarDesc.VarType.FP32: 4, + core.VarDesc.VarType.FP64: 8, + core.VarDesc.VarType.INT16: 2, + core.VarDesc.VarType.INT32: 4, + core.VarDesc.VarType.INT64: 8, + core.VarDesc.VarType.BOOL: 1, + core.VarDesc.VarType.UINT8: 1, + } + assert -1 not in var.shape + return reduce(lambda x, y: x * y, + var.shape) * dtype_to_size[var.dtype] / 1024.0 / 1024.0 + def _add_sub_blocks(self, main_block, program_list): main_program = main_block.program for prog in program_list: From c9c9e843c6541bd4b77c3d5f1b0f5c033e5de5cd Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 17 Aug 2021 12:33:41 +0800 Subject: [PATCH 02/32] remove shape, move fused_merged_grad to startup --- python/paddle/fluid/optimizer.py | 397 ++++--------------------------- 1 file changed, 43 insertions(+), 354 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 2e6f073ec1048d..102bdc8ecfc10a 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5045,7 +5045,7 @@ def _accumulate_gradients(self, corresponding gradient to it. """ if user_defined_strategy.fuse_param_grad: - fused_gradient_names, fused_param_names = self._accumulate_gradients_with_fuse( + fused_gradient_names = self._accumulate_gradients_with_fuse( block, startup_block, fp16_allreduce, user_defined_strategy.fuse_grad_size_in_MB) return fused_gradient_names @@ -5223,9 +5223,6 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, # [([grad0, grad1], [param0, param1]), ([grad2, grad3], [param2, param3])] # each entry of the list is a tuple stores the grads segment list and # the corresponding params segment list - # Two strategy determine create new segment: - # 1. current segment's size is reach the limits (defined by fuse_grad_size_in_MB) - # 2. new grad has different weight decay performance grad_param_segments = [] cur_size = 0. last_dtype = None @@ -5245,39 +5242,34 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, grad_param_segments[-1][1].append(real_param) cur_size += tmp_size - merged_gradients = [] - merged_params = [] + merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' + fused_gradients = [] + fused_merged_gradients = [] # create fused vars for grad and param for grad_param_segment in grad_param_segments: grad_segment = grad_param_segment[0] - param_segment = grad_param_segment[1] - shape = 0 - for i in range(len(grad_segment)): - assert grad_segment[i].shape == param_segment[i].shape - shape += reduce((lambda x, y: x * y), grad_segment[i].shape) - grad_fused = main_block.create_var( - name='FusedOutput_{}'.format(grad_segment[0].name), + fused_grad = main_block.create_var( + name='FusedGrad_{}'.format(grad_segment[0].name), dtype=grad_segment[0].dtype, - shape=[shape], - persistable=True, + persistable=False, stop_gradient=True) - param_fused_startup = startup_block.create_var( - name='FusedInput_{}'.format(param_segment[0].name), - dtype=param_segment[0].dtype, - shape=[shape], + fused_merged_grad = main_block.create_var( + name='FusedMergedGrad_{}'.format(grad_segment[0].name) + + merged_suffix, + dtype=grad_segment[0].dtype, persistable=True, stop_gradient=True) - param_fused_main = main_block.create_var( - name='FusedInput_{}'.format(param_segment[0].name), - dtype=param_segment[0].dtype, - shape=[shape], + fused_merged_grad_main = main_block.create_var( + name='FusedMergedGrad_{}'.format(grad_segment[0].name) + + merged_suffix, + dtype=grad_segment[0].dtype, persistable=True, stop_gradient=True) - merged_gradients.append(grad_fused) - merged_params.append(param_fused_startup) + fused_gradients.append(fused_grad) + fused_merged_gradients.append(fused_merged_grad) - assert len(merged_gradients) == len(grad_param_segments) - assert len(merged_params) == len(grad_param_segments) + assert len(fused_gradients) == len(grad_param_segments) + assert len(fused_merged_gradients) == len(grad_param_segments) # insert coalesce op to init fused values first_opt_op_idx = None @@ -5289,8 +5281,8 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, offset = 0 fuse_param_pos = len(startup_block.ops) for i in range(len(grad_param_segments)): - fused_grad = merged_gradients[i] - fused_param = merged_params[i] + fused_grad = fused_gradients[i] + fused_merged_grad = fused_merged_gradients[i] grads = grad_param_segments[i][0] params = grad_param_segments[i][1] main_block._insert_op_without_sync( @@ -5300,7 +5292,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, outputs={"Output": grads, "FusedOutput": fused_grad}, attrs={ - "copy_data": True, + "copy_data": False, "use_align": True, "dtype": grads[0].dtype, self._op_role_key: self._op_role.Backward @@ -5309,104 +5301,69 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, fuse_param_pos + offset, type="coalesce_tensor", inputs={"Input": params}, - outputs={"Output": params, - "FusedOutput": fused_param}, + outputs={"Output": grads, + "FusedOutput": fused_merged_grad}, attrs={ - "copy_data": True, + "copy_data": False, "use_align": True, "dtype": params[0].dtype, self._op_role_key: self._op_role.Forward }) offset += 1 + dtype = paddle.float16 if fp16 else None first_opt_op_idx += offset offset = 0 - merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' - dtype = paddle.float16 if fp16 else None - merged_gradients_tmp = [] - for i in range(len(merged_gradients)): - param = merged_params[i] - grad_var = merged_gradients[i] - assert param.shape == grad_var.shape - grad_name = grad_var.name - param_grad_name = param.name + core.grad_var_suffix() - merged_param_grad_name = param_grad_name + merged_suffix - merged_param_grad_var = main_block.create_var( - name=merged_param_grad_name, - dtype=dtype if dtype else param.dtype, - shape=param.shape, - persistable=True, - stop_gradient=True) - + for i in range(len(fused_gradients)): + fused_grad = fused_gradients[i] + fused_merged_grad = fused_merged_gradients[i] main_block._insert_op( index=first_opt_op_idx + offset, type='fill_constant', inputs={}, - outputs={'Out': [merged_param_grad_var]}, + outputs={'Out': [fused_merged_grad]}, attrs={ - 'shape': merged_param_grad_var.shape, - 'dtype': merged_param_grad_var.dtype, + 'dtype': fused_merged_grad.dtype, 'value': float(0), self._op_role_key: self._op_role.Optimize.LRSched, }) offset += 1 - is_fp16_grad = 'cast_fp16' in grad_name + is_fp16_grad = 'cast_fp16' in fused_grad need_cast = (is_fp16_grad is not fp16) if need_cast: - cast_grad_var_name = param_grad_name + '@TMP' + cast_grad_var_name = fused_grad.name + '@TMP' cast_grad_var = main_block.create_var( name=cast_grad_var_name, - dtype=dtype if dtype else merged_param_grad_var.dtype, - shape=grad_var.shape, + dtype=dtype if dtype else fused_merged_grad.dtype, persistable=False, stop_gradient=True) main_block._insert_op( index=first_opt_op_idx + offset, type='cast', - inputs={'X': grad_var}, + inputs={'X': fused_grad}, outputs={'Out': cast_grad_var}, attrs={ - 'in_dtype': grad_var.dtype, + 'in_dtype': fused_grad.dtype, 'out_dtype': cast_grad_var.dtype, self._op_role_key: self._op_role.Backward, }) offset += 1 - grad_var = cast_grad_var - - assert merged_param_grad_var.shape == grad_var.shape + fused_grad = cast_grad_var main_block._insert_op( index=first_opt_op_idx + offset, type='sum', - inputs={'X': [merged_param_grad_var, grad_var]}, - outputs={'Out': merged_param_grad_var}, + inputs={'X': [fused_merged_grad, fused_grad]}, + outputs={'Out': fused_merged_grad}, attrs={self._op_role_key: self._op_role.Backward, }) offset += 1 - merged_gradients_tmp.append(merged_param_grad_var) - merged_gradients = merged_gradients_tmp - - # insert the fused grad to check_finite_and_unscale and update_loss_scaling - for index, op in reversed(tuple(enumerate(list(main_block.ops)))): - if op.type == 'check_finite_and_unscale': - extend_x = [] - for grad in merged_gradients: - extend_x.append(grad.name) - op.desc.set_input('X', extend_x) - op.desc.set_output('Out', extend_x) - - if op.type == 'update_loss_scaling': - extend_x = [] - for grad in merged_gradients: - extend_x.append(grad.name) - op.desc.set_input('X', extend_x) - op.desc.set_output('Out', extend_x) fp32_gradients = None if fp16: # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 fp32_gradients = [] - for fp16_grad in merged_gradients: + for fp16_grad in fused_merged_gradients: fp32_grad = main_block.create_var( name=fp16_grad.name + '@BACK@FP32', dtype=paddle.float32, @@ -5426,279 +5383,11 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, offset += 1 fp32_gradients.append(fp32_grad) - # prune the grad clip ops and adam optimizer opds - self._prune_grad_clip(main_block, merged_gradients - if not fp32_gradients else fp32_gradients) - self._prune_adam_optimizer(main_block, startup_block, merged_params, - merged_gradients - if not fp32_gradients else fp32_gradients) - # repalce the var with it's name - for i in range(len(merged_gradients)): - merged_gradients[i] = merged_gradients[i].name - merged_params[i] = merged_params[i].name - - return merged_gradients, merged_params - - def _prune_adam_optimizer(self, main_block, startup_block, merged_params, - merged_gradients): - # remove adam op then reinsert them with fused param and grad - removed_idx = [] - insert_idx = None - lr = None - beta1 = None - beta2 = None - epsilon = None - for idx, op in enumerate(main_block.ops): - if op.type == 'adam': - if insert_idx is None: - lr = op.input('LearningRate')[0] - beta1 = op.attr('beta1') - beta2 = op.attr('beta2') - epsilon = op.attr('epsilon') - insert_idx = idx - removed_idx.append(idx) - if len(removed_idx) == 0: - return - assert insert_idx is not None - removed_idx = sorted(removed_idx, reverse=True) - for idx in removed_idx: - main_block._remove_op(idx, sync=False) - - beta1_suffix = '_beta1_pow_acc_0' - beta2_suffix = '_beta2_pow_acc_0' - moment1_suffix = '_moment1_0' - moment2_suffix = '_moment2_0' - offset = 0 - for i in range(len(merged_gradients)): - grad = merged_gradients[i] - param = merged_params[i] - moment1 = self._insert_adam_var_helper(main_block, startup_block, - grad.name + moment1_suffix, - grad.dtype, grad.shape) - moment2 = self._insert_adam_var_helper(main_block, startup_block, - grad.name + moment2_suffix, - grad.dtype, grad.shape) - beta1_pow_acc = self._insert_adam_var_helper( - main_block, startup_block, grad.name + beta1_suffix, grad.dtype, - [1], 0.9) - beta2_pow_acc = self._insert_adam_var_helper( - main_block, startup_block, grad.name + beta2_suffix, grad.dtype, - [1], 0.999) - inputs = { - "Param": [param], - "Grad": [grad], - "LearningRate": [lr], - "Moment1": [moment1], - "Moment2": [moment2], - "Beta1Pow": [beta1_pow_acc], - "Beta2Pow": [beta2_pow_acc] - } + for i in range(len(fused_merged_gradients)): + fused_merged_gradients[i] = fused_merged_gradients[i].name - outputs = { - "ParamOut": [param], - "Moment1Out": [moment1], - "Moment2Out": [moment2], - "Beta1PowOut": [beta1_pow_acc], - "Beta2PowOut": [beta2_pow_acc], - } - attrs = { - "lazy_mode": False, - "min_row_size_to_use_multithread": 1000, - 'use_global_beta_pow': False, - 'beta1': beta1, - 'beta2': beta2, - 'epsilon': epsilon, - self._op_role_key: self._op_role.Optimize - } - main_block._insert_op( - index=insert_idx + offset, - type='adam', - inputs=inputs, - outputs=outputs, - attrs=attrs) - return - - def _insert_adam_var_helper(self, - main_block, - startup_block, - name, - dtype, - shape, - value=0.): - startup_block.create_var( - name=name, - dtype=dtype, - shape=shape, - persistable=True, - stop_gradient=False, - initializer=Constant(value=float(value))) - return main_block.create_var( - name=name, - dtype=dtype, - shape=shape, - persistable=True, - stop_gradient=False) - - def _prune_grad_clip(self, main_block, merged_gradients): - # remove ops related with ClipGradByGlobalNorm - # squared_l2_norm, sum, sqrt, fill_constant, elementwise_{max/div/mul} - removed_idx = [] - removed_list = [ - 'squared_l2_norm', 'sum', 'sqrt', 'fill_constant', - 'elementwise_max', 'elementwise_div', 'elementwise_mul' - ] - insert_idx = None - for idx, op in enumerate(main_block.ops): - if op.type == 'update_loss_scaling' and insert_idx is None: - insert_idx = idx + 1 - continue - if op.type in removed_list and insert_idx is not None and idx >= insert_idx: - removed_idx.append(idx) - if len(removed_idx) == 0: - return - assert insert_idx is not None - removed_idx = sorted(removed_idx, reverse=True) - for idx in removed_idx: - main_block._remove_op(idx, sync=False) - - with framework.name_scope('gradient_clip'): - # namespace for gradient clip - squared_l2_norm_tmp = [] - offset = 0 - l2_norm_prefix = 'squared_l2_norm_' - # insert squared l2 norm op - for idx, grad in enumerate(merged_gradients): - squared_l2_norm_var = main_block.create_var( - name=l2_norm_prefix + str(idx), - dtype=grad.dtype, - shape=[1], - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=insert_idx + offset, - type='squared_l2_norm', - inputs={'X': grad}, - outputs={'Out': squared_l2_norm_var}, - attrs={self._op_role_key: self._op_role.Optimize, }) - offset += 1 - squared_l2_norm_tmp.append(squared_l2_norm_var) - - # insert sum op - sum_rst = main_block.create_var( - name=unique_name.generate('gradient_clip_sum'), - dtype=squared_l2_norm_tmp[0].dtype, - shape=[1], - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=insert_idx + offset, - type='sum', - inputs={'X': squared_l2_norm_tmp}, - outputs={'Out': sum_rst}, - attrs={ - self._op_role_key: self._op_role.Optimize, - 'use_mkldnn': False - }) - offset += 1 - - # insert sqrt op - sqrt_rst = main_block.create_var( - name=unique_name.generate('gradient_clip_sqrt'), - dtype=sum_rst.dtype, - shape=[1], - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=insert_idx + offset, - type='sqrt', - inputs={'X': sum_rst}, - outputs={'Out': sqrt_rst}, - attrs={ - self._op_role_key: self._op_role.Optimize, - 'use_mkldnn': False, - 'use_cudnn': False - }) - offset += 1 - - # insert fill constant op - fill_constant_rst = main_block.create_var( - name=unique_name.generate('gradient_clip_fill_constant'), - dtype=sqrt_rst.dtype, - shape=[1], - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=insert_idx + offset, - type='fill_constant', - inputs={}, - outputs={'Out': [fill_constant_rst]}, - attrs={ - 'shape': fill_constant_rst.shape, - 'dtype': fill_constant_rst.dtype, - 'value': float(1.0), - self._op_role_key: self._op_role.Optimize, - }) - offset += 1 - - # insert elementwise_max op - elementwise_max_rst = main_block.create_var( - name=unique_name.generate('gradient_clip_elementwise_max'), - dtype=fill_constant_rst.dtype, - shape=[1], - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=insert_idx + offset, - type='elementwise_max', - inputs={'X': sqrt_rst, - 'Y': fill_constant_rst}, - outputs={'Out': [elementwise_max_rst]}, - attrs={ - self._op_role_key: self._op_role.Optimize, - 'use_mkldnn': False, - 'use_quantizer': False, - 'axis': -1 - }) - offset += 1 - - # insert elementwise_div op - elementwise_div_rst = main_block.create_var( - name=unique_name.generate('gradient_clip_elementwise_div'), - dtype=elementwise_max_rst.dtype, - shape=[1], - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=insert_idx + offset, - type='elementwise_div', - inputs={'X': fill_constant_rst, - 'Y': elementwise_max_rst}, - outputs={'Out': [elementwise_div_rst]}, - attrs={ - self._op_role_key: self._op_role.Optimize, - 'use_mkldnn': False, - 'use_quantizer': False, - 'axis': -1 - }) - offset += 1 - - # insert elementwise_mul op - for idx, grad in enumerate(merged_gradients): - main_block._insert_op( - index=insert_idx + offset, - type='elementwise_mul', - inputs={'X': grad, - 'Y': elementwise_div_rst}, - outputs={'Out': grad}, - attrs={ - self._op_role_key: self._op_role.Optimize, - 'use_mkldnn': False, - 'use_quantizer': False, - 'axis': -1 - }) - offset += 1 - return + return fused_merged_gradients def _get_var_size(self, var): dtype_to_size = { From 2562a29a6d54f8dc26256b7d0054ec8a0755347d Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 17 Aug 2021 13:35:48 +0800 Subject: [PATCH 03/32] remove merged grad back to main --- python/paddle/fluid/optimizer.py | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 102bdc8ecfc10a..36489a597544b4 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5259,12 +5259,6 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, dtype=grad_segment[0].dtype, persistable=True, stop_gradient=True) - fused_merged_grad_main = main_block.create_var( - name='FusedMergedGrad_{}'.format(grad_segment[0].name) + - merged_suffix, - dtype=grad_segment[0].dtype, - persistable=True, - stop_gradient=True) fused_gradients.append(fused_grad) fused_merged_gradients.append(fused_merged_grad) @@ -5272,12 +5266,12 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, assert len(fused_merged_gradients) == len(grad_param_segments) # insert coalesce op to init fused values - first_opt_op_idx = None - for index, op in reversed(tuple(enumerate(list(main_block.ops)))): - if self._is_backward_op(op) and first_opt_op_idx is None: - first_opt_op_idx = index + 1 + first_back_op_idx = None + for index, op in enumerate(main_block.ops): + if self._is_backward_op(op) and first_back_op_idx is None: + first_back_op_idx = index + 1 break - assert first_opt_op_idx is not None + assert first_back_op_idx is not None offset = 0 fuse_param_pos = len(startup_block.ops) for i in range(len(grad_param_segments)): @@ -5286,9 +5280,9 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, grads = grad_param_segments[i][0] params = grad_param_segments[i][1] main_block._insert_op_without_sync( - first_opt_op_idx + offset, + first_back_op_idx + offset, type="coalesce_tensor", - inputs={"Input": grads}, + inputs={"Input": params}, outputs={"Output": grads, "FusedOutput": fused_grad}, attrs={ @@ -5297,8 +5291,9 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, "dtype": grads[0].dtype, self._op_role_key: self._op_role.Backward }) - startup_block._insert_op_without_sync( - fuse_param_pos + offset, + offset += 1 + main_block._insert_op_without_sync( + first_back_op_idx + offset, type="coalesce_tensor", inputs={"Input": params}, outputs={"Output": grads, @@ -5306,7 +5301,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, attrs={ "copy_data": False, "use_align": True, - "dtype": params[0].dtype, + "dtype": grads[0].dtype, self._op_role_key: self._op_role.Forward }) offset += 1 From b497b1ebba16f382dfc3836bcb00f6975ab76999 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 17 Aug 2021 14:52:56 +0800 Subject: [PATCH 04/32] rewrite the dtype --- python/paddle/fluid/optimizer.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 36489a597544b4..298d589c886079 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5243,6 +5243,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, cur_size += tmp_size merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' + dtype = paddle.float16 if fp16 else None fused_gradients = [] fused_merged_gradients = [] # create fused vars for grad and param @@ -5256,7 +5257,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, fused_merged_grad = main_block.create_var( name='FusedMergedGrad_{}'.format(grad_segment[0].name) + merged_suffix, - dtype=grad_segment[0].dtype, + dtype=dtype if dtype is not None else grad_segment[0].dtype, persistable=True, stop_gradient=True) fused_gradients.append(fused_grad) @@ -5273,7 +5274,6 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, break assert first_back_op_idx is not None offset = 0 - fuse_param_pos = len(startup_block.ops) for i in range(len(grad_param_segments)): fused_grad = fused_gradients[i] fused_merged_grad = fused_merged_gradients[i] @@ -5301,12 +5301,11 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, attrs={ "copy_data": False, "use_align": True, - "dtype": grads[0].dtype, + "dtype": dtype if dtype is not None else grads[0].dtype, self._op_role_key: self._op_role.Forward }) offset += 1 - dtype = paddle.float16 if fp16 else None first_opt_op_idx += offset offset = 0 for i in range(len(fused_gradients)): @@ -5324,14 +5323,14 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, }) offset += 1 - is_fp16_grad = 'cast_fp16' in fused_grad + is_fp16_grad = 'cast_fp16' in fused_grad.name need_cast = (is_fp16_grad is not fp16) if need_cast: cast_grad_var_name = fused_grad.name + '@TMP' cast_grad_var = main_block.create_var( name=cast_grad_var_name, - dtype=dtype if dtype else fused_merged_grad.dtype, + dtype=dtype if dtype is not None else fused_grad.dtype, persistable=False, stop_gradient=True) main_block._insert_op( From 8553cd9c6389407365e0510697d24a9933627679 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 17 Aug 2021 15:24:28 +0800 Subject: [PATCH 05/32] add fp32 cast back --- python/paddle/fluid/optimizer.py | 48 ++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 298d589c886079..72129189e1f501 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5246,6 +5246,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, dtype = paddle.float16 if fp16 else None fused_gradients = [] fused_merged_gradients = [] + fused_merged_gradients_back_fp32 = [] # create fused vars for grad and param for grad_param_segment in grad_param_segments: grad_segment = grad_param_segment[0] @@ -5262,6 +5263,14 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, stop_gradient=True) fused_gradients.append(fused_grad) fused_merged_gradients.append(fused_merged_grad) + if fp16: + fused_merged_grad_fp32 = main_block.create_var( + name='FusedMergedGrad_{}'.format(grad_segment[0].name) + + merged_suffix + '@BACK@FP32', + dtype=paddle.float32, + persistable=True, + stop_gradient=True) + fused_merged_gradients_back_fp32.append(fused_merged_grad_fp32) assert len(fused_gradients) == len(grad_param_segments) assert len(fused_merged_gradients) == len(grad_param_segments) @@ -5295,16 +5304,34 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, main_block._insert_op_without_sync( first_back_op_idx + offset, type="coalesce_tensor", - inputs={"Input": params}, - outputs={"Output": grads, - "FusedOutput": fused_merged_grad}, + inputs={"Input": fused_grad}, + outputs={ + "Output": fused_grad, + "FusedOutput": fused_merged_grad + }, attrs={ "copy_data": False, "use_align": True, "dtype": dtype if dtype is not None else grads[0].dtype, - self._op_role_key: self._op_role.Forward + self._op_role_key: self._op_role.Backward }) offset += 1 + if fp16: + fused_merged_grad_fp32 = fused_merged_gradients_back_fp32[i] + main_block._insert_op_without_sync( + first_back_op_idx + offset, + type="coalesce_tensor", + inputs={"Input": fused_merged_grad}, + outputs={ + "Output": fused_merged_grad, + "FusedOutput": fused_merged_grad_fp32 + }, + attrs={ + "copy_data": False, + "use_align": True, + "dtype": paddle.float32, + self._op_role_key: self._op_role.Backward + }) first_opt_op_idx += offset offset = 0 @@ -5353,17 +5380,11 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, attrs={self._op_role_key: self._op_role.Backward, }) offset += 1 - fp32_gradients = None if fp16: # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 - fp32_gradients = [] - for fp16_grad in fused_merged_gradients: - fp32_grad = main_block.create_var( - name=fp16_grad.name + '@BACK@FP32', - dtype=paddle.float32, - shape=fp16_grad.shape, - persistable=False, - stop_gradient=True) + for i in range(len(fused_merged_gradients)): + fp16_grad = fused_merged_gradients[i] + fp32_grad = fused_merged_gradients_back_fp32[i] main_block._insert_op( index=first_opt_op_idx + offset, type='cast', @@ -5375,7 +5396,6 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, self._op_role_key: self._op_role.Optimize, }) offset += 1 - fp32_gradients.append(fp32_grad) # repalce the var with it's name for i in range(len(fused_merged_gradients)): From 2899fae952859ef758f103d883ddf29a2a8c42b1 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 17 Aug 2021 17:16:03 +0800 Subject: [PATCH 06/32] create @GRAD@MERGED vars --- python/paddle/fluid/optimizer.py | 86 ++++++++++++++------------------ 1 file changed, 38 insertions(+), 48 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 72129189e1f501..bf0c6d6ac44326 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5218,35 +5218,39 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, if len(grad_param_pairs) == 0: return - # allocate each gard/param paris in different segments - # structure of grad_param_segments is - # [([grad0, grad1], [param0, param1]), ([grad2, grad3], [param2, param3])] - # each entry of the list is a tuple stores the grads segment list and - # the corresponding params segment list grad_param_segments = [] + merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' + dtype = paddle.float16 if fp16 else None cur_size = 0. last_dtype = None # split the grad based on dtype and fused size for grad, param in grad_param_pairs: real_grad = main_block.var(grad) + merged_grad_name = grad.name + merged_suffix + if not main_block.has_var(merged_grad_name): + main_block.create_var( + name=merged_grad_name, + dtype=dtype if dtype is not None else paddle.float32, + persistable=False, + stop_gradient=True) + merged_grad_var = main_block.var(merged_grad_name) real_param = main_block.var(param) tmp_size = self._get_var_size(real_grad) if len(grad_param_segments) == 0 \ or cur_size + tmp_size > fused_size \ or real_grad.dtype != last_dtype: - grad_param_segments.append(([real_grad], [real_param])) + grad_param_segments.append( + ([real_grad], [real_param], [merged_grad_var])) last_dtype = real_grad.dtype cur_size = 0. else: grad_param_segments[-1][0].append(real_grad) grad_param_segments[-1][1].append(real_param) + grad_param_segments[-1][2].append(merged_grad_var) cur_size += tmp_size - merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' - dtype = paddle.float16 if fp16 else None fused_gradients = [] fused_merged_gradients = [] - fused_merged_gradients_back_fp32 = [] # create fused vars for grad and param for grad_param_segment in grad_param_segments: grad_segment = grad_param_segment[0] @@ -5263,14 +5267,6 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, stop_gradient=True) fused_gradients.append(fused_grad) fused_merged_gradients.append(fused_merged_grad) - if fp16: - fused_merged_grad_fp32 = main_block.create_var( - name='FusedMergedGrad_{}'.format(grad_segment[0].name) + - merged_suffix + '@BACK@FP32', - dtype=paddle.float32, - persistable=True, - stop_gradient=True) - fused_merged_gradients_back_fp32.append(fused_merged_grad_fp32) assert len(fused_gradients) == len(grad_param_segments) assert len(fused_merged_gradients) == len(grad_param_segments) @@ -5288,6 +5284,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, fused_merged_grad = fused_merged_gradients[i] grads = grad_param_segments[i][0] params = grad_param_segments[i][1] + merged_grads = grad_param_segments[i][2] main_block._insert_op_without_sync( first_back_op_idx + offset, type="coalesce_tensor", @@ -5304,9 +5301,9 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, main_block._insert_op_without_sync( first_back_op_idx + offset, type="coalesce_tensor", - inputs={"Input": fused_grad}, + inputs={"Input": params}, outputs={ - "Output": fused_grad, + "Output": merged_grads, "FusedOutput": fused_merged_grad }, attrs={ @@ -5316,22 +5313,6 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, self._op_role_key: self._op_role.Backward }) offset += 1 - if fp16: - fused_merged_grad_fp32 = fused_merged_gradients_back_fp32[i] - main_block._insert_op_without_sync( - first_back_op_idx + offset, - type="coalesce_tensor", - inputs={"Input": fused_merged_grad}, - outputs={ - "Output": fused_merged_grad, - "FusedOutput": fused_merged_grad_fp32 - }, - attrs={ - "copy_data": False, - "use_align": True, - "dtype": paddle.float32, - self._op_role_key: self._op_role.Backward - }) first_opt_op_idx += offset offset = 0 @@ -5383,19 +5364,28 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, if fp16: # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 for i in range(len(fused_merged_gradients)): - fp16_grad = fused_merged_gradients[i] - fp32_grad = fused_merged_gradients_back_fp32[i] - main_block._insert_op( - index=first_opt_op_idx + offset, - type='cast', - inputs={'X': fp16_grad}, - outputs={'Out': fp32_grad}, - attrs={ - 'in_dtype': paddle.float16, - 'out_dtype': paddle.float32, - self._op_role_key: self._op_role.Optimize, - }) - offset += 1 + fp16_grads = grad_param_segments[i][2] + grads = grad_param_segments[i][0] + for i in range(len(fp16_grads)): + fp16_grad = fp16_grads[i] + grad = grads[i] + fp32_grad_name = grad.name + '@MERGED' + fp32_grad = main_block.create_var( + name=fp32_grad_name, + dtype=paddle.float32, + persistable=False, + stop_gradient=True) + main_block._insert_op( + index=first_opt_op_idx + offset, + type='cast', + inputs={'X': fp16_grad}, + outputs={'Out': fp32_grad}, + attrs={ + 'in_dtype': paddle.float16, + 'out_dtype': paddle.float32, + self._op_role_key: self._op_role.Optimize, + }) + offset += 1 # repalce the var with it's name for i in range(len(fused_merged_gradients)): From 1e89e788953470cc509e30747dbfd67afedbc879 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 09:04:38 +0800 Subject: [PATCH 07/32] resource the dtype --- python/paddle/fluid/optimizer.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index bf0c6d6ac44326..a9ec07a2dd3564 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5226,14 +5226,11 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, # split the grad based on dtype and fused size for grad, param in grad_param_pairs: real_grad = main_block.var(grad) - merged_grad_name = grad.name + merged_suffix - if not main_block.has_var(merged_grad_name): - main_block.create_var( - name=merged_grad_name, - dtype=dtype if dtype is not None else paddle.float32, - persistable=False, - stop_gradient=True) - merged_grad_var = main_block.var(merged_grad_name) + merged_grad_var = main_block.create_var( + name=grad + merged_suffix, + dtype=dtype if dtype is not None else paddle.float32, + persistable=False, + stop_gradient=True) real_param = main_block.var(param) tmp_size = self._get_var_size(real_grad) if len(grad_param_segments) == 0 \ @@ -5254,15 +5251,15 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, # create fused vars for grad and param for grad_param_segment in grad_param_segments: grad_segment = grad_param_segment[0] + fused_grad_segment = grad_param_segments[2] fused_grad = main_block.create_var( name='FusedGrad_{}'.format(grad_segment[0].name), dtype=grad_segment[0].dtype, persistable=False, stop_gradient=True) fused_merged_grad = main_block.create_var( - name='FusedMergedGrad_{}'.format(grad_segment[0].name) + - merged_suffix, - dtype=dtype if dtype is not None else grad_segment[0].dtype, + name='FusedMergedGrad_{}'.format(fused_grad_segment[0].name), + dtype=fused_grad_segment[0].dtype, persistable=True, stop_gradient=True) fused_gradients.append(fused_grad) @@ -5275,7 +5272,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, first_back_op_idx = None for index, op in enumerate(main_block.ops): if self._is_backward_op(op) and first_back_op_idx is None: - first_back_op_idx = index + 1 + first_back_op_idx = index break assert first_back_op_idx is not None offset = 0 @@ -5309,11 +5306,12 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, attrs={ "copy_data": False, "use_align": True, - "dtype": dtype if dtype is not None else grads[0].dtype, + "dtype": merged_grads[0].dtype, self._op_role_key: self._op_role.Backward }) offset += 1 + # insert gradient merge relating ops first_opt_op_idx += offset offset = 0 for i in range(len(fused_gradients)): @@ -5358,7 +5356,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, type='sum', inputs={'X': [fused_merged_grad, fused_grad]}, outputs={'Out': fused_merged_grad}, - attrs={self._op_role_key: self._op_role.Backward, }) + attrs={self._op_role_key: self._op_role.Backward}) offset += 1 if fp16: From 4faee51b85f326f56cf8c0810454155e7531b86b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 09:29:28 +0800 Subject: [PATCH 08/32] add shape info for MERGED var --- python/paddle/fluid/optimizer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index a9ec07a2dd3564..f1dd747266c075 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5229,6 +5229,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, merged_grad_var = main_block.create_var( name=grad + merged_suffix, dtype=dtype if dtype is not None else paddle.float32, + shape=real_grad.shape, persistable=False, stop_gradient=True) real_param = main_block.var(param) @@ -5251,7 +5252,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, # create fused vars for grad and param for grad_param_segment in grad_param_segments: grad_segment = grad_param_segment[0] - fused_grad_segment = grad_param_segments[2] + fused_grad_segment = grad_param_segment[2] fused_grad = main_block.create_var( name='FusedGrad_{}'.format(grad_segment[0].name), dtype=grad_segment[0].dtype, @@ -5371,6 +5372,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, fp32_grad = main_block.create_var( name=fp32_grad_name, dtype=paddle.float32, + shape=grad.shape, persistable=False, stop_gradient=True) main_block._insert_op( From 3b64dc1efd8d1f8e2b3284209c7cf637ac6d8efb Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 10:41:15 +0800 Subject: [PATCH 09/32] update loop condition --- python/paddle/fluid/optimizer.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index f1dd747266c075..d1cae224033294 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5046,7 +5046,7 @@ def _accumulate_gradients(self, """ if user_defined_strategy.fuse_param_grad: fused_gradient_names = self._accumulate_gradients_with_fuse( - block, startup_block, fp16_allreduce, + block, fp16_allreduce, user_defined_strategy.fuse_grad_size_in_MB) return fused_gradient_names @@ -5179,8 +5179,7 @@ def _accumulate_gradients(self, return merged_gradient_names - def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, - fused_size): + def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): first_opt_op_idx = None grad_param_pairs = [] # obtain all param/grad pairs that needed to be fused @@ -5230,8 +5229,8 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, name=grad + merged_suffix, dtype=dtype if dtype is not None else paddle.float32, shape=real_grad.shape, - persistable=False, - stop_gradient=True) + persistable=True, + stop_gradient=False) real_param = main_block.var(param) tmp_size = self._get_var_size(real_grad) if len(grad_param_segments) == 0 \ @@ -5339,7 +5338,7 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, name=cast_grad_var_name, dtype=dtype if dtype is not None else fused_grad.dtype, persistable=False, - stop_gradient=True) + stop_gradient=False) main_block._insert_op( index=first_opt_op_idx + offset, type='cast', @@ -5365,16 +5364,16 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, for i in range(len(fused_merged_gradients)): fp16_grads = grad_param_segments[i][2] grads = grad_param_segments[i][0] - for i in range(len(fp16_grads)): - fp16_grad = fp16_grads[i] - grad = grads[i] + for j in range(len(fp16_grads)): + fp16_grad = fp16_grads[j] + grad = grads[j] fp32_grad_name = grad.name + '@MERGED' fp32_grad = main_block.create_var( name=fp32_grad_name, dtype=paddle.float32, shape=grad.shape, - persistable=False, - stop_gradient=True) + persistable=True, + stop_gradient=False) main_block._insert_op( index=first_opt_op_idx + offset, type='cast', @@ -5391,6 +5390,8 @@ def _accumulate_gradients_with_fuse(self, main_block, startup_block, fp16, for i in range(len(fused_merged_gradients)): fused_merged_gradients[i] = fused_merged_gradients[i].name + main_block._sync_with_cpp() + return fused_merged_gradients def _get_var_size(self, var): From 1b9b657fe5ce03ddb5b42c1f2895353f20ffd959 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 11:08:53 +0800 Subject: [PATCH 10/32] rewrite cast logic --- python/paddle/fluid/optimizer.py | 46 +++++++++++++++----------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index d1cae224033294..690e577d165744 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5361,30 +5361,28 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): if fp16: # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 - for i in range(len(fused_merged_gradients)): - fp16_grads = grad_param_segments[i][2] - grads = grad_param_segments[i][0] - for j in range(len(fp16_grads)): - fp16_grad = fp16_grads[j] - grad = grads[j] - fp32_grad_name = grad.name + '@MERGED' - fp32_grad = main_block.create_var( - name=fp32_grad_name, - dtype=paddle.float32, - shape=grad.shape, - persistable=True, - stop_gradient=False) - main_block._insert_op( - index=first_opt_op_idx + offset, - type='cast', - inputs={'X': fp16_grad}, - outputs={'Out': fp32_grad}, - attrs={ - 'in_dtype': paddle.float16, - 'out_dtype': paddle.float32, - self._op_role_key: self._op_role.Optimize, - }) - offset += 1 + for grad, param in grad_param_pairs: + fp16_grad_name = grad.name + '@MERGED@FP16' + assert main_block.has_var(fp16_grad_name) + fp16_grad = main_block.var(fp16_grad_name) + fp32_grad_name = grad.name + '@MERGED' + fp32_grad = main_block.create_var( + name=fp32_grad_name, + dtype=paddle.float32, + shape=grad.shape, + persistable=True, + stop_gradient=False) + main_block._insert_op( + index=first_opt_op_idx + offset, + type='cast', + inputs={'X': fp16_grad}, + outputs={'Out': fp32_grad}, + attrs={ + 'in_dtype': paddle.float16, + 'out_dtype': paddle.float32, + self._op_role_key: self._op_role.Optimize, + }) + offset += 1 # repalce the var with it's name for i in range(len(fused_merged_gradients)): From 22bfde7d96a9f0402ed96528fbeb12e3166c974b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 11:29:48 +0800 Subject: [PATCH 11/32] remove cast_fp16 in grad's name --- python/paddle/fluid/optimizer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 690e577d165744..c2049f532eedec 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5226,7 +5226,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): for grad, param in grad_param_pairs: real_grad = main_block.var(grad) merged_grad_var = main_block.create_var( - name=grad + merged_suffix, + name=grad.replace('.cast_fp16', '') + merged_suffix, dtype=dtype if dtype is not None else paddle.float32, shape=real_grad.shape, persistable=True, @@ -5362,14 +5362,15 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): if fp16: # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 for grad, param in grad_param_pairs: - fp16_grad_name = grad.name + '@MERGED@FP16' + real_grad = main_block.var(grad) + fp16_grad_name = grad.replace('.cast_fp16', '') + '@MERGED@FP16' assert main_block.has_var(fp16_grad_name) fp16_grad = main_block.var(fp16_grad_name) - fp32_grad_name = grad.name + '@MERGED' + fp32_grad_name = grad.replace('.cast_fp16', '') + '@MERGED' fp32_grad = main_block.create_var( name=fp32_grad_name, dtype=paddle.float32, - shape=grad.shape, + shape=real_grad.shape, persistable=True, stop_gradient=False) main_block._insert_op( From 72a3120ab4e6e458c24b8aa19e129cea72157d5c Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 11:41:29 +0800 Subject: [PATCH 12/32] update grad name generate method --- python/paddle/fluid/optimizer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index c2049f532eedec..69a84d6f80fbff 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5226,8 +5226,8 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): for grad, param in grad_param_pairs: real_grad = main_block.var(grad) merged_grad_var = main_block.create_var( - name=grad.replace('.cast_fp16', '') + merged_suffix, - dtype=dtype if dtype is not None else paddle.float32, + name=param + core.grad_var_suffix() + merged_suffix, + dtype=dtype if dtype is not None else real_grad.dtype, shape=real_grad.shape, persistable=True, stop_gradient=False) @@ -5363,10 +5363,10 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 for grad, param in grad_param_pairs: real_grad = main_block.var(grad) - fp16_grad_name = grad.replace('.cast_fp16', '') + '@MERGED@FP16' + fp16_grad_name = param + core.grad_var_suffix() + '@MERGED@FP16' assert main_block.has_var(fp16_grad_name) fp16_grad = main_block.var(fp16_grad_name) - fp32_grad_name = grad.replace('.cast_fp16', '') + '@MERGED' + fp32_grad_name = param + core.grad_var_suffix() + '@MERGED' fp32_grad = main_block.create_var( name=fp32_grad_name, dtype=paddle.float32, From 9050872855b3ca52ca594f92aeb26558abb3ba7b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 12:16:22 +0800 Subject: [PATCH 13/32] copy data for colaesce --- python/paddle/fluid/optimizer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 69a84d6f80fbff..38fb38c8faa846 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5256,12 +5256,12 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): name='FusedGrad_{}'.format(grad_segment[0].name), dtype=grad_segment[0].dtype, persistable=False, - stop_gradient=True) + stop_gradient=False) fused_merged_grad = main_block.create_var( name='FusedMergedGrad_{}'.format(fused_grad_segment[0].name), dtype=fused_grad_segment[0].dtype, persistable=True, - stop_gradient=True) + stop_gradient=False) fused_gradients.append(fused_grad) fused_merged_gradients.append(fused_merged_grad) @@ -5304,7 +5304,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): "FusedOutput": fused_merged_grad }, attrs={ - "copy_data": False, + "copy_data": True, "use_align": True, "dtype": merged_grads[0].dtype, self._op_role_key: self._op_role.Backward From ab608226a1560d296be9f293c6dbce6f11be7035 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 13:38:18 +0800 Subject: [PATCH 14/32] fix dtype of coalesce op --- paddle/fluid/operators/coalesce_tensor_op.cc | 4 +-- python/paddle/fluid/optimizer.py | 29 +++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 6ea8809dae13f2..6177fa50bb881b 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -122,9 +122,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel { } } else if (context.Attr("set_constant")) { // TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION. - math::SetConstant set_constant; + math::SetConstant set_constant; set_constant(dev_ctx, fused_tensor, - static_cast(context.Attr("constant"))); + static_cast(context.Attr("constant"))); } else if (context.Attr("persist_output")) { for (size_t i = 0; i < out_var_names.size(); ++i) { size_t len = static_cast(out_tensors[i]->numel()); diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 38fb38c8faa846..1cec3a68d3b8de 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5304,10 +5304,12 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): "FusedOutput": fused_merged_grad }, attrs={ - "copy_data": True, + "set_constant": True, + "constant": float(0.0), + "copy_data": False, "use_align": True, "dtype": merged_grads[0].dtype, - self._op_role_key: self._op_role.Backward + self._op_role_key: self._op_role.Optimize.LRSched }) offset += 1 @@ -5317,17 +5319,18 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): for i in range(len(fused_gradients)): fused_grad = fused_gradients[i] fused_merged_grad = fused_merged_gradients[i] - main_block._insert_op( - index=first_opt_op_idx + offset, - type='fill_constant', - inputs={}, - outputs={'Out': [fused_merged_grad]}, - attrs={ - 'dtype': fused_merged_grad.dtype, - 'value': float(0), - self._op_role_key: self._op_role.Optimize.LRSched, - }) - offset += 1 + # main_block._insert_op( + # index=first_opt_op_idx + offset, + # type='fill_constant', + # inputs={}, + # outputs={'Out': [fused_merged_grad]}, + # attrs={ + # 'shape': fused_merged_grad.shape, + # 'dtype': fused_merged_grad.dtype, + # 'value': float(0), + # self._op_role_key: self._op_role.Optimize.LRSched, + # }) + # offset += 1 is_fp16_grad = 'cast_fp16' in fused_grad.name need_cast = (is_fp16_grad is not fp16) From 4a26c7f94bc259f5f68d337dc5c0832aeef54385 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 13:57:47 +0800 Subject: [PATCH 15/32] dytpe supports for colaesce op --- paddle/fluid/operators/coalesce_tensor_op.cc | 34 +++++++++++++++++--- python/paddle/fluid/optimizer.py | 14 -------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 6177fa50bb881b..508078822ed547 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -24,6 +24,33 @@ namespace paddle { namespace operators { +template +struct FillConstantVisitor { + FillConstantVisitor(const DeviceContext &dev_ctx, + framework::LoDTensor *tensor, const float value) + : dev_ctx_(dev_ctx), tensor_(tensor), value_(value) {} + + template + void apply(typename std::enable_if::value || + std::is_same::value>::type * = + nullptr) const { + PADDLE_THROW(platform::errors::InvalidArgument( + "Not support data type for set_constant attr")); + } + + template + void apply(typename std::enable_if::value || + std::is_same::value)>::type + * = nullptr) const { + math::SetConstant set_constant; + set_constant(dev_ctx_, tensor_, static_cast(value_)); + } + + const DeviceContext &dev_ctx_; + framework::LoDTensor *tensor_; + float value_; +}; + template class CoalesceTensorOpKernel : public framework::OpKernel { public: @@ -121,10 +148,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel { : len; } } else if (context.Attr("set_constant")) { - // TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION. - math::SetConstant set_constant; - set_constant(dev_ctx, fused_tensor, - static_cast(context.Attr("constant"))); + framework::VisitDataType( + dtype, FillConstantVisitor( + dev_ctx, fused_tensor, context.Attr("constant"))); } else if (context.Attr("persist_output")) { for (size_t i = 0; i < out_var_names.size(); ++i) { size_t len = static_cast(out_tensors[i]->numel()); diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 1cec3a68d3b8de..69aad7dd13a484 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5319,22 +5319,8 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): for i in range(len(fused_gradients)): fused_grad = fused_gradients[i] fused_merged_grad = fused_merged_gradients[i] - # main_block._insert_op( - # index=first_opt_op_idx + offset, - # type='fill_constant', - # inputs={}, - # outputs={'Out': [fused_merged_grad]}, - # attrs={ - # 'shape': fused_merged_grad.shape, - # 'dtype': fused_merged_grad.dtype, - # 'value': float(0), - # self._op_role_key: self._op_role.Optimize.LRSched, - # }) - # offset += 1 - is_fp16_grad = 'cast_fp16' in fused_grad.name need_cast = (is_fp16_grad is not fp16) - if need_cast: cast_grad_var_name = fused_grad.name + '@TMP' cast_grad_var = main_block.create_var( From d7f0da3f3b1123a198f742131902676d56fc3511 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 16:13:23 +0800 Subject: [PATCH 16/32] no fp16 allreduce supports --- python/paddle/fluid/optimizer.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 69aad7dd13a484..82df27d4741ac5 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5257,8 +5257,12 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): dtype=grad_segment[0].dtype, persistable=False, stop_gradient=False) + fused_merged_grad_name_prefix = 'FusedMergedGrad.cast_fp16.' if \ + fused_grad_segment[0].dtype == paddle.float16 else 'FusedMergedGrad' + fused_merged_grad_name = fused_merged_grad_name_prefix + '_{}'.format( + fused_grad_segment[0].name) fused_merged_grad = main_block.create_var( - name='FusedMergedGrad_{}'.format(fused_grad_segment[0].name), + name=fused_merged_grad_name, dtype=fused_grad_segment[0].dtype, persistable=True, stop_gradient=False) From ff629651518a985de670830be350fa1db85532d1 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 16:51:21 +0800 Subject: [PATCH 17/32] update dytpe value --- python/paddle/fluid/optimizer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 82df27d4741ac5..8470485dade39d 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5219,7 +5219,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): grad_param_segments = [] merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' - dtype = paddle.float16 if fp16 else None + dtype = paddle.float16 if fp16 else paddle.float32 cur_size = 0. last_dtype = None # split the grad based on dtype and fused size @@ -5227,7 +5227,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): real_grad = main_block.var(grad) merged_grad_var = main_block.create_var( name=param + core.grad_var_suffix() + merged_suffix, - dtype=dtype if dtype is not None else real_grad.dtype, + dtype=dtype, shape=real_grad.shape, persistable=True, stop_gradient=False) @@ -5329,7 +5329,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): cast_grad_var_name = fused_grad.name + '@TMP' cast_grad_var = main_block.create_var( name=cast_grad_var_name, - dtype=dtype if dtype is not None else fused_grad.dtype, + dtype=dtype, persistable=False, stop_gradient=False) main_block._insert_op( From b1122788301e99de97c782077d1c5fd8d93ab8b7 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 18 Aug 2021 18:11:45 +0800 Subject: [PATCH 18/32] add size of dtype --- paddle/fluid/operators/coalesce_tensor_op.cc | 11 +++++++++-- python/paddle/fluid/optimizer.py | 2 ++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 508078822ed547..1870b476eed6a5 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -97,6 +97,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel { auto in_tensors = context.MultiInput("Input"); bool use_align = context.Attr("use_align"); auto align_size = context.Attr("align_size"); + auto size_of_dtype = context.Attr("size_of_dtype"); if (context.Attr("check_name")) { for (size_t i = 0; i < in_var_names.size(); ++i) { @@ -121,7 +122,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel { size_t numel = 0; auto dtype = static_cast( context.Attr("dtype")); - size_t size_of_dtype = framework::SizeOfType(dtype); + if (size_of_dtype == -1) { + size_of_dtype = framework::SizeOfType(dtype); + } GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype, context.GetPlace(), use_align, align_size); @@ -253,10 +256,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel { } auto use_align = ctx->Attrs().Get("use_align"); auto align_size = ctx->Attrs().Get("align_size"); + auto size_of_dtype = ctx->Attrs().Get("size_of_dtype"); auto dtype = static_cast( ctx->Attrs().Get("dtype")); - size_t size_of_dtype = framework::SizeOfType(dtype); + if (size_of_dtype == -1) { + size_of_dtype = framework::SizeOfType(dtype); + } auto alignment = [](size_t size, size_t align_size) { size_t remaining = size % align_size; @@ -334,6 +340,7 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault(true); AddAttr("align_size", "The alignment size when use_align is True") .SetDefault(-1); + AddAttr("size_of_dtype", "The size of dtype").SetDefault(-1); AddComment(R"DOC( CoalesceTensor Operator. diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 8470485dade39d..b99c9f0bf23d53 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5293,6 +5293,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): outputs={"Output": grads, "FusedOutput": fused_grad}, attrs={ + "size_of_dtype": 2, "copy_data": False, "use_align": True, "dtype": grads[0].dtype, @@ -5308,6 +5309,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): "FusedOutput": fused_merged_grad }, attrs={ + "size_of_dtype": 2, "set_constant": True, "constant": float(0.0), "copy_data": False, From 3549d31adb7d014a933008aa2c2c71fa669b9fb7 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 09:37:24 +0800 Subject: [PATCH 19/32] remove startup param --- .../distributed/fleet/meta_optimizers/sharding_optimizer.py | 1 - python/paddle/fluid/optimizer.py | 1 - 2 files changed, 2 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 0a2eff6628fb67..c94bd572f05878 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -320,7 +320,6 @@ def _insert_allreduce_for_pp(self): accumulated_grad_names = self._pp_optimizer._accumulate_gradients( main_block, - startup_block, fp16_allreduce=fp16_allreduce, user_defined_strategy=strategy) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index b99c9f0bf23d53..9bb91eaa74a14c 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5036,7 +5036,6 @@ def _rename_gradient_var_name(self, block): def _accumulate_gradients(self, block, - startup_block, pp_allreduce_in_optimize=False, fp16_allreduce=False, user_defined_strategy=None): From 447a1933889c25d4848def4ebe903a5f87612d8c Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 09:52:13 +0800 Subject: [PATCH 20/32] rename the dist strategy flag --- paddle/fluid/framework/distributed_strategy.proto | 2 +- .../distributed/fleet/base/distributed_strategy.py | 14 +++++++------- .../fleet/meta_optimizers/sharding/utils.py | 4 +++- python/paddle/fluid/optimizer.py | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 1d443ebc39a0ef..58ae35f2689799 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -200,7 +200,7 @@ message DistributedStrategy { optional int32 fuse_grad_size_in_num = 31 [ default = 8 ]; optional bool calc_comm_same_stream = 32 [ default = false ]; optional bool asp = 33 [ default = false ]; - optional bool fuse_param_grad = 34 [ default = false ]; + optional bool fuse_grad_merge = 34 [ default = false ]; optional RecomputeConfig recompute_configs = 101; optional AMPConfig amp_configs = 102; diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 49229c8350821c..28456325a22194 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -968,7 +968,7 @@ def _calc_comm_same_stream(self, same): ) @property - def fuse_param_grad(self): + def fuse_grad_merge(self): """ Set whether fuse the param and grad. Note: this flag only supports adam optimizer with GlobalGradClip @@ -979,15 +979,15 @@ def fuse_param_grad(self): strategy = fleet.DistributedStrategy() strategy.fuse_param_grad = True """ - return self.strategy.fuse_param_grad + return self.strategy.fuse_grad_merge - @fuse_param_grad.setter + @fuse_grad_merge.setter @is_strict_auto - def fuse_param_grad(self, fuse_param_grad): - if isinstance(fuse_param_grad, bool): - self.strategy.fuse_param_grad = fuse_param_grad + def fuse_grad_merge(self, fuse_grad_merge): + if isinstance(fuse_grad_merge, bool): + self.strategy.fuse_grad_merge = fuse_grad_merge else: - print("WARNING: fuse_param_grad should have value of boolean type") + print("WARNING: fuse_grad_merge should have value of boolean type") @property def fuse_grad_size_in_num(self): diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index b3c353b23c3484..16fbc7bea6c8b6 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -343,7 +343,9 @@ def insert_allreduce_ops(block, if user_defined_strategy and \ user_defined_strategy.fuse_all_reduce_ops and \ - not user_defined_strategy.fuse_param_grad: + not user_defined_strategy.fuse_grad_merge: + # If fuse_grad_merge is enable, the grad vars have already been fused during + # gradient merge pass, therefore, those vars are not need to be fused here insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars, op_role, use_calc_stream, user_defined_strategy.fuse_grad_size_in_MB) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 9bb91eaa74a14c..8d1eb348efb56b 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5043,7 +5043,7 @@ def _accumulate_gradients(self, Create a new merged gradient for each parameter and accumulate the corresponding gradient to it. """ - if user_defined_strategy.fuse_param_grad: + if user_defined_strategy.fuse_grad_merge: fused_gradient_names = self._accumulate_gradients_with_fuse( block, fp16_allreduce, user_defined_strategy.fuse_grad_size_in_MB) From d2734bc66bc14fe00a89e37b2f7067b57a627953 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 09:59:43 +0800 Subject: [PATCH 21/32] rename the size_of_dtype attr for coalesce op --- paddle/fluid/operators/coalesce_tensor_op.cc | 14 +++++++++++--- python/paddle/fluid/optimizer.py | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 1870b476eed6a5..13491f34b0b080 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -97,7 +97,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel { auto in_tensors = context.MultiInput("Input"); bool use_align = context.Attr("use_align"); auto align_size = context.Attr("align_size"); - auto size_of_dtype = context.Attr("size_of_dtype"); + auto size_of_dtype = context.Attr("user_defined_size_of_dtype"); if (context.Attr("check_name")) { for (size_t i = 0; i < in_var_names.size(); ++i) { @@ -256,7 +256,7 @@ class CoalesceTensorOp : public framework::OperatorWithKernel { } auto use_align = ctx->Attrs().Get("use_align"); auto align_size = ctx->Attrs().Get("align_size"); - auto size_of_dtype = ctx->Attrs().Get("size_of_dtype"); + auto size_of_dtype = ctx->Attrs().Get("user_defined_size_of_dtype"); auto dtype = static_cast( ctx->Attrs().Get("dtype")); @@ -340,7 +340,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault(true); AddAttr("align_size", "The alignment size when use_align is True") .SetDefault(-1); - AddAttr("size_of_dtype", "The size of dtype").SetDefault(-1); + AddAttr("user_defined_size_of_dtype", + "The user defined size of dtype. This is used to coalesce " + "grad vars and merged_grad vars at the same time. For some " + "strategy, the dtype of fused_grad_vars and the dtype of " + "fused_grad_merged_vars are not identical, which will cause " + "the shape of these two coalesced vars are different. To " + "make sure the shape of these two vars are identical with " + "each other, this attr is added.") + .SetDefault(-1); AddComment(R"DOC( CoalesceTensor Operator. diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 8d1eb348efb56b..809a720c30e5b2 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5292,7 +5292,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): outputs={"Output": grads, "FusedOutput": fused_grad}, attrs={ - "size_of_dtype": 2, + "user_defined_size_of_dtype": 2, "copy_data": False, "use_align": True, "dtype": grads[0].dtype, @@ -5308,7 +5308,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): "FusedOutput": fused_merged_grad }, attrs={ - "size_of_dtype": 2, + "user_defined_size_of_dtype": 2, "set_constant": True, "constant": float(0.0), "copy_data": False, From 00b7636f4598550dff03362c07757edda7b413ea Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 10:17:59 +0800 Subject: [PATCH 22/32] update some comment --- .../fleet/base/distributed_strategy.py | 6 +++--- python/paddle/fluid/optimizer.py | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 28456325a22194..d19cfd21698021 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -970,9 +970,9 @@ def _calc_comm_same_stream(self, same): @property def fuse_grad_merge(self): """ - Set whether fuse the param and grad. - Note: this flag only supports adam optimizer with GlobalGradClip - The default value for the fuse_param_grad is False + Set whether fuse the grad for gradient merge. + Note: this flag will only effect the gradient merge under pipeline mode + The default value for the fuse_grad_merge is False Examples: .. code-block:: python import paddle.distributed.fleet as fleet diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 809a720c30e5b2..d3af1166269eaa 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5224,6 +5224,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): # split the grad based on dtype and fused size for grad, param in grad_param_pairs: real_grad = main_block.var(grad) + # create the gradient merged var for each grad merged_grad_var = main_block.create_var( name=param + core.grad_var_suffix() + merged_suffix, dtype=dtype, @@ -5232,6 +5233,9 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): stop_gradient=False) real_param = main_block.var(param) tmp_size = self._get_var_size(real_grad) + # two strategies for splitting the grad + # 1. the current segment's size reach the user defined grad_size_in_MB + # 2. the upcoming grad holds different dtype compared with grads in current segment if len(grad_param_segments) == 0 \ or cur_size + tmp_size > fused_size \ or real_grad.dtype != last_dtype: @@ -5256,6 +5260,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): dtype=grad_segment[0].dtype, persistable=False, stop_gradient=False) + # keep the '.cast_fp16' info in the fuse var name fused_merged_grad_name_prefix = 'FusedMergedGrad.cast_fp16.' if \ fused_grad_segment[0].dtype == paddle.float16 else 'FusedMergedGrad' fused_merged_grad_name = fused_merged_grad_name_prefix + '_{}'.format( @@ -5271,7 +5276,8 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): assert len(fused_gradients) == len(grad_param_segments) assert len(fused_merged_gradients) == len(grad_param_segments) - # insert coalesce op to init fused values + # insert coalesce op at the start of the backward pass + # use param as the coalesce input to make sure the two Fused vars are in same shape first_back_op_idx = None for index, op in enumerate(main_block.ops): if self._is_backward_op(op) and first_back_op_idx is None: @@ -5299,6 +5305,9 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): self._op_role_key: self._op_role.Backward }) offset += 1 + # For the gradient_merged_fused_var, given a init value during the coalesce op + # this will remove a problematic fill_constant op. This op role of this coalesce + # is set to be LRSched to make this coalesce (with init) only run once main_block._insert_op_without_sync( first_back_op_idx + offset, type="coalesce_tensor", @@ -5327,6 +5336,8 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): is_fp16_grad = 'cast_fp16' in fused_grad.name need_cast = (is_fp16_grad is not fp16) if need_cast: + # for fp16 allreduce, cast fp32 grad to fp16 + # for fp32 allreduce, cast fp16 grad to fp32 cast_grad_var_name = fused_grad.name + '@TMP' cast_grad_var = main_block.create_var( name=cast_grad_var_name, @@ -5379,7 +5390,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): }) offset += 1 - # repalce the var with it's name + # replace the var with it's name, which will be used for inserting allreduce for i in range(len(fused_merged_gradients)): fused_merged_gradients[i] = fused_merged_gradients[i].name From e9829596fc30f0de7025c48a38392b27761ebd8f Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 10:41:32 +0800 Subject: [PATCH 23/32] add todo, test=notest --- paddle/fluid/operators/coalesce_tensor_op.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 13491f34b0b080..a6eeb1f314a9b4 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -30,6 +30,7 @@ struct FillConstantVisitor { framework::LoDTensor *tensor, const float value) : dev_ctx_(dev_ctx), tensor_(tensor), value_(value) {} + // TODO(WHO?) set_constant supports NPU template void apply(typename std::enable_if::value || std::is_same::value>::type * = From f120ed78f9b5a58c59dc8dfce17e40c14600280b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 11:37:58 +0800 Subject: [PATCH 24/32] support NPU set constant for coalesce op, test=notest --- paddle/fluid/operators/coalesce_tensor_op.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index a6eeb1f314a9b4..1d9d8f84f7cb60 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -43,8 +43,17 @@ struct FillConstantVisitor { void apply(typename std::enable_if::value || std::is_same::value)>::type * = nullptr) const { +#ifdef PADDLE_WITH_ASCEND_CL + if (platform::is_npu_place(dev_ctx_.GetPlace())) { + FillNpuTensorWithConstant(tensor_, value_); + } else { + math::SetConstant set_constant; + set_constant(dev_ctx_, tensor_, static_cast(value_)); + } +#else math::SetConstant set_constant; set_constant(dev_ctx_, tensor_, static_cast(value_)); +#endif } const DeviceContext &dev_ctx_; From 91f05de791d6c7bd1b18845fe9b5aeb2723fc975 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 12:32:28 +0800 Subject: [PATCH 25/32] add test for optimizer pass, test=notest --- .../test_fleet_sharding_meta_optimizer.py | 183 ++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index 5a981a470cb4ef..ac43b9e358cc2b 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -1050,6 +1050,189 @@ def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_offload(self): self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast_with_gradient_fuse( + self): + train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( + ) + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + strategy.amp = True + strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "mp_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "optimize_cast": True, + } + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4, + } + strategy.fp16_allreduce = True + strategy.fuse_grad_merge = True + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # ring: mp, pp_group, pp_pair, pp_pair + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', + 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', + 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add', + 'softmax', 'cross_entropy2', 'mean', 'elementwise_mul', + 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'fill_constant', 'scale', 'scale', + 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', + 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', + 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'sum', 'cast', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', 'cast', + 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', + 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', + 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', + 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', + 'momentum', 'momentum' + ]) + + # amp check_finite_and_unscale, allreduce(pp) + self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) + + # should has ring id for pp + created_ring_ids = [ + op.desc.attr("ring_id") for op in startup_prog_ops + if op.type == "c_comm_init" + ] + self.assertIn(self.pp_pair_ring_id, created_ring_ids) + self.assertIn(self.dp_ring_id, created_ring_ids) + + # check correctness of pp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_0": + pp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) + + # check correctness of dp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_3": + dp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + + def test_hybrid_with_pp_dp_amp_with_gradient_fuse(self): + train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( + ) + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + strategy.amp = True + strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "mp_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "optimize_cast": True, + } + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4, + } + strategy.fuse_grad_merge = True + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # ring: mp, pp_group, pp_pair, pp_pair + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', + 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', + 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add', + 'softmax', 'cross_entropy2', 'mean', 'elementwise_mul', + 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'fill_constant', 'scale', 'scale', + 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', + 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', + 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', + 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', + 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', + 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', + 'momentum', 'momentum' + ]) + + # amp check_finite_and_unscale, allreduce(pp) + self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) + + # should has ring id for pp + created_ring_ids = [ + op.desc.attr("ring_id") for op in startup_prog_ops + if op.type == "c_comm_init" + ] + self.assertIn(self.pp_pair_ring_id, created_ring_ids) + self.assertIn(self.dp_ring_id, created_ring_ids) + + # check correctness of pp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_0": + pp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) + + # check correctness of dp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_3": + dp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + if __name__ == "__main__": unittest.main() From 1257a614b871122d6cf631d38a0d3088eba66df8 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 13:55:45 +0800 Subject: [PATCH 26/32] update NPU fill constant, test=notest --- paddle/fluid/operators/coalesce_tensor_op.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 1d9d8f84f7cb60..872d3b61e55d23 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -20,6 +20,9 @@ #include "paddle/fluid/framework/var_type.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/device_memory_aligment.h" +#ifdef PADDLE_WITH_ASCEND_CL +#include "paddle/fluid/operators/npu_op_runner.h" +#endif namespace paddle { namespace operators { @@ -45,7 +48,7 @@ struct FillConstantVisitor { * = nullptr) const { #ifdef PADDLE_WITH_ASCEND_CL if (platform::is_npu_place(dev_ctx_.GetPlace())) { - FillNpuTensorWithConstant(tensor_, value_); + FillNpuTensorWithConstant(tensor_, static_cast(value_)); } else { math::SetConstant set_constant; set_constant(dev_ctx_, tensor_, static_cast(value_)); From 5b0269623e2f464570ef48f5ede37bbb6e27f6d9 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 14:03:10 +0800 Subject: [PATCH 27/32] gpu test for coalesce, test=notest --- python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py b/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py index a5b30330448d29..868a72334247d0 100644 --- a/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py +++ b/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py @@ -92,7 +92,8 @@ def init_attr(self): "copy_data": False, "set_constant": True, "constant": 0.5, - "dtype": self.fluid_dtype + "dtype": self.fluid_dtype, + "user_defined_size_of_dtype": 2 } def test_check_output(self): From 98fa3170b1c20262a45b119fa5bb10acf6878bae Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 14:04:15 +0800 Subject: [PATCH 28/32] npu test for coalesce --- .../fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py index f1bbf0becf1950..93a969bf10f030 100644 --- a/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py +++ b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py @@ -90,7 +90,8 @@ def init_attr(self): "set_constant": False, "constant": 0.5, "use_align": True, - "dtype": self.fluid_dtype + "dtype": self.fluid_dtype, + "user_defined_size_of_dtype": 2 } def test_check_output(self): From bb61c9dd73e2a24f9631764cdca69f619296426a Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 14:10:01 +0800 Subject: [PATCH 29/32] remove useless todo --- paddle/fluid/operators/coalesce_tensor_op.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 872d3b61e55d23..4c5f3a2a47bd84 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -33,7 +33,6 @@ struct FillConstantVisitor { framework::LoDTensor *tensor, const float value) : dev_ctx_(dev_ctx), tensor_(tensor), value_(value) {} - // TODO(WHO?) set_constant supports NPU template void apply(typename std::enable_if::value || std::is_same::value>::type * = From 9316944cba2770b190e3da4b91db550dde7438c7 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 15:26:28 +0800 Subject: [PATCH 30/32] fix bug if user_defined_strategy is None --- python/paddle/fluid/optimizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index d3af1166269eaa..7a30b3c5745373 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5043,7 +5043,7 @@ def _accumulate_gradients(self, Create a new merged gradient for each parameter and accumulate the corresponding gradient to it. """ - if user_defined_strategy.fuse_grad_merge: + if user_defined_strategy and user_defined_strategy.fuse_grad_merge: fused_gradient_names = self._accumulate_gradients_with_fuse( block, fp16_allreduce, user_defined_strategy.fuse_grad_size_in_MB) From f68967ae295d877d7358150b00edc1d71e0cf772 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 17:04:14 +0800 Subject: [PATCH 31/32] tmp solution for fuse_grad_merge + optimize_cast --- .../sharding/offload_helper.py | 3 ++ .../test_fleet_sharding_meta_optimizer.py | 30 +++++++++---------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py index a96705b09e835e..8aee34960332ac 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py @@ -122,6 +122,9 @@ def remove_param(input_name): for idx, op in enumerate(block.ops): if is_optimizer_op(op): break + # TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast + if not offload and op.type == 'coalesce_tensor': + continue for input_name in op.desc.input_arg_names(): if input_name not in param_to_idx: continue diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index ac43b9e358cc2b..3b0df74d3e6b4b 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -1086,24 +1086,24 @@ def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast_with_gradient_fuse( # ring: mp, pp_group, pp_pair, pp_pair self.assertEqual(startup_prog_op_types, [ - 'uniform_random', 'fill_constant', 'uniform_random', - 'fill_constant', 'uniform_random', 'fill_constant', - 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'uniform_random', 'cast', 'fill_constant', 'cast', 'uniform_random', + 'cast', 'fill_constant', 'cast', 'uniform_random', 'cast', + 'fill_constant', 'cast', 'uniform_random', 'cast', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', - 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_sync_comm_stream' ]) self.assertEqual(main_prog_op_types, [ - 'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', - 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', - 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add', - 'softmax', 'cross_entropy2', 'mean', 'elementwise_mul', - 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', - 'coalesce_tensor', 'fill_constant', 'scale', 'scale', - 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', + 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', + 'cast', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', + 'elementwise_mul', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'coalesce_tensor', 'fill_constant', 'scale', + 'scale', 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', @@ -1112,8 +1112,9 @@ def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast_with_gradient_fuse( 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', - 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', - 'momentum', 'momentum' + 'cast', 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'cast', + 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'momentum', + 'cast' ]) # amp check_finite_and_unscale, allreduce(pp) @@ -1155,7 +1156,6 @@ def test_hybrid_with_pp_dp_amp_with_gradient_fuse(self): "mp_degree": 1, "pp_degree": 2, "dp_degree": 2, - "optimize_cast": True, } strategy.pipeline = True strategy.pipeline_configs = { From c87cb9dd230966aeffe5cb78ae9f7688eaa1af3e Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 19 Aug 2021 17:24:14 +0800 Subject: [PATCH 32/32] add more comment, rename var name --- python/paddle/fluid/optimizer.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 7a30b3c5745373..58f61b77fd1fe0 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5254,7 +5254,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): # create fused vars for grad and param for grad_param_segment in grad_param_segments: grad_segment = grad_param_segment[0] - fused_grad_segment = grad_param_segment[2] + merged_grad_segment = grad_param_segment[2] fused_grad = main_block.create_var( name='FusedGrad_{}'.format(grad_segment[0].name), dtype=grad_segment[0].dtype, @@ -5262,12 +5262,12 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): stop_gradient=False) # keep the '.cast_fp16' info in the fuse var name fused_merged_grad_name_prefix = 'FusedMergedGrad.cast_fp16.' if \ - fused_grad_segment[0].dtype == paddle.float16 else 'FusedMergedGrad' + merged_grad_segment[0].dtype == paddle.float16 else 'FusedMergedGrad' fused_merged_grad_name = fused_merged_grad_name_prefix + '_{}'.format( - fused_grad_segment[0].name) + merged_grad_segment[0].name) fused_merged_grad = main_block.create_var( name=fused_merged_grad_name, - dtype=fused_grad_segment[0].dtype, + dtype=merged_grad_segment[0].dtype, persistable=True, stop_gradient=False) fused_gradients.append(fused_grad) @@ -5298,6 +5298,17 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): outputs={"Output": grads, "FusedOutput": fused_grad}, attrs={ + # Explanation of user_defined_size_of_dtype: + # In coalesce op, the align size is 256 bytes + # the float takes 4 bytes while fp16 takes 2 bytes. + # To meet the requirement, 128 fp16 or 64 float will be aligned + # Think the total shape of the input tensors if [64], + # if the dtype is float, then the shape of the fuse var is [64] + # however if the dytpe if fp16, the shape of the fuse var is [128], + # which will cause the fused vars' shape vary between each other. + # To make sure the shape of the fused vars are identical, + # we set the dtype of float and fp16 both to 2. + # Under this way, the fused vars' shape for float and fp16 are all [128] "user_defined_size_of_dtype": 2, "copy_data": False, "use_align": True, @@ -5376,7 +5387,7 @@ def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): name=fp32_grad_name, dtype=paddle.float32, shape=real_grad.shape, - persistable=True, + persistable=False, stop_gradient=False) main_block._insert_op( index=first_opt_op_idx + offset,