diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index ff8ed811ee6f84..e2c79874c74d32 100755 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -50,6 +50,7 @@ cc_library( collective_helper op_registry executor_gc_helper + flags gflags glog ${BRPC_DEPS}) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 9b023e12a8893c..411fe8872e44c1 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/fleet_executor/carrier.h" +#include "gflags/gflags.h" #include #include @@ -28,6 +29,8 @@ #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable_helper.h" +DECLARE_bool(fleetexecutor_debug_mode); + namespace paddle { namespace distributed { @@ -38,6 +41,7 @@ USE_INTERCEPTOR(Sink); USE_INTERCEPTOR(Cond); USE_INTERCEPTOR(Start); + void Carrier::Init( int64_t rank, const std::unordered_map& interceptor_id_to_rank) { @@ -48,6 +52,69 @@ void Carrier::Init( thread_num_ = 1; thread_pool_.SetThreadNum(thread_num_); thread_pool_.Start(); + + test_thread_ = std::thread([this]() { loop_to_send_msg(); }); + cache_begin_ == std::chrono::steady_clock::now(); +} + +void Carrier::loop_to_send_msg() { + //VLOG(3) << "loop_send_msg loop now"; + while(1){ + while(1){ + int q_size=0; + std::chrono::time_point c_begin; + { + std::lock_guard lock(running_mutex_); + q_size = messages_for_test_.size(); + c_begin = cache_begin_; + } + + auto now = std::chrono::steady_clock::now(); + auto delta = std::chrono::duration_cast(now - c_begin).count(); + + if(q_size<2 && delta <5000){ + //std::time_t now_c = std::chrono::system_clock::to_time_t(now); + //VLOG(3) << "messages_for_test_ q_size:" << q_size + // << ", delta:" << delta << ", will sleep 1000ms" ;//<<", now:" << now_c; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + }else{ + VLOG(3) << "messages_for_test_ q_size:" << q_size + << ", delta:" << delta << ", will send all msg" ; + break; + } + } + + { + std::lock_guard lock(running_mutex_); + while (!messages_for_test_.empty()) { + auto msg=messages_for_test_.back(); + messages_for_test_.pop_back(); + + int64_t src_id = msg.src_id(); + // TODO(liyurui): compatible solution, will be removed completely in the + // future + if (interceptor_id_to_rank_.find(src_id) == interceptor_id_to_rank_.end() && + src_id == SOURCE_ID) { + src_id = msg.dst_id(); + } + int64_t dst_id = msg.dst_id(); + int64_t dst_rank = GetRank(dst_id); + + VLOG(3) << "Send a cached message from interceptor " << src_id + << " to interceptor " << dst_id + << ", which are in different ranks, scope_idx:" << msg.scope_idx(); + + if(!GlobalVal::Get()->Send(dst_rank, msg)){ + LOG(FATAL) << "send msg error"; + } + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + + cache_begin_ = std::chrono::steady_clock::now(); + } + } + VLOG(3) << "reset cache_begin_"; } void Carrier::Init( @@ -95,6 +162,9 @@ void Carrier::Init( thread_pool_.SetThreadNum(thread_num_); thread_pool_.Start(); + test_thread_ = std::thread([this]() { loop_to_send_msg(); }); + cache_begin_ == std::chrono::steady_clock::now(); + CreateInterceptors(); is_init_ = true; } @@ -230,12 +300,39 @@ bool Carrier::Send(const InterceptorMessage& msg) { VLOG(3) << "Send a message from interceptor " << src_id << " to interceptor " << dst_id << ", which are in the same ranks."; return EnqueueInterceptorMessage(msg); - } else { + } + + if(!FLAGS_fleetexecutor_debug_mode){ + VLOG(3) << "Send a message from interceptor " << src_id + << " to interceptor " << dst_id + << ", which are in different ranks."; + return GlobalVal::Get()->Send(dst_rank, msg); + } + + if(msg.message_type() != DATA_IS_READY){ VLOG(3) << "Send a message from interceptor " << src_id - << " to interceptor " << dst_id - << ", which are in different ranks."; + << " to interceptor " << dst_id + << ", which are in different ranks."; return GlobalVal::Get()->Send(dst_rank, msg); } + + { + VLOG(3) << "prepare executor debug"; + + std::unique_lock lock(running_mutex_); + if(messages_for_test_.empty()){ + cache_begin_ = std::chrono::steady_clock::now(); + //std::time_t now_c = std::chrono::system_clock::to_time_t(cache_begin_)); + VLOG(3) << "messages_for_test_ empty, reset cache_begin_"; + } + + VLOG(3) << "Cache message from interceptor " << src_id + << " to interceptor " << dst_id + << ", which are in different ranks, scope_idx:" << msg.scope_idx(); + messages_for_test_.emplace_back(msg); + } + + return true; } Interceptor* Carrier::SetInterceptor(int64_t interceptor_id, diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 8e7fad3e892d87..9591c2adcb0b1d 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -21,6 +21,12 @@ #include #include #include +#include +#include + +#include + + #include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" @@ -118,6 +124,12 @@ class Carrier final { int thread_num_; TaskLoopThreadPool thread_pool_; std::unordered_set interceptor_ids_; + + std::deque messages_for_test_; + std::thread test_thread_; + std::chrono::time_point cache_begin_; + + void loop_to_send_msg(); }; } // namespace distributed diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index 518aabbb09ead8..cfd09f34bc9b6b 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -1020,3 +1020,17 @@ PADDLE_DEFINE_EXPORTED_bool( PADDLE_DEFINE_EXPORTED_string(jit_engine_type, "Predictor", "Choose default funciton type in JitLayer."); + +/** + * Executor debug FLAG + * Name: FLAGS_fleetexecutor_debug_mode + * Since Version: 2.5 + * Value Range: bool + * default=False + * Example: + * Note: + * FLAGS_fleetexecutor_debug_mode == 1, enter in debug mode + */ +PADDLE_DEFINE_EXPORTED_bool(fleetexecutor_debug_mode, + false, + "Enter in FleetExecutor debug mode."); \ No newline at end of file