Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ message DistributedStrategy {
optional bool fp16_allreduce = 25 [ default = false ];
optional bool sharding = 26 [ default = false ];
optional float last_comm_group_size_MB = 27 [ default = 1 ];
optional bool find_unused_parameters = 28 [ default = true ];
optional bool find_unused_parameters = 28 [ default = false ];
optional bool tensor_parallel = 29 [ default = false ];
optional bool without_graph_optimization = 30 [ default = false ];

Expand Down
110 changes: 64 additions & 46 deletions paddle/fluid/imperative/reducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ Reducer::Reducer(const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
is_sparse_gradient_(is_sparse_gradient),
parallel_ctx_(parallel_ctx),
group_size_limits_(group_size_limits),
find_unused_vars_(find_unused_vars) {
find_unused_vars_each_step_(find_unused_vars) {
VLOG(3) << "Start construct the Reducer ...";
nrings_ = parallel_ctx->GetNRings();
nranks_ = parallel_ctx->GetNRanks();
Expand Down Expand Up @@ -457,42 +457,8 @@ void Reducer::PrepareDeps(const std::unordered_set<GradOpNode *> &init_nodes) {
}
}

// After each batch is calculated, the counter of each group(group.pending_)
// and allreudce sequence counter(next_group_) will be cleaned up again.
void Reducer::PrepareForBackward(
void Reducer::TraverseBackwardGraph(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size();
group.sparse_contents_ = nullptr;
});

// reinitialize vars_marked_ready_ for next iteration
vars_marked_ready_.clear();
vars_marked_ready_.resize(vars_.size(), false);

PADDLE_ENFORCE_EQ(
groups_need_finalize_, false,
platform::errors::PreconditionNotMet(
"A serious error has occurred here. There may be several reasons: "
"1) Please note that all forward outputs derived from the module "
"parameters must participate in the calculation of losses and "
"subsequent gradient calculations. If not, the wrapper will hang, "
"waiting for autograd to generate gradients for these parameters. "
"you can use detach or stop_gradient to make the unused parameters "
"detached from the autograd graph. "
"2) Used multiple forwards and one backward. You may be able to wrap "
"multiple forwards in a model."));

// The first var to trigger the unused parameter
has_marked_unused_vars_ = false;
unused_vars_.clear();

if (!find_unused_vars_) {
return;
}

node_deps_.clear();
std::queue<std::shared_ptr<GradOpNode>> q;
std::unordered_set<VariableWrapper *> var_visited;
Expand Down Expand Up @@ -554,8 +520,50 @@ void Reducer::PrepareForBackward(
<< "] is not used";
}
}
}

if (unused_vars_.empty()) {
// After each batch is calculated, the counter of each group(group.pending_)
// and allreudce sequence counter(next_group_) will be cleaned up again.
void Reducer::PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size();
group.sparse_contents_ = nullptr;
});

// reinitialize vars_marked_ready_ for next iteration
vars_marked_ready_.clear();
vars_marked_ready_.resize(vars_.size(), false);

PADDLE_ENFORCE_EQ(
groups_need_finalize_, false,
platform::errors::PreconditionNotMet(
"A serious error has occurred here. Please "
"set find_unused_parameters=True to traverse backward graph "
"in each step to prepare reduce in advance. If you have "
"set, There may be several reasons for this error: "
"1) Please note that all forward outputs derived from the module "
"parameters must participate in the calculation of losses and "
"subsequent gradient calculations. If not, the wrapper will hang, "
"waiting for autograd to generate gradients for these parameters. "
"you can use detach or stop_gradient to make the unused parameters "
"detached from the autograd graph. "
"2) Used multiple forwards and one backward. You may be able to wrap "
"multiple forwards in a model."));

// The first var to trigger the unused parameter
has_marked_unused_vars_ = false;

if (find_unused_vars_once_ || find_unused_vars_each_step_) {
unused_vars_.clear();
TraverseBackwardGraph(outputs);
// only check once in first step
find_unused_vars_once_ = false;
}

if (find_unused_vars_each_step_ && unused_vars_.empty()) {
LOG_FIRST_N(WARNING, 1)
<< "All parameters are involved in the backward pass. "
"It is recommended to set find_unused_parameters to False "
Expand All @@ -564,7 +572,9 @@ void Reducer::PrepareForBackward(
"will occur. Please make it clear that in the subsequent "
"training, there will be no parameters that are not used "
"in the backward pass, and then set find_unused_parameters";
} else if (unused_vars_.size() == vars_.size()) {
}

if (unused_vars_.size() == vars_.size()) {
LOG_FIRST_N(WARNING, 1)
<< "There is no parameter in the device involved "
"in the backward calculation. If there are "
Expand Down Expand Up @@ -595,13 +605,13 @@ void Reducer::AddDistHook(size_t var_index) {

local_used_vars_[var_index] = 1;

// rebuild group when find_unused_vars_ is false
// rebuild group when find_unused_vars_each_step_ is false
if (NeedRebuildGroup()) {
rebuild_vars_.push_back(vars_[var_index]);
rebuild_var_indices_.push_back(var_index);
}

if (!has_marked_unused_vars_ && find_unused_vars_) {
if (!has_marked_unused_vars_) {
has_marked_unused_vars_ = true;
for (const auto &unused_index : unused_vars_) {
MarkVarReady(unused_index, false);
Expand All @@ -622,7 +632,9 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
if (vars_marked_ready_[var_index]) {
auto error_info = string::Sprintf(
"Error happened, when parameter[%d][%s] has been ready before. "
"There may be several reasons for this error: "
"Please set find_unused_parameters=True to traverse backward graph "
"in each step to prepare reduce in advance. If you have set, "
"there may be several reasons for this error: "
"1) In multiple reentrant backward phase, some parameters are reused."
"2) Using model parameters outside of forward function. Please "
"make sure that model parameters are not shared in concurrent "
Expand Down Expand Up @@ -690,10 +702,16 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
}
} else {
// process sparse group
PADDLE_ENFORCE_EQ(HasGrad(var_index), true,
platform::errors::PreconditionNotMet(
"The sparse parameter[%d][%s] must have a gradient",
var_index, vars_[var_index]->Name()));
PADDLE_ENFORCE_EQ(
HasGrad(var_index), true,
platform::errors::PreconditionNotMet(
"The sparse parameter[%d][%s] should have gradient. "
"Currently, DataParallel does not support sparse "
"parameters without generating gradients during training. "
"For example, if is_sparese=True is used in Embedding, "
"the current step of this parameter cannot generate gradient "
"because of stop_gradient/detatch, where error will occur.",
var_index, vars_[var_index]->Name()));
auto var_base = vars_[var_index]->GradVarBase();
// need to check tensor type
PADDLE_ENFORCE_EQ(
Expand Down Expand Up @@ -943,7 +961,7 @@ void Reducer::FinalizeBackward() {
InitializeGroups(group_indices_);
}

if (find_unused_vars_) {
if (find_unused_vars_each_step_) {
// TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
ProcessUnusedDenseVars();
Expand Down
8 changes: 6 additions & 2 deletions paddle/fluid/imperative/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,16 @@ class Reducer {
std::vector<std::vector<size_t>> RebuildGruops();

inline bool NeedRebuildGroup() {
return !has_rebuilt_group_ && !find_unused_vars_;
return !has_rebuilt_group_ && !find_unused_vars_each_step_;
}

void ProcessUnusedDenseVars();

bool HasGrad(size_t var_index);

void TraverseBackwardGraph(
const std::vector<std::shared_ptr<imperative::VarBase>>& outputs);

private:
std::vector<std::shared_ptr<imperative::VarBase>> vars_;
std::vector<std::vector<size_t>> group_indices_;
Expand All @@ -195,7 +198,8 @@ class Reducer {
std::unordered_map<VariableWrapper*, size_t> var_index_map_;
std::vector<size_t> unused_vars_;
bool has_marked_unused_vars_{false};
bool find_unused_vars_{false};
bool find_unused_vars_each_step_{false};
bool find_unused_vars_once_{true};
bool groups_need_finalize_{false};
#ifdef PADDLE_WITH_XPU_BKCL
// comm_pool_ is used for scheduling allreduce in multi Kunlun cards training.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ def find_unused_parameters(self):
Indicating whether we are using find_unused_parameters to
find unused parameters in DataParallel.

Default value: True
Default value: False

Examples:

Expand Down
15 changes: 6 additions & 9 deletions python/paddle/fluid/dygraph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,15 @@ class DataParallel(layers.Layer):
Note that setting the find_unused_parameters to True
will affect computing performance. Therefore, if all parameters
are sure to participate in the loss calculation and the
autograd graph construction, please set it False. Default: True.
autograd graph construction, please set it False. Default: False.

Returns:
Layer: The data paralleled module.

Examples:
.. code-block:: python


# required: distributed
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
Expand Down Expand Up @@ -474,7 +475,7 @@ def __init__(self,
strategy=None,
comm_buffer_size=25,
last_comm_buffer_size=1,
find_unused_parameters=True):
find_unused_parameters=False):
super(DataParallel,
self).__init__(layers.full_name() + "_data_parallel")

Expand Down Expand Up @@ -576,12 +577,8 @@ def _find_varbase(self, obj):
def forward(self, *inputs, **kwargs):
outputs = self._layers(*inputs, **kwargs)
if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad:
if self.find_unused_parameters:
self._reducer.prepare_for_backward(
list(self._find_varbase(outputs)))
else:
self._reducer.prepare_for_backward(list(self._find_varbase([])))

self._reducer.prepare_for_backward(
list(self._find_varbase(outputs)))
return outputs

@deprecated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def test_multiple_gpus(self):
state_dict = model_a.state_dict()
model_b.set_state_dict(state_dict)

model_a = paddle.DataParallel(model_a)
model_b = paddle.DataParallel(model_b)
model_a = paddle.DataParallel(model_a, find_unused_parameters=True)
model_b = paddle.DataParallel(model_b, find_unused_parameters=True)

ones_input = paddle.ones(shape=(batch, in_dim))
ones_input.stop_gradient = True
Expand Down
1 change: 1 addition & 0 deletions python/paddle/fluid/tests/unittests/spawn_runner_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
class SpawnAssistTestArgs(object):
update_method = "local"
trainer_id = 0
find_unused_parameters = False


class TestDistSpawnRunner(unittest.TestCase):
Expand Down
11 changes: 7 additions & 4 deletions python/paddle/fluid/tests/unittests/test_dist_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,10 @@ def run_trainer_with_spawn(self, args):
# 4. train model
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
model = paddle.DataParallel(model)
if args.find_unused_parameters:
model = paddle.DataParallel(model, find_unused_parameters=True)
else:
model = paddle.DataParallel(model, find_unused_parameters=False)

out_losses = []
for step_id, data in enumerate(train_reader()):
Expand Down Expand Up @@ -581,8 +584,8 @@ def run_use_fleet_api_trainer(self, args):

# set strategy
strategy = fleet.DistributedStrategy()
if not args.find_unused_parameters:
strategy.find_unused_parameters = False
if args.find_unused_parameters:
strategy.find_unused_parameters = True

# 3. init parallel env
if args.update_method == "nccl2" or "bkcl":
Expand Down Expand Up @@ -737,7 +740,7 @@ def setUp(self):
self._save_model = False
self._fuse_all_reduce = None
self._accumulate_gradient = False
self._find_unused_parameters = True
self._find_unused_parameters = False
self._setup_config()

global DIST_UT_PORT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True

def test_net(self):
if fluid.core.is_compiled_with_cuda():
Expand All @@ -46,6 +47,7 @@ def _setup_config(self):
self._nccl2_mode = True
self._dygraph = True
self._use_fleet_api = True
self._find_unused_parameters = True


class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame):
Expand All @@ -54,13 +56,15 @@ def _setup_config(self):
self._nccl2_mode = True
self._dygraph = True
self._accumulate_gradient = True
self._find_unused_parameters = True


class TestDygraphControlFlowDiff(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True

def test_net(self):
if fluid.core.is_compiled_with_cuda():
Expand All @@ -77,6 +81,7 @@ def _setup_config(self):
self._nccl2_mode = True
self._dygraph = True
self._use_fleet_api = True
self._find_unused_parameters = True


class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff):
Expand All @@ -85,6 +90,7 @@ def _setup_config(self):
self._nccl2_mode = True
self._dygraph = True
self._accumulate_gradient = True
self._find_unused_parameters = True


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True

def test_mnist(self):
if fluid.core.is_compiled_with_cuda():
Expand Down