From 0f8ce5cf62cdfc32e0764d7f9b179cb0c344b49e Mon Sep 17 00:00:00 2001 From: gongweibao Date: Sat, 25 Feb 2023 13:57:39 +0800 Subject: [PATCH 1/3] fix ready scope --- .../fleet_executor/compute_interceptor.cc | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc index 07f7813c0bd264..ec2d26ec6ade4e 100644 --- a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc @@ -176,6 +176,7 @@ bool ComputeInterceptor::IsInputReady() { if (!gen_step_to_scope_id_to_finish_flag_.empty()) { scope_id_to_finish_flag = gen_step_to_scope_id_to_finish_flag_.begin()->second; + VLOG(3) << "Is Input Ready in gen step " << gen_step_to_scope_id_to_finish_flag_.begin()->first; } for (int64_t i = 0; i < node_->max_run_times(); ++i) { bool flag = true; @@ -184,18 +185,26 @@ bool ComputeInterceptor::IsInputReady() { flag = flag && (ready_size_map.at(i) != 0); } if (flag) { - for (auto iter : scope_id_to_finish_flag) { - if (iter.first == i) { - break; - } else if (!iter.second) { - VLOG(3) << "The previous scope is not ready, waiting for the " - "previous scope " - << iter.first; - return false; + if (scope_id_to_finish_flag.empty()) { + cur_scope_id_ = i; + return true; + } else if (scope_id_to_finish_flag.find(i) != scope_id_to_finish_flag.end()) { + for (auto iter : scope_id_to_finish_flag) { + if (iter.first == i) { + break; + } else if (!iter.second) { + VLOG(3) << "The previous scope is not ready, waiting for the " + "previous scope " + << iter.first << " in gen_step " << gen_step_to_scope_id_to_finish_flag_.begin()->first; + return false; + } } + cur_scope_id_ = i; + return true; + } else { + VLOG(3) << "Interceptor " << GetInterceptorId() << " in scope " << i + << " is larger than gen_step " << gen_step_to_scope_id_to_finish_flag_.begin()->first; } - cur_scope_id_ = i; - return true; } else { VLOG(3) << "Interceptor " << GetInterceptorId() << " in scope " << i << "'s upstreams aren't all ready."; @@ -346,6 +355,8 @@ void ComputeInterceptor::Run() { if (!gen_step_to_scope_id_to_finish_flag_.empty()) { auto iter = gen_step_to_scope_id_to_finish_flag_.begin(); + VLOG(3) << "id=" << GetInterceptorId() + << " ComputeInterceptor running in scope " << cur_scope_id_ << " with gen_step " << iter->first; auto& scope_id_to_finish_flag = iter->second; PADDLE_ENFORCE_NE( scope_id_to_finish_flag.find(cur_scope_id_), From ebf56023b10c0259f6d8f7db9caf50911aba75c6 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Sat, 25 Feb 2023 14:41:14 +0800 Subject: [PATCH 2/3] add test --- .../distributed/fleet_executor/carrier.cc | 107 +++++++++++++++++- .../distributed/fleet_executor/carrier.h | 10 ++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 9b023e12a8893c..f688eeac910660 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -28,6 +28,10 @@ #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable_helper.h" +#include "gflags/gflags.h" + +DECLARE_bool(fleetexecutor_debug_mode); + namespace paddle { namespace distributed { @@ -48,6 +52,69 @@ void Carrier::Init( thread_num_ = 1; thread_pool_.SetThreadNum(thread_num_); thread_pool_.Start(); + +test_thread_ = std::thread([this]() { loop_to_send_msg(); }); + cache_begin_ == std::chrono::steady_clock::now(); +} + +void Carrier::loop_to_send_msg() { + //VLOG(3) << "loop_send_msg loop now"; + while(1){ + while(1){ + int q_size=0; + std::chrono::time_point c_begin; + { + std::lock_guard lock(running_mutex_); + q_size = messages_for_test_.size(); + c_begin = cache_begin_; + } + + auto now = std::chrono::steady_clock::now(); + auto delta = std::chrono::duration_cast(now - c_begin).count(); + + if(q_size<2 && delta <5000){ + //std::time_t now_c = std::chrono::system_clock::to_time_t(now); + //VLOG(3) << "messages_for_test_ q_size:" << q_size + // << ", delta:" << delta << ", will sleep 1000ms" ;//<<", now:" << now_c; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + }else{ + VLOG(3) << "messages_for_test_ q_size:" << q_size + << ", delta:" << delta << ", will send all msg" ; + break; + } + } + + { + std::lock_guard lock(running_mutex_); + while (!messages_for_test_.empty()) { + auto msg=messages_for_test_.back(); + messages_for_test_.pop_back(); + + int64_t src_id = msg.src_id(); + // TODO(liyurui): compatible solution, will be removed completely in the + // future + if (interceptor_id_to_rank_.find(src_id) == interceptor_id_to_rank_.end() && + src_id == SOURCE_ID) { + src_id = msg.dst_id(); + } + int64_t dst_id = msg.dst_id(); + int64_t dst_rank = GetRank(dst_id); + + VLOG(3) << "Send a cached message from interceptor " << src_id + << " to interceptor " << dst_id + << ", which are in different ranks, scope_idx:" << msg.scope_idx(); + + if(!GlobalVal::Get()->Send(dst_rank, msg)){ + LOG(FATAL) << "send msg error"; + } + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + + cache_begin_ = std::chrono::steady_clock::now(); + } + } + VLOG(3) << "reset cache_begin_"; } void Carrier::Init( @@ -95,6 +162,9 @@ void Carrier::Init( thread_pool_.SetThreadNum(thread_num_); thread_pool_.Start(); + test_thread_ = std::thread([this]() { loop_to_send_msg(); }); + cache_begin_ == std::chrono::steady_clock::now(); + CreateInterceptors(); is_init_ = true; } @@ -230,12 +300,47 @@ bool Carrier::Send(const InterceptorMessage& msg) { VLOG(3) << "Send a message from interceptor " << src_id << " to interceptor " << dst_id << ", which are in the same ranks."; return EnqueueInterceptorMessage(msg); - } else { + } +/* + else { VLOG(3) << "Send a message from interceptor " << src_id << " to interceptor " << dst_id << ", which are in different ranks."; return GlobalVal::Get()->Send(dst_rank, msg); } +*/ + +if(!FLAGS_fleetexecutor_debug_mode){ + VLOG(3) << "Send a message from interceptor " << src_id + << " to interceptor " << dst_id + << ", which are in different ranks."; + return GlobalVal::Get()->Send(dst_rank, msg); + } + + if(msg.message_type() != DATA_IS_READY){ + VLOG(3) << "Send a message from interceptor " << src_id + << " to interceptor " << dst_id + << ", which are in different ranks."; + return GlobalVal::Get()->Send(dst_rank, msg); + } + + { + VLOG(3) << "prepare executor debug"; + + std::unique_lock lock(running_mutex_); + if(messages_for_test_.empty()){ + cache_begin_ = std::chrono::steady_clock::now(); + //std::time_t now_c = std::chrono::system_clock::to_time_t(cache_begin_)); + VLOG(3) << "messages_for_test_ empty, reset cache_begin_"; + } + + VLOG(3) << "Cache message from interceptor " << src_id + << " to interceptor " << dst_id + << ", which are in different ranks, scope_idx:" << msg.scope_idx(); + messages_for_test_.emplace_back(msg); + } + + return true; } Interceptor* Carrier::SetInterceptor(int64_t interceptor_id, diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 8e7fad3e892d87..c223432d8a8db8 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -21,6 +21,10 @@ #include #include #include +#include +#include + +#include #include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" @@ -118,6 +122,12 @@ class Carrier final { int thread_num_; TaskLoopThreadPool thread_pool_; std::unordered_set interceptor_ids_; + +std::deque messages_for_test_; + std::thread test_thread_; + std::chrono::time_point cache_begin_; + + void loop_to_send_msg(); }; } // namespace distributed From b4bfa314ffd87f5fce49f07ff1639e499ab7048b Mon Sep 17 00:00:00 2001 From: gongweibao Date: Sat, 25 Feb 2023 15:34:47 +0800 Subject: [PATCH 3/3] add flags --- .../distributed/fleet_executor/CMakeLists.txt | 1 + paddle/fluid/platform/flags.cc | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index ff8ed811ee6f84..c9863ace0602b0 100755 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -51,6 +51,7 @@ cc_library( op_registry executor_gc_helper gflags + flags glog ${BRPC_DEPS}) diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index 518aabbb09ead8..3d5ee82740280c 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -1020,3 +1020,20 @@ PADDLE_DEFINE_EXPORTED_bool( PADDLE_DEFINE_EXPORTED_string(jit_engine_type, "Predictor", "Choose default funciton type in JitLayer."); + + + +/** + * Executor debug FLAG + * Name: FLAGS_fleetexecutor_debug_mode + * Since Version: 2.5 + * Value Range: bool + * default=False + * Example: + * Note: + * FLAGS_fleetexecutor_debug_mode == 1, enter in debug mode + */ +PADDLE_DEFINE_EXPORTED_bool(fleetexecutor_debug_mode, + false, + "Enter in FleetExecutor debug mode."); +