From 92a6c7a04906e7d26196ac795eccace84156d42d Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 16 Jan 2019 10:08:14 +0800 Subject: [PATCH 01/37] init async ssa executor --- .../details/async_ssa_graph_executor.cc | 99 +++++++++++++++++++ .../details/async_ssa_graph_executor.h | 51 ++++++++++ 2 files changed, 150 insertions(+) create mode 100644 paddle/fluid/framework/details/async_ssa_graph_executor.cc create mode 100644 paddle/fluid/framework/details/async_ssa_graph_executor.h diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc new file mode 100644 index 00000000000000..9b26fdd545c9a5 --- /dev/null +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -0,0 +1,99 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/async_ssa_graph_executor.h" + +namespace paddle { +namespace framework { +namespace details { + +AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( + const ExecutionStrategy &strategy, const std::vector &local_scopes, + const std::vector &places, + std::vector> &&graphs) + : strategy_(std::move(strategy)), + local_scopes_(std::move(local_scopes)), + pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), + places_(std::move(places)), + graphs_(std::move(graphs)) { + PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); + + // set the correct size of thread pool to each device. + strategy_.num_threads_ = strategy_.num_threads_ < places_.size() + ? 1UL + : strategy_.num_threads_ / places_.size(); + VLOG(1) << "set num_threads: " << strategy_.num_threads_ + << " to run the operators of the graph on each device."; + for (size_t i = 0; i < places.size(); ++i) { + executors_.emplace_back(new details::ThreadedSSAGraphExecutor( + strategy_, {local_scopes_[i]}, {places_[i]}, std::move(graphs_[i]))); + } +} + +FeedFetchList AsyncSSAGraphExecutor::Run( + const std::vector &fetch_tensors) { + std::vector> run_futures; + + std::vector fetch_data; + FeedFetchList ret; + + fetch_data.reserve(places_.size()); + ret.reserve(fetch_tensors.size()); + exception_holder_.Clear(); + + for (size_t i = 0; i < places_.size(); ++i) { + auto call = [this, i, &fetch_tensors]() -> FeedFetchList { + try { + return executors_[i]->Run(fetch_tensors); + } catch (...) { + exception_holder_.Catch(std::current_exception()); + } + return FeedFetchList(); + }; + + if (pool_) { + run_futures.emplace_back(pool_->enqueue(std::move(call))); + } else { + fetch_data.emplace_back(std::move(call())); + } + } + + if (pool_) { + for (auto &f : run_futures) { + if (exception_holder_.IsCaught()) { + f.wait(); + } else { + fetch_data.emplace_back(std::move(f.get())); + } + } + } + if (exception_holder_.IsCaught()) { + exception_holder_.ReThrow(); + } + + for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) { + std::vector lodtensor_ptrs; + lodtensor_ptrs.reserve(local_scopes_.size()); + for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); ++scope_idx) { + lodtensor_ptrs.push_back(&fetch_data.at(scope_idx).at(fetch_idx)); + } + ret.emplace_back(); + ret.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace()); + } + return ret; +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h new file mode 100644 index 00000000000000..4091c56d743c6e --- /dev/null +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -0,0 +1,51 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "ThreadPool.h" +#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" + +namespace paddle { +namespace framework { +namespace details { + +class AsyncSSAGraphExecutor : public SSAGraphExecutor { + public: + AsyncSSAGraphExecutor(const ExecutionStrategy &strategy, + const std::vector &local_scopes, + const std::vector &places, + std::vector> &&graphs); + ~AsyncSSAGraphExecutor() final = default; + const ir::Graph &Graph() const override { return *graphs_[0]; } + + FeedFetchList Run(const std::vector &fetch_tensors) override; + + private: + ExecutionStrategy strategy_; + std::vector local_scopes_; + std::unique_ptr<::ThreadPool> pool_{nullptr}; + std::vector places_; + std::vector> graphs_; + + std::vector> executors_; + ExceptionHolder exception_holder_; +}; + +} // namespace details +} // namespace framework +} // namespace paddle From afda84012643353fbf9849fb5f26bbcd0c45bcea Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 16 Jan 2019 10:32:56 +0800 Subject: [PATCH 02/37] init communicator --- paddle/fluid/framework/communicator.h | 45 +++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 paddle/fluid/framework/communicator.h diff --git a/paddle/fluid/framework/communicator.h b/paddle/fluid/framework/communicator.h new file mode 100644 index 00000000000000..e459729f5c339f --- /dev/null +++ b/paddle/fluid/framework/communicator.h @@ -0,0 +1,45 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include +#include +#include +#include "paddle/fluid/framework/data_layout.h" +#include "paddle/fluid/framework/ddim.h" +#include "paddle/fluid/framework/framework.pb.h" +#include "paddle/fluid/memory/memory.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/place.h" + +namespace paddle { + +namespace framework { + +class Communicator { + public: + Communicator() {} + ~Communicator() {} + + private: +}; + +} // namespace framework +} // namespace paddle + +#include "paddle/fluid/framework/tensor_impl.h" From ea66979684c53743b9eb749106e0400542ec83da Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 17 Jan 2019 13:28:15 +0800 Subject: [PATCH 03/37] can run --- paddle/fluid/framework/CMakeLists.txt | 2 +- paddle/fluid/framework/details/CMakeLists.txt | 2 + .../details/async_ssa_graph_executor.cc | 1 + .../fluid/framework/details/build_strategy.cc | 5 +- .../fluid/framework/details/build_strategy.h | 1 + .../details/multi_devices_graph_pass.cc | 2 + .../details/multi_devices_graph_pass.h | 16 ++++++- paddle/fluid/framework/parallel_executor.cc | 46 +++++++++++++++---- paddle/fluid/pybind/pybind.cc | 3 ++ 9 files changed, 65 insertions(+), 13 deletions(-) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index a167511160d074..e22c7f8a403be1 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -184,7 +184,7 @@ endif() target_link_libraries(executor garbage_collector) cc_library(parallel_executor SRCS parallel_executor.cc DEPS - threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor + threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor async_ssa_graph_executor graph build_strategy fast_threaded_ssa_graph_executor variable_helper) diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index c1ba6606f10647..01c24b0d824d30 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -79,6 +79,8 @@ cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS cc_library(parallel_ssa_graph_executor SRCS parallel_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor) +cc_library(async_ssa_graph_executor SRCS async_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor) + cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory device_context broadcast_op_handle) cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 9b26fdd545c9a5..d3e4573e228705 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -27,6 +27,7 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), places_(std::move(places)), graphs_(std::move(graphs)) { + VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); // set the correct size of thread pool to each device. diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index df0ff772c9d35c..f8911cd9ad4bc7 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -116,7 +116,10 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // Convert graph to run on multi-devices. void AppendMultiDevPass(const BuildStrategy &strategy) { ir::Pass *multi_devices_pass; - if (strategy_.is_distribution_) { + + if (strategy_.async_mode_) { + multi_devices_pass = AppendPass("async_multi_devices_pass").get(); + } else if (strategy_.is_distribution_) { multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); } else { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 15c2e01b614257..16324839657364 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -86,6 +86,7 @@ struct BuildStrategy { // num_trainers is 1, so the current fields of build_strategy doesn't tell if // it's distributed model. bool is_distribution_{false}; + bool async_mode_{false}; int num_trainers_{1}; int trainer_id_{0}; std::vector trainers_endpoints_; diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 75f922d2cca685..d7a4b5692b3c36 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -975,3 +975,5 @@ REGISTER_MULTI_DEVICES_PASS( paddle::framework::details::AllReduceSSAGraphBuilder); REGISTER_MULTI_DEVICES_PASS(dist_multi_devices_pass, paddle::framework::details::DistSSAGraphBuilder); +REGISTER_MULTI_DEVICES_PASS(async_multi_devices_pass, + paddle::framework::details::AsyncSSAGraphBuilder); diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 6d4386538ea7d0..e91397816c3a98 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -55,7 +55,7 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { bool UseGPU() const; - bool NeedCollectiveOps() const; + virtual bool NeedCollectiveOps() const; bool IsScaleLossOp(ir::Node *node) const; @@ -116,6 +116,20 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { virtual void InsertPostprocessOps(ir::Graph *result) const {} }; +class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { + protected: + virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + const std::string &g_name) const {} + + bool NeedCollectiveOps() const override { return false; } + + virtual bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const { + return false; + } + + virtual void InsertPostprocessOps(ir::Graph *result) const {} +}; + class BalanceVarSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { protected: int GetVarDeviceID(const std::string &varname) const; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index f61c9e3a911467..4173b39e10dc30 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -21,6 +21,7 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/details/async_ssa_graph_executor.h" #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" @@ -282,10 +283,19 @@ ParallelExecutor::ParallelExecutor( graphs.push_back(std::move(graph)); } #else - std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_cuda_); - graphs.push_back(std::move(graph)); + if (build_strategy.async_mode_) { + for (size_t i = 0; i < member_->places_.size(); ++i) { + std::unique_ptr graph = build_strategy.Apply( + main_program, {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_); + graphs.push_back(std::move(graph)); + } + } else { + std::unique_ptr graph = build_strategy.Apply( + main_program, member_->places_, loss_var_name, member_->local_scopes_, + member_->nranks_, member_->use_cuda_); + graphs.push_back(std::move(graph)); + } #endif auto max_memory_size = GetEagerDeletionThreshold(); if (max_memory_size >= 0) { @@ -323,23 +333,31 @@ ParallelExecutor::ParallelExecutor( "please don't pass loss_var_name."; } } - - if (build_strategy.enable_parallel_graph_) { + if (build_strategy.async_mode_) { + VLOG(3) << "use AsyncSSAGraphExecutor"; + member_->executor_.reset(new details::AsyncSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->places_, + std::move(graphs))); + } else if (build_strategy.enable_parallel_graph_) { + VLOG(3) << "use ParallelSSAGraphExecutor"; member_->executor_.reset(new details::ParallelSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, std::move(graphs))); } else { if (exec_strategy.type_ == ExecutionStrategy::kDefault) { + VLOG(3) << "use ThreadedSSAGraphExecutor"; member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, std::move(graphs[0]))); } else { + VLOG(3) << "use FastThreadedSSAGraphExecutor"; member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, std::move(graphs[0]))); } } + VLOG(3) << "use ScopeBufferedSSAGraphExecutor"; member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( exec_strategy, member_->local_scopes_, std::move(var_infos), member_->places_, std::move(member_->executor_))); @@ -401,14 +419,22 @@ void ParallelExecutor::BCastParamsToDevices( auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); - // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. - if (member_->use_all_reduce_ || member_->use_cuda_ || - var == "@LR_DECAY_COUNTER@") { + auto share_memory = [&] { t->Resize(dims); t->mutable_data(cpu, main_tensor.type()); paddle::framework::TensorCopy(main_tensor, cpu, t); + }; + + auto copy_memory = [&] { t->ShareDataWith(main_tensor); }; + + // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. + if (member_->build_strategy_.async_mode_) { + share_memory(); + } else if (member_->use_all_reduce_ || member_->use_cuda_ || + var == "@LR_DECAY_COUNTER@") { + copy_memory(); } else { - t->ShareDataWith(main_tensor); + share_memory(); } } } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index f3f4854a9efbcf..88d12c69b77de9 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1030,6 +1030,9 @@ All parameter, weight, gradient are variables in Paddle. "is_distribution", [](const BuildStrategy &self) { return self.is_distribution_; }, [](BuildStrategy &self, bool b) { self.is_distribution_ = b; }) + .def_property("async_mode", + [](const BuildStrategy &self) { return self.async_mode_; }, + [](BuildStrategy &self, bool b) { self.async_mode_ = b; }) .def_property( "memory_early_delete", [](const BuildStrategy &self) { return self.memory_early_delete_; }, From 88d71fa2f9655c206d398088effe3cb1a43dafc4 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 17 Jan 2019 17:30:27 +0800 Subject: [PATCH 04/37] support num_iteration_per_run --- .../framework/details/async_ssa_graph_executor.cc | 3 +++ paddle/fluid/framework/details/execution_strategy.h | 2 ++ paddle/fluid/pybind/pybind.cc | 11 +++++++++++ 3 files changed, 16 insertions(+) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index d3e4573e228705..ba2e90d0528a63 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -56,6 +56,9 @@ FeedFetchList AsyncSSAGraphExecutor::Run( for (size_t i = 0; i < places_.size(); ++i) { auto call = [this, i, &fetch_tensors]() -> FeedFetchList { try { + for (size_t j = 0; j < strategy_.num_iteration_per_run_ - 1; ++j) { + executors_[i]->Run(fetch_tensors); + } return executors_[i]->Run(fetch_tensors); } catch (...) { exception_holder_.Catch(std::current_exception()); diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h index 37b07e5736312b..dec4589cada44a 100644 --- a/paddle/fluid/framework/details/execution_strategy.h +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -28,6 +28,8 @@ struct ExecutionStrategy { size_t num_iteration_per_drop_scope_{1}; ExecutorType type_{kDefault}; bool dry_run_{false}; + size_t num_iteration_per_run_{1}; // only use with async_ssa_graph_executor + // and pyreader with data queue }; } // namespace details diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 88d12c69b77de9..b52f99f324d54a 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -892,6 +892,17 @@ All parameter, weight, gradient are variables in Paddle. 2. In some NLP model, it may cause the GPU memory is insufficient, in this case, you should reduce `num_iteration_per_drop_scope`. )DOC") + .def_property( + "num_iteration_per_run", + [](const ExecutionStrategy &self) { + return self.num_iteration_per_run_; + }, + [](ExecutionStrategy &self, size_t num_iteration_per_run) { + self.num_iteration_per_run_ = num_iteration_per_run; + }, + R"DOC(This config that how many iteration the executor will run when + user call pe.run() in python + )DOC") .def_property("_dry_run", [](const ExecutionStrategy &self) { return self.dry_run_; }, [](ExecutionStrategy &self, bool dry_run) { From 69484f71e0c842633df77470c80dc26222f6fd3b Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 18 Jan 2019 12:25:30 +0800 Subject: [PATCH 05/37] remote communicator --- paddle/fluid/framework/communicator.h | 45 --------------------------- 1 file changed, 45 deletions(-) delete mode 100644 paddle/fluid/framework/communicator.h diff --git a/paddle/fluid/framework/communicator.h b/paddle/fluid/framework/communicator.h deleted file mode 100644 index e459729f5c339f..00000000000000 --- a/paddle/fluid/framework/communicator.h +++ /dev/null @@ -1,45 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include -#include -#include -#include -#include -#include "paddle/fluid/framework/data_layout.h" -#include "paddle/fluid/framework/ddim.h" -#include "paddle/fluid/framework/framework.pb.h" -#include "paddle/fluid/memory/memory.h" -#include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/enforce.h" -#include "paddle/fluid/platform/place.h" - -namespace paddle { - -namespace framework { - -class Communicator { - public: - Communicator() {} - ~Communicator() {} - - private: -}; - -} // namespace framework -} // namespace paddle - -#include "paddle/fluid/framework/tensor_impl.h" From f3210b60ba3a5f23cfed95148c44e5d5db298f35 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 18 Jan 2019 15:49:32 +0800 Subject: [PATCH 06/37] fix copy_memory and share_memory --- paddle/fluid/framework/parallel_executor.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 4173b39e10dc30..3997294f172be5 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -419,13 +419,13 @@ void ParallelExecutor::BCastParamsToDevices( auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); - auto share_memory = [&] { + auto copy_memory = [&] { t->Resize(dims); t->mutable_data(cpu, main_tensor.type()); paddle::framework::TensorCopy(main_tensor, cpu, t); }; - auto copy_memory = [&] { t->ShareDataWith(main_tensor); }; + auto share_memory = [&] { t->ShareDataWith(main_tensor); }; // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. if (member_->build_strategy_.async_mode_) { From fab8457e6b117be26e23171b649a1bfda14531b2 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sat, 26 Jan 2019 23:12:23 +0800 Subject: [PATCH 07/37] code optimize --- .../details/async_ssa_graph_executor.cc | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index ba2e90d0528a63..7dc269242f228c 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -68,20 +68,18 @@ FeedFetchList AsyncSSAGraphExecutor::Run( if (pool_) { run_futures.emplace_back(pool_->enqueue(std::move(call))); + for (auto &f : run_futures) { + if (exception_holder_.IsCaught()) { + f.wait(); + } else { + fetch_data.emplace_back(std::move(f.get())); + } + } } else { fetch_data.emplace_back(std::move(call())); } } - if (pool_) { - for (auto &f : run_futures) { - if (exception_holder_.IsCaught()) { - f.wait(); - } else { - fetch_data.emplace_back(std::move(f.get())); - } - } - } if (exception_holder_.IsCaught()) { exception_holder_.ReThrow(); } From 62549e071402530e862285ab1613eb8e8e5e5150 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sun, 27 Jan 2019 17:10:45 +0800 Subject: [PATCH 08/37] add GenParentScopeTreeDebugInfo --- paddle/fluid/framework/parallel_executor.cc | 1 + paddle/fluid/framework/scope.cc | 29 +++++++++++++++++++++ paddle/fluid/framework/scope.h | 1 + 3 files changed, 31 insertions(+) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 3997294f172be5..f0bc3acccc22e6 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -365,6 +365,7 @@ ParallelExecutor::ParallelExecutor( void ParallelExecutor::BCastParamsToDevices( const std::unordered_set &vars) const { + VLOG(3) << "BCastParamsToDevices"; // the initializing bcast, all vars would be bcast from device(0). for (auto &var : vars) { framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var); diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 95361856091322..884ad3b34b363d 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -259,5 +259,34 @@ std::string GenScopeTreeDebugInfo(Scope* root) { return os.str(); } +std::string GenParentScopeTreeDebugInfo(Scope* leaf) { + std::stringstream os; + + if (!leaf) return ""; + + // level traversal + std::vector scopes; + const Scope* current_scope = leaf; + + while (current_scope != nullptr) { + scopes.push_back(current_scope); + current_scope = current_scope->parent(); + // end of a level + os << "\n------------------------------------------\n"; + } + + os << "\nDetails:\n\n"; + + for (auto* q : scopes) { + os << "====\n"; + os << q << ":\n"; + for (auto& var : q->LocalVarNames()) { + os << " - " << var << "\n"; + } + } + + return os.str(); +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index f0915d2eee072b..eb5c12def6a2f9 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -144,6 +144,7 @@ class Scope { // Generate some debug string about the inherience structure of scope, quite // naive. std::string GenScopeTreeDebugInfo(Scope*); +std::string GenParentScopeTreeDebugInfo(Scope*); } // namespace framework } // namespace paddle From be738a646e2f760a53c36a658c7d07c4f75cd814 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sun, 27 Jan 2019 21:56:25 +0800 Subject: [PATCH 09/37] add some debug infor --- .../details/async_ssa_graph_executor.cc | 17 ++++++++++------- .../details/multi_devices_graph_pass.cc | 2 ++ paddle/fluid/framework/scope.cc | 12 +++++------- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 7dc269242f228c..c259ff4f747e3d 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -68,18 +68,21 @@ FeedFetchList AsyncSSAGraphExecutor::Run( if (pool_) { run_futures.emplace_back(pool_->enqueue(std::move(call))); - for (auto &f : run_futures) { - if (exception_holder_.IsCaught()) { - f.wait(); - } else { - fetch_data.emplace_back(std::move(f.get())); - } - } } else { fetch_data.emplace_back(std::move(call())); } } + if (pool_) { + for (auto &f : run_futures) { + if (exception_holder_.IsCaught()) { + f.wait(); + } else { + fetch_data.emplace_back(std::move(f.get())); + } + } + } + if (exception_holder_.IsCaught()) { exception_holder_.ReThrow(); } diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index d7a4b5692b3c36..f1347e2b0d70c6 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -249,6 +249,8 @@ void MultiDevSSAGraphBuilderBase::InsertScaleLossGradOp( break; } + VLOG(3) << "loss_scale: " << loss_scale; + if (loss_scale) { // TODO(paddle-dev): Why is there no input for this op_handle? auto loss_grad_name = node->Op()->OutputArgumentNames()[0]; diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 884ad3b34b363d..2c76ab22f6fd2e 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -271,16 +271,14 @@ std::string GenParentScopeTreeDebugInfo(Scope* leaf) { while (current_scope != nullptr) { scopes.push_back(current_scope); current_scope = current_scope->parent(); - // end of a level - os << "\n------------------------------------------\n"; } - os << "\nDetails:\n\n"; + os << "\n--------------GenParentScopeTreeDebugInfo--------------\n"; - for (auto* q : scopes) { - os << "====\n"; - os << q << ":\n"; - for (auto& var : q->LocalVarNames()) { + for (int i = scopes.size() - 1; i >= 0; --i) { + os << "=======level [" << i << "]=======\n"; + os << scopes[i] << ":\n"; + for (auto& var : scopes[i]->LocalVarNames()) { os << " - " << var << "\n"; } } From 9da96aba956abe13aec945c1e71e338df56a13b5 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sun, 27 Jan 2019 23:04:50 +0800 Subject: [PATCH 10/37] clean code of test_async_ssa_graph_executor_mnist --- .../test_async_ssa_graph_executor_mnist.py | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py new file mode 100644 index 00000000000000..e2b3b2b0f2dc7d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -0,0 +1,214 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +from PIL import Image +import numpy +import paddle +import paddle.fluid as fluid + +BATCH_SIZE = 64 +PASS_NUM = 5 + + +def loss_net(hidden, label): + prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + avg_loss = fluid.layers.mean(loss) + acc = fluid.layers.accuracy(input=prediction, label=label) + return prediction, avg_loss, acc + + +def convolutional_neural_network(img, label): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=img, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu") + conv_pool_1 = fluid.layers.batch_norm(conv_pool_1) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu") + return loss_net(conv_pool_2, label) + + +def train(use_cuda, + save_dirname=None, + model_filename=None, + params_filename=None): + if use_cuda and not fluid.core.is_compiled_with_cuda(): + return + + img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + prediction, avg_loss, acc = convolutional_neural_network(img, label) + + test_program = fluid.default_main_program().clone(for_test=True) + + optimizer = fluid.optimizer.Adam(learning_rate=0.001) + optimizer.minimize(avg_loss) + + def train_test(train_test_program, train_test_feed, train_test_reader): + acc_set = [] + avg_loss_set = [] + for test_data in train_test_reader(): + acc_np, avg_loss_np = exe.run(program=train_test_program, + feed=train_test_feed.feed(test_data), + fetch_list=[acc, avg_loss]) + acc_set.append(float(acc_np)) + avg_loss_set.append(float(avg_loss_np)) + # get test acc and loss + acc_val_mean = numpy.array(acc_set).mean() + avg_loss_val_mean = numpy.array(avg_loss_set).mean() + return avg_loss_val_mean, acc_val_mean + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + + exe = fluid.Executor(place) + + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.mnist.train(), buf_size=500), + batch_size=BATCH_SIZE) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) + feeder = fluid.DataFeeder(feed_list=[img, label], place=place) + + exe.run(fluid.default_startup_program()) + main_program = fluid.default_main_program() + + exec_strategy = fluid.ExecutionStrategy() + build_strategy = fluid.BuildStrategy() + + cpu_num = int(os.environ.get('CPU_NUM')) + thread_num = int(os.getenv("NUM_THREADS")) + + print("cpu_num:" + str(cpu_num)) + print("thread_num:" + str(thread_num)) + + build_strategy.async_mode = True + + exec_strategy.num_threads = thread_num + exec_strategy.num_iteration_per_drop_scope = 1 + exec_strategy.num_iteration_per_run = 10 + + pe = fluid.ParallelExecutor( + use_cuda=False, + loss_name=avg_loss.name, + main_program=main_program, + build_strategy=build_strategy, + exec_strategy=exec_strategy) + + lists = [] + step = 0 + for epoch_id in range(PASS_NUM): + for step_id, data in enumerate(train_reader()): + loss_val, acc_val = pe.run(feed=feeder.feed(data), + fetch_list=[avg_loss.name, acc.name]) + loss_val = numpy.mean(loss_val) + acc_val = numpy.mean(acc_val) + if step % 100 == 0: + print("Pass %d, Batch %d, Cost %f" % (epoch_id, step, loss_val)) + step += 1 + # test for epoch + avg_loss_val, acc_val = train_test( + train_test_program=test_program, + train_test_reader=test_reader, + train_test_feed=feeder) + + print("Test with Epoch %d, avg_cost: %s, acc: %s" % + (epoch_id, avg_loss_val, acc_val)) + lists.append((epoch_id, avg_loss_val, acc_val)) + if save_dirname is not None: + fluid.io.save_inference_model( + save_dirname, ["img"], [prediction], + exe, + model_filename=model_filename, + params_filename=params_filename) + + # find the best pass + best = sorted(lists, key=lambda list: float(list[1]))[0] + print('Best pass is %s, testing Avgcost is %s' % (best[0], best[1])) + print('The classification accuracy is %.2f%%' % (float(best[2]) * 100)) + + +def infer(use_cuda, + save_dirname=None, + model_filename=None, + params_filename=None): + if save_dirname is None: + return + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + def load_image(file): + im = Image.open(file).convert('L') + im = im.resize((28, 28), Image.ANTIALIAS) + im = numpy.array(im).reshape(1, 1, 28, 28).astype(numpy.float32) + im = im / 255.0 * 2.0 - 1.0 + return im + + cur_dir = os.path.dirname(os.path.realpath(__file__)) + tensor_img = load_image(cur_dir + '/image/infer_3.png') + + inference_scope = fluid.core.Scope() + with fluid.scope_guard(inference_scope): + # Use fluid.io.load_inference_model to obtain the inference program desc, + # the feed_target_names (the names of variables that will be feeded + # data using feed operators), and the fetch_targets (variables that + # we want to obtain data from using fetch operators). + [inference_program, feed_target_names, + fetch_targets] = fluid.io.load_inference_model( + save_dirname, exe, model_filename, params_filename) + + # Construct feed as a dictionary of {feed_target_name: feed_target_data} + # and results will contain a list of data corresponding to fetch_targets. + results = exe.run(inference_program, + feed={feed_target_names[0]: tensor_img}, + fetch_list=fetch_targets) + lab = numpy.argsort(results) + print("Inference result of image/infer_3.png is: %d" % lab[0][0][-1]) + + +def main(use_cuda): + model_filename = None + params_filename = None + save_dirname = "recognize_digits" + ".inference.model" + + # call train() with is_local argument to run distributed train + train( + use_cuda=use_cuda, + save_dirname=save_dirname, + model_filename=model_filename, + params_filename=params_filename) + infer( + use_cuda=use_cuda, + save_dirname=save_dirname, + model_filename=model_filename, + params_filename=params_filename) + + +if __name__ == '__main__': + use_cuda = False + main(use_cuda=use_cuda) From 7e145b7c0e8a877ce78135dc74d3d65090e9c704 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 28 Jan 2019 10:13:09 +0800 Subject: [PATCH 11/37] optimize test_async_ssa_graph_executor_mnist --- .../test_async_ssa_graph_executor_mnist.py | 138 ++++-------------- 1 file changed, 31 insertions(+), 107 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py index e2b3b2b0f2dc7d..03d7df8852e37d 100644 --- a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -15,13 +15,13 @@ from __future__ import print_function import os -from PIL import Image +import unittest + import numpy import paddle import paddle.fluid as fluid BATCH_SIZE = 64 -PASS_NUM = 5 def loss_net(hidden, label): @@ -51,11 +51,9 @@ def convolutional_neural_network(img, label): return loss_net(conv_pool_2, label) -def train(use_cuda, - save_dirname=None, - model_filename=None, - params_filename=None): +def train(use_cuda, thread_num, cpu_num): if use_cuda and not fluid.core.is_compiled_with_cuda(): + print("paddle is not compiled with cuda, exit!") return img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') @@ -84,8 +82,6 @@ def train_test(train_test_program, train_test_feed, train_test_reader): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - train_reader = paddle.batch( paddle.reader.shuffle( paddle.dataset.mnist.train(), buf_size=500), @@ -94,24 +90,22 @@ def train_test(train_test_program, train_test_feed, train_test_reader): paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) feeder = fluid.DataFeeder(feed_list=[img, label], place=place) + exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) - main_program = fluid.default_main_program() - exec_strategy = fluid.ExecutionStrategy() - build_strategy = fluid.BuildStrategy() - - cpu_num = int(os.environ.get('CPU_NUM')) - thread_num = int(os.getenv("NUM_THREADS")) + os.environ['CPU_NUM'] = str(cpu_num) print("cpu_num:" + str(cpu_num)) print("thread_num:" + str(thread_num)) - build_strategy.async_mode = True + build_strategy = fluid.BuildStrategy() + build_strategy.async_mode = True # enable async mode + exec_strategy = fluid.ExecutionStrategy() exec_strategy.num_threads = thread_num - exec_strategy.num_iteration_per_drop_scope = 1 - exec_strategy.num_iteration_per_run = 10 + exec_strategy.num_iteration_per_run = 2 + main_program = fluid.default_main_program() pe = fluid.ParallelExecutor( use_cuda=False, loss_name=avg_loss.name, @@ -119,96 +113,26 @@ def train_test(train_test_program, train_test_feed, train_test_reader): build_strategy=build_strategy, exec_strategy=exec_strategy) - lists = [] step = 0 - for epoch_id in range(PASS_NUM): - for step_id, data in enumerate(train_reader()): - loss_val, acc_val = pe.run(feed=feeder.feed(data), - fetch_list=[avg_loss.name, acc.name]) - loss_val = numpy.mean(loss_val) - acc_val = numpy.mean(acc_val) - if step % 100 == 0: - print("Pass %d, Batch %d, Cost %f" % (epoch_id, step, loss_val)) - step += 1 - # test for epoch - avg_loss_val, acc_val = train_test( - train_test_program=test_program, - train_test_reader=test_reader, - train_test_feed=feeder) - - print("Test with Epoch %d, avg_cost: %s, acc: %s" % - (epoch_id, avg_loss_val, acc_val)) - lists.append((epoch_id, avg_loss_val, acc_val)) - if save_dirname is not None: - fluid.io.save_inference_model( - save_dirname, ["img"], [prediction], - exe, - model_filename=model_filename, - params_filename=params_filename) - - # find the best pass - best = sorted(lists, key=lambda list: float(list[1]))[0] - print('Best pass is %s, testing Avgcost is %s' % (best[0], best[1])) - print('The classification accuracy is %.2f%%' % (float(best[2]) * 100)) - - -def infer(use_cuda, - save_dirname=None, - model_filename=None, - params_filename=None): - if save_dirname is None: - return + for step_id, data in enumerate(train_reader()): + loss_val = pe.run(feed=feeder.feed(data), fetch_list=[avg_loss.name]) + loss_val = numpy.mean(loss_val) + if step % 100 == 0: + print("Batch %d, Cost %f" % (step, loss_val)) + step += 1 + # test for epoch + avg_loss_val, acc_val = train_test( + train_test_program=test_program, + train_test_reader=test_reader, + train_test_feed=feeder) + + print("Test: avg_cost: %s, acc: %s" % (avg_loss_val, acc_val)) + + +class TestAsyncSSAGraphExecutor(unittest.TestCase): + def test_check_async_ssa_exe_train(self): + train(use_cuda=False, thread_num=2, cpu_num=2) - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - def load_image(file): - im = Image.open(file).convert('L') - im = im.resize((28, 28), Image.ANTIALIAS) - im = numpy.array(im).reshape(1, 1, 28, 28).astype(numpy.float32) - im = im / 255.0 * 2.0 - 1.0 - return im - - cur_dir = os.path.dirname(os.path.realpath(__file__)) - tensor_img = load_image(cur_dir + '/image/infer_3.png') - - inference_scope = fluid.core.Scope() - with fluid.scope_guard(inference_scope): - # Use fluid.io.load_inference_model to obtain the inference program desc, - # the feed_target_names (the names of variables that will be feeded - # data using feed operators), and the fetch_targets (variables that - # we want to obtain data from using fetch operators). - [inference_program, feed_target_names, - fetch_targets] = fluid.io.load_inference_model( - save_dirname, exe, model_filename, params_filename) - - # Construct feed as a dictionary of {feed_target_name: feed_target_data} - # and results will contain a list of data corresponding to fetch_targets. - results = exe.run(inference_program, - feed={feed_target_names[0]: tensor_img}, - fetch_list=fetch_targets) - lab = numpy.argsort(results) - print("Inference result of image/infer_3.png is: %d" % lab[0][0][-1]) - - -def main(use_cuda): - model_filename = None - params_filename = None - save_dirname = "recognize_digits" + ".inference.model" - - # call train() with is_local argument to run distributed train - train( - use_cuda=use_cuda, - save_dirname=save_dirname, - model_filename=model_filename, - params_filename=params_filename) - infer( - use_cuda=use_cuda, - save_dirname=save_dirname, - model_filename=model_filename, - params_filename=params_filename) - - -if __name__ == '__main__': - use_cuda = False - main(use_cuda=use_cuda) +if __name__ == "__main__": + unittest.main() From 02dab46ab8101873663a63614f88931ead7846d9 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 28 Jan 2019 16:23:06 +0800 Subject: [PATCH 12/37] add some debug info --- .../details/async_ssa_graph_executor.cc | 2 ++ .../framework/details/exception_holder.h | 17 ++++++++++++ .../fluid/operators/reader/blocking_queue.h | 1 + .../test_async_ssa_graph_executor_mnist.py | 27 ++++++++++++++++++- 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index c259ff4f747e3d..e21d5fb96dcbec 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -84,6 +84,8 @@ FeedFetchList AsyncSSAGraphExecutor::Run( } if (exception_holder_.IsCaught()) { + VLOG(3) << "caught exception " << exception_holder_.Type() + << ", rethrow it"; exception_holder_.ReThrow(); } diff --git a/paddle/fluid/framework/details/exception_holder.h b/paddle/fluid/framework/details/exception_holder.h index 1b1afce04ebbf8..77ca03b86e6aeb 100644 --- a/paddle/fluid/framework/details/exception_holder.h +++ b/paddle/fluid/framework/details/exception_holder.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "glog/logging.h" #include "paddle/fluid/platform/enforce.h" @@ -64,6 +66,21 @@ class ExceptionHolder { ClearImpl(); } + std::string Type() { + std::lock_guard lock(mu_); + switch (type_) { + case kNone: + return "None"; + case kEnforceNotMet: { + return "EnforceNotMet"; + } + case kEOF: { + return "EOF"; + } + } + return "unknown"; + } + private: void ClearImpl() { exception_.reset(); diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 51b980acb5a08d..45c3ad802fc980 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -79,6 +79,7 @@ class BlockingQueue { return true; } else { PADDLE_ENFORCE(closed_); + VLOG(3) << "queue is closed! return nothing."; return false; } } diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py index 03d7df8852e37d..6a2f829654c364 100644 --- a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -59,6 +59,13 @@ def train(use_cuda, thread_num, cpu_num): img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') + py_reader = fluid.layers.create_py_reader_by_data( + capacity=64, + feed_list=[img, label], + name='py_reader', + use_double_buffer=True) + img, label = fluid.layers.read_file(py_reader) + prediction, avg_loss, acc = convolutional_neural_network(img, label) test_program = fluid.default_main_program().clone(for_test=True) @@ -103,7 +110,7 @@ def train_test(train_test_program, train_test_feed, train_test_reader): exec_strategy = fluid.ExecutionStrategy() exec_strategy.num_threads = thread_num - exec_strategy.num_iteration_per_run = 2 + exec_strategy.num_iteration_per_run = 1 main_program = fluid.default_main_program() pe = fluid.ParallelExecutor( @@ -113,6 +120,22 @@ def train_test(train_test_program, train_test_feed, train_test_reader): build_strategy=build_strategy, exec_strategy=exec_strategy) + py_reader.decorate_paddle_reader(train_reader) + py_reader.start() + + step = 0 + try: + while True: + print("step %d in" % step) + loss_val = pe.run(fetch_list=[avg_loss.name]) + loss_val = numpy.mean(loss_val) + if step % 1 == 0: + print("Batch %d, Cost %f, queue size %d" % + (step, loss_val, py_reader.queue.size())) + step += 1 + except fluid.core.EOFException: + py_reader.reset() + """ step = 0 for step_id, data in enumerate(train_reader()): loss_val = pe.run(feed=feeder.feed(data), fetch_list=[avg_loss.name]) @@ -120,6 +143,8 @@ def train_test(train_test_program, train_test_feed, train_test_reader): if step % 100 == 0: print("Batch %d, Cost %f" % (step, loss_val)) step += 1 + """ + # test for epoch avg_loss_val, acc_val = train_test( train_test_program=test_program, From 4a172611f989eaae04638784cf96c3a2be3c6b8c Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 28 Jan 2019 17:11:48 +0800 Subject: [PATCH 13/37] complete test_async_ssa_graph_executor_mnist test=develop --- .../test_async_ssa_graph_executor_mnist.py | 162 ++++++++++-------- 1 file changed, 91 insertions(+), 71 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py index 6a2f829654c364..110460497070cf 100644 --- a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -18,60 +18,61 @@ import unittest import numpy +import time import paddle import paddle.fluid as fluid BATCH_SIZE = 64 -def loss_net(hidden, label): - prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') - loss = fluid.layers.cross_entropy(input=prediction, label=label) - avg_loss = fluid.layers.mean(loss) - acc = fluid.layers.accuracy(input=prediction, label=label) - return prediction, avg_loss, acc - - -def convolutional_neural_network(img, label): - conv_pool_1 = fluid.nets.simple_img_conv_pool( - input=img, - filter_size=5, - num_filters=20, - pool_size=2, - pool_stride=2, - act="relu") - conv_pool_1 = fluid.layers.batch_norm(conv_pool_1) - conv_pool_2 = fluid.nets.simple_img_conv_pool( - input=conv_pool_1, - filter_size=5, - num_filters=50, - pool_size=2, - pool_stride=2, - act="relu") - return loss_net(conv_pool_2, label) - - -def train(use_cuda, thread_num, cpu_num): - if use_cuda and not fluid.core.is_compiled_with_cuda(): - print("paddle is not compiled with cuda, exit!") - return - - img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - py_reader = fluid.layers.create_py_reader_by_data( - capacity=64, - feed_list=[img, label], - name='py_reader', - use_double_buffer=True) - img, label = fluid.layers.read_file(py_reader) - - prediction, avg_loss, acc = convolutional_neural_network(img, label) +def convolutional_neural_network(use_py_reader): + with fluid.unique_name.guard(): + img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + py_reader = None + if use_py_reader: + py_reader = fluid.layers.create_py_reader_by_data( + capacity=64, + feed_list=[img, label], + name='py_reader', + use_double_buffer=True) + img, label = fluid.layers.read_file(py_reader) + + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=img, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu") + conv_pool_1 = fluid.layers.batch_norm(conv_pool_1) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu") + + prediction = fluid.layers.fc(input=conv_pool_2, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + avg_loss = fluid.layers.mean(loss) + acc = fluid.layers.accuracy(input=prediction, label=label) + + return img, label, prediction, avg_loss, acc, py_reader + + +def test(): + place = fluid.CPUPlace() + exe = fluid.Executor(place) - test_program = fluid.default_main_program().clone(for_test=True) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) - optimizer = fluid.optimizer.Adam(learning_rate=0.001) - optimizer.minimize(avg_loss) + img, label, prediction, avg_loss, acc, py_reader = convolutional_neural_network( + use_py_reader=False) + feeder = fluid.DataFeeder(feed_list=[img, label], place=place) def train_test(train_test_program, train_test_feed, train_test_reader): acc_set = [] @@ -87,16 +88,33 @@ def train_test(train_test_program, train_test_feed, train_test_reader): avg_loss_val_mean = numpy.array(avg_loss_set).mean() return avg_loss_val_mean, acc_val_mean - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + # test for epoch + avg_loss_val, acc_val = train_test( + train_test_program=fluid.default_main_program(), + train_test_reader=test_reader, + train_test_feed=feeder) + + print("Test: avg_cost: %s, acc: %s" % (avg_loss_val, acc_val)) + assert acc_val > 0.96 + + +def train(use_cuda, thread_num, cpu_num): + if use_cuda and not fluid.core.is_compiled_with_cuda(): + print("paddle is not compiled with cuda, exit!") + return + + img, label, prediction, avg_loss, acc, py_reader = convolutional_neural_network( + use_py_reader=True) + + optimizer = fluid.optimizer.Adam(learning_rate=0.001) + optimizer.minimize(avg_loss) train_reader = paddle.batch( paddle.reader.shuffle( paddle.dataset.mnist.train(), buf_size=500), batch_size=BATCH_SIZE) - test_reader = paddle.batch( - paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) - feeder = fluid.DataFeeder(feed_list=[img, label], place=place) + place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) @@ -106,11 +124,11 @@ def train_test(train_test_program, train_test_feed, train_test_reader): print("thread_num:" + str(thread_num)) build_strategy = fluid.BuildStrategy() - build_strategy.async_mode = True # enable async mode + build_strategy.async_mode = True exec_strategy = fluid.ExecutionStrategy() exec_strategy.num_threads = thread_num - exec_strategy.num_iteration_per_run = 1 + exec_strategy.num_iteration_per_run = 10 main_program = fluid.default_main_program() pe = fluid.ParallelExecutor( @@ -126,37 +144,39 @@ def train_test(train_test_program, train_test_feed, train_test_reader): step = 0 try: while True: - print("step %d in" % step) loss_val = pe.run(fetch_list=[avg_loss.name]) loss_val = numpy.mean(loss_val) - if step % 1 == 0: + if step % 100 == 0: print("Batch %d, Cost %f, queue size %d" % (step, loss_val, py_reader.queue.size())) step += 1 except fluid.core.EOFException: + print("train end") py_reader.reset() - """ - step = 0 - for step_id, data in enumerate(train_reader()): - loss_val = pe.run(feed=feeder.feed(data), fetch_list=[avg_loss.name]) - loss_val = numpy.mean(loss_val) - if step % 100 == 0: - print("Batch %d, Cost %f" % (step, loss_val)) - step += 1 - """ - - # test for epoch - avg_loss_val, acc_val = train_test( - train_test_program=test_program, - train_test_reader=test_reader, - train_test_feed=feeder) - print("Test: avg_cost: %s, acc: %s" % (avg_loss_val, acc_val)) + return step class TestAsyncSSAGraphExecutor(unittest.TestCase): def test_check_async_ssa_exe_train(self): - train(use_cuda=False, thread_num=2, cpu_num=2) + step_list = [] + for cpu_num in [1, 2, 4]: + scope = fluid.core.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard( + fluid.Program(), startup_program=fluid.Program()): + start_time = time.time() + step = train( + use_cuda=False, thread_num=cpu_num, cpu_num=cpu_num) + end_time = time.time() + step_list.append(step) + print("cpu_num -> " + str(cpu_num) + " step -> " + str(step) + + " time -> " + str(end_time - start_time)) + with fluid.program_guard( + fluid.Program(), startup_program=fluid.Program()): + test() + assert step_list[0] / 2 == step_list[1] + assert step_list[1] / 2 == step_list[2] if __name__ == "__main__": From 249f48e5397359696f1c2844473f4dcf55ce0ebe Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 29 Jan 2019 07:10:00 +0800 Subject: [PATCH 14/37] update test test=develop --- .../tests/unittests/test_async_ssa_graph_executor_mnist.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py index 110460497070cf..41fa39e06be8c9 100644 --- a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -175,8 +175,8 @@ def test_check_async_ssa_exe_train(self): with fluid.program_guard( fluid.Program(), startup_program=fluid.Program()): test() - assert step_list[0] / 2 == step_list[1] - assert step_list[1] / 2 == step_list[2] + assert int(step_list[0] / 2) == int(step_list[1]) + assert int(step_list[1] / 2) == int(step_list[2]) if __name__ == "__main__": From b1fe8d45709e0d7d0dcde4e969b5fc4e833320c6 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 4 Feb 2019 09:48:00 +0800 Subject: [PATCH 15/37] add a check for async_ssa_graph_exe test=develop --- .../framework/details/async_ssa_graph_executor.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index e21d5fb96dcbec..79b390dde48adf 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -30,6 +30,19 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); + if (strategy_.num_iteration_per_run_ > 1) { + int read_op_num = 0; + for (auto *node : graphs_[0]->Nodes()) { + if (node->IsOp() && node->Name() == "read") { + read_op_num++; + } + } + if (read_op_num == 0) { + LOG(WARNING) << "when num_iteration_per_run_ is larger then 1, the model " + "should use pyreader to feed data!"; + } + } + // set the correct size of thread pool to each device. strategy_.num_threads_ = strategy_.num_threads_ < places_.size() ? 1UL From e72637ddd22765dd915119b96bc1821734cd28ef Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sat, 9 Feb 2019 17:11:46 +0800 Subject: [PATCH 16/37] ThreadedSSAGraphExecutor support num_iteration_per_run test=develop --- .../details/async_ssa_graph_executor.cc | 16 ------------ .../details/threaded_ssa_graph_executor.cc | 25 +++++++++++++++++-- .../details/threaded_ssa_graph_executor.h | 1 + 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 79b390dde48adf..5ce92ad826741c 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -30,19 +30,6 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); - if (strategy_.num_iteration_per_run_ > 1) { - int read_op_num = 0; - for (auto *node : graphs_[0]->Nodes()) { - if (node->IsOp() && node->Name() == "read") { - read_op_num++; - } - } - if (read_op_num == 0) { - LOG(WARNING) << "when num_iteration_per_run_ is larger then 1, the model " - "should use pyreader to feed data!"; - } - } - // set the correct size of thread pool to each device. strategy_.num_threads_ = strategy_.num_threads_ < places_.size() ? 1UL @@ -69,9 +56,6 @@ FeedFetchList AsyncSSAGraphExecutor::Run( for (size_t i = 0; i < places_.size(); ++i) { auto call = [this, i, &fetch_tensors]() -> FeedFetchList { try { - for (size_t j = 0; j < strategy_.num_iteration_per_run_ - 1; ++j) { - executors_[i]->Run(fetch_tensors); - } return executors_[i]->Run(fetch_tensors); } catch (...) { exception_holder_.Catch(std::current_exception()); diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 677a2937945b03..16fa2a6db689b1 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -32,9 +32,22 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( places_(places), fetch_ctxs_(places), running_ops_(0), - strategy_(strategy) {} + strategy_(strategy) { + if (strategy_.num_iteration_per_run_ > 1) { + int read_op_num = 0; + for (auto *node : graph_->Nodes()) { + if (node->IsOp() && node->Name() == "read") { + read_op_num++; + } + } + if (read_op_num == 0) { + LOG(WARNING) << "when num_iteration_per_run_ is larger then 1, the model " + "should use pyreader to feed data!"; + } + } +} -FeedFetchList ThreadedSSAGraphExecutor::Run( +inline FeedFetchList ThreadedSSAGraphExecutor::RunImpl( const std::vector &fetch_tensors) { std::unique_ptr event( new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare", nullptr)); @@ -140,6 +153,14 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( return fetch_data; } +FeedFetchList ThreadedSSAGraphExecutor::Run( + const std::vector &fetch_tensors) { + for (size_t j = 0; j < strategy_.num_iteration_per_run_ - 1; ++j) { + RunImpl({}); + } + return RunImpl(fetch_tensors); +} + void ThreadedSSAGraphExecutor::InsertFetchOps( const std::vector &fetch_tensors, std::vector *fetch_ops, diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 24da56c09e3e0f..3809b6e9ae0c43 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -51,6 +51,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ~ThreadedSSAGraphExecutor() final = default; private: + inline FeedFetchList RunImpl(const std::vector &fetch_tensors); void RunOp(const std::shared_ptr> &ready_var_q, details::OpHandleBase *op); From 84367cf8bc4195d82dc1851d116980746f7c68b6 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sun, 10 Feb 2019 19:58:50 +0800 Subject: [PATCH 17/37] support async mode in dist mode parallel executor --- .../details/multi_devices_graph_pass.cc | 35 ++++++++++++++++--- .../details/multi_devices_graph_pass.h | 12 +++---- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index f1347e2b0d70c6..a2bbfc91b73745 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -167,6 +167,10 @@ std::unique_ptr MultiDevSSAGraphBuilderBase::ApplyImpl( bool is_forwarding = true; bool insert_collection_ops = NeedCollectiveOps(); + if (strategy_.async_mode_) { + // async mode did not need to merge gradient + insert_collection_ops = false; + } for (ir::Node *node : sorted_ops) { if (DealWithSpecialOp(&result, node)) { @@ -192,8 +196,22 @@ std::unique_ptr MultiDevSSAGraphBuilderBase::ApplyImpl( static_cast(boost::get(node->Op()->GetAttr( OpProtoAndCheckerMaker::OpRoleAttrName())) & static_cast(OpRole::kBackward)); + // optimize op is already processed in DealWithSpecialOp, + // here we only consider backward op if (!is_bk_op) continue; + /* + * the op that will generate the gradient of on parameter will have + one attr op_role_var + * to record the parameter and gradient, like: + attrs { + name: "op_role_var" + type: STRINGS + strings: "fc_1.b_0" + strings: "fc_1.b_0@GRAD" + } + */ + // Currently, we assume that once gradient is generated, it can be // broadcast, and each gradient is only broadcast once. auto backward_vars = @@ -204,7 +222,7 @@ std::unique_ptr MultiDevSSAGraphBuilderBase::ApplyImpl( for (size_t i = 0; i < backward_vars.size(); i += 2) { auto &p_name = backward_vars[i]; auto &g_name = backward_vars[i + 1]; - VLOG(10) << "Bcast " << g_name << " for parameter " << p_name; + VLOG(3) << "Bcast " << g_name << " for parameter " << p_name; InsertCollectiveOp(&result, p_name, g_name); } @@ -385,7 +403,7 @@ void MultiDevSSAGraphBuilderBase::CreateFusedBroadcastOp( void MultiDevSSAGraphBuilderBase::CreateComputationalOp(ir::Graph *result, ir::Node *node, - int dev_id) const { + size_t dev_id) const { result->Get(kGraphOps).emplace_back( new ComputationOpHandle(result->CreateOpNode(node->Op()), local_scopes_[dev_id], places_[dev_id], dev_id)); @@ -454,9 +472,8 @@ void MultiDevSSAGraphBuilderBase::CreateComputationalOps( } } -VarHandle *MultiDevSSAGraphBuilderBase::CreateReduceOp(ir::Graph *result, - const std::string &og, - int dst_dev_id) const { +VarHandle *MultiDevSSAGraphBuilderBase::CreateReduceOp( + ir::Graph *result, const std::string &og, size_t dst_dev_id) const { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) result->Get(kGraphOps).emplace_back(new ReduceOpHandle( result->CreateEmptyNode("reduce", ir::Node::Type::kOperation), @@ -720,6 +737,10 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, ir::Node *node) const { bool insert_op = false; if (OpHaveRole(*node, OpRole::kRPC)) { + // in async_mode, each graph will send it's own gradient. + if (strategy_.async_mode_ && node->Op()->Type() == "send") { + return false; + } int op_dev_id = CreateRPCOp(result, node); PADDLE_ENFORCE(op_dev_id != -1, "Can not schedule the RPC operator to the right place."); @@ -737,6 +758,8 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, } else if (OpHaveRole(*node, OpRole::kDist)) { int op_dev_id = CreateDistTrainOp(result, node); if (node->Op()->Type() == "concat") { + // the input(block of parameter) of concat is on different device, + // the output(parameter) will on one device. auto origin_param_name = node->Op()->OutputArgumentNames()[0]; bcast_var_name_set_[op_dev_id].emplace(origin_param_name); } @@ -744,6 +767,7 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, } else { int op_dev_id = GetOpDeviceID(node); if (op_dev_id != -1) { // This op only runs on one specific device. + // optimize op will be processed here. CreateComputationalOp(result, node, op_dev_id); for (ir::Node *n : node->outputs) { sharded_var_device_.emplace(n->Name(), op_dev_id); @@ -905,6 +929,7 @@ int DistSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, const std::string &p_name, const std::string &g_name) const { + // collective gradient to each device size_t cur_device_id = 0; switch (strategy_.reduce_) { case BuildStrategy::ReduceStrategy::kReduce: diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index e91397816c3a98..377ba50fccf4ab 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -68,10 +68,10 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { proto::VarType::Type dtype) const; VarHandle *CreateReduceOp(ir::Graph *result, const std::string &og, - int dst_dev_id) const; + size_t dst_dev_id) const; void CreateComputationalOp(ir::Graph *result, ir::Node *node, - int dev_id) const; + size_t dev_id) const; bool IsSparseGradient(const std::string &og) const; @@ -118,16 +118,16 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { protected: - virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, - const std::string &g_name) const {} + void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + const std::string &g_name) const override {} bool NeedCollectiveOps() const override { return false; } - virtual bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const { + bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const override { return false; } - virtual void InsertPostprocessOps(ir::Graph *result) const {} + void InsertPostprocessOps(ir::Graph *result) const override {} }; class BalanceVarSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { From c4ded17e8cbcbf33e68145c1a4ffe777582bf3ab Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 11 Feb 2019 09:19:48 +0800 Subject: [PATCH 18/37] async mode support dist train --- paddle/fluid/framework/details/build_strategy.cc | 6 +++--- paddle/fluid/framework/details/multi_devices_graph_pass.cc | 7 ++++++- paddle/fluid/framework/parallel_executor.cc | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index a286cb30a239a4..e917395259cfba 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -133,10 +133,10 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { void AppendMultiDevPass(const BuildStrategy &strategy) { ir::Pass *multi_devices_pass; - if (strategy_.async_mode_) { - multi_devices_pass = AppendPass("async_multi_devices_pass").get(); - } else if (strategy_.is_distribution_) { + if (strategy_.is_distribution_) { multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); + } else if (strategy_.async_mode_) { + multi_devices_pass = AppendPass("async_multi_devices_pass").get(); } else { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { multi_devices_pass = diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index a2bbfc91b73745..572d374b501a76 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -756,6 +756,11 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, insert_op = true; need_broadcast_var_ = true; } else if (OpHaveRole(*node, OpRole::kDist)) { + // in async_mode, each graph will send it's own gradient, do not need to + // merge gradient. + if (strategy_.async_mode_ && node->Op()->Type() != "concat") { + return false; + } int op_dev_id = CreateDistTrainOp(result, node); if (node->Op()->Type() == "concat") { // the input(block of parameter) of concat is on different device, @@ -827,7 +832,7 @@ int DistSSAGraphBuilder::CreateRPCOp(ir::Graph *result, ir::Node *node) const { } auto recv_param_grad = boost::get>( node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName())); - if (recv_param_grad.size() == 2U) { + if (recv_param_grad.size() == 2U && !strategy_.async_mode_) { op_dev_id = GetVarDeviceID(recv_param_grad[1]); VLOG(10) << "recv param " << recv_param_grad[0] << " get grad place: " << recv_param_grad[1] diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index f0bc3acccc22e6..c85fe4f2006817 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -283,7 +283,7 @@ ParallelExecutor::ParallelExecutor( graphs.push_back(std::move(graph)); } #else - if (build_strategy.async_mode_) { + if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { for (size_t i = 0; i < member_->places_.size(); ++i) { std::unique_ptr graph = build_strategy.Apply( main_program, {member_->places_[i]}, loss_var_name, From 2171aa77f100b53c59b8dfd615f2a7ebcf447b77 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 11 Feb 2019 09:29:36 +0800 Subject: [PATCH 19/37] async ssa exe only support local mode --- paddle/fluid/framework/parallel_executor.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index c85fe4f2006817..e8531cd8d848a3 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -333,7 +333,7 @@ ParallelExecutor::ParallelExecutor( "please don't pass loss_var_name."; } } - if (build_strategy.async_mode_) { + if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use AsyncSSAGraphExecutor"; member_->executor_.reset(new details::AsyncSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, From 9465c3d0c393f7e7c5665f561433ca65e193396c Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 21 Feb 2019 16:28:38 +0800 Subject: [PATCH 20/37] fix compile problem --- paddle/fluid/framework/parallel_executor.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index dfadfb57dbdaa7..67ccf04d057377 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -21,8 +21,8 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/graph.h" -#include "paddle/fluid/framework/details/async_ssa_graph_executor.h" #include "paddle/fluid/framework/details/all_reduce_deps_pass.h" +#include "paddle/fluid/framework/details/async_ssa_graph_executor.h" #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" @@ -260,6 +260,7 @@ ParallelExecutor::ParallelExecutor( // Step 2. Convert main_program to SSA form and dependency graph. Also, insert // ncclOp std::unique_ptr graph; + std::vector> graphs; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, member_->local_scopes_, member_->nranks_, @@ -273,10 +274,9 @@ ParallelExecutor::ParallelExecutor( graphs.push_back(std::move(graph)); } } else { - std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_cuda_); - graphs.push_back(std::move(graph)); + graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, + member_->use_cuda_); } #endif auto max_memory_size = GetEagerDeletionThreshold(); From 7f3be09045e349ef9028337083604c1d3a126169 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 21 Feb 2019 17:08:56 +0800 Subject: [PATCH 21/37] fix multi graph test=develop --- .../fluid/framework/details/build_strategy.cc | 1 + paddle/fluid/framework/parallel_executor.cc | 46 +++++++++++-------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 1b0ec029104231..e5c108f8904425 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -249,6 +249,7 @@ std::unique_ptr BuildStrategy::Apply( graph = pass->Apply(std::move(graph)); VLOG(3) << "Finish Apply Pass " << pass->Type(); } + VLOG(3) << "All Passes Applied"; return graph; } diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 67ccf04d057377..ecae729124c216 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -259,14 +259,15 @@ ParallelExecutor::ParallelExecutor( // Step 2. Convert main_program to SSA form and dependency graph. Also, insert // ncclOp - std::unique_ptr graph; std::vector> graphs; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, - member_->use_cuda_, member_->nccl_ctxs_.get()); + std::unique_ptr graph = build_strategy.Apply( + main_program, member_->places_, loss_var_name, member_->local_scopes_, + member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_.get()); + graphs.push_back(std::move(graph)); #else if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { + VLOG(3) << "use local async mode"; for (size_t i = 0; i < member_->places_.size(); ++i) { std::unique_ptr graph = build_strategy.Apply( main_program, {member_->places_[i]}, loss_var_name, @@ -274,39 +275,44 @@ ParallelExecutor::ParallelExecutor( graphs.push_back(std::move(graph)); } } else { - graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, - member_->use_cuda_); + std::unique_ptr graph = build_strategy.Apply( + main_program, member_->places_, loss_var_name, member_->local_scopes_, + member_->nranks_, member_->use_cuda_); + graphs.push_back(std::move(graph)); } #endif auto max_memory_size = GetEagerDeletionThreshold(); VLOG(10) << "Eager Deletion Threshold " << static_cast(max_memory_size) / (1 << 30); if (max_memory_size >= 0) { - graph = member_->PrepareGCAndRefCnts(std::move(graph), - static_cast(max_memory_size)); + for (size_t i = 0; i < graphs.size(); ++i) { + graphs[i] = member_->PrepareGCAndRefCnts( + std::move(graphs[i]), static_cast(max_memory_size)); + } } // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; - for (auto &node : graph->Nodes()) { - if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { - var_infos.emplace_back(); - var_infos.back().name_ = node->Var()->Name(); - var_infos.back().type_ = node->Var()->GetType(); - var_infos.back().persistable_ = node->Var()->Persistable(); + for (auto &graph : graphs) { + for (auto &node : graph->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos.emplace_back(); + var_infos.back().name_ = node->Var()->Name(); + var_infos.back().type_ = node->Var()->GetType(); + var_infos.back().persistable_ = node->Var()->Persistable(); + } } } // If the loss_var_name is given, the number of graph should be only one. if (loss_var_name.size()) { - size_t graph_num = ir::GraphNum(*graph); + size_t graph_num = ir::GraphNum(*graphs[0]); if (graph_num > 1) { LOG(WARNING) << "The number of graph should be only one, " "but the current graph has " - << ir::GraphNum(*graph) + << ir::GraphNum(*graphs[0]) << " sub_graphs. If you want to see the nodes of the " "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' " "to specify the output dir. NOTES: if you not do training, " @@ -326,7 +332,7 @@ ParallelExecutor::ParallelExecutor( // allreduce_seq_pass doesn't need it as the attr. member_->executor_.reset(new details::ParallelSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, main_program, - std::move(graph))); + std::move(graphs[0]))); #else PADDLE_THROW( "Paddle should be compiled with CUDA for ParallelGraph Execution."); @@ -336,12 +342,12 @@ ParallelExecutor::ParallelExecutor( VLOG(3) << "use ThreadedSSAGraphExecutor"; member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, - std::move(graph))); + std::move(graphs[0]))); } else { VLOG(3) << "use FastThreadedSSAGraphExecutor"; member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, - std::move(graph))); + std::move(graphs[0]))); } } From 12f6b8c3d623d166e77b77eb11837783ffc5fe42 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 21 Feb 2019 18:23:31 +0800 Subject: [PATCH 22/37] change the include of ThreadPool.h test=develop --- paddle/fluid/framework/details/threaded_ssa_graph_executor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 3809b6e9ae0c43..ae9cb1ebca4f6d 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -21,8 +21,8 @@ #include #include +#include // ThreadPool in thrird party #include -#include "ThreadPool.h" // ThreadPool in thrird party #include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/details/exception_holder.h" #include "paddle/fluid/framework/details/execution_strategy.h" From f4f4816b0c1ffdf7689523f732cd728c196e5aff Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 22 Feb 2019 16:26:50 +0800 Subject: [PATCH 23/37] fix gpu error test=develop --- .../details/async_ssa_graph_executor.cc | 1 + paddle/fluid/framework/parallel_executor.cc | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 5ce92ad826741c..0780fb040a6fa0 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -29,6 +29,7 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( graphs_(std::move(graphs)) { VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); + PADDLE_ENFORCE_EQ(graphs_.size, local_scopes_.size()); // set the correct size of thread pool to each device. strategy_.num_threads_ = strategy_.num_threads_ < places_.size() diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index ecae729124c216..cfd6609a4b14fe 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -261,10 +261,21 @@ ParallelExecutor::ParallelExecutor( // ncclOp std::vector> graphs; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_.get()); - graphs.push_back(std::move(graph)); + if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { + VLOG(3) << "use local async mode"; + for (size_t i = 0; i < member_->places_.size(); ++i) { + std::unique_ptr graph = build_strategy.Apply( + main_program, {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_, + member_->nccl_ctxs_.get()); + graphs.push_back(std::move(graph)); + } + } else { + std::unique_ptr graph = build_strategy.Apply( + main_program, member_->places_, loss_var_name, member_->local_scopes_, + member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_.get()); + graphs.push_back(std::move(graph)); + } #else if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use local async mode"; From ecedd531c1ba9b68a1f24bce9b7b98ced67cc128 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 22 Feb 2019 16:37:40 +0800 Subject: [PATCH 24/37] fix code bug test=develop --- paddle/fluid/framework/details/async_ssa_graph_executor.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 0780fb040a6fa0..a584b3a708be30 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -29,7 +29,7 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( graphs_(std::move(graphs)) { VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); - PADDLE_ENFORCE_EQ(graphs_.size, local_scopes_.size()); + PADDLE_ENFORCE_EQ(graphs_.size(), local_scopes_.size()); // set the correct size of thread pool to each device. strategy_.num_threads_ = strategy_.num_threads_ < places_.size() From b5b8e6cc9c0b219d9fea2c43944798509f035d04 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sat, 23 Feb 2019 09:28:56 +0800 Subject: [PATCH 25/37] revert the change of scope test=develop --- paddle/fluid/framework/scope.cc | 27 --------------------------- paddle/fluid/framework/scope.h | 1 - 2 files changed, 28 deletions(-) diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 4fe843dde9c6fe..87f0f307d30bc9 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -255,32 +255,5 @@ std::string GenScopeTreeDebugInfo(Scope* root) { return os.str(); } -std::string GenParentScopeTreeDebugInfo(Scope* leaf) { - std::stringstream os; - - if (!leaf) return ""; - - // level traversal - std::vector scopes; - const Scope* current_scope = leaf; - - while (current_scope != nullptr) { - scopes.push_back(current_scope); - current_scope = current_scope->parent(); - } - - os << "\n--------------GenParentScopeTreeDebugInfo--------------\n"; - - for (int i = scopes.size() - 1; i >= 0; --i) { - os << "=======level [" << i << "]=======\n"; - os << scopes[i] << ":\n"; - for (auto& var : scopes[i]->LocalVarNames()) { - os << " - " << var << "\n"; - } - } - - return os.str(); -} - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index eb5c12def6a2f9..f0915d2eee072b 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -144,7 +144,6 @@ class Scope { // Generate some debug string about the inherience structure of scope, quite // naive. std::string GenScopeTreeDebugInfo(Scope*); -std::string GenParentScopeTreeDebugInfo(Scope*); } // namespace framework } // namespace paddle From 10393dd0d16e57203b8cb039174cff97b6efbc89 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 25 Feb 2019 10:09:25 +0800 Subject: [PATCH 26/37] add some check test=develop --- paddle/fluid/framework/parallel_executor.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index cfd6609a4b14fe..8236773672562e 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -217,6 +217,11 @@ ParallelExecutor::ParallelExecutor( } } + if (build_strategy.async_mode_) { + PADDLE_ENFORCE(!member_->use_cuda_, + "gpu mode does not support async_mode_ now!"); + } + // FIXME(Yancey1989): parallel graph mode get better performance // in GPU allreduce distributed training. Need an elegant way to // choice the execution strategy. From 43c82376cba493bf622d452741c395da275f0a1b Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 25 Feb 2019 22:39:34 +0800 Subject: [PATCH 27/37] use one graph --- .../details/async_ssa_graph_executor.cc | 7 +- .../details/async_ssa_graph_executor.h | 6 +- paddle/fluid/framework/parallel_executor.cc | 66 ++++++++----------- 3 files changed, 33 insertions(+), 46 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index a584b3a708be30..b6d1ee50739eb4 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -21,15 +21,14 @@ namespace details { AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::vector> &&graphs) + std::unique_ptr &&graph) : strategy_(std::move(strategy)), local_scopes_(std::move(local_scopes)), pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), places_(std::move(places)), - graphs_(std::move(graphs)) { + graph_(std::move(graph)) { VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); - PADDLE_ENFORCE_EQ(graphs_.size(), local_scopes_.size()); // set the correct size of thread pool to each device. strategy_.num_threads_ = strategy_.num_threads_ < places_.size() @@ -39,7 +38,7 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( << " to run the operators of the graph on each device."; for (size_t i = 0; i < places.size(); ++i) { executors_.emplace_back(new details::ThreadedSSAGraphExecutor( - strategy_, {local_scopes_[i]}, {places_[i]}, std::move(graphs_[i]))); + strategy_, {local_scopes_[i]}, {places_[i]}, graph_.get())); } } diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h index 4091c56d743c6e..50f207361fb1c4 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -29,9 +29,9 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { AsyncSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::vector> &&graphs); + std::unique_ptr &&graph); ~AsyncSSAGraphExecutor() final = default; - const ir::Graph &Graph() const override { return *graphs_[0]; } + const ir::Graph &Graph() const override { return *graph_; } FeedFetchList Run(const std::vector &fetch_tensors) override; @@ -40,7 +40,7 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { std::vector local_scopes_; std::unique_ptr<::ThreadPool> pool_{nullptr}; std::vector places_; - std::vector> graphs_; + std::unique_ptr graph_; std::vector> executors_; ExceptionHolder exception_holder_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 8236773672562e..129d3a7f0d3580 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -264,71 +264,59 @@ ParallelExecutor::ParallelExecutor( // Step 2. Convert main_program to SSA form and dependency graph. Also, insert // ncclOp - std::vector> graphs; + std::unique_ptr graph; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use local async mode"; - for (size_t i = 0; i < member_->places_.size(); ++i) { - std::unique_ptr graph = build_strategy.Apply( - main_program, {member_->places_[i]}, loss_var_name, - {member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_, - member_->nccl_ctxs_.get()); - graphs.push_back(std::move(graph)); - } + graph = + build_strategy.Apply(main_program, {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, member_->nranks_, + member_->use_cuda_, member_->nccl_ctxs_.get()); } else { - std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_.get()); - graphs.push_back(std::move(graph)); + graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, + member_->use_cuda_, member_->nccl_ctxs_.get()); } #else if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use local async mode"; - for (size_t i = 0; i < member_->places_.size(); ++i) { - std::unique_ptr graph = build_strategy.Apply( - main_program, {member_->places_[i]}, loss_var_name, - {member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_); - graphs.push_back(std::move(graph)); - } + graph = build_strategy.Apply(main_program, {member_->places_[0]}, + loss_var_name, {member_->local_scopes_[0]}, + member_->nranks_, member_->use_cuda_); } else { - std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_cuda_); - graphs.push_back(std::move(graph)); + graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, + member_->use_cuda_); } #endif auto max_memory_size = GetEagerDeletionThreshold(); VLOG(10) << "Eager Deletion Threshold " << static_cast(max_memory_size) / (1 << 30); if (max_memory_size >= 0) { - for (size_t i = 0; i < graphs.size(); ++i) { - graphs[i] = member_->PrepareGCAndRefCnts( - std::move(graphs[i]), static_cast(max_memory_size)); - } + graph = member_->PrepareGCAndRefCnts(std::move(graph), + static_cast(max_memory_size)); } // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; - for (auto &graph : graphs) { - for (auto &node : graph->Nodes()) { - if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { - var_infos.emplace_back(); - var_infos.back().name_ = node->Var()->Name(); - var_infos.back().type_ = node->Var()->GetType(); - var_infos.back().persistable_ = node->Var()->Persistable(); - } + for (auto &node : graph->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos.emplace_back(); + var_infos.back().name_ = node->Var()->Name(); + var_infos.back().type_ = node->Var()->GetType(); + var_infos.back().persistable_ = node->Var()->Persistable(); } } // If the loss_var_name is given, the number of graph should be only one. if (loss_var_name.size()) { - size_t graph_num = ir::GraphNum(*graphs[0]); + size_t graph_num = ir::GraphNum(*graph); if (graph_num > 1) { LOG(WARNING) << "The number of graph should be only one, " "but the current graph has " - << ir::GraphNum(*graphs[0]) + << ir::GraphNum(*graph) << " sub_graphs. If you want to see the nodes of the " "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' " "to specify the output dir. NOTES: if you not do training, " @@ -340,7 +328,7 @@ ParallelExecutor::ParallelExecutor( VLOG(3) << "use AsyncSSAGraphExecutor"; member_->executor_.reset(new details::AsyncSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, - std::move(graphs))); + std::move(graph))); } else if (build_strategy.enable_parallel_graph_) { VLOG(3) << "use ParallelSSAGraphExecutor"; #ifdef PADDLE_WITH_CUDA @@ -358,12 +346,12 @@ ParallelExecutor::ParallelExecutor( VLOG(3) << "use ThreadedSSAGraphExecutor"; member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, - std::move(graphs[0]))); + std::move(graph))); } else { VLOG(3) << "use FastThreadedSSAGraphExecutor"; member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, - std::move(graphs[0]))); + std::move(graph))); } } From dab7f36909a61af51beacd145228bb2a4acc4db5 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 25 Feb 2019 22:49:03 +0800 Subject: [PATCH 28/37] optimize code test=develop --- .../details/async_ssa_graph_executor.cc | 6 ++-- .../details/async_ssa_graph_executor.h | 4 +-- paddle/fluid/framework/parallel_executor.cc | 30 +++++++++---------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index b6d1ee50739eb4..8757842996f16f 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -21,12 +21,12 @@ namespace details { AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph) + ir::Graph* graph) : strategy_(std::move(strategy)), local_scopes_(std::move(local_scopes)), pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), places_(std::move(places)), - graph_(std::move(graph)) { + graph_(graph) { VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); @@ -38,7 +38,7 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( << " to run the operators of the graph on each device."; for (size_t i = 0; i < places.size(); ++i) { executors_.emplace_back(new details::ThreadedSSAGraphExecutor( - strategy_, {local_scopes_[i]}, {places_[i]}, graph_.get())); + strategy_, {local_scopes_[i]}, {places_[i]}, graph_)); } } diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h index 50f207361fb1c4..8536852a00f9ee 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -29,7 +29,7 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { AsyncSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph); + ir::Graph *graph); ~AsyncSSAGraphExecutor() final = default; const ir::Graph &Graph() const override { return *graph_; } @@ -40,7 +40,7 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { std::vector local_scopes_; std::unique_ptr<::ThreadPool> pool_{nullptr}; std::vector places_; - std::unique_ptr graph_; + ir::Graph *graph_; std::vector> executors_; ExceptionHolder exception_holder_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index a498ec5b0b5406..081d06b6aa2f66 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -269,25 +269,26 @@ ParallelExecutor::ParallelExecutor( #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use local async mode"; - temp_owned_graph = - build_strategy.Apply(std::move(temp_owned_graph), {member_->places_[0]}, loss_var_name, - {member_->local_scopes_[0]}, member_->nranks_, - member_->use_cuda_, member_->nccl_ctxs_.get()); + temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graph), {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, member_->nranks_, member_->use_cuda_, + member_->nccl_ctxs_.get()); } else { - temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, - member_->use_cuda_, member_->nccl_ctxs_.get()); + temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graph), member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, member_->use_cuda_, + member_->nccl_ctxs_.get()); } #else if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use local async mode"; - temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), {member_->places_[0]}, - loss_var_name, {member_->local_scopes_[0]}, - member_->nranks_, member_->use_cuda_); + temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graph), {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, member_->nranks_, member_->use_cuda_); } else { - temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, - member_->use_cuda_); + temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graph), member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, member_->use_cuda_); } #endif @@ -333,8 +334,7 @@ ParallelExecutor::ParallelExecutor( if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use AsyncSSAGraphExecutor"; member_->executor_.reset(new details::AsyncSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->places_, - graph)); + exec_strategy, member_->local_scopes_, member_->places_, graph)); } else if (build_strategy.enable_parallel_graph_) { VLOG(3) << "use ParallelSSAGraphExecutor"; #ifdef PADDLE_WITH_CUDA From ff01d705835c5e1ccac4d9f1e109725bf6efeb53 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 25 Feb 2019 23:31:56 +0800 Subject: [PATCH 29/37] fix style test=develop --- paddle/fluid/framework/details/async_ssa_graph_executor.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 8757842996f16f..21741667a3ac78 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -20,8 +20,7 @@ namespace details { AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, - const std::vector &places, - ir::Graph* graph) + const std::vector &places, ir::Graph *graph) : strategy_(std::move(strategy)), local_scopes_(std::move(local_scopes)), pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), From f768fbf7157e4b500de3aa456beddaa138f00cd5 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 26 Feb 2019 15:01:59 +0800 Subject: [PATCH 30/37] support multi graph test=develop --- .../details/async_ssa_graph_executor.cc | 6 +-- .../details/async_ssa_graph_executor.h | 6 +-- paddle/fluid/framework/parallel_executor.cc | 40 ++++++++++++++----- paddle/fluid/framework/parallel_executor.h | 2 +- .../fluid/operators/reader/blocking_queue.h | 1 + .../operators/reader/create_py_reader_op.cc | 5 ++- paddle/fluid/pybind/pybind.cc | 2 +- python/paddle/fluid/parallel_executor.py | 9 ++++- 8 files changed, 50 insertions(+), 21 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 21741667a3ac78..dfb9d73dcbe520 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -20,12 +20,12 @@ namespace details { AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, - const std::vector &places, ir::Graph *graph) + const std::vector &places, std::vector graphs) : strategy_(std::move(strategy)), local_scopes_(std::move(local_scopes)), pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), places_(std::move(places)), - graph_(graph) { + graphs_(std::move(graphs)) { VLOG(3) << "build AsyncSSAGraphExecutor"; PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); @@ -37,7 +37,7 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( << " to run the operators of the graph on each device."; for (size_t i = 0; i < places.size(); ++i) { executors_.emplace_back(new details::ThreadedSSAGraphExecutor( - strategy_, {local_scopes_[i]}, {places_[i]}, graph_)); + strategy_, {local_scopes_[i]}, {places_[i]}, graphs_[i])); } } diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h index 8536852a00f9ee..ff85ba2c6cf1dd 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -29,9 +29,9 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { AsyncSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - ir::Graph *graph); + std::vector graphs); ~AsyncSSAGraphExecutor() final = default; - const ir::Graph &Graph() const override { return *graph_; } + const ir::Graph &Graph() const override { return *graphs_[0]; } FeedFetchList Run(const std::vector &fetch_tensors) override; @@ -40,7 +40,7 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { std::vector local_scopes_; std::unique_ptr<::ThreadPool> pool_{nullptr}; std::vector places_; - ir::Graph *graph_; + std::vector graphs_; std::vector> executors_; ExceptionHolder exception_holder_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 081d06b6aa2f66..b1f40911487fa0 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -188,7 +188,7 @@ ParallelExecutor::ParallelExecutor( const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, - ir::Graph *graph) + std::vector graphs) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; member_->use_cuda_ = exec_strategy.use_cuda_; @@ -222,6 +222,8 @@ ParallelExecutor::ParallelExecutor( PADDLE_ENFORCE(!member_->use_cuda_, "gpu mode does not support async_mode_ now!"); } + + ir::Graph *graph = graphs[0]; std::unique_ptr temp_owned_graph(graph); // FIXME(Yancey1989): parallel graph mode get better performance @@ -262,17 +264,26 @@ ParallelExecutor::ParallelExecutor( if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { BCastParamsToDevices(bcast_vars); } -// Startup Program has been run. All local scopes has correct parameters. + // Startup Program has been run. All local scopes has correct parameters. -// Step 2. Convert main_program to SSA form and dependency graph. Also, insert -// ncclOp + // Step 2. Convert main_program to SSA form and dependency graph. Also, insert + // ncclOp + std::vector async_graphs(places.size()); #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use local async mode"; - temp_owned_graph = build_strategy.Apply( - std::move(temp_owned_graph), {member_->places_[0]}, loss_var_name, - {member_->local_scopes_[0]}, member_->nranks_, member_->use_cuda_, - member_->nccl_ctxs_.get()); + temp_owned_graph = + build_strategy.Apply(std::move(temp_owned_graph), {member_->places_[0]}, + loss_var_name, {member_->local_scopes_[0]}, 1, + member_->use_cuda_, member_->nccl_ctxs_.get()); + for (int i = 1; i < member_->places_.size(); ++i) { + std::unique_ptr temp_graph(graphs[i]); + temp_graph = + build_strategy.Apply(std::move(temp_graph), {member_->places_[i]}, + loss_var_name, {member_->local_scopes_[i]}, 1, + member_->use_cuda_, member_->nccl_ctxs_.get()); + async_graphs[i] = temp_graph.release(); + } } else { temp_owned_graph = build_strategy.Apply( std::move(temp_owned_graph), member_->places_, loss_var_name, @@ -284,7 +295,14 @@ ParallelExecutor::ParallelExecutor( VLOG(3) << "use local async mode"; temp_owned_graph = build_strategy.Apply( std::move(temp_owned_graph), {member_->places_[0]}, loss_var_name, - {member_->local_scopes_[0]}, member_->nranks_, member_->use_cuda_); + {member_->local_scopes_[0]}, 1, member_->use_cuda_); + for (int i = 1; i < member_->places_.size(); ++i) { + std::unique_ptr temp_graph(graphs[i]); + temp_graph = build_strategy.Apply( + std::move(temp_graph), {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, 1, member_->use_cuda_); + async_graphs[i] = temp_graph.release(); + } } else { temp_owned_graph = build_strategy.Apply( std::move(temp_owned_graph), member_->places_, loss_var_name, @@ -304,6 +322,8 @@ ParallelExecutor::ParallelExecutor( graph = temp_owned_graph.release(); } + async_graphs[0] = graph; + // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; @@ -334,7 +354,7 @@ ParallelExecutor::ParallelExecutor( if (build_strategy.async_mode_ && !build_strategy.is_distribution_) { VLOG(3) << "use AsyncSSAGraphExecutor"; member_->executor_.reset(new details::AsyncSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->places_, graph)); + exec_strategy, member_->local_scopes_, member_->places_, async_graphs)); } else if (build_strategy.enable_parallel_graph_) { VLOG(3) << "use ParallelSSAGraphExecutor"; #ifdef PADDLE_WITH_CUDA diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index ddf60b39466e72..0e05b2a460a90c 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -50,7 +50,7 @@ class ParallelExecutor { const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, - ir::Graph *graph); + std::vector graphs); ~ParallelExecutor(); diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 45c3ad802fc980..c99b2bc593bca3 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -95,6 +95,7 @@ class BlockingQueue { void Close() { std::lock_guard lock(mutex_); + VLOG(3) << "close queue"; closed_ = true; send_cv_.notify_all(); receive_cv_.notify_all(); diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 901a92ab5b5c74..b2469ad0eb28ea 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -35,7 +35,10 @@ class PyReader : public framework::FileReader { ~PyReader() { queue_->Close(); } - void Shutdown() override { queue_->Close(); } + void Shutdown() override { + VLOG(3) << "PyReader shutdown!"; + queue_->Close(); + } void Start() override { queue_->ReOpen(); } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index f9e7366779462a..fdee5a6d6657df 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1230,7 +1230,7 @@ All parameter, weight, gradient are variables in Paddle. pe.def(py::init &, const std::unordered_set &, const std::string &, Scope *, std::vector &, const ExecutionStrategy &, - const BuildStrategy &, ir::Graph *>()) + const BuildStrategy &, std::vector>()) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element // of vec will be freed by Python GC. We can only return Scope* diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 889156ff74d6eb..9c578ef662bfda 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -177,12 +177,17 @@ def place_obj(place): # step7: init ParallelExecutor # ParallelExecutor API will be deprecated, don't support parallel graph. - self._graph = core.Graph(main.desc) + self._graphs = [] + if build_strategy.async_mode: + for _ in range(cpu_num): + self._graphs.append(core.Graph(main.desc)) + else: + self._graphs.append(core.Graph(main.desc)) self.executor = core.ParallelExecutor( places, persistable_vars, cpt.to_text(loss_name) if loss_name else six.u(''), scope, - local_scopes, exec_strategy, build_strategy, self._graph) + local_scopes, exec_strategy, build_strategy, self._graphs) self.scope = scope From 847e4f4e854b3f73625816d152f65ca5f5c7a27e Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 1 Mar 2019 11:24:14 +0800 Subject: [PATCH 31/37] pure async mode train --- .../details/async_ssa_graph_executor.cc | 114 ++++++++++++------ .../details/async_ssa_graph_executor.h | 12 ++ .../details/threaded_ssa_graph_executor.cc | 2 + paddle/fluid/framework/parallel_executor.cc | 8 +- paddle/fluid/framework/reader.cc | 5 +- paddle/fluid/framework/reader.h | 10 +- .../fluid/operators/reader/blocking_queue.h | 3 +- .../fluid/operators/reader/buffered_reader.cc | 3 + .../operators/reader/create_py_reader_op.cc | 7 +- .../reader/lod_tensor_blocking_queue.h | 5 +- paddle/fluid/pybind/pybind.cc | 1 + .../test_async_ssa_graph_executor_mnist.py | 41 ++++--- 12 files changed, 148 insertions(+), 63 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index dfb9d73dcbe520..69f770afee9a6c 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -14,10 +14,31 @@ #include "paddle/fluid/framework/details/async_ssa_graph_executor.h" +#include "paddle/fluid/framework/variable_helper.h" + namespace paddle { namespace framework { namespace details { +inline void NewTempScopeAndInitVars(const std::vector &var_infos, + Scope *scope) { + Scope &local_scope = scope->NewScope(); + *scope->Var(details::kLocalExecScopeName)->GetMutable() = + &local_scope; + + for (auto &info : var_infos) { + if (scope->FindVar(info.name_) != nullptr) { + continue; + } + + if (info.persistable_) { // Persistable + InitializeVariable(scope->Var(info.name_), info.type_); + } else { + InitializeVariable(local_scope.Var(info.name_), info.type_); + } + } +} + AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, std::vector graphs) @@ -39,58 +60,81 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( executors_.emplace_back(new details::ThreadedSSAGraphExecutor( strategy_, {local_scopes_[i]}, {places_[i]}, graphs_[i])); } -} -FeedFetchList AsyncSSAGraphExecutor::Run( - const std::vector &fetch_tensors) { - std::vector> run_futures; - - std::vector fetch_data; - FeedFetchList ret; - - fetch_data.reserve(places_.size()); - ret.reserve(fetch_tensors.size()); - exception_holder_.Clear(); + for (auto &node : graphs_[0]->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos_.emplace_back(); + var_infos_.back().name_ = node->Var()->Name(); + var_infos_.back().type_ = node->Var()->GetType(); + var_infos_.back().persistable_ = node->Var()->Persistable(); + } + } + for (auto *scope : local_scopes_) { + NewTempScopeAndInitVars(var_infos_, scope); + } +} - for (size_t i = 0; i < places_.size(); ++i) { - auto call = [this, i, &fetch_tensors]() -> FeedFetchList { +void AsyncSSAGraphExecutor::StartOffPythonTrainLoop() { + VLOG(3) << "StartOffPythonTrainLoop size = " << places_.size(); + for (size_t i = 1; i < places_.size(); ++i) { + auto call = [this, i]() -> void { + VLOG(3) << "start off python thread " << i; try { - return executors_[i]->Run(fetch_tensors); + while (true) { + executors_[i]->Run({}); + } } catch (...) { exception_holder_.Catch(std::current_exception()); + VLOG(3) << "get exception type = " << exception_holder_.Type(); } - return FeedFetchList(); + VLOG(3) << "thread " << i << " exited!"; }; - - if (pool_) { - run_futures.emplace_back(pool_->enqueue(std::move(call))); - } else { - fetch_data.emplace_back(std::move(call())); - } - } - - if (pool_) { - for (auto &f : run_futures) { - if (exception_holder_.IsCaught()) { - f.wait(); - } else { - fetch_data.emplace_back(std::move(f.get())); - } - } + run_futures_.emplace_back(pool_->enqueue(std::move(call))); } +} +void AsyncSSAGraphExecutor::HandleException() { if (exception_holder_.IsCaught()) { + for (auto &f : run_futures_) { + VLOG(3) << "wait future"; + f.wait(); + } VLOG(3) << "caught exception " << exception_holder_.Type() << ", rethrow it"; + run_futures_.clear(); exception_holder_.ReThrow(); } +} + +FeedFetchList AsyncSSAGraphExecutor::Run( + const std::vector &fetch_tensors) { + // init once + if (run_futures_.size() == 0 && places_.size() > 1) { + exception_holder_.Clear(); + StartOffPythonTrainLoop(); + } + + if (places_.size() == 1) { + exception_holder_.Clear(); + } else { + HandleException(); + } + + FeedFetchList fetch_data; + fetch_data.reserve(fetch_tensors.size()); + + try { + fetch_data = executors_[0]->Run(fetch_tensors); + } catch (...) { + exception_holder_.Catch(std::current_exception()); + } + + HandleException(); + FeedFetchList ret; for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) { std::vector lodtensor_ptrs; - lodtensor_ptrs.reserve(local_scopes_.size()); - for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); ++scope_idx) { - lodtensor_ptrs.push_back(&fetch_data.at(scope_idx).at(fetch_idx)); - } + lodtensor_ptrs.push_back(&fetch_data.at(fetch_idx)); ret.emplace_back(); ret.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace()); } diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h index ff85ba2c6cf1dd..7d7296772d847e 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -24,6 +24,12 @@ namespace paddle { namespace framework { namespace details { +struct VarInfo { + std::string name_; + proto::VarType::Type type_; + bool persistable_; +}; + class AsyncSSAGraphExecutor : public SSAGraphExecutor { public: AsyncSSAGraphExecutor(const ExecutionStrategy &strategy, @@ -35,6 +41,10 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { FeedFetchList Run(const std::vector &fetch_tensors) override; + private: + void StartOffPythonTrainLoop(); + void HandleException(); + private: ExecutionStrategy strategy_; std::vector local_scopes_; @@ -44,6 +54,8 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { std::vector> executors_; ExceptionHolder exception_holder_; + std::vector> run_futures_; + std::vector var_infos_; }; } // namespace details diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 84366263629361..fa0c90e1f49c3f 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -119,6 +119,8 @@ inline FeedFetchList ThreadedSSAGraphExecutor::RunImpl( if (timeout) { if (exception_holder_.IsCaught()) { + VLOG(3) << "caught exception " << exception_holder_.Type() + << ", rethrow it"; for (auto &run_op_future : run_op_futures_) { run_op_future.wait(); } diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index b1f40911487fa0..c133772e6e86d1 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -379,9 +379,11 @@ ParallelExecutor::ParallelExecutor( } VLOG(3) << "use ScopeBufferedSSAGraphExecutor"; - member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, std::move(var_infos), - member_->places_, std::move(member_->executor_))); + if (!build_strategy.async_mode_) { + member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, std::move(var_infos), + member_->places_, std::move(member_->executor_))); + } } void ParallelExecutor::BCastParamsToDevices( diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 40eafda9bf294f..d3513fb7dbed04 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -69,6 +69,9 @@ void ReaderBase::Start() { ReaderBase::~ReaderBase() {} -DecoratedReader::~DecoratedReader() { reader_->Shutdown(); } +DecoratedReader::~DecoratedReader() { + VLOG(1) << "~DecoratedReader"; + reader_->Shutdown(); +} } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 82562bf883d887..6cf0ec2937935c 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -77,7 +77,10 @@ class DecoratedReader : public ReaderBase, ~DecoratedReader(); protected: - void ShutdownImpl() override { reader_->Shutdown(); } + void ShutdownImpl() override { + VLOG(1) << "ShutdownImpl"; + reader_->Shutdown(); + } void StartImpl() override { reader_->Start(); } @@ -98,6 +101,8 @@ class ReaderHolder { reader_ = reader_base; } + ~ReaderHolder() { VLOG(1) << "~ReaderHolder"; } + const std::shared_ptr& Get() const { return reader_; } void ReadNext(std::vector* out) { @@ -106,6 +111,7 @@ class ReaderHolder { } void ResetAll() { + VLOG(1) << "ResetAll"; auto end_readers = reader_->GetEndPoints(); for (auto* reader : end_readers) { reader->Shutdown(); @@ -116,11 +122,13 @@ class ReaderHolder { } void Shutdown() { + VLOG(1) << "Shutdown"; PADDLE_ENFORCE_NOT_NULL(reader_); reader_->Shutdown(); } void Start() { + VLOG(1) << "start"; PADDLE_ENFORCE_NOT_NULL(reader_); reader_->Start(); } diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index c99b2bc593bca3..fe3f2f40317339 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -86,6 +86,7 @@ class BlockingQueue { void ReOpen() { std::lock_guard lock(mutex_); + VLOG(1) << "reopen queue"; closed_ = false; std::deque new_deque; queue_.swap(new_deque); @@ -95,7 +96,7 @@ class BlockingQueue { void Close() { std::lock_guard lock(mutex_); - VLOG(3) << "close queue"; + VLOG(1) << "close queue"; closed_ = true; send_cv_.notify_all(); receive_cv_.notify_all(); diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index defc29b91f81cb..db80fda695dc76 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -20,6 +20,7 @@ namespace paddle { namespace operators { namespace reader { BufferedReader::~BufferedReader() { + VLOG(1) << "~BufferedReader"; reader_->Shutdown(); while (!position_.empty()) { position_.front().wait(); @@ -41,6 +42,7 @@ BufferedReader::BufferedReader( thread_pool_(1), place_(place), buffer_size_(buffer_size) { + VLOG(1) << "BufferedReader"; #ifdef PADDLE_WITH_CUDA if (platform::is_gpu_place(place_)) { platform::SetDeviceId(boost::get(place_).device); @@ -121,6 +123,7 @@ void BufferedReader::ReadAsync(size_t i) { } void BufferedReader::ShutdownImpl() { + VLOG(1) << "ShutdownImpl"; reader_->Shutdown(); while (!position_.empty()) { position_.pop(); diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index b2469ad0eb28ea..2916be618cdfe7 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -33,10 +33,13 @@ class PyReader : public framework::FileReader { if (!success) out->clear(); } - ~PyReader() { queue_->Close(); } + ~PyReader() { + VLOG(1) << "~PyReader"; + queue_->Close(); + } void Shutdown() override { - VLOG(3) << "PyReader shutdown!"; + VLOG(1) << "PyReader shutdown!"; queue_->Close(); } diff --git a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h index 5b53edff5d8ea7..eeba330d66ea66 100644 --- a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h +++ b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h @@ -57,7 +57,10 @@ class LoDTensorBlockingQueue { inline void ReOpen() { queue_.ReOpen(); } - inline void Close() { queue_.Close(); } + inline void Close() { + VLOG(1) << "LoDTensorBlockingQueue close"; + queue_.Close(); + } inline bool IsClosed() const { return queue_.IsClosed(); } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index fdee5a6d6657df..af049127aa3960 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -557,6 +557,7 @@ All parameter, weight, gradient are variables in Paddle. m.def("init_lod_tensor_blocking_queue", [](Variable &var, size_t capacity) -> std::shared_ptr { + VLOG(1) << "init_lod_tensor_blocking_queue"; auto *holder = var.GetMutable(); holder->InitOnce(capacity, FLAGS_reader_queue_speed_test_mode); return holder->GetQueue(); diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py index 41fa39e06be8c9..4fbda407f12f5b 100644 --- a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -36,7 +36,7 @@ def convolutional_neural_network(use_py_reader): capacity=64, feed_list=[img, label], name='py_reader', - use_double_buffer=True) + use_double_buffer=False) img, label = fluid.layers.read_file(py_reader) conv_pool_1 = fluid.nets.simple_img_conv_pool( @@ -139,20 +139,21 @@ def train(use_cuda, thread_num, cpu_num): exec_strategy=exec_strategy) py_reader.decorate_paddle_reader(train_reader) - py_reader.start() - - step = 0 - try: - while True: - loss_val = pe.run(fetch_list=[avg_loss.name]) - loss_val = numpy.mean(loss_val) - if step % 100 == 0: - print("Batch %d, Cost %f, queue size %d" % - (step, loss_val, py_reader.queue.size())) - step += 1 - except fluid.core.EOFException: - print("train end") - py_reader.reset() + + for pass_id in range(2): + step = 0 + py_reader.start() + try: + while True: + loss_val = pe.run(fetch_list=[avg_loss.name]) + loss_val = numpy.mean(loss_val) + if step % 10 == 0: + print("Pass %d, Batch %d, Cost %f, queue size %d" % + (pass_id, step, loss_val, py_reader.queue.size())) + step += 1 + except fluid.core.EOFException: + print("train end pass = " + str(pass_id)) + py_reader.reset() return step @@ -161,10 +162,11 @@ class TestAsyncSSAGraphExecutor(unittest.TestCase): def test_check_async_ssa_exe_train(self): step_list = [] for cpu_num in [1, 2, 4]: - scope = fluid.core.Scope() - with fluid.scope_guard(scope): + print("run cpu_num -> " + str(cpu_num)) + with fluid.scope_guard(fluid.core.Scope()): with fluid.program_guard( - fluid.Program(), startup_program=fluid.Program()): + main_program=fluid.Program(), + startup_program=fluid.Program()): start_time = time.time() step = train( use_cuda=False, thread_num=cpu_num, cpu_num=cpu_num) @@ -173,7 +175,8 @@ def test_check_async_ssa_exe_train(self): print("cpu_num -> " + str(cpu_num) + " step -> " + str(step) + " time -> " + str(end_time - start_time)) with fluid.program_guard( - fluid.Program(), startup_program=fluid.Program()): + main_program=fluid.Program(), + startup_program=fluid.Program()): test() assert int(step_list[0] / 2) == int(step_list[1]) assert int(step_list[1] / 2) == int(step_list[2]) From 8744f9a083719626c56190672b66eb7ac24d32be Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 4 Mar 2019 22:54:26 +0800 Subject: [PATCH 32/37] fix parallel executor async mode --- paddle/fluid/framework/parallel_executor.cc | 10 ++++++++-- paddle/fluid/framework/parallel_executor.h | 3 ++- paddle/fluid/pybind/pybind.cc | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index c133772e6e86d1..ae7cd800adb5f1 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -188,7 +188,7 @@ ParallelExecutor::ParallelExecutor( const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, - std::vector graphs) + ir::Graph *graph) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; member_->use_cuda_ = exec_strategy.use_cuda_; @@ -218,12 +218,18 @@ ParallelExecutor::ParallelExecutor( } } + std::vector graphs; if (build_strategy.async_mode_) { PADDLE_ENFORCE(!member_->use_cuda_, "gpu mode does not support async_mode_ now!"); + graphs.push_back(graph); + for (int i = 1; i < places.size(); ++i) { + auto *tmp_graph = new ir::Graph(graph->OriginProgram()); + async_graphs_.emplace_back(tmp_graph); + graphs.push_back(tmp_graph); + } } - ir::Graph *graph = graphs[0]; std::unique_ptr temp_owned_graph(graph); // FIXME(Yancey1989): parallel graph mode get better performance diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 0e05b2a460a90c..987f715066306e 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -50,7 +50,7 @@ class ParallelExecutor { const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, - std::vector graphs); + ir::Graph *graph); ~ParallelExecutor(); @@ -76,6 +76,7 @@ class ParallelExecutor { const BuildStrategy &build_strategy) const; ParallelExecutorPrivate *member_; + std::vector> async_graphs_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) std::unique_ptr local_nccl_id_; #endif diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 6d1fc0be232ec8..69cfe280c6b71a 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1271,7 +1271,7 @@ All parameter, weight, gradient are variables in Paddle. pe.def(py::init &, const std::unordered_set &, const std::string &, Scope *, std::vector &, const ExecutionStrategy &, - const BuildStrategy &, std::vector>()) + const BuildStrategy &, ir::Graph *>()) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element // of vec will be freed by Python GC. We can only return Scope* From e92ad8a2097ecffdfa412306b60dba4df68b8541 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 5 Mar 2019 16:56:56 +0800 Subject: [PATCH 33/37] optimize test_async_ssa_graph_executor_mnist test=develop --- .../tests/unittests/test_async_ssa_graph_executor_mnist.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py index 4fbda407f12f5b..5e77ce9b811bc0 100644 --- a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -178,8 +178,8 @@ def test_check_async_ssa_exe_train(self): main_program=fluid.Program(), startup_program=fluid.Program()): test() - assert int(step_list[0] / 2) == int(step_list[1]) - assert int(step_list[1] / 2) == int(step_list[2]) + assert abs(int(step_list[0] / 2) - int(step_list[1])) < 5 + assert abs(int(step_list[1] / 2) - int(step_list[2])) < 5 if __name__ == "__main__": From f28c25845330cf47250f7f6cba67f6f4cdaae97d Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 5 Mar 2019 17:10:17 +0800 Subject: [PATCH 34/37] code clean test=develop --- .../framework/details/multi_devices_graph_pass.cc | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 109037c3e6bc4a..c8e9c5d6870844 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -167,10 +167,6 @@ std::unique_ptr MultiDevSSAGraphBuilderBase::ApplyImpl( bool is_forwarding = true; bool insert_collection_ops = NeedCollectiveOps(); - if (strategy_.async_mode_) { - // async mode did not need to merge gradient - insert_collection_ops = false; - } for (ir::Node *node : sorted_ops) { if (DealWithSpecialOp(&result, node)) { @@ -749,10 +745,6 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, ir::Node *node) const { bool insert_op = false; if (OpHaveRole(*node, OpRole::kRPC)) { - // in async_mode, each graph will send it's own gradient. - if (strategy_.async_mode_ && node->Op()->Type() == "send") { - return false; - } int op_dev_id = CreateRPCOp(result, node); PADDLE_ENFORCE(op_dev_id != -1, "Can not schedule the RPC operator to the right place."); @@ -768,11 +760,6 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, insert_op = true; need_broadcast_var_ = true; } else if (OpHaveRole(*node, OpRole::kDist)) { - // in async_mode, each graph will send it's own gradient, do not need to - // merge gradient. - if (strategy_.async_mode_ && node->Op()->Type() != "concat") { - return false; - } int op_dev_id = CreateDistTrainOp(result, node); if (node->Op()->Type() == "concat") { // the input(block of parameter) of concat is on different device, @@ -844,7 +831,7 @@ int DistSSAGraphBuilder::CreateRPCOp(ir::Graph *result, ir::Node *node) const { } auto recv_param_grad = boost::get>( node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName())); - if (recv_param_grad.size() == 2U && !strategy_.async_mode_) { + if (recv_param_grad.size() == 2U) { op_dev_id = GetVarDeviceID(recv_param_grad[1]); VLOG(10) << "recv param " << recv_param_grad[0] << " get grad place: " << recv_param_grad[1] From c09477b05755da2c61862b37c82fc4031bbf04b1 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 5 Mar 2019 23:13:00 +0800 Subject: [PATCH 35/37] revert change --- python/paddle/fluid/parallel_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 84beb37c1d9951..2ebaab3b102487 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -104,6 +104,7 @@ def __init__(self, main_program = main_program if main_program is not None \ else framework.default_main_program() + self._compiled_program = compiler.CompiledProgram(main_program) self._compiled_program.with_data_parallel( loss_name=loss_name, From 4e218dabc5cb24c753186503389fd533087bae81 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 5 Mar 2019 23:29:09 +0800 Subject: [PATCH 36/37] code format test=develop --- paddle/fluid/framework/details/async_ssa_graph_executor.cc | 3 +++ paddle/fluid/framework/details/build_strategy.cc | 1 + paddle/fluid/framework/details/build_strategy.h | 1 + paddle/fluid/framework/details/exception_holder.h | 1 + paddle/fluid/framework/details/multi_devices_graph_pass.cc | 3 +++ paddle/fluid/framework/details/multi_devices_graph_pass.h | 3 +++ paddle/fluid/framework/details/threaded_ssa_graph_executor.h | 2 ++ paddle/fluid/framework/parallel_executor.h | 1 + paddle/fluid/operators/reader/blocking_queue.h | 1 + paddle/fluid/operators/reader/lod_tensor_blocking_queue.h | 1 + 10 files changed, 17 insertions(+) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 69f770afee9a6c..83fd8a50c37700 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -14,6 +14,9 @@ #include "paddle/fluid/framework/details/async_ssa_graph_executor.h" +#include +#include + #include "paddle/fluid/framework/variable_helper.h" namespace paddle { diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 4c5384af613752..c073f10d8cc6b3 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -16,6 +16,7 @@ limitations under the License. */ #include #include +#include #include "paddle/fluid/framework/details/memory_optimize_helper.h" #include "paddle/fluid/framework/details/multi_devices_graph_pass.h" diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 8cb57ad67490c6..9c807560f5c581 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include diff --git a/paddle/fluid/framework/details/exception_holder.h b/paddle/fluid/framework/details/exception_holder.h index 77ca03b86e6aeb..f8fd395bd9cc1e 100644 --- a/paddle/fluid/framework/details/exception_holder.h +++ b/paddle/fluid/framework/details/exception_holder.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include "glog/logging.h" diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index c8e9c5d6870844..8e4f0497210835 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -13,7 +13,10 @@ // limitations under the License. #include #include +#include #include +#include +#include #include #include diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 377ba50fccf4ab..f7ec9d28de91e5 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -14,7 +14,10 @@ #pragma once +#include #include +#include +#include #include #include diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 923e940884555a..778bbab5057726 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -16,7 +16,9 @@ #include #include +#include #include +#include #include #include #include diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 987f715066306e..9a9f4e08fe1ad4 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once +#include #include #include #include diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index fe3f2f40317339..2b7cb16bc73539 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -16,6 +16,7 @@ #include // NOLINT #include +#include #include "paddle/fluid/platform/enforce.h" diff --git a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h index eeba330d66ea66..be044085f14350 100644 --- a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h +++ b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include "paddle/fluid/framework/ddim.h" From 5e8de51409e52b9bc0210f32cf0759b5925995d4 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 6 Mar 2019 09:31:34 +0800 Subject: [PATCH 37/37] code format test=develop --- paddle/fluid/framework/details/async_ssa_graph_executor.cc | 3 --- paddle/fluid/framework/details/async_ssa_graph_executor.h | 2 ++ paddle/fluid/framework/parallel_executor.cc | 1 + paddle/fluid/framework/reader.h | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 83fd8a50c37700..69f770afee9a6c 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -14,9 +14,6 @@ #include "paddle/fluid/framework/details/async_ssa_graph_executor.h" -#include -#include - #include "paddle/fluid/framework/variable_helper.h" namespace paddle { diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h index 7d7296772d847e..6aaf8f9a165f2e 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -14,7 +14,9 @@ #pragma once +#include #include +#include #include #include "ThreadPool.h" diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index ae7cd800adb5f1..6c5f246f95b977 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -16,6 +16,7 @@ limitations under the License. */ #include #include #include +#include #include #include "paddle/fluid/framework/ir/graph_helper.h" diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 6cf0ec2937935c..4b400e72a4cacd 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -16,6 +16,7 @@ #include #include +#include #include #include "paddle/fluid/framework/ddim.h"