diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 58ae35f2689799..1de6d26d05b9e4 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -42,7 +42,6 @@ message ShardingConfig { optional bool optimize_offload = 9 [ default = false ]; optional bool pp_allreduce_in_optimize = 10 [ default = false ]; optional int32 pp_degree = 11 [ default = 1 ]; - optional bool optimize_cast = 12 [ default = false ]; } message HybridConfig { @@ -200,7 +199,6 @@ message DistributedStrategy { optional int32 fuse_grad_size_in_num = 31 [ default = 8 ]; optional bool calc_comm_same_stream = 32 [ default = false ]; optional bool asp = 33 [ default = false ]; - optional bool fuse_grad_merge = 34 [ default = false ]; optional RecomputeConfig recompute_configs = 101; optional AMPConfig amp_configs = 102; diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index c9cc01b8b17dc7..029876ad9d23f1 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -127,7 +127,6 @@ class CoalesceTensorOpKernel : public framework::OpKernel { auto in_tensors = context.MultiInput("Input"); bool use_align = context.Attr("use_align"); auto align_size = context.Attr("align_size"); - auto size_of_dtype = context.Attr("user_defined_size_of_dtype"); if (context.Attr("check_name")) { for (size_t i = 0; i < in_var_names.size(); ++i) { @@ -152,9 +151,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel { size_t numel = 0; auto dtype = static_cast( context.Attr("dtype")); - if (size_of_dtype == -1) { - size_of_dtype = framework::SizeOfType(dtype); - } + size_t size_of_dtype = framework::SizeOfType(dtype); GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype, context.GetPlace(), use_align, align_size); @@ -167,6 +164,12 @@ class CoalesceTensorOpKernel : public framework::OpKernel { auto out_tensors = context.MultiOutput("Output"); size_t offset = 0; if (context.Attr("copy_data")) { +#ifdef PADDLE_WITH_ASCEND_CL + framework::VisitDataType( + dtype, + FillConstantVisitor( + dev_ctx, fused_tensor, static_cast(0.0), dtype, context)); +#endif for (size_t i = 0; i < in_var_names.size(); ++i) { size_t len = static_cast(in_tensors[i]->numel()); auto sub_tensor = fused_tensor->Slice( @@ -181,10 +184,10 @@ class CoalesceTensorOpKernel : public framework::OpKernel { : len; } } else if (context.Attr("set_constant")) { - framework::VisitDataType( - dtype, FillConstantVisitor( - dev_ctx, fused_tensor, context.Attr("constant"), - dtype, context)); + // TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION. + math::SetConstant set_constant; + set_constant(dev_ctx, fused_tensor, + static_cast(context.Attr("constant"))); } else if (context.Attr("persist_output")) { for (size_t i = 0; i < out_var_names.size(); ++i) { size_t len = static_cast(out_tensors[i]->numel()); @@ -287,13 +290,10 @@ class CoalesceTensorOp : public framework::OperatorWithKernel { } auto use_align = ctx->Attrs().Get("use_align"); auto align_size = ctx->Attrs().Get("align_size"); - auto size_of_dtype = ctx->Attrs().Get("user_defined_size_of_dtype"); auto dtype = static_cast( ctx->Attrs().Get("dtype")); - if (size_of_dtype == -1) { - size_of_dtype = framework::SizeOfType(dtype); - } + size_t size_of_dtype = framework::SizeOfType(dtype); auto alignment = [](size_t size, size_t align_size) { size_t remaining = size % align_size; @@ -371,15 +371,6 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault(true); AddAttr("align_size", "The alignment size when use_align is True") .SetDefault(-1); - AddAttr("user_defined_size_of_dtype", - "The user defined size of dtype. This is used to coalesce " - "grad vars and merged_grad vars at the same time. For some " - "strategy, the dtype of fused_grad_vars and the dtype of " - "fused_grad_merged_vars are not identical, which will cause " - "the shape of these two coalesced vars are different. To " - "make sure the shape of these two vars are identical with " - "each other, this attr is added.") - .SetDefault(-1); AddComment(R"DOC( CoalesceTensor Operator. diff --git a/paddle/fluid/operators/collective/c_allreduce_op.h b/paddle/fluid/operators/collective/c_allreduce_op.h index 4eff406893757e..1076e84e613f4a 100644 --- a/paddle/fluid/operators/collective/c_allreduce_op.h +++ b/paddle/fluid/operators/collective/c_allreduce_op.h @@ -45,10 +45,6 @@ limitations under the License. */ #include "paddle/fluid/platform/hccl_helper.h" #endif -#if defined(PADDLE_WITH_ASCEND_CL) -DECLARE_bool(hccl_check_nan); -#endif - namespace paddle { namespace operators { @@ -144,7 +140,6 @@ inline bool ContainsNan(const paddle::platform::NPUDeviceContext& dev_ctx, try { const auto& runner_mean = paddle::operators::NpuOpRunner( "ReduceMeanD", {*in}, {mean}, {{"axes", axes}, {"keep_dims", false}}); - runner_mean.Run(stream); TensorToVector(mean, dev_ctx, &vec); } catch (...) { LOG(WARNING) << "ContainsNan catch exception"; @@ -238,11 +233,9 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { break; } case framework::proto::VarType::FP32: { - if (FLAGS_hccl_check_nan) { - VLOG(3) << "prepare to FoundNanInf"; - found_nan = ContainsNan(*dev_ctx, dev_ctx->stream(), in); - VLOG(3) << "check_numerics:" << found_nan; - } + VLOG(4) << "prepare to FoundNanInf"; + found_nan = ContainsNan(*dev_ctx, dev_ctx->stream(), in); + VLOG(4) << "check_numerics:" << found_nan; break; } default: diff --git a/paddle/fluid/operators/elementwise/elementwise_add_op_npu.cc b/paddle/fluid/operators/elementwise/elementwise_add_op_npu.cc index cd1d50a017c363..1ba6c4cb1932b1 100644 --- a/paddle/fluid/operators/elementwise/elementwise_add_op_npu.cc +++ b/paddle/fluid/operators/elementwise/elementwise_add_op_npu.cc @@ -42,22 +42,27 @@ class ElementwiseAddNPUKernel : public framework::OpKernel { auto y_dims = y->dims(); axis = (axis == -1 ? std::abs(x_dims.size() - y_dims.size()) : axis); if (x_dims.size() >= y_dims.size()) { - direct_compute = x_dims.size() == (y_dims.size() + axis); + direct_compute = + y_dims == framework::slice_ddim(x_dims, axis, x_dims.size()); } else { - direct_compute = y_dims.size() == (x_dims.size() + axis); + direct_compute = + x_dims == framework::slice_ddim(y_dims, axis, y_dims.size()); } + Tensor transformed_x, transformed_y; if (direct_compute) { - const auto& runner = NpuOpRunner("Add", {*x, *y}, {*out}, {}); - runner.Run(dev_ctx.stream()); + transformed_x.ShareDataWith(*x); + transformed_y.ShareDataWith(*y); } else { - Tensor transformed_x, transformed_y; NpuElementWiseOpBroadcast(dev_ctx, x, y, axis, &transformed_x, &transformed_y); - const auto& runner = - NpuOpRunner("Add", {transformed_x, transformed_y}, {*out}, {}); - runner.Run(dev_ctx.stream()); } + const auto& runner = + NpuOpRunner("Add", {transformed_x, transformed_y}, {*out}, {}); + auto stream = + ctx.template device_context() + .stream(); + runner.Run(stream); } }; diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index f18eab3246547b..33d9c6efef852d 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -93,9 +93,6 @@ DEFINE_string(selected_npus, "", "This option is useful when doing multi process training and " "each process have only one device (NPU). If you want to use " "all visible devices, set this to empty string."); -DEFINE_bool(hccl_check_nan, false, - "Check Nan in tensor before hccl_allreduce_sum otherwise it'll " - "core when meets Nan value"); DEFINE_string( npu_config_path, "", "The absolute path of configuration json file, like: /tmp/config.json. " diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index d19cfd21698021..051f6b11c2609a 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -888,9 +888,6 @@ def sharding_configs(self): pp_allreduce_in_optimize(bool, optional): [Hybrid parallelism ONLY] move the allreduce operations from backward stage to update(optimize) stage when pipeline parallelsim is on. This configuration will affect the communication speed of Hybrid parallelism training depeneded on network topology. this strategy is experimental by now.. Default is False. - optimize_cast(bool, optional): [Hybrid parallelism ONLY] Move the cast op of AMP which cast fp32 param to fp16 param to optimizer. optimize_cast will persist fp16 param, it - will take more memory, but will be faster, trade space for time. Recommend to turn on only when using pipeline or gradient_merge_acc_step large. - Examples: @@ -967,28 +964,6 @@ def _calc_comm_same_stream(self, same): "WARNING: calc_comm_same_stream should have value of boolean type" ) - @property - def fuse_grad_merge(self): - """ - Set whether fuse the grad for gradient merge. - Note: this flag will only effect the gradient merge under pipeline mode - The default value for the fuse_grad_merge is False - Examples: - .. code-block:: python - import paddle.distributed.fleet as fleet - strategy = fleet.DistributedStrategy() - strategy.fuse_param_grad = True - """ - return self.strategy.fuse_grad_merge - - @fuse_grad_merge.setter - @is_strict_auto - def fuse_grad_merge(self, fuse_grad_merge): - if isinstance(fuse_grad_merge, bool): - self.strategy.fuse_grad_merge = fuse_grad_merge - else: - print("WARNING: fuse_grad_merge should have value of boolean type") - @property def fuse_grad_size_in_num(self): """ diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py index 8aee34960332ac..f6741b165ce072 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ..common import is_optimizer_op, OP_ROLE_KEY, OpRole, is_update_op +from ..common import is_optimizer_op, OP_ROLE_KEY, OpRole from paddle.fluid import core, unique_name __all__ = [] @@ -84,7 +84,7 @@ def _create_offload_var(self, var_name, offload_var_name, blocks): dtype=var.dtype, persistable=True) - def offload_fp32param(self, block, startup_block, offload=True): + def offload_fp32param(self, block, startup_block): """ (p_fp16) = cast(p) (p_fp16_recompute) = cast(p) @@ -113,23 +113,19 @@ def remove_param(input_name): # step1: record param for idx, op in reversed(list(enumerate(block.ops))): - if is_update_op(op): + if op.type in ('adam', 'momentum', 'lars', 'lamb'): param = op.desc.input("Param")[0] param_to_idx[param] = idx - # step2: remove param which can't offload and - # record param->fp16param, fp16param->recompute_var + # step2: remove param which can't offload for idx, op in enumerate(block.ops): if is_optimizer_op(op): break - # TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast - if not offload and op.type == 'coalesce_tensor': - continue for input_name in op.desc.input_arg_names(): if input_name not in param_to_idx: continue - # param which will be used by fp32 op + # param is real used by fp32 op if op.type != 'cast': remove_param(input_name) continue @@ -158,19 +154,17 @@ def remove_param(input_name): # step3: main_block add offload, cast op # change recompute to fp16, remove cast(param) to fp16 for idx, op in reversed(list(enumerate(block.ops))): - if is_update_op(op): + if op.type in ('adam', 'momentum', 'lars', 'lamb'): param = op.desc.input("Param")[0] if param not in param_to_idx: continue # step3.1: create offload_var offload_var_name = self._get_offload_var_name(param) param_name_to_offload_name[param] = offload_var_name - if offload: - self._create_offload_var(param, offload_var_name, - [block, startup_block]) + self._create_offload_var(param, offload_var_name, + [block, startup_block]) - # step3.2: insert cast op and offload op - self._insert_offload_op(block, idx + 1, param, - offload_var_name) + # step3.2: insert cast op and offload op + self._insert_offload_op(block, idx + 1, param, offload_var_name) assert param in param_to_fp16 fp16_param_name = param_to_fp16[param] @@ -179,9 +173,8 @@ def remove_param(input_name): self._insert_cast_op(block, idx + 1, param, param_to_fp16[param]) - if offload: - # step3.3: insert fetch op - self._insert_fetch_op(block, idx, offload_var_name, param) + # step3.3: insert fetch op + self._insert_fetch_op(block, idx, offload_var_name, param) continue # step3.4: remove cast op @@ -213,10 +206,9 @@ def remove_param(input_name): if out_name in param_name_to_offload_name: var_name = out_name - if offload: - offload_var_name = param_name_to_offload_name[var_name] - self._insert_offload_op(startup_block, idx + 1, - var_name, offload_var_name) + offload_var_name = param_name_to_offload_name[var_name] + self._insert_offload_op(startup_block, idx + 1, var_name, + offload_var_name) self._insert_cast_op(startup_block, idx + 1, var_name, param_to_fp16[var_name]) @@ -225,19 +217,6 @@ def remove_param(input_name): block._sync_with_cpp() startup_block._sync_with_cpp() - def cast_fp32param_in_optimize(self, block, startup_block): - """ - (p_fp16) = cast(p) - (p_fp16_recompute) = cast(p) - (pout,) = adam(p) - ===========================> - rename(p_fp16_recompute, p_fp16) - - (pout,) = adam(p) - (p_fp16) = cast(p) - """ - self.offload_fp32param(block, startup_block, offload=False) - def offload(self, block, startup_block): """ (m1, m2) = prefetch(m1@offload, m2@offload) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 16fbc7bea6c8b6..52ef843aa0d751 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -341,11 +341,7 @@ def insert_allreduce_ops(block, if len(allreduce_vars) == 0: return - if user_defined_strategy and \ - user_defined_strategy.fuse_all_reduce_ops and \ - not user_defined_strategy.fuse_grad_merge: - # If fuse_grad_merge is enable, the grad vars have already been fused during - # gradient merge pass, therefore, those vars are not need to be fused here + if user_defined_strategy and user_defined_strategy.fuse_all_reduce_ops: insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars, op_role, use_calc_stream, user_defined_strategy.fuse_grad_size_in_MB) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index a76a70cdcab3df..5d2095200942c7 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -319,9 +319,7 @@ def _insert_allreduce_for_pp(self): main_block._remove_op(idx) accumulated_grad_names = self._pp_optimizer._accumulate_gradients( - main_block, - fp16_allreduce=fp16_allreduce, - user_defined_strategy=strategy) + main_block, fp16_allreduce=fp16_allreduce) len_of_ops = len(main_block.ops) first_optimize_op_index = get_first_optimize_op_idx(main_block) @@ -405,14 +403,7 @@ def _apply_optimize_offload_pass(self): logger.info("Sharding with optimize offload !") offload_helper = OffloadHelper() offload_helper.offload(main_block, startup_block) - # The optimize_cast is already included in offload_fp32param offload_helper.offload_fp32param(main_block, startup_block) - elif sharding_configs['optimize_cast']: - logger.info("Sharding with optimize cast !") - # NOTE(wangxi): optimize_cast will persist fp16 param, it - # will take more memory, but will be faster. Trade space for time. - offload_helper = OffloadHelper() - offload_helper.cast_fp32param_in_optimize(main_block, startup_block) def _dump_program_for_debug(self): main_block = self._main_program.global_block() @@ -456,7 +447,6 @@ def minimize_impl(self, # loss div dp_degree self._insert_loss_grad_scale_op() - # apply optimize offload or optimize cast self._apply_optimize_offload_pass() # step6: (optional) sharding gradient merge diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 8bb4d82b724785..5d1274a1f05324 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -248,7 +248,6 @@ def __bootstrap__(): 'gpu_memory_limit_mb', 'npu_config_path', 'get_host_by_name_time', - 'hccl_check_nan', ] core.init_gflags(["--tryfromenv=" + ",".join(read_env_flags)]) diff --git a/python/paddle/fluid/clip.py b/python/paddle/fluid/clip.py index d48cea48a76fd4..04fb45cd3ae22d 100644 --- a/python/paddle/fluid/clip.py +++ b/python/paddle/fluid/clip.py @@ -40,7 +40,7 @@ def _squared_l2_norm(x): This OP returns the squared L2 norm of a tensor. """ - if core.is_compiled_with_xpu(): + if core.is_compiled_with_npu() or core.is_compiled_with_xpu(): square = layers.square(x) sum_square = layers.reduce_sum(square) return sum_square diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index eb3d559ddcde9e..378902d8dde813 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5044,18 +5044,11 @@ def _rename_gradient_var_name(self, block): def _accumulate_gradients(self, block, pp_allreduce_in_optimize=False, - fp16_allreduce=False, - user_defined_strategy=None): + fp16_allreduce=False): """ Create a new merged gradient for each parameter and accumulate the corresponding gradient to it. """ - if user_defined_strategy and user_defined_strategy.fuse_grad_merge: - fused_gradient_names = self._accumulate_gradients_with_fuse( - block, fp16_allreduce, - user_defined_strategy.fuse_grad_size_in_MB) - return fused_gradient_names - merged_gradient_names = [] first_opt_op_idx = None @@ -5185,273 +5178,6 @@ def _accumulate_gradients(self, return merged_gradient_names - def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): - first_opt_op_idx = None - grad_param_pairs = [] - # obtain all param/grad pairs that needed to be fused - for index, op in reversed(tuple(enumerate(list(main_block.ops)))): - # remove the cast op of fp16 grad to fp32 grad - if self._is_optimize_op(op) and op.type == 'cast': - in_name = op.input_arg_names[0] - out_name = op.output_arg_names[0] - if out_name.strip('@GRAD') in self._param_device_map: - assert in_name.replace('.cast_fp16', '') == out_name - main_block._remove_op(index) - continue - - if self._is_backward_op(op) and first_opt_op_idx is None: - first_opt_op_idx = index + 1 - # no optimize phase - if first_opt_op_idx == len(main_block.ops): - return - - if self._is_backward_op(op) and ( - self._op_role_var_key in op.attr_names): - op_role_var = op.attr(self._op_role_var_key) - if len(op_role_var) == 0: - continue - assert len(op_role_var) % 2 == 0 - for i in range(0, len(op_role_var), 2): - param_name = op_role_var[i] - if not main_block.has_var(param_name): - continue - if '@BroadCast' in param_name: - continue - grad_param_pairs.append( - (op_role_var[i + 1], op_role_var[i])) - - if len(grad_param_pairs) == 0: - return - - grad_param_pairs = self._sort_grad_param_by_dtype(main_block, - grad_param_pairs) - - grad_param_segments = [] - merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' - dtype = paddle.float16 if fp16 else paddle.float32 - cur_size = 0. - last_dtype = None - # split the grad based on dtype and fused size - for grad, param in grad_param_pairs: - real_grad = main_block.var(grad) - # create the gradient merged var for each grad - merged_grad_var = main_block.create_var( - name=param + core.grad_var_suffix() + merged_suffix, - dtype=dtype, - shape=real_grad.shape, - persistable=True, - stop_gradient=False) - real_param = main_block.var(param) - tmp_size = self._get_var_size(real_grad) - # two strategies for splitting the grad - # 1. the current segment's size reach the user defined grad_size_in_MB - # 2. the upcoming grad holds different dtype compared with grads in current segment - if len(grad_param_segments) == 0 \ - or cur_size + tmp_size > fused_size \ - or real_grad.dtype != last_dtype: - grad_param_segments.append( - ([real_grad], [real_param], [merged_grad_var])) - last_dtype = real_grad.dtype - cur_size = 0. - else: - grad_param_segments[-1][0].append(real_grad) - grad_param_segments[-1][1].append(real_param) - grad_param_segments[-1][2].append(merged_grad_var) - cur_size += tmp_size - - fused_gradients = [] - fused_merged_gradients = [] - # create fused vars for grad and param - for grad_param_segment in grad_param_segments: - grad_segment = grad_param_segment[0] - merged_grad_segment = grad_param_segment[2] - fused_grad = main_block.create_var( - name='FusedGrad_{}'.format(grad_segment[0].name), - dtype=grad_segment[0].dtype, - persistable=False, - stop_gradient=False) - # keep the '.cast_fp16' info in the fuse var name - fused_merged_grad_name_prefix = 'FusedMergedGrad.cast_fp16.' if \ - merged_grad_segment[0].dtype == paddle.float16 else 'FusedMergedGrad' - fused_merged_grad_name = fused_merged_grad_name_prefix + '_{}'.format( - merged_grad_segment[0].name) - fused_merged_grad = main_block.create_var( - name=fused_merged_grad_name, - dtype=merged_grad_segment[0].dtype, - persistable=True, - stop_gradient=False) - fused_gradients.append(fused_grad) - fused_merged_gradients.append(fused_merged_grad) - - assert len(fused_gradients) == len(grad_param_segments) - assert len(fused_merged_gradients) == len(grad_param_segments) - - # insert coalesce op at the start of the backward pass - # use param as the coalesce input to make sure the two Fused vars are in same shape - first_back_op_idx = None - for index, op in enumerate(main_block.ops): - if self._is_backward_op(op) and first_back_op_idx is None: - first_back_op_idx = index - break - assert first_back_op_idx is not None - offset = 0 - for i in range(len(grad_param_segments)): - fused_grad = fused_gradients[i] - fused_merged_grad = fused_merged_gradients[i] - grads = grad_param_segments[i][0] - params = grad_param_segments[i][1] - merged_grads = grad_param_segments[i][2] - main_block._insert_op_without_sync( - first_back_op_idx + offset, - type="coalesce_tensor", - inputs={"Input": params}, - outputs={"Output": grads, - "FusedOutput": fused_grad}, - attrs={ - # Explanation of user_defined_size_of_dtype: - # In coalesce op, the align size is 256 bytes - # the float takes 4 bytes while fp16 takes 2 bytes. - # To meet the requirement, 128 fp16 or 64 float will be aligned - # Think the total shape of the input tensors if [64], - # if the dtype is float, then the shape of the fuse var is [64] - # however if the dytpe if fp16, the shape of the fuse var is [128], - # which will cause the fused vars' shape vary between each other. - # To make sure the shape of the fused vars are identical, - # we set the dtype of float and fp16 both to 2. - # Under this way, the fused vars' shape for float and fp16 are all [128] - "user_defined_size_of_dtype": 2, - "copy_data": False, - "use_align": True, - "dtype": grads[0].dtype, - self._op_role_key: self._op_role.Backward - }) - offset += 1 - # For the gradient_merged_fused_var, given a init value during the coalesce op - # this will remove a problematic fill_constant op. This op role of this coalesce - # is set to be LRSched to make this coalesce (with init) only run once - main_block._insert_op_without_sync( - first_back_op_idx + offset, - type="coalesce_tensor", - inputs={"Input": params}, - outputs={ - "Output": merged_grads, - "FusedOutput": fused_merged_grad - }, - attrs={ - "user_defined_size_of_dtype": 2, - "set_constant": True, - "constant": float(0.0), - "copy_data": False, - "use_align": True, - "dtype": merged_grads[0].dtype, - self._op_role_key: self._op_role.Optimize.LRSched - }) - offset += 1 - - # insert gradient merge relating ops - first_opt_op_idx += offset - offset = 0 - for i in range(len(fused_gradients)): - fused_grad = fused_gradients[i] - fused_merged_grad = fused_merged_gradients[i] - is_fp16_grad = 'cast_fp16' in fused_grad.name - need_cast = (is_fp16_grad is not fp16) - if need_cast: - # for fp16 allreduce, cast fp32 grad to fp16 - # for fp32 allreduce, cast fp16 grad to fp32 - cast_grad_var_name = fused_grad.name + '@TMP' - cast_grad_var = main_block.create_var( - name=cast_grad_var_name, - dtype=dtype, - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=first_opt_op_idx + offset, - type='cast', - inputs={'X': fused_grad}, - outputs={'Out': cast_grad_var}, - attrs={ - 'in_dtype': fused_grad.dtype, - 'out_dtype': cast_grad_var.dtype, - self._op_role_key: self._op_role.Backward, - }) - offset += 1 - fused_grad = cast_grad_var - main_block._insert_op( - index=first_opt_op_idx + offset, - type='sum', - inputs={'X': [fused_merged_grad, fused_grad]}, - outputs={'Out': fused_merged_grad}, - attrs={self._op_role_key: self._op_role.Backward}) - offset += 1 - - if fp16: - # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 - for grad, param in grad_param_pairs: - real_grad = main_block.var(grad) - fp16_grad_name = param + core.grad_var_suffix() + '@MERGED@FP16' - assert main_block.has_var(fp16_grad_name) - fp16_grad = main_block.var(fp16_grad_name) - fp32_grad_name = param + core.grad_var_suffix() + '@MERGED' - fp32_grad = main_block.create_var( - name=fp32_grad_name, - dtype=paddle.float32, - shape=real_grad.shape, - persistable=False, - stop_gradient=False) - main_block._insert_op( - index=first_opt_op_idx + offset, - type='cast', - inputs={'X': fp16_grad}, - outputs={'Out': fp32_grad}, - attrs={ - 'in_dtype': paddle.float16, - 'out_dtype': paddle.float32, - self._op_role_key: self._op_role.Optimize, - }) - offset += 1 - - # replace the var with it's name, which will be used for inserting allreduce - for i in range(len(fused_merged_gradients)): - fused_merged_gradients[i] = fused_merged_gradients[i].name - - main_block._sync_with_cpp() - - return fused_merged_gradients - - def _sort_grad_param_by_dtype(self, main_block, grad_param_pairs): - # sort the grad param paris by the dtype - fp16_pairs = [] - fp32_pairs = [] - other_pairs = [] - for pairs in grad_param_pairs: - dtype = main_block.var(pairs[0]).dtype - if dtype == paddle.float32: - fp32_pairs.append(pairs) - elif dtype == paddle.float16: - fp16_pairs.append(pairs) - else: - other_pairs.append(pairs) - sorted_pairs = fp16_pairs - sorted_pairs.extend(fp32_pairs) - sorted_pairs.extend(other_pairs) - return sorted_pairs - - def _get_var_size(self, var): - dtype_to_size = { - core.VarDesc.VarType.FP16: 2, - core.VarDesc.VarType.FP32: 4, - core.VarDesc.VarType.FP64: 8, - core.VarDesc.VarType.INT16: 2, - core.VarDesc.VarType.INT32: 4, - core.VarDesc.VarType.INT64: 8, - core.VarDesc.VarType.BOOL: 1, - core.VarDesc.VarType.UINT8: 1, - } - assert -1 not in var.shape - return reduce(lambda x, y: x * y, - var.shape) * dtype_to_size[var.dtype] / 1024.0 / 1024.0 - def _add_sub_blocks(self, main_block, program_list): main_program = main_block.program for prog in program_list: diff --git a/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py index 93a969bf10f030..f1bbf0becf1950 100644 --- a/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py +++ b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py @@ -90,8 +90,7 @@ def init_attr(self): "set_constant": False, "constant": 0.5, "use_align": True, - "dtype": self.fluid_dtype, - "user_defined_size_of_dtype": 2 + "dtype": self.fluid_dtype } def test_check_output(self): diff --git a/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py b/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py index 868a72334247d0..a5b30330448d29 100644 --- a/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py +++ b/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py @@ -92,8 +92,7 @@ def init_attr(self): "copy_data": False, "set_constant": True, "constant": 0.5, - "dtype": self.fluid_dtype, - "user_defined_size_of_dtype": 2 + "dtype": self.fluid_dtype } def test_check_output(self): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index 3b0df74d3e6b4b..d70a58c7d8ab41 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -859,380 +859,6 @@ def test_hybrid_with_sharding_pp_amp_fp16allreduce_in_optimize(self): self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36002']) - def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast(self): - train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( - ) - avg_cost, strategy = self.pp_net(train_prog, startup_prog) - strategy.amp = True - strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } - strategy.sharding = True - strategy.sharding_configs = { - "sharding_degree": 1, - "mp_degree": 1, - "pp_degree": 2, - "dp_degree": 2, - "optimize_cast": True, - } - strategy.pipeline = True - strategy.pipeline_configs = { - "schedule_mode": "1F1B", - "micro_batch_size": 2, - "accumulate_steps": 4, - } - strategy.fp16_allreduce = True - self.optimizer(avg_cost, strategy, train_prog, startup_prog) - train_prog = train_prog._pipeline_opt['section_program'] - startup_prog = startup_prog._pipeline_opt['startup_program'] - - startup_prog_ops = startup_prog.global_block().ops - main_prog_ops = train_prog.global_block().ops - - # check program - startup_prog_op_types = [op.type for op in startup_prog_ops] - main_prog_op_types = [op.type for op in main_prog_ops] - - # ring: mp, pp_group, pp_pair, pp_pair - self.assertEqual(startup_prog_op_types, [ - 'uniform_random', 'cast', 'fill_constant', 'cast', 'uniform_random', - 'cast', 'fill_constant', 'cast', 'uniform_random', 'cast', - 'fill_constant', 'cast', 'uniform_random', 'cast', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', - 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', - 'c_sync_comm_stream' - ]) - - self.assertEqual(main_prog_op_types, [ - 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', - 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', - 'cast', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', - 'elementwise_mul', 'fill_constant', 'scale', 'scale', - 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', - 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', - 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', - 'fill_constant', 'cast', 'sum', 'fill_constant', 'sum', - 'fill_constant', 'sum', 'fill_constant', 'sum', 'fill_constant', - 'sum', 'fill_constant', 'sum', 'fill_constant', 'sum', - 'fill_constant', 'sum', 'coalesce_tensor', 'c_allreduce_sum', - 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', - 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', - 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', - 'cast', 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'cast', - 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'momentum', - 'cast' - ]) - - # amp check_finite_and_unscale, allreduce(pp) - self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) - - # should has ring id for pp - created_ring_ids = [ - op.desc.attr("ring_id") for op in startup_prog_ops - if op.type == "c_comm_init" - ] - self.assertIn(self.pp_pair_ring_id, created_ring_ids) - self.assertIn(self.dp_ring_id, created_ring_ids) - - # check correctness of pp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_0": - pp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) - - # check correctness of dp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_3": - dp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) - - def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_offload(self): - train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( - ) - avg_cost, strategy = self.pp_net(train_prog, startup_prog) - strategy.amp = True - strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } - strategy.sharding = True - strategy.sharding_configs = { - "sharding_degree": 1, - "mp_degree": 1, - "pp_degree": 2, - "dp_degree": 2, - "optimize_offload": True, - } - strategy.pipeline = True - strategy.pipeline_configs = { - "schedule_mode": "1F1B", - "micro_batch_size": 2, - "accumulate_steps": 4, - } - strategy.fp16_allreduce = True - self.optimizer(avg_cost, strategy, train_prog, startup_prog) - train_prog = train_prog._pipeline_opt['section_program'] - startup_prog = startup_prog._pipeline_opt['startup_program'] - - startup_prog_ops = startup_prog.global_block().ops - main_prog_ops = train_prog.global_block().ops - - # check program - startup_prog_op_types = [op.type for op in startup_prog_ops] - main_prog_op_types = [op.type for op in main_prog_ops] - - # ring: mp, pp_group, pp_pair, pp_pair - self.assertEqual(startup_prog_op_types, [ - 'uniform_random', 'cast', 'memcpy', 'fill_constant', 'cast', - 'memcpy', 'uniform_random', 'cast', 'memcpy', 'fill_constant', - 'cast', 'memcpy', 'uniform_random', 'cast', 'memcpy', - 'fill_constant', 'cast', 'memcpy', 'uniform_random', 'cast', - 'memcpy', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', - 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', - 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' - ]) - - self.assertEqual(main_prog_op_types, [ - 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', - 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', - 'cast', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', - 'elementwise_mul', 'fill_constant', 'scale', 'scale', - 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', - 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', - 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', - 'fill_constant', 'cast', 'sum', 'fill_constant', 'sum', - 'fill_constant', 'sum', 'fill_constant', 'sum', 'fill_constant', - 'sum', 'fill_constant', 'sum', 'fill_constant', 'sum', - 'fill_constant', 'sum', 'coalesce_tensor', 'c_allreduce_sum', - 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', - 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', - 'c_allreduce_max', 'cast', 'update_loss_scaling', 'memcpy', - 'momentum', 'cast', 'memcpy', 'memcpy', 'momentum', 'cast', - 'memcpy', 'memcpy', 'momentum', 'cast', 'memcpy', 'memcpy', - 'momentum', 'cast', 'memcpy', 'memcpy', 'momentum', 'cast', - 'memcpy', 'memcpy', 'momentum', 'cast', 'memcpy', 'momentum', - 'memcpy', 'momentum', 'cast', 'memcpy' - ]) - - # amp check_finite_and_unscale, allreduce(pp) - self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) - - # should has ring id for pp - created_ring_ids = [ - op.desc.attr("ring_id") for op in startup_prog_ops - if op.type == "c_comm_init" - ] - self.assertIn(self.pp_pair_ring_id, created_ring_ids) - self.assertIn(self.dp_ring_id, created_ring_ids) - - # check correctness of pp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_0": - pp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) - - # check correctness of dp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_3": - dp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) - - def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast_with_gradient_fuse( - self): - train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( - ) - avg_cost, strategy = self.pp_net(train_prog, startup_prog) - strategy.amp = True - strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } - strategy.sharding = True - strategy.sharding_configs = { - "sharding_degree": 1, - "mp_degree": 1, - "pp_degree": 2, - "dp_degree": 2, - "optimize_cast": True, - } - strategy.pipeline = True - strategy.pipeline_configs = { - "schedule_mode": "1F1B", - "micro_batch_size": 2, - "accumulate_steps": 4, - } - strategy.fp16_allreduce = True - strategy.fuse_grad_merge = True - self.optimizer(avg_cost, strategy, train_prog, startup_prog) - train_prog = train_prog._pipeline_opt['section_program'] - startup_prog = startup_prog._pipeline_opt['startup_program'] - - startup_prog_ops = startup_prog.global_block().ops - main_prog_ops = train_prog.global_block().ops - - # check program - startup_prog_op_types = [op.type for op in startup_prog_ops] - main_prog_op_types = [op.type for op in main_prog_ops] - - # ring: mp, pp_group, pp_pair, pp_pair - self.assertEqual(startup_prog_op_types, [ - 'uniform_random', 'cast', 'fill_constant', 'cast', 'uniform_random', - 'cast', 'fill_constant', 'cast', 'uniform_random', 'cast', - 'fill_constant', 'cast', 'uniform_random', 'cast', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', - 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', - 'c_sync_comm_stream' - ]) - - self.assertEqual(main_prog_op_types, [ - 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', - 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', - 'cast', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', - 'elementwise_mul', 'coalesce_tensor', 'coalesce_tensor', - 'coalesce_tensor', 'coalesce_tensor', 'fill_constant', 'scale', - 'scale', 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', - 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', - 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', - 'sum', 'cast', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', 'cast', - 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', - 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', - 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', - 'cast', 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'cast', - 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'momentum', - 'cast' - ]) - - # amp check_finite_and_unscale, allreduce(pp) - self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) - - # should has ring id for pp - created_ring_ids = [ - op.desc.attr("ring_id") for op in startup_prog_ops - if op.type == "c_comm_init" - ] - self.assertIn(self.pp_pair_ring_id, created_ring_ids) - self.assertIn(self.dp_ring_id, created_ring_ids) - - # check correctness of pp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_0": - pp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) - - # check correctness of dp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_3": - dp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) - - def test_hybrid_with_pp_dp_amp_with_gradient_fuse(self): - train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( - ) - avg_cost, strategy = self.pp_net(train_prog, startup_prog) - strategy.amp = True - strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } - strategy.sharding = True - strategy.sharding_configs = { - "sharding_degree": 1, - "mp_degree": 1, - "pp_degree": 2, - "dp_degree": 2, - } - strategy.pipeline = True - strategy.pipeline_configs = { - "schedule_mode": "1F1B", - "micro_batch_size": 2, - "accumulate_steps": 4, - } - strategy.fuse_grad_merge = True - self.optimizer(avg_cost, strategy, train_prog, startup_prog) - train_prog = train_prog._pipeline_opt['section_program'] - startup_prog = startup_prog._pipeline_opt['startup_program'] - - startup_prog_ops = startup_prog.global_block().ops - main_prog_ops = train_prog.global_block().ops - - # check program - startup_prog_op_types = [op.type for op in startup_prog_ops] - main_prog_op_types = [op.type for op in main_prog_ops] - - # ring: mp, pp_group, pp_pair, pp_pair - self.assertEqual(startup_prog_op_types, [ - 'uniform_random', 'fill_constant', 'uniform_random', - 'fill_constant', 'uniform_random', 'fill_constant', - 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', - 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', - 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', - 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' - ]) - - self.assertEqual(main_prog_op_types, [ - 'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', - 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', - 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add', - 'softmax', 'cross_entropy2', 'mean', 'elementwise_mul', - 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', - 'coalesce_tensor', 'fill_constant', 'scale', 'scale', - 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', - 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', - 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'tanh_grad', - 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', - 'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', - 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', - 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', - 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', - 'momentum', 'momentum' - ]) - - # amp check_finite_and_unscale, allreduce(pp) - self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) - - # should has ring id for pp - created_ring_ids = [ - op.desc.attr("ring_id") for op in startup_prog_ops - if op.type == "c_comm_init" - ] - self.assertIn(self.pp_pair_ring_id, created_ring_ids) - self.assertIn(self.dp_ring_id, created_ring_ids) - - # check correctness of pp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_0": - pp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) - - # check correctness of dp group - for op in startup_prog_ops: - if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ - 0] == "comm_id_3": - dp_group_waiting_ports = op.desc.attr("other_endpoints") - - self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) - if __name__ == "__main__": unittest.main()