Skip to content
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
92a6c7a
init async ssa executor
jacquesqiao Jan 16, 2019
afda840
init communicator
jacquesqiao Jan 16, 2019
ea66979
can run
jacquesqiao Jan 17, 2019
88d71fa
support num_iteration_per_run
jacquesqiao Jan 17, 2019
69484f7
remote communicator
jacquesqiao Jan 18, 2019
f3210b6
fix copy_memory and share_memory
jacquesqiao Jan 18, 2019
ada43e8
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Jan 25, 2019
fab8457
code optimize
jacquesqiao Jan 26, 2019
a66115b
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Jan 26, 2019
62549e0
add GenParentScopeTreeDebugInfo
jacquesqiao Jan 27, 2019
be738a6
add some debug infor
jacquesqiao Jan 27, 2019
9da96ab
clean code of test_async_ssa_graph_executor_mnist
jacquesqiao Jan 27, 2019
7e145b7
optimize test_async_ssa_graph_executor_mnist
jacquesqiao Jan 28, 2019
02dab46
add some debug info
jacquesqiao Jan 28, 2019
4a17261
complete test_async_ssa_graph_executor_mnist test=develop
jacquesqiao Jan 28, 2019
249f48e
update test test=develop
jacquesqiao Jan 28, 2019
d6c0dca
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Jan 29, 2019
16af1db
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Feb 3, 2019
b1fe8d4
add a check for async_ssa_graph_exe test=develop
jacquesqiao Feb 4, 2019
e72637d
ThreadedSSAGraphExecutor support num_iteration_per_run test=develop
jacquesqiao Feb 9, 2019
84367cf
support async mode in dist mode parallel executor
jacquesqiao Feb 10, 2019
c4ded17
async mode support dist train
jacquesqiao Feb 11, 2019
2171aa7
async ssa exe only support local mode
jacquesqiao Feb 11, 2019
cc71e89
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Feb 21, 2019
31a05d3
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Feb 21, 2019
9465c3d
fix compile problem
jacquesqiao Feb 21, 2019
7f3be09
fix multi graph test=develop
jacquesqiao Feb 21, 2019
12f6b8c
change the include of ThreadPool.h test=develop
jacquesqiao Feb 21, 2019
f4f4816
fix gpu error test=develop
jacquesqiao Feb 22, 2019
ecedd53
fix code bug test=develop
jacquesqiao Feb 22, 2019
b5b8e6c
revert the change of scope test=develop
jacquesqiao Feb 23, 2019
10393dd
add some check test=develop
jacquesqiao Feb 25, 2019
43c8237
use one graph
jacquesqiao Feb 25, 2019
cf0511f
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Feb 25, 2019
dab7f36
optimize code test=develop
jacquesqiao Feb 25, 2019
ff01d70
fix style
jacquesqiao Feb 25, 2019
f768fbf
support multi graph
jacquesqiao Feb 26, 2019
847e4f4
pure async mode train
jacquesqiao Mar 1, 2019
e70b172
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Mar 4, 2019
8744f9a
fix parallel executor async mode
jacquesqiao Mar 4, 2019
b2c082c
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
jacquesqiao Mar 5, 2019
e92ad8a
optimize test_async_ssa_graph_executor_mnist test=develop
jacquesqiao Mar 5, 2019
f28c258
code clean test=develop
jacquesqiao Mar 5, 2019
c09477b
revert change
jacquesqiao Mar 5, 2019
4e218da
code format test=develop
jacquesqiao Mar 5, 2019
5e8de51
code format test=develop
jacquesqiao Mar 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ endif()
target_link_libraries(executor garbage_collector)

cc_library(parallel_executor SRCS parallel_executor.cc DEPS
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor async_ssa_graph_executor
graph build_strategy
fast_threaded_ssa_graph_executor variable_helper)

Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS

cc_library(parallel_ssa_graph_executor SRCS parallel_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor)

cc_library(async_ssa_graph_executor SRCS async_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor)

cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
Expand Down
146 changes: 146 additions & 0 deletions paddle/fluid/framework/details/async_ssa_graph_executor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/details/async_ssa_graph_executor.h"

#include "paddle/fluid/framework/variable_helper.h"

namespace paddle {
namespace framework {
namespace details {

inline void NewTempScopeAndInitVars(const std::vector<VarInfo> &var_infos,
Scope *scope) {
Scope &local_scope = scope->NewScope();
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>() =
&local_scope;

for (auto &info : var_infos) {
if (scope->FindVar(info.name_) != nullptr) {
continue;
}

if (info.persistable_) { // Persistable
InitializeVariable(scope->Var(info.name_), info.type_);
} else {
InitializeVariable(local_scope.Var(info.name_), info.type_);
}
}
}

AsyncSSAGraphExecutor::AsyncSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_scopes是从哪里创建带入的?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const std::vector<platform::Place> &places, std::vector<ir::Graph *> graphs)
: strategy_(std::move(strategy)),
local_scopes_(std::move(local_scopes)),
pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr),
places_(std::move(places)),
graphs_(std::move(graphs)) {
VLOG(3) << "build AsyncSSAGraphExecutor";
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());

// set the correct size of thread pool to each device.
strategy_.num_threads_ = strategy_.num_threads_ < places_.size()
? 1UL
: strategy_.num_threads_ / places_.size();
VLOG(1) << "set num_threads: " << strategy_.num_threads_
<< " to run the operators of the graph on each device.";
for (size_t i = 0; i < places.size(); ++i) {
executors_.emplace_back(new details::ThreadedSSAGraphExecutor(
strategy_, {local_scopes_[i]}, {places_[i]}, graphs_[i]));
}

for (auto &node : graphs_[0]->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos_.emplace_back();
var_infos_.back().name_ = node->Var()->Name();
var_infos_.back().type_ = node->Var()->GetType();
var_infos_.back().persistable_ = node->Var()->Persistable();
}
}
for (auto *scope : local_scopes_) {
NewTempScopeAndInitVars(var_infos_, scope);
}
}

void AsyncSSAGraphExecutor::StartOffPythonTrainLoop() {
VLOG(3) << "StartOffPythonTrainLoop size = " << places_.size();
for (size_t i = 1; i < places_.size(); ++i) {
auto call = [this, i]() -> void {
VLOG(3) << "start off python thread " << i;
try {
while (true) {
executors_[i]->Run({});
}
} catch (...) {
exception_holder_.Catch(std::current_exception());
VLOG(3) << "get exception type = " << exception_holder_.Type();
}
VLOG(3) << "thread " << i << " exited!";
};
run_futures_.emplace_back(pool_->enqueue(std::move(call)));
}
}

void AsyncSSAGraphExecutor::HandleException() {
if (exception_holder_.IsCaught()) {
for (auto &f : run_futures_) {
VLOG(3) << "wait future";
f.wait();
}
VLOG(3) << "caught exception " << exception_holder_.Type()
<< ", rethrow it";
run_futures_.clear();
exception_holder_.ReThrow();
}
}

FeedFetchList AsyncSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
// init once
if (run_futures_.size() == 0 && places_.size() > 1) {
exception_holder_.Clear();
StartOffPythonTrainLoop();
}

if (places_.size() == 1) {
exception_holder_.Clear();
} else {
HandleException();
}

FeedFetchList fetch_data;
fetch_data.reserve(fetch_tensors.size());

try {
fetch_data = executors_[0]->Run(fetch_tensors);
} catch (...) {
exception_holder_.Catch(std::current_exception());
}

HandleException();

FeedFetchList ret;
for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) {
std::vector<const LoDTensor *> lodtensor_ptrs;
lodtensor_ptrs.push_back(&fetch_data.at(fetch_idx));
ret.emplace_back();
ret.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_iteration_per_run_ > 1的情况下,各线程执行速度不一致,merge各个local_scope的结果是否有意义?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个感觉可以去掉其实,反正已经是纯异步了,相当于减少一点做evel的数据量

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

大家步调不一致,参数版本也不一致,确实应该去掉,观察其中一个线程就够了

}
return ret;
}

} // namespace details
} // namespace framework
} // namespace paddle
63 changes: 63 additions & 0 deletions paddle/fluid/framework/details/async_ssa_graph_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <string>
#include <vector>

#include "ThreadPool.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"

namespace paddle {
namespace framework {
namespace details {

struct VarInfo {
std::string name_;
proto::VarType::Type type_;
bool persistable_;
};

class AsyncSSAGraphExecutor : public SSAGraphExecutor {
public:
AsyncSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::vector<ir::Graph *> graphs);
~AsyncSSAGraphExecutor() final = default;
const ir::Graph &Graph() const override { return *graphs_[0]; }

FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;

private:
void StartOffPythonTrainLoop();
void HandleException();

private:
ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_;
std::unique_ptr<::ThreadPool> pool_{nullptr};
std::vector<platform::Place> places_;
std::vector<ir::Graph *> graphs_;

std::vector<std::unique_ptr<details::ThreadedSSAGraphExecutor>> executors_;
ExceptionHolder exception_holder_;
std::vector<std::future<void>> run_futures_;
std::vector<VarInfo> var_infos_;
};

} // namespace details
} // namespace framework
} // namespace paddle
4 changes: 4 additions & 0 deletions paddle/fluid/framework/details/build_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,12 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
// Convert graph to run on multi-devices.
void AppendMultiDevPass(const BuildStrategy &strategy) {
ir::Pass *multi_devices_pass;

if (strategy_.is_distribution_) {
VLOG(3) << "multi device parameter server mode";
multi_devices_pass = AppendPass("dist_multi_devices_pass").get();
} else if (strategy_.async_mode_) {
multi_devices_pass = AppendPass("async_multi_devices_pass").get();
} else {
if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) {
VLOG(3) << "multi devices collective mode with allreduce";
Expand Down Expand Up @@ -220,6 +223,7 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
graph = pass->Apply(std::move(graph));
VLOG(3) << "Finish Apply Pass " << pass->Type();
}
VLOG(3) << "All Passes Applied";
return graph;
}

Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/details/build_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct BuildStrategy {
// num_trainers is 1, so the current fields of build_strategy doesn't tell if
// it's distributed model.
bool is_distribution_{false};
bool async_mode_{false};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the relationship between async_mode and is_distribution

int num_trainers_{1};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can num_trainers > 1 and not is_distribution?

int trainer_id_{0};
std::vector<std::string> trainers_endpoints_;
Expand Down
17 changes: 17 additions & 0 deletions paddle/fluid/framework/details/exception_holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <string>

#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"

Expand Down Expand Up @@ -64,6 +66,21 @@ class ExceptionHolder {
ClearImpl();
}

std::string Type() {
std::lock_guard<std::mutex> lock(mu_);
switch (type_) {
case kNone:
return "None";
case kEnforceNotMet: {
return "EnforceNotMet";
}
case kEOF: {
return "EOF";
}
}
return "unknown";
}

private:
void ClearImpl() {
exception_.reset();
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/details/execution_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ struct ExecutionStrategy {
size_t num_iteration_per_drop_scope_{1};
ExecutorType type_{kDefault};
bool dry_run_{false};
size_t num_iteration_per_run_{1}; // only use with async_ssa_graph_executor
// and pyreader with data queue
};

} // namespace details
Expand Down
Loading