Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ message ShardingConfig {
optional bool optimize_offload = 9 [ default = false ];
optional bool pp_allreduce_in_optimize = 10 [ default = false ];
optional int32 pp_degree = 11 [ default = 1 ];
optional bool optimize_cast = 12 [ default = false ];
}

message HybridConfig {
Expand Down Expand Up @@ -200,7 +199,6 @@ message DistributedStrategy {
optional int32 fuse_grad_size_in_num = 31 [ default = 8 ];
optional bool calc_comm_same_stream = 32 [ default = false ];
optional bool asp = 33 [ default = false ];
optional bool fuse_grad_merge = 34 [ default = false ];

optional RecomputeConfig recompute_configs = 101;
optional AMPConfig amp_configs = 102;
Expand Down
33 changes: 12 additions & 21 deletions paddle/fluid/operators/coalesce_tensor_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
auto in_tensors = context.MultiInput<framework::LoDTensor>("Input");
bool use_align = context.Attr<bool>("use_align");
auto align_size = context.Attr<int>("align_size");
auto size_of_dtype = context.Attr<int>("user_defined_size_of_dtype");

if (context.Attr<bool>("check_name")) {
for (size_t i = 0; i < in_var_names.size(); ++i) {
Expand All @@ -152,9 +151,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
size_t numel = 0;
auto dtype = static_cast<framework::proto::VarType::Type>(
context.Attr<int>("dtype"));
if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}
size_t size_of_dtype = framework::SizeOfType(dtype);
GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype,
context.GetPlace(), use_align, align_size);

Expand All @@ -167,6 +164,12 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
auto out_tensors = context.MultiOutput<framework::LoDTensor>("Output");
size_t offset = 0;
if (context.Attr<bool>("copy_data")) {
#ifdef PADDLE_WITH_ASCEND_CL
framework::VisitDataType(
dtype,
FillConstantVisitor<DeviceContext>(
dev_ctx, fused_tensor, static_cast<float>(0.0), dtype, context));
#endif
for (size_t i = 0; i < in_var_names.size(); ++i) {
size_t len = static_cast<size_t>(in_tensors[i]->numel());
auto sub_tensor = fused_tensor->Slice(
Expand All @@ -181,10 +184,10 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
: len;
}
} else if (context.Attr<bool>("set_constant")) {
framework::VisitDataType(
dtype, FillConstantVisitor<DeviceContext>(
dev_ctx, fused_tensor, context.Attr<float>("constant"),
dtype, context));
// TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION.
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx, fused_tensor,
static_cast<T>(context.Attr<float>("constant")));
} else if (context.Attr<bool>("persist_output")) {
for (size_t i = 0; i < out_var_names.size(); ++i) {
size_t len = static_cast<size_t>(out_tensors[i]->numel());
Expand Down Expand Up @@ -287,13 +290,10 @@ class CoalesceTensorOp : public framework::OperatorWithKernel {
}
auto use_align = ctx->Attrs().Get<bool>("use_align");
auto align_size = ctx->Attrs().Get<int>("align_size");
auto size_of_dtype = ctx->Attrs().Get<int>("user_defined_size_of_dtype");

auto dtype = static_cast<framework::proto::VarType::Type>(
ctx->Attrs().Get<int>("dtype"));
if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}
size_t size_of_dtype = framework::SizeOfType(dtype);

auto alignment = [](size_t size, size_t align_size) {
size_t remaining = size % align_size;
Expand Down Expand Up @@ -371,15 +371,6 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
.SetDefault(true);
AddAttr<int>("align_size", "The alignment size when use_align is True")
.SetDefault(-1);
AddAttr<int>("user_defined_size_of_dtype",
"The user defined size of dtype. This is used to coalesce "
"grad vars and merged_grad vars at the same time. For some "
"strategy, the dtype of fused_grad_vars and the dtype of "
"fused_grad_merged_vars are not identical, which will cause "
"the shape of these two coalesced vars are different. To "
"make sure the shape of these two vars are identical with "
"each other, this attr is added.")
.SetDefault(-1);
AddComment(R"DOC(
CoalesceTensor Operator.

Expand Down
13 changes: 3 additions & 10 deletions paddle/fluid/operators/collective/c_allreduce_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ limitations under the License. */
#include "paddle/fluid/platform/hccl_helper.h"
#endif

#if defined(PADDLE_WITH_ASCEND_CL)
DECLARE_bool(hccl_check_nan);
#endif

namespace paddle {
namespace operators {

Expand Down Expand Up @@ -144,7 +140,6 @@ inline bool ContainsNan(const paddle::platform::NPUDeviceContext& dev_ctx,
try {
const auto& runner_mean = paddle::operators::NpuOpRunner(
"ReduceMeanD", {*in}, {mean}, {{"axes", axes}, {"keep_dims", false}});
runner_mean.Run(stream);
TensorToVector(mean, dev_ctx, &vec);
} catch (...) {
LOG(WARNING) << "ContainsNan catch exception";
Expand Down Expand Up @@ -238,11 +233,9 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel<T> {
break;
}
case framework::proto::VarType::FP32: {
if (FLAGS_hccl_check_nan) {
VLOG(3) << "prepare to FoundNanInf";
found_nan = ContainsNan(*dev_ctx, dev_ctx->stream(), in);
VLOG(3) << "check_numerics:" << found_nan;
}
VLOG(4) << "prepare to FoundNanInf";
found_nan = ContainsNan(*dev_ctx, dev_ctx->stream(), in);
VLOG(4) << "check_numerics:" << found_nan;
break;
}
default:
Expand Down
21 changes: 13 additions & 8 deletions paddle/fluid/operators/elementwise/elementwise_add_op_npu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,27 @@ class ElementwiseAddNPUKernel : public framework::OpKernel<T> {
auto y_dims = y->dims();
axis = (axis == -1 ? std::abs(x_dims.size() - y_dims.size()) : axis);
if (x_dims.size() >= y_dims.size()) {
direct_compute = x_dims.size() == (y_dims.size() + axis);
direct_compute =
y_dims == framework::slice_ddim(x_dims, axis, x_dims.size());
} else {
direct_compute = y_dims.size() == (x_dims.size() + axis);
direct_compute =
x_dims == framework::slice_ddim(y_dims, axis, y_dims.size());
}

Tensor transformed_x, transformed_y;
if (direct_compute) {
const auto& runner = NpuOpRunner("Add", {*x, *y}, {*out}, {});
runner.Run(dev_ctx.stream());
transformed_x.ShareDataWith(*x);
transformed_y.ShareDataWith(*y);
} else {
Tensor transformed_x, transformed_y;
NpuElementWiseOpBroadcast<T>(dev_ctx, x, y, axis, &transformed_x,
&transformed_y);
const auto& runner =
NpuOpRunner("Add", {transformed_x, transformed_y}, {*out}, {});
runner.Run(dev_ctx.stream());
}
const auto& runner =
NpuOpRunner("Add", {transformed_x, transformed_y}, {*out}, {});
auto stream =
ctx.template device_context<paddle::platform::NPUDeviceContext>()
.stream();
runner.Run(stream);
}
};

Expand Down
3 changes: 0 additions & 3 deletions paddle/fluid/platform/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ DEFINE_string(selected_npus, "",
"This option is useful when doing multi process training and "
"each process have only one device (NPU). If you want to use "
"all visible devices, set this to empty string.");
DEFINE_bool(hccl_check_nan, false,
"Check Nan in tensor before hccl_allreduce_sum otherwise it'll "
"core when meets Nan value");
DEFINE_string(
npu_config_path, "",
"The absolute path of configuration json file, like: /tmp/config.json. "
Expand Down
25 changes: 0 additions & 25 deletions python/paddle/distributed/fleet/base/distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,9 +888,6 @@ def sharding_configs(self):
pp_allreduce_in_optimize(bool, optional): [Hybrid parallelism ONLY] move the allreduce operations from backward stage to update(optimize) stage when pipeline parallelsim is on.
This configuration will affect the communication speed of Hybrid parallelism training depeneded on network topology. this strategy is experimental by now.. Default is False.

optimize_cast(bool, optional): [Hybrid parallelism ONLY] Move the cast op of AMP which cast fp32 param to fp16 param to optimizer. optimize_cast will persist fp16 param, it
will take more memory, but will be faster, trade space for time. Recommend to turn on only when using pipeline or gradient_merge_acc_step large.


Examples:

Expand Down Expand Up @@ -967,28 +964,6 @@ def _calc_comm_same_stream(self, same):
"WARNING: calc_comm_same_stream should have value of boolean type"
)

@property
def fuse_grad_merge(self):
"""
Set whether fuse the grad for gradient merge.
Note: this flag will only effect the gradient merge under pipeline mode
The default value for the fuse_grad_merge is False
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.fuse_param_grad = True
"""
return self.strategy.fuse_grad_merge

@fuse_grad_merge.setter
@is_strict_auto
def fuse_grad_merge(self, fuse_grad_merge):
if isinstance(fuse_grad_merge, bool):
self.strategy.fuse_grad_merge = fuse_grad_merge
else:
print("WARNING: fuse_grad_merge should have value of boolean type")

@property
def fuse_grad_size_in_num(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ..common import is_optimizer_op, OP_ROLE_KEY, OpRole, is_update_op
from ..common import is_optimizer_op, OP_ROLE_KEY, OpRole
from paddle.fluid import core, unique_name

__all__ = []
Expand Down Expand Up @@ -84,7 +84,7 @@ def _create_offload_var(self, var_name, offload_var_name, blocks):
dtype=var.dtype,
persistable=True)

def offload_fp32param(self, block, startup_block, offload=True):
def offload_fp32param(self, block, startup_block):
"""
(p_fp16) = cast(p)
(p_fp16_recompute) = cast(p)
Expand Down Expand Up @@ -113,23 +113,19 @@ def remove_param(input_name):

# step1: record param
for idx, op in reversed(list(enumerate(block.ops))):
if is_update_op(op):
if op.type in ('adam', 'momentum', 'lars', 'lamb'):
param = op.desc.input("Param")[0]
param_to_idx[param] = idx

# step2: remove param which can't offload and
# record param->fp16param, fp16param->recompute_var
# step2: remove param which can't offload
for idx, op in enumerate(block.ops):
if is_optimizer_op(op):
break
# TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast
if not offload and op.type == 'coalesce_tensor':
continue
for input_name in op.desc.input_arg_names():
if input_name not in param_to_idx:
continue

# param which will be used by fp32 op
# param is real used by fp32 op
if op.type != 'cast':
remove_param(input_name)
continue
Expand Down Expand Up @@ -158,19 +154,17 @@ def remove_param(input_name):
# step3: main_block add offload, cast op
# change recompute to fp16, remove cast(param) to fp16
for idx, op in reversed(list(enumerate(block.ops))):
if is_update_op(op):
if op.type in ('adam', 'momentum', 'lars', 'lamb'):
param = op.desc.input("Param")[0]
if param not in param_to_idx: continue
# step3.1: create offload_var
offload_var_name = self._get_offload_var_name(param)
param_name_to_offload_name[param] = offload_var_name
if offload:
self._create_offload_var(param, offload_var_name,
[block, startup_block])
self._create_offload_var(param, offload_var_name,
[block, startup_block])

# step3.2: insert cast op and offload op
self._insert_offload_op(block, idx + 1, param,
offload_var_name)
# step3.2: insert cast op and offload op
self._insert_offload_op(block, idx + 1, param, offload_var_name)

assert param in param_to_fp16
fp16_param_name = param_to_fp16[param]
Expand All @@ -179,9 +173,8 @@ def remove_param(input_name):
self._insert_cast_op(block, idx + 1, param,
param_to_fp16[param])

if offload:
# step3.3: insert fetch op
self._insert_fetch_op(block, idx, offload_var_name, param)
# step3.3: insert fetch op
self._insert_fetch_op(block, idx, offload_var_name, param)
continue

# step3.4: remove cast op
Expand Down Expand Up @@ -213,10 +206,9 @@ def remove_param(input_name):

if out_name in param_name_to_offload_name:
var_name = out_name
if offload:
offload_var_name = param_name_to_offload_name[var_name]
self._insert_offload_op(startup_block, idx + 1,
var_name, offload_var_name)
offload_var_name = param_name_to_offload_name[var_name]
self._insert_offload_op(startup_block, idx + 1, var_name,
offload_var_name)
self._insert_cast_op(startup_block, idx + 1, var_name,
param_to_fp16[var_name])

Expand All @@ -225,19 +217,6 @@ def remove_param(input_name):
block._sync_with_cpp()
startup_block._sync_with_cpp()

def cast_fp32param_in_optimize(self, block, startup_block):
"""
(p_fp16) = cast(p)
(p_fp16_recompute) = cast(p)
(pout,) = adam(p)
===========================>
rename(p_fp16_recompute, p_fp16)

(pout,) = adam(p)
(p_fp16) = cast(p)
"""
self.offload_fp32param(block, startup_block, offload=False)

def offload(self, block, startup_block):
"""
(m1, m2) = prefetch(m1@offload, m2@offload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,7 @@ def insert_allreduce_ops(block,
if len(allreduce_vars) == 0:
return

if user_defined_strategy and \
user_defined_strategy.fuse_all_reduce_ops and \
not user_defined_strategy.fuse_grad_merge:
# If fuse_grad_merge is enable, the grad vars have already been fused during
# gradient merge pass, therefore, those vars are not need to be fused here
if user_defined_strategy and user_defined_strategy.fuse_all_reduce_ops:
insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars,
op_role, use_calc_stream,
user_defined_strategy.fuse_grad_size_in_MB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,7 @@ def _insert_allreduce_for_pp(self):
main_block._remove_op(idx)

accumulated_grad_names = self._pp_optimizer._accumulate_gradients(
main_block,
fp16_allreduce=fp16_allreduce,
user_defined_strategy=strategy)
main_block, fp16_allreduce=fp16_allreduce)

len_of_ops = len(main_block.ops)
first_optimize_op_index = get_first_optimize_op_idx(main_block)
Expand Down Expand Up @@ -405,14 +403,7 @@ def _apply_optimize_offload_pass(self):
logger.info("Sharding with optimize offload !")
offload_helper = OffloadHelper()
offload_helper.offload(main_block, startup_block)
# The optimize_cast is already included in offload_fp32param
offload_helper.offload_fp32param(main_block, startup_block)
elif sharding_configs['optimize_cast']:
logger.info("Sharding with optimize cast !")
# NOTE(wangxi): optimize_cast will persist fp16 param, it
# will take more memory, but will be faster. Trade space for time.
offload_helper = OffloadHelper()
offload_helper.cast_fp32param_in_optimize(main_block, startup_block)

def _dump_program_for_debug(self):
main_block = self._main_program.global_block()
Expand Down Expand Up @@ -456,7 +447,6 @@ def minimize_impl(self,
# loss div dp_degree
self._insert_loss_grad_scale_op()

# apply optimize offload or optimize cast
self._apply_optimize_offload_pass()

# step6: (optional) sharding gradient merge
Expand Down
1 change: 0 additions & 1 deletion python/paddle/fluid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ def __bootstrap__():
'gpu_memory_limit_mb',
'npu_config_path',
'get_host_by_name_time',
'hccl_check_nan',
]

core.init_gflags(["--tryfromenv=" + ",".join(read_env_flags)])
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/fluid/clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _squared_l2_norm(x):
This OP returns the squared L2 norm of a tensor.
"""

if core.is_compiled_with_xpu():
if core.is_compiled_with_npu() or core.is_compiled_with_xpu():
square = layers.square(x)
sum_square = layers.reduce_sum(square)
return sum_square
Expand Down
Loading