diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index f8a4d099244353..deade2739dbb7e 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -351,6 +351,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS fast_threaded_ssa_graph_executor variable_helper) cc_library(executor_cache SRCS executor_cache.cc DEPS parallel_executor) +cc_library(event_based_executor SRCS event_based_executor.cc runtime_graph.cc DEPS framework_proto) if(WITH_PSCORE) get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS) cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS @@ -440,7 +441,7 @@ endif() #cc_binary(test_executor SRCS test_executor.cc DEPS executor op_registry ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} ) #cc_binary(new_executor SRCS new_exec_test.cc DEPS operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler) -set(FLUID_FRAMEWORK_MODULES proto_desc memory lod_tensor executor data_feed_proto layer dynamic_loader custom_operator) +set(FLUID_FRAMEWORK_MODULES proto_desc memory lod_tensor executor data_feed_proto layer dynamic_loader custom_operator event_based_executor) cc_library(paddle_framework DEPS ${FLUID_FRAMEWORK_MODULES}) diff --git a/paddle/fluid/framework/event_based_executor.cc b/paddle/fluid/framework/event_based_executor.cc new file mode 100644 index 00000000000000..6cee7e86140c14 --- /dev/null +++ b/paddle/fluid/framework/event_based_executor.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/event_based_executor.h" +#include +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/runtime_graph.h" + +namespace paddle { +namespace framework { + +EventBasedExecutor::~EventBasedExecutor() { + std::cout << "In EventBased Deconstructor" << std::endl; +} + +void EventBasedExecutor::Compile(const ProgramDesc& program, + const std::string& grain) { + if (grain == "coarse") { + CompileCoarseGrainGraph(program); + } else { + CompileFineGrainGraph(program); + } +} + +void EventBasedExecutor::CompileCoarseGrainGraph(const ProgramDesc& program) { + runtime_graph_.reset(new RuntimeGraph(program)); + runtime_graph_->PrintGraph(); +} + +void EventBasedExecutor::CompileFineGrainGraph(const ProgramDesc& program) { + std::cout << "Compile Fine Grain Graph" << std::endl; +} + +void EventBasedExecutor::Run() { + std::cout << "In Event Based Executor Run" << std::endl; +} +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/event_based_executor.h b/paddle/fluid/framework/event_based_executor.h new file mode 100644 index 00000000000000..7b56df3d8ff41f --- /dev/null +++ b/paddle/fluid/framework/event_based_executor.h @@ -0,0 +1,42 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include "paddle/fluid/framework/runtime_graph.h" + +namespace paddle { +namespace framework { + +class ProgramDesc; +class EventBasedWorker; + +class EventBasedExecutor { + public: + EventBasedExecutor() = default; + ~EventBasedExecutor(); + + void Compile(const ProgramDesc& program_desc, const std::string& grain); + void Run(); + + private: + void CompileCoarseGrainGraph(const ProgramDesc& program_desc); + void CompileFineGrainGraph(const ProgramDesc& program_desc); + std::unique_ptr runtime_graph_; +}; +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/runtime_graph.cc b/paddle/fluid/framework/runtime_graph.cc new file mode 100644 index 00000000000000..99968dc5a18654 --- /dev/null +++ b/paddle/fluid/framework/runtime_graph.cc @@ -0,0 +1,164 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/runtime_graph.h" +#include "paddle/fluid/framework/program_desc.h" + +namespace paddle { +namespace framework { + +namespace { +typedef std::unordered_map> VarList; + +void FilterAndAddOutputVars(const BlockDesc &block, + const VarList &prev_input_vars, + VarList *cur_output_vars) { + const auto &ops_in_block = block.AllOps(); + for (const OpDesc *op : ops_in_block) { + const auto &output_names = op->OutputArgumentNames(); + for (const auto &output_name : output_names) { + if (prev_input_vars.find(output_name) != prev_input_vars.end()) { + for (int64_t consumer_id : prev_input_vars.at(output_name)) { + if (cur_output_vars->find(output_name) == cur_output_vars->end()) { + cur_output_vars->emplace(output_name, std::vector()); + } + cur_output_vars->at(output_name).emplace_back(consumer_id); + } + } + } + } +} + +void CreateVarNodesAndAddDeps(const VarList &cur_output_vars, + int64_t producer_id, + RuntimeGraph *runtime_graph) { + TaskNode *producer = runtime_graph->GetTaskNode(producer_id); + for (const auto &output_var : cur_output_vars) { + const auto &var_name = output_var.first; + if (!runtime_graph->HasVarNode(var_name)) { + VarDesc *var = producer->FindVar(var_name); + runtime_graph->CreateAndAddVarNode(*var); + } + InterVarNode *var_node = runtime_graph->FindVarNode(var_name); + for (int64_t consumer_id : cur_output_vars.at(var_name)) { + TaskNode *consumer = runtime_graph->GetTaskNode(consumer_id); + var_node->AddConsumedTask(consumer_id); + var_node->SetProducedTask(producer_id); + producer->AddProducedVarNode(var_name); + consumer->AddConsumedVarNode(var_name); + } + } +} + +void FilterAndAddInputVars(const BlockDesc &block, VarList *prev_input_vars) { + const auto &ops_in_block = block.AllOps(); + for (const OpDesc *op : ops_in_block) { + const auto &var_names = op->InputArgumentNames(); + for (const auto &name : var_names) { + if (prev_input_vars->find(name) == prev_input_vars->end()) { + prev_input_vars->emplace(name, std::vector()); + } + prev_input_vars->at(name).emplace_back(block.ID()); + } + } +} +} // namespace + +InterVarNode::InterVarNode(const VarDesc &var) : name_(var.Name()), var_(&var) { + task_produce_this_var_ = -1; +} + +void InterVarNode::AddConsumedTask(int64_t task_id) { + tasks_consume_this_var_.insert(task_id); +} + +void InterVarNode::SetProducedTask(int64_t task_id) { + task_produce_this_var_ = task_id; +} + +TaskNode::TaskNode(const BlockDesc &block) + : task_id_(block.ID()), block_(&block) {} + +void TaskNode::AddConsumedVarNode(const std::string &var_node_name) { + consumed_var_node_names_.insert(var_node_name); +} + +void TaskNode::AddProducedVarNode(const std::string &var_node_name) { + produced_var_node_names_.insert(var_node_name); +} + +bool TaskNode::IsSrcTask() const { return consumed_var_node_names_.empty(); } + +VarDesc *TaskNode::FindVar(const std::string &name) const { + return block_->FindVar(name); +} + +void TaskNode::PrintTaskNode() const { + std::cout << "consumed variables" + << ": "; + for (const auto &name : consumed_var_node_names_) { + std::cout << name << " "; + } + std::cout << std::endl; + std::cout << "produced variables" + << ": "; + for (const auto &name : produced_var_node_names_) { + std::cout << name << " "; + } + std::cout << std::endl; +} + +RuntimeGraph::RuntimeGraph(const ProgramDesc &program) { + int64_t block_size = program.Size(); + task_nodes_.resize(block_size); + VarList prev_input_vars; + for (int64_t i = block_size - 1; i >= 0; --i) { + const auto &block = program.Block(i); + task_nodes_[i].reset(new TaskNode(block)); + VarList cur_output_vars; + FilterAndAddOutputVars(block, prev_input_vars, &cur_output_vars); + CreateVarNodesAndAddDeps(cur_output_vars, block.ID(), this); + FilterAndAddInputVars(block, &prev_input_vars); + } +} + +bool RuntimeGraph::HasVarNode(const std::string &name) const { + return var_nodes_.find(name) != var_nodes_.end(); +} + +InterVarNode *RuntimeGraph::FindVarNode(const std::string &name) const { + CHECK(var_nodes_.find(name) != var_nodes_.end()); + return var_nodes_.at(name).get(); +} + +TaskNode *RuntimeGraph::GetTaskNode(int64_t id) const { + CHECK_LT(id, (int)task_nodes_.size()); + return task_nodes_[id].get(); +} + +InterVarNode *RuntimeGraph::CreateAndAddVarNode(const VarDesc &var) { + InterVarNode *var_node = new InterVarNode(var); + var_nodes_.emplace(var.Name(), var_node); + return var_node; +} + +int64_t RuntimeGraph::TaskNodesNum() const { return task_nodes_.size(); } + +void RuntimeGraph::PrintGraph() const { + for (const auto &task : task_nodes_) { + task->PrintTaskNode(); + } +} +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/runtime_graph.h b/paddle/fluid/framework/runtime_graph.h new file mode 100644 index 00000000000000..32aa423f390fcc --- /dev/null +++ b/paddle/fluid/framework/runtime_graph.h @@ -0,0 +1,94 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include + +namespace paddle { +namespace framework { + +class ProgramDesc; +class OpDesc; +class BlockDesc; +class VarDesc; + +class InterVarNode final { + public: + InterVarNode() = delete; + explicit InterVarNode(const VarDesc &var); + ~InterVarNode() = default; + InterVarNode(const InterVarNode &) = delete; + InterVarNode(InterVarNode &&) = delete; + InterVarNode &operator=(const InterVarNode &) = delete; + InterVarNode &operator=(InterVarNode &&) = delete; + + void AddConsumedTask(int64_t task_id); + void SetProducedTask(int64_t task_id); + + private: + std::string name_; + std::unordered_set tasks_consume_this_var_; + int64_t task_produce_this_var_; + const VarDesc *var_; +}; + +class TaskNode final { + public: + TaskNode() = delete; + explicit TaskNode(const BlockDesc &block); + ~TaskNode() = default; + TaskNode(const TaskNode &) = delete; + TaskNode(TaskNode &&) = delete; + TaskNode &operator=(const TaskNode &) = delete; + TaskNode &operator=(TaskNode &&) = delete; + + bool IsSrcTask() const; + void AddConsumedVarNode(const std::string &var_node_name); + void AddProducedVarNode(const std::string &var_node_name); + VarDesc *FindVar(const std::string &name) const; + void PrintTaskNode() const; + + private: + int64_t task_id_; + const BlockDesc *block_; + std::unordered_set consumed_var_node_names_; + std::unordered_set produced_var_node_names_; +}; + +class RuntimeGraph final { + public: + RuntimeGraph() = delete; + explicit RuntimeGraph(const ProgramDesc &program); + ~RuntimeGraph() = default; + RuntimeGraph(const RuntimeGraph &) = delete; + RuntimeGraph(RuntimeGraph &&) = delete; + RuntimeGraph &operator=(const RuntimeGraph &) = delete; + RuntimeGraph &operator=(RuntimeGraph &&) = delete; + + TaskNode *GetTaskNode(int64_t id) const; + InterVarNode *FindVarNode(const std::string &name) const; + bool HasVarNode(const std::string &name) const; + InterVarNode *CreateAndAddVarNode(const VarDesc &var); + int64_t TaskNodesNum() const; + void PrintGraph() const; + + private: + std::vector> task_nodes_; + std::unordered_map> var_nodes_; +}; +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index b0148e50afc548..64aab5418d2a1f 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -31,6 +31,7 @@ limitations under the License. */ #include "paddle/fluid/framework/custom_operator.h" #include "paddle/fluid/framework/data_layout.h" #include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/event_based_executor.h" #include "paddle/fluid/framework/executor_cache.h" #include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/feed_fetch_method.h" @@ -1907,6 +1908,11 @@ All parameter, weight, gradient are variables in Paddle. m.def("_get_eager_deletion_vars", &framework::GetEagerDeletionCleanVars); + py::class_(m, "EventBasedExecutor") + .def(py::init<>()) + .def("compile", &EventBasedExecutor::Compile) + .def("run", &EventBasedExecutor::Run); + py::class_(m, "Executor") .def(py::init()) .def("close", &Executor::Close) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 8ca0344962bde2..3853204c2afb09 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1131,6 +1131,16 @@ def _run_impl(self, program, feed, fetch_list, feed_var_name, else: fetch_list = [] + if isinstance(program, Program) and program._event_based_exe_opt: + program._event_based_exe_opt["program"] = \ + self._add_feed_fetch_ops( + program=program, + feed=feed, + fetch_list=fetch_list, + feed_var_name=feed_var_name, + fetch_var_name=fetch_var_name) + self._run_using_event_based_exe(program) + if isinstance(program, Program) and program._pipeline_opt: if "startup_program" in program._pipeline_opt: program = program._pipeline_opt["startup_program"] @@ -1254,6 +1264,80 @@ def _run_impl(self, program, feed, fetch_list, feed_var_name, return_numpy=return_numpy, return_merged=return_merged) + def _run_using_event_based_exe(self, program): + event_based_exe = core.EventBasedExecutor() + if program._event_based_exe_opt["grain"] == "coarse": + if isinstance(program, Program) and program._pipeline_opt: + pass + else: + # Add a list of ops and vars into block + def _add_ops_vars_into_block(block, ops, vars): + unique_var_names = set() + for op in ops: + new_op_desc = block.desc.append_op() + new_op_desc.copy_from(op.desc) + for name in op.input_arg_names: + unique_var_names.add(name) + for name in op.output_arg_names: + unique_var_names.add(name) + for name in unique_var_names: + block._clone_variable(vars[name]) + block._sync_with_cpp() + + # Partition the program to multiple blocks + def _partition_program_to_multiple_blocks(program): + new_program = framework.Program() + feed_ops = [] + forward_ops = [] + backward_ops = [] + comm_ops = [] + optimizer_ops = [] + fetch_ops = [] + comm_ops_white_list = [ + 'c_allreduce_sum', 'c_sync_comm_stream' + ] + vars = program.block(0).vars + for op in program.block(0).ops: + if op.type == "feed": + feed_ops.append(op) + elif op.type == "fetch": + fetch_ops.append(op) + elif op.attr("op_role") == 0 or op.attr( + "op_role") == 256: + forward_ops.append(op) + elif op.attr("op_role") == 1 or op.attr( + "op_role") == 257: + if op.type in comm_ops_white_list: + comm_ops.append(op) + else: + backward_ops.append(op) + elif op.attr("op_role") == 2: + optimizer_ops.append(op) + else: + print( + "Op accidently ignored by event based executor") + _add_ops_vars_into_block( + new_program.block(0), feed_ops, vars) + _add_ops_vars_into_block(new_program._create_block(), + forward_ops, vars) + _add_ops_vars_into_block(new_program._create_block(), + backward_ops, vars) + _add_ops_vars_into_block(new_program._create_block(), + comm_ops, vars) + _add_ops_vars_into_block(new_program._create_block(), + optimizer_ops, vars) + _add_ops_vars_into_block(new_program._create_block(), + fetch_ops, vars) + return new_program + new_program = \ + _partition_program_to_multiple_blocks(program._event_based_exe_opt["program"]) + event_based_exe.compile(new_program.desc, "coarse") + elif program._event_based_exe_opt["grain"] == "fine": + pass + else: + print("Unsupported partition grain") + event_based_exe.run() + def _run_program(self, program, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache): from paddle.optimizer.lr import LRScheduler diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 762bfb01fe14ed..4ca3f0a6518d49 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -3422,6 +3422,12 @@ def _clone_variable(self, var, force_persistable=True): elif var.type == core.VarDesc.VarType.RAW: ret_var = self.create_var( name=var.name, persistable=var.persistable, type=var.type) + elif var.type == core.VarDesc.VarType.FEED_MINIBATCH: + ret_var = self.create_var( + name=var.name, persistable=var.persistable, type=var.type) + elif var.type == core.VarDesc.VarType.FETCH_LIST: + ret_var = self.create_var( + name=var.name, persistable=var.persistable, type=var.type) elif var.type == core.VarDesc.VarType.SELECTED_ROWS: ret_var = self.create_var( name=var.name, @@ -4382,6 +4388,9 @@ def __init__(self): # compiled program, i.e. Graph self._graph = None + # assigned if this program executed by event based executor + self._event_based_exe_opt = None + def _find_var_class_kwargs(self, new_desc): # NOTE: not all variables support shape/dtype/lod_level methods. # For example: RAW, STEP_SCOPES, etc.