diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 0a20856c0fd050..f8a4d099244353 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -354,10 +354,10 @@ cc_library(executor_cache SRCS executor_cache.cc DEPS parallel_executor) if(WITH_PSCORE) get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS) cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS - conditional_block_op executor ${RPC_DEPS}) + conditional_block_op executor gloo_wrapper ${RPC_DEPS}) else() cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS - conditional_block_op executor) + conditional_block_op executor gloo_wrapper) endif() cc_library(prune SRCS prune.cc DEPS framework_proto boost) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 87afda459624f9..fdb24ee18eca7d 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -257,6 +257,11 @@ bool InMemoryDataFeed::Start() { output_channel_->Write(std::move(data)); } #endif + if (batch_offsets_.size() > 0) { + VLOG(3) << "batch_size offsets: " << batch_offsets_.size(); + enable_heterps_ = true; + this->offset_index_ = 0; + } this->finish_start_ = true; return true; } @@ -265,34 +270,64 @@ template int InMemoryDataFeed::Next() { #ifdef _LINUX this->CheckStart(); - CHECK(output_channel_ != nullptr); - CHECK(consume_channel_ != nullptr); - VLOG(3) << "output_channel_ size=" << output_channel_->Size() - << ", consume_channel_ size=" << consume_channel_->Size() - << ", thread_id=" << thread_id_; - int index = 0; - T instance; - std::vector ins_vec; - ins_vec.reserve(this->default_batch_size_); - while (index < this->default_batch_size_) { - if (output_channel_->Size() == 0) { - break; + if (!enable_heterps_) { + CHECK(output_channel_ != nullptr); + CHECK(consume_channel_ != nullptr); + VLOG(3) << "output_channel_ size=" << output_channel_->Size() + << ", consume_channel_ size=" << consume_channel_->Size() + << ", thread_id=" << thread_id_; + int index = 0; + T instance; + std::vector ins_vec; + ins_vec.reserve(this->default_batch_size_); + while (index < this->default_batch_size_) { + if (output_channel_->Size() == 0) { + break; + } + output_channel_->Get(instance); + ins_vec.push_back(instance); + ++index; + consume_channel_->Put(std::move(instance)); + } + this->batch_size_ = index; + VLOG(3) << "batch_size_=" << this->batch_size_ + << ", thread_id=" << thread_id_; + if (this->batch_size_ != 0) { + PutToFeedVec(ins_vec); + } else { + VLOG(3) << "finish reading, output_channel_ size=" + << output_channel_->Size() + << ", consume_channel_ size=" << consume_channel_->Size() + << ", thread_id=" << thread_id_; } - output_channel_->Get(instance); - ins_vec.push_back(instance); - ++index; - consume_channel_->Put(std::move(instance)); - } - this->batch_size_ = index; - VLOG(3) << "batch_size_=" << this->batch_size_ - << ", thread_id=" << thread_id_; - if (this->batch_size_ != 0) { - PutToFeedVec(ins_vec); } else { - VLOG(3) << "finish reading, output_channel_ size=" - << output_channel_->Size() - << ", consume_channel_ size=" << consume_channel_->Size() + VLOG(3) << "enable heter NEXT: " << offset_index_ + << " batch_offsets: " << batch_offsets_.size(); + if (offset_index_ >= batch_offsets_.size()) { + VLOG(3) << "offset_index: " << offset_index_ + << " batch_offsets: " << batch_offsets_.size(); + return 0; + } + auto& batch = batch_offsets_[offset_index_++]; + this->batch_size_ = batch.second; + VLOG(3) << "batch_size_=" << this->batch_size_ << ", thread_id=" << thread_id_; + if (this->batch_size_ != 0) { + PutToFeedVec(&records_[batch.first], this->batch_size_); + } else { + VLOG(3) << "finish reading for heterps, batch size zero, thread_id=" + << thread_id_; + } + /* + if (offset_index_ == batch_offsets_.size() - 1) { + std::vector data; + output_channel_->ReadAll(data); + consume_channel_->Write(std::move(data)); + } + */ + VLOG(3) << "#15 enable heter NEXT: " << offset_index_ + << " batch_offsets: " << batch_offsets_.size() + << " baych_size: " << this->batch_size_; } return this->batch_size_; #else @@ -1141,6 +1176,103 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) { return false; } +void MultiSlotInMemoryDataFeed::PutToFeedVec(const Record* ins_vec, int num) { +#ifdef _LINUX + for (size_t i = 0; i < batch_float_feasigns_.size(); ++i) { + batch_float_feasigns_[i].clear(); + batch_uint64_feasigns_[i].clear(); + offset_[i].clear(); + offset_[i].push_back(0); + } + ins_content_vec_.clear(); + ins_content_vec_.reserve(num); + ins_id_vec_.clear(); + ins_id_vec_.reserve(num); + for (int i = 0; i < num; ++i) { + auto& r = ins_vec[i]; + ins_id_vec_.push_back(r.ins_id_); + ins_content_vec_.push_back(r.content_); + for (auto& item : r.float_feasigns_) { + batch_float_feasigns_[item.slot()].push_back(item.sign().float_feasign_); + visit_[item.slot()] = true; + } + for (auto& item : r.uint64_feasigns_) { + batch_uint64_feasigns_[item.slot()].push_back( + item.sign().uint64_feasign_); + visit_[item.slot()] = true; + } + for (size_t j = 0; j < use_slots_.size(); ++j) { + const auto& type = all_slots_type_[j]; + if (visit_[j]) { + visit_[j] = false; + } else { + // fill slot value with default value 0 + if (type[0] == 'f') { // float + batch_float_feasigns_[j].push_back(0.0); + } else if (type[0] == 'u') { // uint64 + batch_uint64_feasigns_[j].push_back(0); + } + } + // get offset of this ins in this slot + if (type[0] == 'f') { // float + offset_[j].push_back(batch_float_feasigns_[j].size()); + } else if (type[0] == 'u') { // uint64 + offset_[j].push_back(batch_uint64_feasigns_[j].size()); + } + } + } + + for (size_t i = 0; i < use_slots_.size(); ++i) { + if (feed_vec_[i] == nullptr) { + continue; + } + int total_instance = offset_[i].back(); + const auto& type = all_slots_type_[i]; + if (type[0] == 'f') { // float + float* feasign = batch_float_feasigns_[i].data(); + float* tensor_ptr = + feed_vec_[i]->mutable_data({total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(float)); + } else if (type[0] == 'u') { // uint64 + // no uint64_t type in paddlepaddle + uint64_t* feasign = batch_uint64_feasigns_[i].data(); + int64_t* tensor_ptr = feed_vec_[i]->mutable_data( + {total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(int64_t)); + } + auto& slot_offset = offset_[i]; + if (this->input_type_ == 0) { + LoD data_lod{slot_offset}; + feed_vec_[i]->set_lod(data_lod); + } else if (this->input_type_ == 1) { + if (!use_slots_is_dense_[i]) { + std::vector tmp_offset; + PADDLE_ENFORCE_EQ(slot_offset.size(), 2, + platform::errors::InvalidArgument( + "In batch reader, the sparse tensor lod size " + "must be 2, but received %d.", + slot_offset.size())); + const auto& max_size = slot_offset[1]; + tmp_offset.reserve(max_size + 1); + for (unsigned int k = 0; k <= max_size; k++) { + tmp_offset.emplace_back(k); + } + slot_offset = tmp_offset; + LoD data_lod{slot_offset}; + feed_vec_[i]->set_lod(data_lod); + } + } + if (use_slots_is_dense_[i]) { + if (inductive_shape_index_[i] != -1) { + use_slots_shape_[i][inductive_shape_index_[i]] = + total_instance / total_dims_without_inductive_[i]; + } + feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i])); + } + } +#endif +} + void MultiSlotInMemoryDataFeed::PutToFeedVec( const std::vector& ins_vec) { #ifdef _LINUX diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 04a5b9b4d3adaf..198bc51463af35 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -167,7 +167,7 @@ class DLManager { } paddle::framework::CustomParser* Load(const std::string& name, - std::vector& conf) { + const std::vector& conf) { #ifdef _LINUX std::lock_guard lock(mutex_); DLHandle handle; @@ -195,7 +195,7 @@ class DLManager { } paddle::framework::CustomParser* ReLoad(const std::string& name, - std::vector& conf) { + const std::vector& conf) { Close(name); return Load(name, conf); } @@ -422,6 +422,7 @@ class InMemoryDataFeed : public DataFeed { virtual void ParseOneInstanceFromSo(const char* str, T* instance, CustomParser* parser) {} virtual void PutToFeedVec(const std::vector& ins_vec) = 0; + virtual void PutToFeedVec(const T* ins_vec, int num) = 0; int thread_id_; int thread_num_; @@ -439,6 +440,11 @@ class InMemoryDataFeed : public DataFeed { paddle::framework::ChannelObject* input_pv_channel_; paddle::framework::ChannelObject* output_pv_channel_; paddle::framework::ChannelObject* consume_pv_channel_; + + std::vector> batch_offsets_; + uint64_t offset_index_ = 0; + bool enable_heterps_ = false; + T* records_ = nullptr; }; // This class define the data type of instance(ins_vec) in MultiSlotDataFeed @@ -601,7 +607,7 @@ paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, for (size_t& x : offset) { uint64_t t; ar >> t; - x = (size_t)t; + x = static_cast(t); } #endif ar >> ins.MutableFloatData(); @@ -777,6 +783,11 @@ class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { MultiSlotInMemoryDataFeed() {} virtual ~MultiSlotInMemoryDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc); + void SetRecord(Record* records) { records_ = records; } + int GetDefaultBatchSize() { return default_batch_size_; } + void AddBatchOffset(const std::pair& offset) { + batch_offsets_.push_back(offset); + } protected: virtual bool ParseOneInstance(Record* instance); @@ -786,6 +797,7 @@ class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { virtual void PutToFeedVec(const std::vector& ins_vec); virtual void GetMsgFromLogKey(const std::string& log_key, uint64_t* search_id, uint32_t* cmatch, uint32_t* rank); + virtual void PutToFeedVec(const Record* ins_vec, int num); std::vector> batch_float_feasigns_; std::vector> batch_uint64_feasigns_; std::vector> offset_; diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index a9903f164bda38..08c42a93d1fcbf 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -216,6 +216,180 @@ void DatasetImpl::RegisterClientToClientMsgHandler() { }); VLOG(3) << "RegisterClientToClientMsgHandler done"; } +static void compute_left_batch_num(const int ins_num, const int thread_num, + std::vector>* offset, + const int start_pos) { + int cur_pos = start_pos; + int batch_size = ins_num / thread_num; + int left_num = ins_num % thread_num; + for (int i = 0; i < thread_num; ++i) { + int batch_num_size = batch_size; + if (i == 0) { + batch_num_size = batch_num_size + left_num; + } + offset->push_back(std::make_pair(cur_pos, batch_num_size)); + cur_pos += batch_num_size; + } +} + +static void compute_batch_num(const int64_t ins_num, const int batch_size, + const int thread_num, + std::vector>* offset) { + int thread_batch_num = batch_size * thread_num; + // less data + if (static_cast(thread_batch_num) > ins_num) { + compute_left_batch_num(ins_num, thread_num, offset, 0); + return; + } + + int cur_pos = 0; + int offset_num = static_cast(ins_num / thread_batch_num) * thread_num; + int left_ins_num = static_cast(ins_num % thread_batch_num); + if (left_ins_num > 0 && left_ins_num < thread_num) { + offset_num = offset_num - thread_num; + left_ins_num = left_ins_num + thread_batch_num; + for (int i = 0; i < offset_num; ++i) { + offset->push_back(std::make_pair(cur_pos, batch_size)); + cur_pos += batch_size; + } + // split data to thread avg two rounds + compute_left_batch_num(left_ins_num, thread_num * 2, offset, cur_pos); + } else { + for (int i = 0; i < offset_num; ++i) { + offset->push_back(std::make_pair(cur_pos, batch_size)); + cur_pos += batch_size; + } + if (left_ins_num > 0) { + compute_left_batch_num(left_ins_num, thread_num, offset, cur_pos); + } + } +} + +static int compute_thread_batch_nccl( + const int thr_num, const int64_t total_instance_num, + const int minibatch_size, std::vector>* nccl_offsets) { + int thread_avg_batch_num = 0; + if (total_instance_num < static_cast(thr_num)) { + LOG(WARNING) << "compute_thread_batch_nccl total ins num:[" + << total_instance_num << "], less thread num:[" << thr_num + << "]"; + return thread_avg_batch_num; + } + + auto& offset = (*nccl_offsets); + // split data avg by thread num + compute_batch_num(total_instance_num, minibatch_size, thr_num, &offset); + thread_avg_batch_num = static_cast(offset.size() / thr_num); +#ifdef PADDLE_WITH_GLOO + auto gloo_wrapper = paddle::framework::GlooWrapper::GetInstance(); + if (!gloo_wrapper->IsInitialized()) { + VLOG(0) << "GLOO is not inited"; + gloo_wrapper->Init(); + } + + if (gloo_wrapper->Size() > 1) { + // adjust batch num per thread for NCCL + std::vector thread_avg_batch_num_vec(1, thread_avg_batch_num); + std::vector total_instance_num_vec(1, total_instance_num); + auto thread_max_batch_num_vec = + gloo_wrapper->AllReduce(thread_avg_batch_num_vec, "max"); + auto sum_total_ins_num_vec = + gloo_wrapper->AllReduce(total_instance_num_vec, "sum"); + int thread_max_batch_num = thread_max_batch_num_vec[0]; + int64_t sum_total_ins_num = sum_total_ins_num_vec[0]; + int diff_batch_num = thread_max_batch_num - thread_avg_batch_num; + VLOG(3) << "diff batch num: " << diff_batch_num + << " thread max batch num: " << thread_max_batch_num + << " thread avg batch num: " << thread_avg_batch_num; + if (diff_batch_num == 0) { + LOG(WARNING) << "total sum ins " << sum_total_ins_num << ", thread_num " + << thr_num << ", ins num " << total_instance_num + << ", batch num " << offset.size() + << ", thread avg batch num " << thread_avg_batch_num; + return thread_avg_batch_num; + } + + int need_ins_num = thread_max_batch_num * thr_num; + // data is too less + if ((int64_t)need_ins_num > total_instance_num) { + PADDLE_THROW(platform::errors::InvalidArgument( + "error instance num:[%d] less need ins num:[%d]", total_instance_num, + need_ins_num)); + return thread_avg_batch_num; + } + + int need_batch_num = (diff_batch_num + 1) * thr_num; + int offset_split_index = static_cast(offset.size() - thr_num); + int split_left_num = total_instance_num - offset[offset_split_index].first; + while (split_left_num < need_batch_num) { + need_batch_num += thr_num; + offset_split_index -= thr_num; + split_left_num = total_instance_num - offset[offset_split_index].first; + } + int split_start = offset[offset_split_index].first; + offset.resize(offset_split_index); + compute_left_batch_num(split_left_num, need_batch_num, &offset, + split_start); + LOG(WARNING) << "total sum ins " << sum_total_ins_num << ", thread_num " + << thr_num << ", ins num " << total_instance_num + << ", batch num " << offset.size() << ", thread avg batch num " + << thread_avg_batch_num << ", thread max batch num " + << thread_max_batch_num + << ", need batch num: " << (need_batch_num / thr_num) + << "split begin (" << split_start << ")" << split_start + << ", num " << split_left_num; + thread_avg_batch_num = thread_max_batch_num; + } else { + LOG(WARNING) << "thread_num " << thr_num << ", ins num " + << total_instance_num << ", batch num " << offset.size() + << ", thread avg batch num " << thread_avg_batch_num; + } +#else + PADDLE_THROW(platform::errors::Unavailable( + "dataset compute nccl batch number need compile with GLOO")); +#endif + return thread_avg_batch_num; +} + +template +void DatasetImpl::SetHeterPs(bool enable_heterps) { +#ifdef PADDLE_WITH_GLOO + enable_heterps_ = enable_heterps; + if (enable_heterps_) { + if (input_records_.size() == 0 && input_channel_ != nullptr && + input_channel_->Size() != 0) { + input_channel_->ReadAll(input_records_); + VLOG(3) << "read from channel to records with records size: " + << input_records_.size(); + } + VLOG(3) << "input records size: " << input_records_.size(); + int64_t total_ins_num = input_records_.size(); + std::vector> offset; + int default_batch_size = + reinterpret_cast(readers_[0].get()) + ->GetDefaultBatchSize(); + VLOG(3) << "thread_num: " << thread_num_ + << " memory size: " << total_ins_num + << " default batch_size: " << default_batch_size; + compute_thread_batch_nccl(thread_num_, total_ins_num, default_batch_size, + &offset); + VLOG(3) << "offset size: " << offset.size(); + for (int i = 0; i < thread_num_; i++) { + reinterpret_cast(readers_[i].get()) + ->SetRecord(&input_records_[0]); + } + for (size_t i = 0; i < offset.size(); i++) { + reinterpret_cast( + readers_[i % thread_num_].get()) + ->AddBatchOffset(offset[i]); + } + } +#else + PADDLE_THROW(platform::errors::Unavailable( + "dataset set heterps need compile with GLOO")); +#endif + return; +} // load data into memory, Dataset hold this memory, // which will later be fed into readers' channel @@ -319,6 +493,13 @@ void DatasetImpl::ReleaseMemory() { multi_pv_consume_[i]->Clear(); multi_pv_consume_[i] = nullptr; } + if (enable_heterps_) { + input_records_.clear(); + input_records_.shrink_to_fit(); + std::vector().swap(input_records_); + VLOG(3) << "release heterps input records records size: " + << input_records_.size(); + } std::vector>().swap(multi_pv_consume_); std::vector>().swap(readers_); @@ -654,6 +835,9 @@ void DatasetImpl::CreateReaders() { channel_idx = 0; } } + if (enable_heterps_) { + SetHeterPs(true); + } VLOG(3) << "readers size: " << readers_.size(); } diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 1c9869fa5afe28..f3ee96fab8297f 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -24,6 +24,10 @@ #include #include #include +#ifdef PADDLE_WITH_GLOO +#include +#include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#endif #include "paddle/fluid/framework/data_feed.h" @@ -145,6 +149,7 @@ class Dataset { virtual void DynamicAdjustReadersNum(int thread_num) = 0; // set fleet send sleep seconds virtual void SetFleetSendSleepSeconds(int seconds) = 0; + virtual void SetHeterPs(bool enable_heterps) = 0; protected: virtual int ReceiveFromClient(int msg_type, int client_id, @@ -228,6 +233,7 @@ class DatasetImpl : public Dataset { bool discard_remaining_ins = false); virtual void DynamicAdjustReadersNum(int thread_num); virtual void SetFleetSendSleepSeconds(int seconds); + virtual void SetHeterPs(bool enable_heterps); std::vector>& GetMultiOutputChannel() { return multi_output_channel_; @@ -292,6 +298,7 @@ class DatasetImpl : public Dataset { int64_t global_index_ = 0; std::vector> consume_task_pool_; std::vector input_records_; // only for paddleboxdatafeed + bool enable_heterps_ = false; }; // use std::vector or Record as data type diff --git a/paddle/fluid/framework/details/multi_devices_helper.cc b/paddle/fluid/framework/details/multi_devices_helper.cc index 01ef83518af5dd..4587c6d3e4f2a3 100644 --- a/paddle/fluid/framework/details/multi_devices_helper.cc +++ b/paddle/fluid/framework/details/multi_devices_helper.cc @@ -40,6 +40,7 @@ static std::unordered_set kMultiDeviceOps{ "c_broadcast", "c_comm_init", "c_comm_init_all", + "c_comm_init_multitrainer", "c_gen_nccl_id", "c_sync_comm_stream", "send", diff --git a/paddle/fluid/framework/dist_multi_trainer_test.cc b/paddle/fluid/framework/dist_multi_trainer_test.cc index 0e3292df3cf79b..06d84bca1273dc 100644 --- a/paddle/fluid/framework/dist_multi_trainer_test.cc +++ b/paddle/fluid/framework/dist_multi_trainer_test.cc @@ -14,7 +14,9 @@ #include "gtest/gtest.h" #include "paddle/fluid/framework/trainer.h" - +#ifdef PADDLE_WITH_GLOO +#include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#endif #if defined _WIN32 || defined __APPLE__ #else #define _LINUX diff --git a/paddle/fluid/framework/fleet/CMakeLists.txt b/paddle/fluid/framework/fleet/CMakeLists.txt index a9e4691dd0a015..36c5b137013614 100644 --- a/paddle/fluid/framework/fleet/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/CMakeLists.txt @@ -12,15 +12,15 @@ endif(WITH_PSLIB) if(WITH_HETERPS) if(WITH_NCCL) nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc - DEPS heter_ps ${BRPC_DEPS}) + DEPS heter_ps gloo_wrapper ${BRPC_DEPS}) add_subdirectory(heter_ps) elseif(WITH_RCCL) hip_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc - DEPS heter_ps ${BRPC_DEPS}) + DEPS heter_ps gloo_wrapper ${BRPC_DEPS}) add_subdirectory(heter_ps) endif(WITH_NCCL) else() - cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc) + cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc DEPS gloo_wrapper) endif(WITH_HETERPS) if(WITH_NCCL OR WITH_RCCL) diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index f8dfccf58ff960..7b3131003da79b 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -123,7 +123,7 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { } timeline.Pause(); - VLOG(1) << "GpuPs task unique11111 cost " << timeline.ElapsedSec() + VLOG(1) << "GpuPs task add keys cost " << timeline.ElapsedSec() << " seconds."; timeline.Start(); gpu_task->UniqueKeys(); @@ -138,19 +138,74 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { timeline.Start(); auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) { size_t key_size = local_keys[i].size(); + int32_t status = -1; #ifdef PADDLE_WITH_PSLIB - auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( - reinterpret_cast(local_ptr[i].data()), this->table_id_, - local_keys[i].data(), key_size); + // auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( + // reinterpret_cast(local_ptr[i].data()), this->table_id_, + // local_keys[i].data(), key_size); + int32_t cnt = 0; + while (true) { + auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( + reinterpret_cast(local_ptr[i].data()), this->table_id_, + local_keys[i].data(), key_size); + bool flag = true; + + tt.wait(); + + try { + status = tt.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull sparse failed, retry 3 times"; + exit(-1); + } + + if (flag) { + break; + } + } #endif #ifdef PADDLE_WITH_PSCORE - auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr( - reinterpret_cast(local_ptr[i].data()), this->table_id_, - local_keys[i].data(), key_size); + int32_t cnt = 0; + while (true) { + auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr( + reinterpret_cast(local_ptr[i].data()), this->table_id_, + local_keys[i].data(), key_size); + bool flag = true; + + tt.wait(); + + try { + status = tt.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull sparse failed, retry 3 times"; + exit(-1); + } + + if (flag) { + break; + } + } #endif - tt.wait(); - auto status = tt.get(); - // auto status = 0; if (status != 0) { LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]"; sleep(300); @@ -169,10 +224,27 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { timeline.Pause(); VLOG(1) << "pull sparse from CpuPS into GpuPS cost " << timeline.ElapsedSec() << " seconds."; + if (multi_node_) { + auto gloo_wrapper = paddle::framework::GlooWrapper::GetInstance(); + if (!gloo_wrapper->IsInitialized()) { + VLOG(0) << "GLOO is not inited"; + gloo_wrapper->Init(); + } + gloo_wrapper->Barrier(); + } timeline.Start(); - auto build_func = [device_num, &local_keys, &local_ptr, &device_keys, - &device_vals, &device_mutex](int i) { + std::vector>> pass_values; + uint16_t pass_id = 0; + + bool record_status = false; + if (multi_node_) { + record_status = fleet_ptr->pslib_ptr_->_worker_ptr->take_sparse_record( + table_id_, pass_id, pass_values); + } + auto build_func = [device_num, record_status, &pass_values, &local_keys, + &local_ptr, &device_keys, &device_vals, + &device_mutex](int i) { std::vector> task_keys(device_num); #ifdef PADDLE_WITH_PSLIB std::vector> task_ptrs( @@ -188,7 +260,21 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { task_keys[shard].push_back(local_keys[i][j]); task_ptrs[shard].push_back(local_ptr[i][j]); } - + if (record_status) { + size_t local_keys_size = local_keys.size(); + size_t pass_values_size = pass_values.size(); + for (size_t j = 0; j < pass_values_size; j += local_keys_size) { + auto& shard_values = pass_values[j]; + for (size_t pair_idx = 0; pair_idx < pass_values[j].size(); + pair_idx++) { + auto& cur_pair = shard_values[pair_idx]; + int shard = cur_pair.first % device_num; + task_keys[shard].push_back(cur_pair.first); + task_ptrs[shard].push_back( + (paddle::ps::DownpourFixedFeatureValue*)cur_pair.second); + } + } + } for (int dev = 0; dev < device_num; dev++) { device_mutex[dev]->lock(); diff --git a/paddle/fluid/operators/collective/c_comm_init_multitrainer_op.cc b/paddle/fluid/operators/collective/c_comm_init_multitrainer_op.cc new file mode 100644 index 00000000000000..aee10dcdc27323 --- /dev/null +++ b/paddle/fluid/operators/collective/c_comm_init_multitrainer_op.cc @@ -0,0 +1,104 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#if defined(PADDLE_WITH_NCCL) +#include +#endif +#include +#include +#include + +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/threadpool.h" +// #include "paddle/fluid/operators/distributed/distributed.h" +// #include "paddle/fluid/operators/distributed/request_handler_impl.h" +#if defined(PADDLE_WITH_NCCL) +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/nccl_helper.h" +#endif + +namespace paddle { +namespace operators { + +class CCommInitMultiTrainerInferShape : public framework::InferShapeBase { + public: + ~CCommInitMultiTrainerInferShape() {} + void operator()(framework::InferShapeContext* ctx) const override{}; +}; + +class CCommInitMultiTrainerOp : public framework::OperatorBase { + public: + CCommInitMultiTrainerOp(const std::string& type, + const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void RunImpl(const framework::Scope& scope, + const platform::Place& place) const override { + auto var = scope.FindVar(Input("X")); + PADDLE_ENFORCE_NOT_NULL( + var, platform::errors::InvalidArgument("Input X must be provided.")); +#if defined(PADDLE_WITH_NCCL) + ncclUniqueId* nccl_id = var->GetMutable(); + + int ntrainers = Attr("ntrainers"); + int train_id = Attr("trainer_id"); + int rid = Attr("ring_id"); + + std::vector devices = Attr>("devices"); + + if (devices.empty()) { + devices = platform::GetSelectedDevices(); + } + platform::NCCLCommContext::Instance().CreateNCCLCommMultiTrainer( + devices, nccl_id, ntrainers, train_id, rid); +#else + PADDLE_THROW(platform::errors::Unimplemented( + "PaddlePaddle should compile with GPU.")); +#endif + } +}; + +class CCommInitMultiTrainerOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddInput("X", "Raw variable contains a NCCL UniqueId instaces."); + AddComment(R"DOC( +CCommInitMultiTrainer operator + +Initialize collective communicatoin context within this trainer +)DOC"); + AddAttr("ntrainers", + "(int) The number of trainers of distributed trainers"); + AddAttr("trainer_id", + "(int) The id of the trainer in distributed training."); + AddAttr>("devices", + "(std::vector) which devices does the nccl " + "comm initialized on in each trainer") + .SetDefault({}); + AddAttr("ring_id", "(int default 0) user specified ring id") + .SetDefault(0); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(c_comm_init_multitrainer, ops::CCommInitMultiTrainerOp, + ops::CCommInitMultiTrainerInferShape, + ops::CCommInitMultiTrainerOpMaker); diff --git a/paddle/fluid/platform/collective_helper.cc b/paddle/fluid/platform/collective_helper.cc index cc9f2c75989db8..a765f344daf8aa 100644 --- a/paddle/fluid/platform/collective_helper.cc +++ b/paddle/fluid/platform/collective_helper.cc @@ -140,6 +140,50 @@ void NCCLCommContext::CreateAllNCCLComms(const std::vector& dev_ids, }); } +void NCCLCommContext::CreateNCCLCommMultiTrainer( + const std::vector& dev_ids, ncclUniqueId* nccl_id, int ntrainers, + int train_id, int ring_id) { + PADDLE_ENFORCE_GT( + dev_ids.size(), 0, + paddle::platform::errors::InvalidArgument( + "dev ids = [%d], it should greater than 0.", dev_ids.size())); + const int kDevices = dev_ids.size(); + VLOG(3) << "Begin CreateNCCLCommMultiTrainer. device number: " << kDevices + << ", ntrainers: " << ntrainers << ", train_id: " << train_id + << ", rind_id: " << ring_id; + ncclComm_t comms[kDevices]; + { + PADDLE_ENFORCE_CUDA_SUCCESS(dynload::ncclGroupStart()); + for (int i = 0; i < kDevices; i++) { +#ifdef PADDLE_WITH_HIP + PADDLE_ENFORCE_CUDA_SUCCESS(hipSetDevice(i)); +#else + PADDLE_ENFORCE_CUDA_SUCCESS(cudaSetDevice(i)); +#endif + platform::dynload::ncclCommInitRank(comms + i, kDevices * ntrainers, + *nccl_id, train_id * kDevices + i); + VLOG(3) << "ncclCommInitRank: " << i; + } + PADDLE_ENFORCE_CUDA_SUCCESS(dynload::ncclGroupEnd()); + VLOG(3) << "nccl group end seccessss"; + } + PADDLE_ENFORCE_EQ(comm_map_.count(ring_id), 0, + platform::errors::InvalidArgument( + "comm_map_ of ring_id: %s should be 0. %s is provided", + ring_id, comm_map_.count(ring_id))); + for (int i = 0; i < kDevices; ++i) { + AssignNCCLComm(comms[i], kDevices * ntrainers, train_id * kDevices + i, + dev_ids[i], ring_id); + VLOG(3) << "nccl communicator of train_id " << train_id * kDevices + i + << " in ring " << ring_id << " has been created on device " + << dev_ids[i]; + } + + std::call_once(once_flag_, []() { + std::atexit([]() { NCCLCommContext::Instance().ReleaseNCCLComms(); }); + }); +} + NCCLComm* NCCLCommContext::AssignNCCLComm(ncclComm_t comm, int nranks, int rank, int dev_id, int ring_id) { std::unique_ptr dev_ctx( diff --git a/paddle/fluid/platform/collective_helper.h b/paddle/fluid/platform/collective_helper.h index b9be9dc8304e1e..566121a08b880f 100644 --- a/paddle/fluid/platform/collective_helper.h +++ b/paddle/fluid/platform/collective_helper.h @@ -77,6 +77,10 @@ class NCCLCommContext { void CreateAllNCCLComms(const std::vector& dev_ids, int ring_id = 0); + void CreateNCCLCommMultiTrainer(const std::vector& dev_ids, + ncclUniqueId* nccl_id, int nranks, int rank, + int ring_id); + // a latter comm with the same dev_id and the same ring_id // will override the former NCCLComm* AssignNCCLComm(ncclComm_t comm, int nranks, int rank, int dev_id, diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 7a32d8729fc6ca..41cf0189d3d9d0 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -309,6 +309,8 @@ void BindDataset(py::module *m) { &framework::Dataset::SetFleetSendSleepSeconds, py::call_guard()) .def("enable_pv_merge", &framework::Dataset::EnablePvMerge, + py::call_guard()) + .def("set_heter_ps", &framework::Dataset::SetHeterPs, py::call_guard()); py::class_(*m, "IterableDatasetWrapper") diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 8d20dd994475f0..cf9d40d7b00c03 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -985,6 +985,13 @@ def get_shuffle_data_size(self, fleet=None): return global_data_size[0] return local_data_size[0] + def _set_heter_ps(self, enable_heter_ps=False): + """ + Set heter ps mode + user no need to call this function. + """ + self.dataset.set_heter_ps(enable_heter_ps) + class QueueDataset(DatasetBase): """ diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 49c262607498c6..39cf3ebeb32a95 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -101,16 +101,17 @@ def init_worker(self): # barrier_all for init_worker self._role_maker._barrier_all() # prepare for client to client communication - if not self._opt_info["use_ps_gpu"]: - if self._role_maker.is_worker(): - info = self._fleet_ptr.get_clients_info() - all_info = self._role_maker._worker_gather(info[0]) - self._fleet_ptr.gather_clients(all_info) - self._fleet_ptr.set_client2client_config( - self._client2client_request_timeout_ms, - self._client2client_connect_timeout_ms, - self._client2client_max_retry) - self._fleet_ptr.create_client2client_connection() + if self._role_maker.is_worker(): + info = self._fleet_ptr.get_clients_info() + print("IIIIFO: {}".format(info)) + all_info = self._role_maker._worker_gather(info[0]) + print("ALL info: {}".format(all_info)) + self._fleet_ptr.gather_clients(all_info) + self._fleet_ptr.set_client2client_config( + self._client2client_request_timeout_ms, + self._client2client_connect_timeout_ms, + self._client2client_max_retry) + self._fleet_ptr.create_client2client_connection() # barrier for init model self._role_maker._barrier_worker() if self._role_maker.is_first_worker(): @@ -1120,14 +1121,14 @@ def minimize(self, fleet._main_programs = programs fleet._scopes = scopes if opt_info["use_ps_gpu"]: - from paddle.fluid.transpiler.collective import SingleProcessMultiThread + from paddle.fluid.transpiler.collective import MultiThread # check start program env = self.get_dist_env() if not isinstance(losses, list): startup_programs = [startup_programs] for i in range(0, len(startup_programs)): - t = SingleProcessMultiThread() + t = MultiThread() start_program = startup_programs[i] main_program = programs[i] t.transpile( diff --git a/python/paddle/fluid/transpiler/collective.py b/python/paddle/fluid/transpiler/collective.py index 308a876977cf4f..ec8602ec7e6726 100644 --- a/python/paddle/fluid/transpiler/collective.py +++ b/python/paddle/fluid/transpiler/collective.py @@ -29,7 +29,7 @@ from ..framework import Program, default_main_program, default_startup_program from .details import wait_server_ready -__all__ = ['GradAllReduce', 'LocalSGD'] +__all__ = ['GradAllReduce', 'LocalSGD', 'MultiThread'] OpRole = core.op_proto_and_checker_maker.OpRole @@ -97,8 +97,14 @@ def _transpile_startup_program(self): self.wait_port) self._broadcast_params() - def _init_communicator(self, program, current_endpoint, endpoints, rank, - ring_id, wait_port): + def _init_communicator(self, + program, + current_endpoint, + endpoints, + rank, + ring_id, + wait_port, + has_multitrainer=False): nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) @@ -150,16 +156,28 @@ def _init_communicator(self, program, current_endpoint, endpoints, rank, 'other_endpoints': other_endpoints, self.op_role_key: OpRole.Forward }) - block.append_op( - type='c_comm_init', - inputs={'X': nccl_id_var}, - outputs={}, - attrs={ - 'nranks': nranks, - 'rank': rank, - 'ring_id': ring_id, - self.op_role_key: OpRole.Forward - }) + if not has_multitrainer: + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': ring_id, + self.op_role_key: OpRole.Forward + }) + else: + block.append_op( + type='c_comm_init_multitrainer', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'ntrainers': nranks, + 'trainer_id': rank, + 'ring_id': ring_id, + self.op_role_key: OpRole.Forward + }) def _broadcast_params(self): block = self.startup_program.global_block() @@ -425,7 +443,7 @@ class MultiThread(GradAllReduce): def __init__(self, nrings=1): GradAllReduce.__init__(self, nrings) - self.mode = "box" + self.mode = "single_process_multi_thread" def _transpile_startup_program(self): if len(self.endpoints) > 1: @@ -434,9 +452,9 @@ def _transpile_startup_program(self): print("total endpoints: ", self.endpoints) print("rank: %d, ring_id: %d" % (self.rank, self.nrings)) for ring_id in range(self.nrings): - self._init_communicator(self.startup_program, - self.current_endpoint, self.endpoints, - self.rank, ring_id, self.wait_port) + self._init_communicator( + self.startup_program, self.current_endpoint, self.endpoints, + self.rank, ring_id, self.wait_port, True) else: print("begin to _transpile_startup_program for single-node")