diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index 503f1513aad20c..80fee94f1c85d9 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -157,7 +157,19 @@ class ChannelObject { p.resize(finished); return finished; } + // read once only + size_t ReadOnce(std::vector& p, size_t size) { // NOLINT + if (size == 0) { + return 0; + } + std::unique_lock lock(mutex_); + p.resize(size); + size_t finished = Read(size, &p[0], lock, true); + p.resize(finished); + Notify(); + return finished; + } size_t ReadAll(std::vector& p) { // NOLINT p.clear(); size_t finished = 0; @@ -241,17 +253,21 @@ class ChannelObject { return !closed_; } - size_t Read(size_t n, T* p, std::unique_lock& lock) { // NOLINT + size_t Read(size_t n, T* p, std::unique_lock& lock, // NOLINT + bool once = false) { // NOLINT size_t finished = 0; CHECK(n <= MaxCapacity() - reading_count_); reading_count_ += n; while (finished < n && WaitForRead(lock)) { - size_t m = std::min(n - finished, data_.size()); + size_t m = (std::min)(n - finished, data_.size()); for (size_t i = 0; i < m; i++) { p[finished++] = std::move(data_.front()); data_.pop_front(); } reading_count_ -= m; + if (once && m > 0) { + break; + } } reading_count_ -= n - finished; return finished; diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index fdb24ee18eca7d..2d089b4721b82c 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -28,6 +28,7 @@ limitations under the License. */ #include "paddle/fluid/platform/timer.h" USE_INT_STAT(STAT_total_feasign_num_in_mem); +DECLARE_bool(enable_ins_parser_file); namespace paddle { namespace framework { @@ -36,6 +37,107 @@ DLManager& global_dlmanager_pool() { return manager; } +class BufferedLineFileReader { + typedef std::function SampleFunc; + static const int MAX_FILE_BUFF_SIZE = 4 * 1024 * 1024; + class FILEReader { + public: + explicit FILEReader(FILE* fp) : fp_(fp) {} + int read(char* buf, int len) { return fread(buf, sizeof(char), len, fp_); } + + private: + FILE* fp_; + }; + + public: + typedef std::function LineFunc; + + private: + template + int read_lines(T* reader, LineFunc func, int skip_lines) { + int lines = 0; + size_t ret = 0; + char* ptr = NULL; + char* eol = NULL; + total_len_ = 0; + error_line_ = 0; + + SampleFunc spfunc = get_sample_func(); + std::string x; + while (!is_error() && (ret = reader->read(buff_, MAX_FILE_BUFF_SIZE)) > 0) { + total_len_ += ret; + ptr = buff_; + eol = reinterpret_cast(memchr(ptr, '\n', ret)); + while (eol != NULL) { + int size = static_cast((eol - ptr) + 1); + x.append(ptr, size - 1); + ++lines; + if (lines > skip_lines && spfunc()) { + if (!func(x)) { + ++error_line_; + } + } + + x.clear(); + ptr += size; + ret -= size; + eol = reinterpret_cast(memchr(ptr, '\n', ret)); + } + if (ret > 0) { + x.append(ptr, ret); + } + } + if (!is_error() && !x.empty()) { + ++lines; + if (lines > skip_lines && spfunc()) { + if (!func(x)) { + ++error_line_; + } + } + } + return lines; + } + + public: + BufferedLineFileReader() + : random_engine_(std::random_device()()), + uniform_distribution_(0.0f, 1.0f) { + total_len_ = 0; + sample_line_ = 0; + buff_ = + reinterpret_cast(calloc(MAX_FILE_BUFF_SIZE + 1, sizeof(char))); + } + ~BufferedLineFileReader() { free(buff_); } + + int read_file(FILE* fp, LineFunc func, int skip_lines) { + FILEReader reader(fp); + return read_lines(&reader, func, skip_lines); + } + uint64_t file_size(void) { return total_len_; } + void set_sample_rate(float r) { sample_rate_ = r; } + size_t get_sample_line() { return sample_line_; } + bool is_error(void) { return (error_line_ > 10); } + + private: + SampleFunc get_sample_func() { + if (std::abs(sample_rate_ - 1.0f) < 1e-5f) { + return [this](void) { return true; }; + } + return [this](void) { + return (uniform_distribution_(random_engine_) < sample_rate_); + }; + } + + private: + char* buff_ = nullptr; + uint64_t total_len_ = 0; + + std::default_random_engine random_engine_; + std::uniform_real_distribution uniform_distribution_; + float sample_rate_ = 1.0f; + size_t sample_line_ = 0; + size_t error_line_ = 0; +}; void RecordCandidateList::ReSize(size_t length) { mutex_.lock(); capacity_ = length; @@ -301,7 +403,7 @@ int InMemoryDataFeed::Next() { << ", thread_id=" << thread_id_; } } else { - VLOG(3) << "enable heter NEXT: " << offset_index_ + VLOG(3) << "enable heter next: " << offset_index_ << " batch_offsets: " << batch_offsets_.size(); if (offset_index_ >= batch_offsets_.size()) { VLOG(3) << "offset_index: " << offset_index_ @@ -318,14 +420,7 @@ int InMemoryDataFeed::Next() { VLOG(3) << "finish reading for heterps, batch size zero, thread_id=" << thread_id_; } - /* - if (offset_index_ == batch_offsets_.size() - 1) { - std::vector data; - output_channel_->ReadAll(data); - consume_channel_->Write(std::move(data)); - } - */ - VLOG(3) << "#15 enable heter NEXT: " << offset_index_ + VLOG(3) << "enable heter next: " << offset_index_ << " batch_offsets: " << batch_offsets_.size() << " baych_size: " << this->batch_size_; } @@ -1835,5 +1930,646 @@ void PaddleBoxDataFeed::PutToFeedVec(const std::vector& ins_vec) { #endif } +template class InMemoryDataFeed; +void SlotRecordInMemoryDataFeed::Init(const DataFeedDesc& data_feed_desc) { + finish_init_ = false; + finish_set_filelist_ = false; + finish_start_ = false; + PADDLE_ENFORCE(data_feed_desc.has_multi_slot_desc(), + platform::errors::PreconditionNotMet( + "Multi_slot_desc has not been set in data_feed_desc")); + paddle::framework::MultiSlotDesc multi_slot_desc = + data_feed_desc.multi_slot_desc(); + SetBatchSize(data_feed_desc.batch_size()); + size_t all_slot_num = multi_slot_desc.slots_size(); + + all_slots_.resize(all_slot_num); + all_slots_info_.resize(all_slot_num); + used_slots_info_.resize(all_slot_num); + use_slot_size_ = 0; + use_slots_.clear(); + + float_total_dims_size_ = 0; + float_total_dims_without_inductives_.clear(); + for (size_t i = 0; i < all_slot_num; ++i) { + const auto& slot = multi_slot_desc.slots(i); + all_slots_[i] = slot.name(); + + AllSlotInfo& all_slot = all_slots_info_[i]; + all_slot.slot = slot.name(); + all_slot.type = slot.type(); + all_slot.used_idx = slot.is_used() ? use_slot_size_ : -1; + all_slot.slot_value_idx = -1; + + if (slot.is_used()) { + UsedSlotInfo& info = used_slots_info_[use_slot_size_]; + info.idx = i; + info.slot = slot.name(); + info.type = slot.type(); + info.dense = slot.is_dense(); + info.total_dims_without_inductive = 1; + info.inductive_shape_index = -1; + + // record float value and uint64_t value pos + if (info.type[0] == 'u') { + info.slot_value_idx = uint64_use_slot_size_; + all_slot.slot_value_idx = uint64_use_slot_size_; + ++uint64_use_slot_size_; + } else if (info.type[0] == 'f') { + info.slot_value_idx = float_use_slot_size_; + all_slot.slot_value_idx = float_use_slot_size_; + ++float_use_slot_size_; + } + + use_slots_.push_back(slot.name()); + + if (slot.is_dense()) { + for (int j = 0; j < slot.shape_size(); ++j) { + if (slot.shape(j) > 0) { + info.total_dims_without_inductive *= slot.shape(j); + } + if (slot.shape(j) == -1) { + info.inductive_shape_index = j; + } + } + } + if (info.type[0] == 'f') { + float_total_dims_without_inductives_.push_back( + info.total_dims_without_inductive); + float_total_dims_size_ += info.total_dims_without_inductive; + } + info.local_shape.clear(); + for (int j = 0; j < slot.shape_size(); ++j) { + info.local_shape.push_back(slot.shape(j)); + } + ++use_slot_size_; + } + } + used_slots_info_.resize(use_slot_size_); + + feed_vec_.resize(used_slots_info_.size()); + const int kEstimatedFeasignNumPerSlot = 5; // Magic Number + for (size_t i = 0; i < all_slot_num; i++) { + batch_float_feasigns_.push_back(std::vector()); + batch_uint64_feasigns_.push_back(std::vector()); + batch_float_feasigns_[i].reserve(default_batch_size_ * + kEstimatedFeasignNumPerSlot); + batch_uint64_feasigns_[i].reserve(default_batch_size_ * + kEstimatedFeasignNumPerSlot); + offset_.push_back(std::vector()); + offset_[i].reserve(default_batch_size_ + + 1); // Each lod info will prepend a zero + } + visit_.resize(all_slot_num, false); + pipe_command_ = data_feed_desc.pipe_command(); + finish_init_ = true; + input_type_ = data_feed_desc.input_type(); + size_t pos = pipe_command_.find(".so"); + if (pos != std::string::npos) { + pos = pipe_command_.rfind('|'); + if (pos == std::string::npos) { + so_parser_name_ = pipe_command_; + pipe_command_.clear(); + } else { + so_parser_name_ = pipe_command_.substr(pos + 1); + pipe_command_ = pipe_command_.substr(0, pos); + } + so_parser_name_ = paddle::string::erase_spaces(so_parser_name_); + } else { + so_parser_name_.clear(); + } +} + +void SlotRecordInMemoryDataFeed::LoadIntoMemory() { + VLOG(3) << "SlotRecord LoadIntoMemory() begin, thread_id=" << thread_id_; + if (!so_parser_name_.empty()) { + LoadIntoMemoryByLib(); + } else { + LoadIntoMemoryByCommand(); + } +} +void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLib(void) { + if (true) { + // user defined file format analysis + LoadIntoMemoryByFile(); + } else { + LoadIntoMemoryByLine(); + } +} + +void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) { +#ifdef _LINUX + paddle::framework::CustomParser* parser = + global_dlmanager_pool().Load(so_parser_name_, all_slots_info_); + CHECK(parser != nullptr); + // get slotrecord object + auto pull_record_func = [this](std::vector& record_vec, + int max_fetch_num, int offset) { + if (offset > 0) { + input_channel_->WriteMove(offset, &record_vec[0]); + if (max_fetch_num > 0) { + SlotRecordPool().get(&record_vec[0], offset); + } else { // free all + max_fetch_num = static_cast(record_vec.size()); + if (max_fetch_num > offset) { + SlotRecordPool().put(&record_vec[offset], (max_fetch_num - offset)); + } + } + } else if (max_fetch_num > 0) { + SlotRecordPool().get(&record_vec, max_fetch_num); + } else { + SlotRecordPool().put(&record_vec); + } + }; + + std::string filename; + while (this->PickOneFile(&filename)) { + VLOG(3) << "PickOneFile, filename=" << filename + << ", thread_id=" << thread_id_; + platform::Timer timeline; + timeline.Start(); + + int lines = 0; + bool is_ok = true; + do { + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + is_ok = parser->ParseFileInstance( + [this](char* buf, int len) { + return fread(buf, sizeof(char), len, this->fp_.get()); + }, + pull_record_func, lines); + + if (!is_ok) { + LOG(WARNING) << "parser error, filename=" << filename + << ", lines=" << lines; + } + } while (!is_ok); + timeline.Pause(); + VLOG(3) << "LoadIntoMemoryByLib() read all file, file=" << filename + << ", cost time=" << timeline.ElapsedSec() + << " seconds, thread_id=" << thread_id_ << ", lines=" << lines; + } +#endif +} + +void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLine(void) { +#ifdef _LINUX + paddle::framework::CustomParser* parser = + global_dlmanager_pool().Load(so_parser_name_, all_slots_info_); + std::string filename; + BufferedLineFileReader line_reader; + line_reader.set_sample_rate(sample_rate_); + BufferedLineFileReader::LineFunc line_func = nullptr; + + while (this->PickOneFile(&filename)) { + VLOG(3) << "PickOneFile, filename=" << filename + << ", thread_id=" << thread_id_; + std::vector record_vec; + platform::Timer timeline; + timeline.Start(); + int offset = 0; + int old_offset = 0; + + SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + // get slotrecord object function + auto record_func = [this, &offset, &record_vec, &old_offset]( + std::vector& vec, int num) { + vec.resize(num); + if (offset + num > OBJPOOL_BLOCK_SIZE) { + input_channel_->WriteMove(offset, &record_vec[0]); + SlotRecordPool().get(&record_vec[0], offset); + record_vec.resize(OBJPOOL_BLOCK_SIZE); + offset = 0; + old_offset = 0; + } + for (int i = 0; i < num; ++i) { + auto& ins = record_vec[offset + i]; + ins->reset(); + vec[i] = ins; + } + offset = offset + num; + }; + + line_func = [this, &parser, &record_vec, &offset, &filename, &record_func, + &old_offset](const std::string& line) { + old_offset = offset; + if (!parser->ParseOneInstance(line, record_func)) { + offset = old_offset; + LOG(WARNING) << "read file:[" << filename << "] item error, line:[" + << line << "]"; + return false; + } + if (offset >= OBJPOOL_BLOCK_SIZE) { + input_channel_->Write(std::move(record_vec)); + record_vec.clear(); + SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + offset = 0; + } + return true; + }; + + int lines = 0; + + do { + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + lines = line_reader.read_file(this->fp_.get(), line_func, lines); + } while (line_reader.is_error()); + + if (offset > 0) { + input_channel_->WriteMove(offset, &record_vec[0]); + if (offset < OBJPOOL_BLOCK_SIZE) { + SlotRecordPool().put(&record_vec[offset], + (OBJPOOL_BLOCK_SIZE - offset)); + } + } else { + SlotRecordPool().put(&record_vec); + } + record_vec.clear(); + record_vec.shrink_to_fit(); + timeline.Pause(); + VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename + << ", cost time=" << timeline.ElapsedSec() + << " seconds, thread_id=" << thread_id_ << ", lines=" << lines + << ", sample lines=" << line_reader.get_sample_line() + << ", filesize=" << line_reader.file_size() / 1024.0 / 1024.0 + << "MB"; + } + + VLOG(3) << "LoadIntoMemoryByLib() end, thread_id=" << thread_id_ + << ", total size: " << line_reader.file_size(); +#endif +} + +void SlotRecordInMemoryDataFeed::LoadIntoMemoryByCommand(void) { +#ifdef _LINUX + std::string filename; + BufferedLineFileReader line_reader; + line_reader.set_sample_rate(sample_rate_); + + while (this->PickOneFile(&filename)) { + VLOG(3) << "PickOneFile, filename=" << filename + << ", thread_id=" << thread_id_; + int lines = 0; + std::vector record_vec; + platform::Timer timeline; + timeline.Start(); + SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + int offset = 0; + + do { + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + + lines = line_reader.read_file( + this->fp_.get(), + [this, &record_vec, &offset, &filename](const std::string& line) { + if (ParseOneInstance(line, &record_vec[offset])) { + ++offset; + } else { + LOG(WARNING) << "read file:[" << filename + << "] item error, line:[" << line << "]"; + return false; + } + if (offset >= OBJPOOL_BLOCK_SIZE) { + input_channel_->Write(std::move(record_vec)); + record_vec.clear(); + SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + offset = 0; + } + return true; + }, + lines); + } while (line_reader.is_error()); + if (offset > 0) { + input_channel_->WriteMove(offset, &record_vec[0]); + if (offset < OBJPOOL_BLOCK_SIZE) { + SlotRecordPool().put(&record_vec[offset], + (OBJPOOL_BLOCK_SIZE - offset)); + } + } else { + SlotRecordPool().put(&record_vec); + } + record_vec.clear(); + record_vec.shrink_to_fit(); + timeline.Pause(); + VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename + << ", lines=" << lines + << ", sample lines=" << line_reader.get_sample_line() + << ", cost time=" << timeline.ElapsedSec() + << " seconds, thread_id=" << thread_id_; + } + VLOG(3) << "LoadIntoMemory() end, thread_id=" << thread_id_ + << ", total size: " << line_reader.file_size(); +#endif +} + +static void parser_log_key(const std::string& log_key, uint64_t* search_id, + uint32_t* cmatch, uint32_t* rank) { + std::string searchid_str = log_key.substr(16, 16); + *search_id = static_cast(strtoull(searchid_str.c_str(), NULL, 16)); + std::string cmatch_str = log_key.substr(11, 3); + *cmatch = static_cast(strtoul(cmatch_str.c_str(), NULL, 16)); + std::string rank_str = log_key.substr(14, 2); + *rank = static_cast(strtoul(rank_str.c_str(), NULL, 16)); +} + +bool SlotRecordInMemoryDataFeed::ParseOneInstance(const std::string& line, + SlotRecord* ins) { + SlotRecord& rec = (*ins); + // parse line + const char* str = line.c_str(); + char* endptr = const_cast(str); + int pos = 0; + + thread_local std::vector> slot_float_feasigns; + thread_local std::vector> slot_uint64_feasigns; + slot_float_feasigns.resize(float_use_slot_size_); + slot_uint64_feasigns.resize(uint64_use_slot_size_); + + if (parse_ins_id_) { + int num = strtol(&str[pos], &endptr, 10); + CHECK(num == 1); // NOLINT + pos = endptr - str + 1; + size_t len = 0; + while (str[pos + len] != ' ') { + ++len; + } + rec->ins_id_ = std::string(str + pos, len); + pos += len + 1; + } + if (parse_logkey_) { + int num = strtol(&str[pos], &endptr, 10); + CHECK(num == 1); // NOLINT + pos = endptr - str + 1; + size_t len = 0; + while (str[pos + len] != ' ') { + ++len; + } + // parse_logkey + std::string log_key = std::string(str + pos, len); + uint64_t search_id; + uint32_t cmatch; + uint32_t rank; + parser_log_key(log_key, &search_id, &cmatch, &rank); + + rec->ins_id_ = log_key; + rec->search_id = search_id; + rec->cmatch = cmatch; + rec->rank = rank; + pos += len + 1; + } + + int float_total_slot_num = 0; + int uint64_total_slot_num = 0; + + for (size_t i = 0; i < all_slots_info_.size(); ++i) { + auto& info = all_slots_info_[i]; + int num = strtol(&str[pos], &endptr, 10); + PADDLE_ENFORCE(num, + "The number of ids can not be zero, you need padding " + "it in data generator; or if there is something wrong with " + "the data, please check if the data contains unresolvable " + "characters.\nplease check this error line: %s", + str); + if (info.used_idx != -1) { + if (info.type[0] == 'f') { // float + auto& slot_fea = slot_float_feasigns[info.slot_value_idx]; + slot_fea.clear(); + for (int j = 0; j < num; ++j) { + float feasign = strtof(endptr, &endptr); + if (fabs(feasign) < 1e-6 && !used_slots_info_[info.used_idx].dense) { + continue; + } + slot_fea.push_back(feasign); + ++float_total_slot_num; + } + } else if (info.type[0] == 'u') { // uint64 + auto& slot_fea = slot_uint64_feasigns[info.slot_value_idx]; + slot_fea.clear(); + for (int j = 0; j < num; ++j) { + uint64_t feasign = + static_cast(strtoull(endptr, &endptr, 10)); + if (feasign == 0 && !used_slots_info_[info.used_idx].dense) { + continue; + } + slot_fea.push_back(feasign); + ++uint64_total_slot_num; + } + } + pos = endptr - str; + } else { + for (int j = 0; j <= num; ++j) { + // pos = line.find_first_of(' ', pos + 1); + while (line[pos + 1] != ' ') { + pos++; + } + } + } + } + rec->slot_float_feasigns_.add_slot_feasigns(slot_float_feasigns, + float_total_slot_num); + rec->slot_uint64_feasigns_.add_slot_feasigns(slot_uint64_feasigns, + uint64_total_slot_num); + + return (uint64_total_slot_num > 0); +} + +void SlotRecordInMemoryDataFeed::PutToFeedVec(const SlotRecord* ins_vec, + int num) { + for (int j = 0; j < use_slot_size_; ++j) { + auto& feed = feed_vec_[j]; + if (feed == nullptr) { + continue; + } + + auto& slot_offset = offset_[j]; + slot_offset.clear(); + slot_offset.reserve(num + 1); + slot_offset.push_back(0); + + int total_instance = 0; + auto& info = used_slots_info_[j]; + // fill slot value with default value 0 + if (info.type[0] == 'f') { // float + auto& batch_fea = batch_float_feasigns_[j]; + batch_fea.clear(); + + for (int i = 0; i < num; ++i) { + auto r = ins_vec[i]; + size_t fea_num = 0; + float* slot_values = + r->slot_float_feasigns_.get_values(info.slot_value_idx, &fea_num); + batch_fea.resize(total_instance + fea_num); + memcpy(&batch_fea[total_instance], slot_values, + sizeof(float) * fea_num); + total_instance += fea_num; + slot_offset.push_back(total_instance); + } + + float* feasign = batch_fea.data(); + float* tensor_ptr = + feed->mutable_data({total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(float)); + + } else if (info.type[0] == 'u') { // uint64 + auto& batch_fea = batch_uint64_feasigns_[j]; + batch_fea.clear(); + + for (int i = 0; i < num; ++i) { + auto r = ins_vec[i]; + size_t fea_num = 0; + uint64_t* slot_values = + r->slot_uint64_feasigns_.get_values(info.slot_value_idx, &fea_num); + if (fea_num > 0) { + batch_fea.resize(total_instance + fea_num); + memcpy(&batch_fea[total_instance], slot_values, + sizeof(uint64_t) * fea_num); + total_instance += fea_num; + } + if (fea_num == 0) { + batch_fea.resize(total_instance + fea_num); + batch_fea[total_instance] = 0; + total_instance += 1; + } + slot_offset.push_back(total_instance); + } + + // no uint64_t type in paddlepaddle + uint64_t* feasign = batch_fea.data(); + int64_t* tensor_ptr = + feed->mutable_data({total_instance, 1}, this->place_); + CopyToFeedTensor(tensor_ptr, feasign, total_instance * sizeof(int64_t)); + } + + if (info.dense) { + if (info.inductive_shape_index != -1) { + info.local_shape[info.inductive_shape_index] = + total_instance / info.total_dims_without_inductive; + } + feed->Resize(framework::make_ddim(info.local_shape)); + } else { + LoD data_lod{slot_offset}; + feed_vec_[j]->set_lod(data_lod); + } + } +} + +void SlotRecordInMemoryDataFeed::ExpandSlotRecord(SlotRecord* rec) { + SlotRecord& ins = (*rec); + if (ins->slot_float_feasigns_.slot_offsets.empty()) { + return; + } + size_t total_value_size = ins->slot_float_feasigns_.slot_values.size(); + if (float_total_dims_size_ == total_value_size) { + return; + } + int float_slot_num = + static_cast(float_total_dims_without_inductives_.size()); + CHECK(float_slot_num == float_use_slot_size_); + std::vector old_values; + std::vector old_offsets; + old_values.swap(ins->slot_float_feasigns_.slot_values); + old_offsets.swap(ins->slot_float_feasigns_.slot_offsets); + + ins->slot_float_feasigns_.slot_values.resize(float_total_dims_size_); + ins->slot_float_feasigns_.slot_offsets.assign(float_slot_num + 1, 0); + + auto& slot_offsets = ins->slot_float_feasigns_.slot_offsets; + auto& slot_values = ins->slot_float_feasigns_.slot_values; + + uint32_t offset = 0; + int num = 0; + uint32_t old_off = 0; + int dim = 0; + + for (int i = 0; i < float_slot_num; ++i) { + dim = float_total_dims_without_inductives_[i]; + old_off = old_offsets[i]; + num = static_cast(old_offsets[i + 1] - old_off); + if (num == 0) { + // fill slot value with default value 0 + for (int k = 0; k < dim; ++k) { + slot_values[k + offset] = 0.0; + } + } else { + if (num == dim) { + memcpy(&slot_values[offset], &old_values[old_off], dim * sizeof(float)); + } else { + // position fea + // record position index need fix values + int pos_idx = static_cast(old_values[old_off]); + for (int k = 0; k < dim; ++k) { + if (k == pos_idx) { + slot_values[k + offset] = 1.0; + } else { + slot_values[k + offset] = 0.0; + } + } + } + } + slot_offsets[i] = offset; + offset += dim; + } + slot_offsets[float_slot_num] = offset; + CHECK(float_total_dims_size_ == static_cast(offset)); +} + +bool SlotRecordInMemoryDataFeed::Start() { +#ifdef _LINUX + this->CheckSetFileList(); + if (input_channel_->Size() != 0) { + std::vector data; + input_channel_->Read(data); + } +#endif + if (batch_offsets_.size() > 0) { + VLOG(3) << "batch_size offsets: " << batch_offsets_.size(); + enable_heterps_ = true; + this->offset_index_ = 0; + } + this->finish_start_ = true; + return true; +} + +int SlotRecordInMemoryDataFeed::Next() { +#ifdef _LINUX + this->CheckStart(); + + VLOG(3) << "enable heter next: " << offset_index_ + << " batch_offsets: " << batch_offsets_.size(); + if (offset_index_ >= batch_offsets_.size()) { + VLOG(3) << "offset_index: " << offset_index_ + << " batch_offsets: " << batch_offsets_.size(); + return 0; + } + auto& batch = batch_offsets_[offset_index_++]; + this->batch_size_ = batch.second; + VLOG(3) << "batch_size_=" << this->batch_size_ + << ", thread_id=" << thread_id_; + if (this->batch_size_ != 0) { + PutToFeedVec(&records_[batch.first], this->batch_size_); + } else { + VLOG(3) << "finish reading for heterps, batch size zero, thread_id=" + << thread_id_; + } + VLOG(3) << "enable heter next: " << offset_index_ + << " batch_offsets: " << batch_offsets_.size() + << " baych_size: " << this->batch_size_; + + return this->batch_size_; +#else + return 0; +#endif +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 198bc51463af35..dcdec505179034 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -39,8 +39,14 @@ limitations under the License. */ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/string/string_helper.h" +DECLARE_int32(record_pool_max_size); +DECLARE_int32(slotpool_thread_num); +DECLARE_bool(enable_slotpool_wait_release); +DECLARE_bool(enable_slotrecord_reset_shrink); + namespace paddle { namespace framework { class DataFeedDesc; @@ -69,6 +75,50 @@ namespace framework { // while (reader->Next()) { // // trainer do something // } + +template +struct SlotValues { + std::vector slot_values; + std::vector slot_offsets; + + void add_values(const T* values, uint32_t num) { + if (slot_offsets.empty()) { + slot_offsets.push_back(0); + } + if (num > 0) { + slot_values.insert(slot_values.end(), values, values + num); + } + slot_offsets.push_back(static_cast(slot_values.size())); + } + T* get_values(int idx, size_t* size) { + uint32_t& offset = slot_offsets[idx]; + (*size) = slot_offsets[idx + 1] - offset; + return &slot_values[offset]; + } + void add_slot_feasigns(const std::vector>& slot_feasigns, + uint32_t fea_num) { + slot_values.reserve(fea_num); + int slot_num = static_cast(slot_feasigns.size()); + slot_offsets.resize(slot_num + 1); + for (int i = 0; i < slot_num; ++i) { + auto& slot_val = slot_feasigns[i]; + slot_offsets[i] = static_cast(slot_values.size()); + uint32_t num = static_cast(slot_val.size()); + if (num > 0) { + slot_values.insert(slot_values.end(), slot_val.begin(), slot_val.end()); + } + } + slot_offsets[slot_num] = slot_values.size(); + } + void clear(bool shrink) { + slot_offsets.clear(); + slot_values.clear(); + if (shrink) { + slot_values.shrink_to_fit(); + slot_offsets.shrink_to_fit(); + } + } +}; union FeatureFeasign { uint64_t uint64_feasign_; float float_feasign_; @@ -97,6 +147,38 @@ struct FeatureItem { uint16_t slot_; }; +struct AllSlotInfo { + std::string slot; + std::string type; + int used_idx; + int slot_value_idx; +}; +struct UsedSlotInfo { + int idx; + int slot_value_idx; + std::string slot; + std::string type; + bool dense; + std::vector local_shape; + int total_dims_without_inductive; + int inductive_shape_index; +}; +struct SlotRecordObject { + uint64_t search_id; + uint32_t rank; + uint32_t cmatch; + std::string ins_id_; + SlotValues slot_uint64_feasigns_; + SlotValues slot_float_feasigns_; + + ~SlotRecordObject() { clear(true); } + void reset(void) { clear(FLAGS_enable_slotrecord_reset_shrink); } + void clear(bool shrink) { + slot_uint64_feasigns_.clear(shrink); + slot_float_feasigns_.clear(shrink); + } +}; +using SlotRecord = SlotRecordObject*; // sizeof Record is much less than std::vector struct Record { std::vector uint64_feasigns_; @@ -108,6 +190,179 @@ struct Record { uint32_t cmatch; }; +inline SlotRecord make_slotrecord() { + static const size_t slot_record_byte_size = sizeof(SlotRecordObject); + void* p = malloc(slot_record_byte_size); + new (p) SlotRecordObject; + return reinterpret_cast(p); +} + +inline void free_slotrecord(SlotRecordObject* p) { + p->~SlotRecordObject(); + free(p); +} + +template +class SlotObjAllocator { + public: + explicit SlotObjAllocator(std::function deleter) + : free_nodes_(NULL), capacity_(0), deleter_(deleter) {} + ~SlotObjAllocator() { clear(); } + + void clear() { + T* tmp = NULL; + while (free_nodes_ != NULL) { + tmp = reinterpret_cast(reinterpret_cast(free_nodes_)); + free_nodes_ = free_nodes_->next; + deleter_(tmp); + --capacity_; + } + CHECK_EQ(capacity_, static_cast(0)); + } + T* acquire(void) { + T* x = NULL; + x = reinterpret_cast(reinterpret_cast(free_nodes_)); + free_nodes_ = free_nodes_->next; + --capacity_; + return x; + } + void release(T* x) { + Node* node = reinterpret_cast(reinterpret_cast(x)); + node->next = free_nodes_; + free_nodes_ = node; + ++capacity_; + } + size_t capacity(void) { return capacity_; } + + private: + struct alignas(T) Node { + union { + Node* next; + char data[sizeof(T)]; + }; + }; + Node* free_nodes_; // a list + size_t capacity_; + std::function deleter_ = nullptr; +}; +static const int OBJPOOL_BLOCK_SIZE = 10000; +class SlotObjPool { + public: + SlotObjPool() + : max_capacity_(FLAGS_record_pool_max_size), alloc_(free_slotrecord) { + ins_chan_ = MakeChannel(); + ins_chan_->SetBlockSize(OBJPOOL_BLOCK_SIZE); + for (int i = 0; i < FLAGS_slotpool_thread_num; ++i) { + threads_.push_back(std::thread([this]() { run(); })); + } + disable_pool_ = false; + count_ = 0; + } + ~SlotObjPool() { + ins_chan_->Close(); + for (auto& t : threads_) { + t.join(); + } + } + void disable_pool(bool disable) { disable_pool_ = disable; } + void set_max_capacity(size_t max_capacity) { max_capacity_ = max_capacity; } + void get(std::vector* output, int n) { + output->resize(n); + return get(&(*output)[0], n); + } + void get(SlotRecord* output, int n) { + int size = 0; + mutex_.lock(); + int left = static_cast(alloc_.capacity()); + if (left > 0) { + size = (left >= n) ? n : left; + for (int i = 0; i < size; ++i) { + output[i] = alloc_.acquire(); + } + } + mutex_.unlock(); + count_ += n; + if (size == n) { + return; + } + for (int i = size; i < n; ++i) { + output[i] = make_slotrecord(); + } + } + void put(std::vector* input) { + size_t size = input->size(); + if (size == 0) { + return; + } + put(&(*input)[0], size); + input->clear(); + } + void put(SlotRecord* input, size_t size) { + CHECK(ins_chan_->WriteMove(size, input) == size); + } + void run(void) { + std::vector input; + while (ins_chan_->ReadOnce(input, OBJPOOL_BLOCK_SIZE)) { + if (input.empty()) { + continue; + } + // over max capacity + size_t n = input.size(); + count_ -= n; + if (disable_pool_ || n + capacity() > max_capacity_) { + for (auto& t : input) { + free_slotrecord(t); + } + } else { + for (auto& t : input) { + t->reset(); + } + mutex_.lock(); + for (auto& t : input) { + alloc_.release(t); + } + mutex_.unlock(); + } + input.clear(); + } + } + void clear(void) { + platform::Timer timeline; + timeline.Start(); + mutex_.lock(); + alloc_.clear(); + mutex_.unlock(); + // wait release channel data + if (FLAGS_enable_slotpool_wait_release) { + while (!ins_chan_->Empty()) { + sleep(1); + } + } + timeline.Pause(); + VLOG(3) << "clear slot pool data size=" << count_.load() + << ", span=" << timeline.ElapsedSec(); + } + size_t capacity(void) { + mutex_.lock(); + size_t total = alloc_.capacity(); + mutex_.unlock(); + return total; + } + + private: + size_t max_capacity_; + Channel ins_chan_; + std::vector threads_; + std::mutex mutex_; + SlotObjAllocator alloc_; + bool disable_pool_; + std::atomic count_; // NOLINT +}; + +inline SlotObjPool& SlotRecordPool() { + static SlotObjPool pool; + return pool; +} struct PvInstanceObject { std::vector ads; void merge_instance(Record* ins) { ads.push_back(ins); } @@ -129,7 +384,21 @@ class CustomParser { CustomParser() {} virtual ~CustomParser() {} virtual void Init(const std::vector& slots) = 0; + virtual bool Init(const std::vector& slots) = 0; virtual void ParseOneInstance(const char* str, Record* instance) = 0; + virtual bool ParseOneInstance( + const std::string& line, + std::function&, int)> + GetInsFunc) { // NOLINT + return true; + } + virtual bool ParseFileInstance( + std::function ReadBuffFunc, + std::function&, int, int)> + PullRecordsFunc, // NOLINT + int& lines) { // NOLINT + return false; + } }; typedef paddle::framework::CustomParser* (*CreateParserObjectFunc)(); @@ -194,6 +463,34 @@ class DLManager { return nullptr; } + paddle::framework::CustomParser* Load(const std::string& name, + const std::vector& conf) { +#ifdef _LINUX + std::lock_guard lock(mutex_); + DLHandle handle; + std::map::iterator it = handle_map_.find(name); + if (it != handle_map_.end()) { + return it->second.parser; + } + handle.module = dlopen(name.c_str(), RTLD_NOW); + if (handle.module == nullptr) { + VLOG(0) << "Create so of " << name << " fail"; + exit(-1); + return nullptr; + } + + CreateParserObjectFunc create_parser_func = + (CreateParserObjectFunc)dlsym(handle.module, "CreateParserObject"); + handle.parser = create_parser_func(); + handle.parser->Init(conf); + handle_map_.insert({name, handle}); + + return handle.parser; +#endif + VLOG(0) << "Not implement in windows"; + return nullptr; + } + paddle::framework::CustomParser* ReLoad(const std::string& name, const std::vector& conf) { Close(name); @@ -415,6 +712,11 @@ class InMemoryDataFeed : public DataFeed { virtual void SetCurrentPhase(int current_phase); virtual void LoadIntoMemory(); virtual void LoadIntoMemoryFromSo(); + virtual void SetRecord(T* records) { records_ = records; } + int GetDefaultBatchSize() { return default_batch_size_; } + void AddBatchOffset(const std::pair& offset) { + batch_offsets_.push_back(offset); + } protected: virtual bool ParseOneInstance(T* instance) = 0; @@ -424,6 +726,11 @@ class InMemoryDataFeed : public DataFeed { virtual void PutToFeedVec(const std::vector& ins_vec) = 0; virtual void PutToFeedVec(const T* ins_vec, int num) = 0; + std::vector> batch_float_feasigns_; + std::vector> batch_uint64_feasigns_; + std::vector> offset_; + std::vector visit_; + int thread_id_; int thread_num_; bool parse_ins_id_; @@ -783,11 +1090,7 @@ class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { MultiSlotInMemoryDataFeed() {} virtual ~MultiSlotInMemoryDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc); - void SetRecord(Record* records) { records_ = records; } - int GetDefaultBatchSize() { return default_batch_size_; } - void AddBatchOffset(const std::pair& offset) { - batch_offsets_.push_back(offset); - } + // void SetRecord(Record* records) { records_ = records; } protected: virtual bool ParseOneInstance(Record* instance); @@ -798,10 +1101,42 @@ class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { virtual void GetMsgFromLogKey(const std::string& log_key, uint64_t* search_id, uint32_t* cmatch, uint32_t* rank); virtual void PutToFeedVec(const Record* ins_vec, int num); - std::vector> batch_float_feasigns_; - std::vector> batch_uint64_feasigns_; - std::vector> offset_; - std::vector visit_; +}; + +class SlotRecordInMemoryDataFeed : public InMemoryDataFeed { + public: + SlotRecordInMemoryDataFeed() {} + virtual ~SlotRecordInMemoryDataFeed() {} + virtual void Init(const DataFeedDesc& data_feed_desc); + virtual void LoadIntoMemory(); + void ExpandSlotRecord(SlotRecord* ins); + + protected: + virtual bool Start(); + virtual int Next(); + virtual bool ParseOneInstance(SlotRecord* instance) { return false; } + virtual bool ParseOneInstanceFromPipe(SlotRecord* instance) { return false; } + // virtual void ParseOneInstanceFromSo(const char* str, T* instance, + // CustomParser* parser) {} + virtual void PutToFeedVec(const std::vector& ins_vec) {} + + virtual void LoadIntoMemoryByCommand(void); + virtual void LoadIntoMemoryByLib(void); + virtual void LoadIntoMemoryByLine(void); + virtual void LoadIntoMemoryByFile(void); + virtual void SetInputChannel(void* channel) { + input_channel_ = static_cast*>(channel); + } + bool ParseOneInstance(const std::string& line, SlotRecord* rec); + virtual void PutToFeedVec(const SlotRecord* ins_vec, int num); + float sample_rate_ = 1.0f; + int use_slot_size_ = 0; + int float_use_slot_size_ = 0; + int uint64_use_slot_size_ = 0; + std::vector all_slots_info_; + std::vector used_slots_info_; + size_t float_total_dims_size_ = 0; + std::vector float_total_dims_without_inductives_; }; class PaddleBoxDataFeed : public MultiSlotInMemoryDataFeed { diff --git a/paddle/fluid/framework/data_feed_factory.cc b/paddle/fluid/framework/data_feed_factory.cc index ec1b8ec773fa64..e46e4aeb0124c2 100644 --- a/paddle/fluid/framework/data_feed_factory.cc +++ b/paddle/fluid/framework/data_feed_factory.cc @@ -58,8 +58,8 @@ std::shared_ptr DataFeedFactory::CreateDataFeed( std::string data_feed_class) { if (g_data_feed_map.count(data_feed_class) < 1) { LOG(WARNING) << "Your DataFeed " << data_feed_class - << "is not supported currently"; - LOG(WARNING) << "Supported DataFeed: " << DataFeedTypeList(); + << " is not supported currently"; + LOG(WARNING) << " Supported DataFeed: " << DataFeedTypeList(); exit(-1); } return g_data_feed_map[data_feed_class](); @@ -68,6 +68,7 @@ std::shared_ptr DataFeedFactory::CreateDataFeed( REGISTER_DATAFEED_CLASS(MultiSlotDataFeed); REGISTER_DATAFEED_CLASS(MultiSlotInMemoryDataFeed); REGISTER_DATAFEED_CLASS(PaddleBoxDataFeed); +REGISTER_DATAFEED_CLASS(SlotRecordInMemoryDataFeed); #if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && !defined(_WIN32) REGISTER_DATAFEED_CLASS(MultiSlotFileInstantDataFeed); #endif diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 08c42a93d1fcbf..2a071665b263c6 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -351,10 +351,8 @@ static int compute_thread_batch_nccl( return thread_avg_batch_num; } -template -void DatasetImpl::SetHeterPs(bool enable_heterps) { +void MultiSlotDataset::PrepareTrain() { #ifdef PADDLE_WITH_GLOO - enable_heterps_ = enable_heterps; if (enable_heterps_) { if (input_records_.size() == 0 && input_channel_ != nullptr && input_channel_->Size() != 0) { @@ -541,22 +539,21 @@ void DatasetImpl::LocalShuffle() { << timeline.ElapsedSec() << " seconds"; } -template -void DatasetImpl::GlobalShuffle(int thread_num) { +void MultiSlotDataset::GlobalShuffle(int thread_num) { #ifdef PADDLE_WITH_PSLIB - VLOG(3) << "DatasetImpl::GlobalShuffle() begin"; + VLOG(3) << "MultiSlotDataset::GlobalShuffle() begin"; platform::Timer timeline; timeline.Start(); auto fleet_ptr = FleetWrapper::GetInstance(); if (!input_channel_ || input_channel_->Size() == 0) { - VLOG(3) << "DatasetImpl::GlobalShuffle() end, no data to shuffle"; + VLOG(3) << "MultiSlotDataset::GlobalShuffle() end, no data to shuffle"; return; } // local shuffle input_channel_->Close(); - std::vector data; + std::vector data; input_channel_->ReadAll(data); std::shuffle(data.begin(), data.end(), fleet_ptr->LocalRandomEngine()); input_channel_->Open(); @@ -566,10 +563,10 @@ void DatasetImpl::GlobalShuffle(int thread_num) { input_channel_->Close(); input_channel_->SetBlockSize(fleet_send_batch_size_); - VLOG(3) << "DatasetImpl::GlobalShuffle() input_channel_ size " + VLOG(3) << "MultiSlotDataset::GlobalShuffle() input_channel_ size " << input_channel_->Size(); - auto get_client_id = [this, fleet_ptr](const T& data) -> size_t { + auto get_client_id = [this, fleet_ptr](const Record& data) -> size_t { if (!this->merge_by_insid_) { return fleet_ptr->LocalRandomEngine()() % this->trainer_num_; } else { @@ -580,7 +577,7 @@ void DatasetImpl::GlobalShuffle(int thread_num) { auto global_shuffle_func = [this, get_client_id]() { auto fleet_ptr = FleetWrapper::GetInstance(); - std::vector data; + std::vector data; while (this->input_channel_->Read(data)) { std::vector ars(this->trainer_num_); for (auto& t : data) { @@ -835,9 +832,6 @@ void DatasetImpl::CreateReaders() { channel_idx = 0; } } - if (enable_heterps_) { - SetHeterPs(true); - } VLOG(3) << "readers size: " << readers_.size(); } @@ -923,9 +917,8 @@ int64_t DatasetImpl::GetShuffleDataSize() { return sum; } -template -int DatasetImpl::ReceiveFromClient(int msg_type, int client_id, - const std::string& msg) { +int MultiSlotDataset::ReceiveFromClient(int msg_type, int client_id, + const std::string& msg) { #ifdef _LINUX VLOG(3) << "ReceiveFromClient msg_type=" << msg_type << ", client_id=" << client_id << ", msg length=" << msg.length(); @@ -937,9 +930,9 @@ int DatasetImpl::ReceiveFromClient(int msg_type, int client_id, if (ar.Cursor() == ar.Finish()) { return 0; } - std::vector data; + std::vector data; while (ar.Cursor() < ar.Finish()) { - data.push_back(ar.Get()); + data.push_back(ar.Get()); } CHECK(ar.Cursor() == ar.Finish()); @@ -966,6 +959,20 @@ int DatasetImpl::ReceiveFromClient(int msg_type, int client_id, // explicit instantiation template class DatasetImpl; +void MultiSlotDataset::DynamicAdjustReadersNum(int thread_num) { + if (thread_num_ == thread_num) { + VLOG(3) << "DatasetImpl::DynamicAdjustReadersNum thread_num_=" + << thread_num_ << ", thread_num_=thread_num, no need to adjust"; + return; + } + VLOG(3) << "adjust readers num from " << thread_num_ << " to " << thread_num; + thread_num_ = thread_num; + std::vector>().swap(readers_); + CreateReaders(); + VLOG(3) << "adjust readers num done"; + PrepareTrain(); +} + void MultiSlotDataset::PostprocessInstance() { // divide pv instance, and merge to input_channel_ if (enable_pv_merge_) { @@ -1503,5 +1510,154 @@ void MultiSlotDataset::SlotsShuffle( << ", cost time=" << timeline.ElapsedSec() << " seconds"; } +template class DatasetImpl; +void SlotRecordDataset::CreateChannel() { + if (input_channel_ == nullptr) { + input_channel_ = paddle::framework::MakeChannel(); + } +} +void SlotRecordDataset::CreateReaders() { + VLOG(3) << "Calling CreateReaders()"; + VLOG(3) << "thread num in Dataset: " << thread_num_; + VLOG(3) << "Filelist size in Dataset: " << filelist_.size(); + VLOG(3) << "channel num in Dataset: " << channel_num_; + CHECK(thread_num_ > 0) << "thread num should > 0"; + CHECK(channel_num_ > 0) << "channel num should > 0"; + CHECK(channel_num_ <= thread_num_) << "channel num should <= thread num"; + VLOG(3) << "readers size: " << readers_.size(); + if (readers_.size() != 0) { + VLOG(3) << "readers_.size() = " << readers_.size() + << ", will not create again"; + return; + } + VLOG(3) << "data feed class name: " << data_feed_desc_.name(); + for (int i = 0; i < thread_num_; ++i) { + readers_.push_back(DataFeedFactory::CreateDataFeed(data_feed_desc_.name())); + readers_[i]->Init(data_feed_desc_); + readers_[i]->SetThreadId(i); + readers_[i]->SetThreadNum(thread_num_); + readers_[i]->SetFileListMutex(&mutex_for_pick_file_); + readers_[i]->SetFileListIndex(&file_idx_); + readers_[i]->SetFeaNumMutex(&mutex_for_fea_num_); + readers_[i]->SetFeaNum(&total_fea_num_); + readers_[i]->SetFileList(filelist_); + readers_[i]->SetParseInsId(parse_ins_id_); + readers_[i]->SetParseContent(parse_content_); + readers_[i]->SetParseLogKey(parse_logkey_); + readers_[i]->SetEnablePvMerge(enable_pv_merge_); + readers_[i]->SetCurrentPhase(current_phase_); + if (input_channel_ != nullptr) { + readers_[i]->SetInputChannel(input_channel_.get()); + } + } + VLOG(3) << "readers size: " << readers_.size(); +} + +void SlotRecordDataset::ReleaseMemory() { + VLOG(3) << "SlotRecordDataset::ReleaseMemory() begin"; + platform::Timer timeline; + timeline.Start(); + + if (input_channel_) { + input_channel_->Clear(); + input_channel_ = nullptr; + } + if (enable_heterps_) { + VLOG(3) << "put pool records size: " << input_records_.size(); + SlotRecordPool().put(&input_records_); + input_records_.clear(); + input_records_.shrink_to_fit(); + VLOG(3) << "release heterps input records records size: " + << input_records_.size(); + } + + readers_.clear(); + readers_.shrink_to_fit(); + + std::vector>().swap(readers_); + + VLOG(3) << "SlotRecordDataset::ReleaseMemory() end"; + VLOG(3) << "total_feasign_num_(" << STAT_GET(STAT_total_feasign_num_in_mem) + << ") - current_fea_num_(" << total_fea_num_ << ") = (" + << STAT_GET(STAT_total_feasign_num_in_mem) - total_fea_num_ << ")" + << " object pool size=" << SlotRecordPool().capacity(); // For Debug + STAT_SUB(STAT_total_feasign_num_in_mem, total_fea_num_); +} +void SlotRecordDataset::GlobalShuffle(int thread_num) { + // TODO(yaoxuefeng) + return; +} + +void SlotRecordDataset::DynamicAdjustChannelNum(int channel_num, + bool discard_remaining_ins) { + if (channel_num_ == channel_num) { + VLOG(3) << "DatasetImpl::DynamicAdjustChannelNum channel_num_=" + << channel_num_ << ", channel_num_=channel_num, no need to adjust"; + return; + } + VLOG(3) << "adjust channel num from " << channel_num_ << " to " + << channel_num; + channel_num_ = channel_num; + + if (static_cast(input_channel_->Size()) >= channel_num) { + input_channel_->SetBlockSize(input_channel_->Size() / channel_num + + (discard_remaining_ins ? 0 : 1)); + } + + VLOG(3) << "adjust channel num done"; +} + +void SlotRecordDataset::PrepareTrain() { +#ifdef PADDLE_WITH_GLOO + if (enable_heterps_) { + if (input_records_.size() == 0 && input_channel_ != nullptr && + input_channel_->Size() != 0) { + input_channel_->ReadAll(input_records_); + VLOG(3) << "read from channel to records with records size: " + << input_records_.size(); + } + VLOG(3) << "input records size: " << input_records_.size(); + int64_t total_ins_num = input_records_.size(); + std::vector> offset; + int default_batch_size = + reinterpret_cast(readers_[0].get()) + ->GetDefaultBatchSize(); + VLOG(3) << "thread_num: " << thread_num_ + << " memory size: " << total_ins_num + << " default batch_size: " << default_batch_size; + compute_thread_batch_nccl(thread_num_, total_ins_num, default_batch_size, + &offset); + VLOG(3) << "offset size: " << offset.size(); + for (int i = 0; i < thread_num_; i++) { + reinterpret_cast(readers_[i].get()) + ->SetRecord(&input_records_[0]); + } + for (size_t i = 0; i < offset.size(); i++) { + reinterpret_cast( + readers_[i % thread_num_].get()) + ->AddBatchOffset(offset[i]); + } + } +#else + PADDLE_THROW(platform::errors::Unavailable( + "dataset set heterps need compile with GLOO")); +#endif + return; +} + +void SlotRecordDataset::DynamicAdjustReadersNum(int thread_num) { + if (thread_num_ == thread_num) { + VLOG(3) << "DatasetImpl::DynamicAdjustReadersNum thread_num_=" + << thread_num_ << ", thread_num_=thread_num, no need to adjust"; + return; + } + VLOG(3) << "adjust readers num from " << thread_num_ << " to " << thread_num; + thread_num_ = thread_num; + std::vector>().swap(readers_); + CreateReaders(); + VLOG(3) << "adjust readers num done"; + PrepareTrain(); +} + } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index f3ee96fab8297f..981fb694e0fec9 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -149,7 +149,6 @@ class Dataset { virtual void DynamicAdjustReadersNum(int thread_num) = 0; // set fleet send sleep seconds virtual void SetFleetSendSleepSeconds(int seconds) = 0; - virtual void SetHeterPs(bool enable_heterps) = 0; protected: virtual int ReceiveFromClient(int msg_type, int client_id, @@ -207,7 +206,7 @@ class DatasetImpl : public Dataset { virtual void WaitPreLoadDone(); virtual void ReleaseMemory(); virtual void LocalShuffle(); - virtual void GlobalShuffle(int thread_num = -1); + virtual void GlobalShuffle(int thread_num = -1) {} virtual void SlotsShuffle(const std::set& slots_to_replace) {} virtual const std::vector& GetSlotsOriginalData() { return slots_shuffle_original_data_; @@ -233,7 +232,11 @@ class DatasetImpl : public Dataset { bool discard_remaining_ins = false); virtual void DynamicAdjustReadersNum(int thread_num); virtual void SetFleetSendSleepSeconds(int seconds); - virtual void SetHeterPs(bool enable_heterps); + /* for enable_heterps_ + virtual void EnableHeterps(bool enable_heterps) { + enable_heterps_ = enable_heterps; + } + */ std::vector>& GetMultiOutputChannel() { return multi_output_channel_; @@ -251,7 +254,10 @@ class DatasetImpl : public Dataset { protected: virtual int ReceiveFromClient(int msg_type, int client_id, - const std::string& msg); + const std::string& msg) { + // TODO(yaoxuefeng) for SlotRecordDataset + return -1; + } std::vector> readers_; std::vector> preload_readers_; paddle::framework::Channel input_channel_; @@ -327,6 +333,32 @@ class MultiSlotDataset : public DatasetImpl { const std::unordered_set& slots_to_replace, std::vector* result); virtual ~MultiSlotDataset() {} + virtual void GlobalShuffle(int thread_num = -1); + virtual void DynamicAdjustReadersNum(int thread_num); + virtual void PrepareTrain(); + + protected: + virtual int ReceiveFromClient(int msg_type, int client_id, + const std::string& msg); +}; +class SlotRecordDataset : public DatasetImpl { + public: + SlotRecordDataset() { SlotRecordPool(); } + virtual ~SlotRecordDataset() {} + // create input channel + virtual void CreateChannel(); + // create readers + virtual void CreateReaders(); + // release memory + virtual void ReleaseMemory(); + virtual void GlobalShuffle(int thread_num = -1); + virtual void DynamicAdjustChannelNum(int channel_num, + bool discard_remaining_ins); + virtual void PrepareTrain(); + virtual void DynamicAdjustReadersNum(int thread_num); + + protected: + bool enable_heterps_ = true; }; } // end namespace framework diff --git a/paddle/fluid/framework/dataset_factory.cc b/paddle/fluid/framework/dataset_factory.cc index aeaf9611853238..38200927c5586f 100644 --- a/paddle/fluid/framework/dataset_factory.cc +++ b/paddle/fluid/framework/dataset_factory.cc @@ -53,7 +53,7 @@ std::unique_ptr DatasetFactory::CreateDataset( std::string dataset_class) { if (g_dataset_map.count(dataset_class) < 1) { LOG(WARNING) << "Your Dataset " << dataset_class - << "is not supported currently"; + << " is not supported currently"; LOG(WARNING) << "Supported Dataset: " << DatasetTypeList(); exit(-1); } @@ -61,5 +61,6 @@ std::unique_ptr DatasetFactory::CreateDataset( } REGISTER_DATASET_CLASS(MultiSlotDataset); +REGISTER_DATASET_CLASS(SlotRecordDataset); } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 784cbc3d90b865..d1e98a711dc9dd 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -45,9 +45,7 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { platform::Timer timeline; timeline.Start(); int device_num = heter_devices_.size(); - MultiSlotDataset* dataset = dynamic_cast(dataset_); gpu_task->init(thread_keys_shard_num_, device_num); - auto input_channel = dataset->GetInputChannel(); auto& local_keys = gpu_task->feature_keys_; auto& local_ptr = gpu_task->value_ptr_; @@ -68,35 +66,83 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { for (int i = 0; i < thread_keys_thread_num_; i++) { thread_keys_[i].resize(thread_keys_shard_num_); } - const std::deque& vec_data = input_channel->GetData(); - size_t total_len = vec_data.size(); - size_t len_per_thread = total_len / thread_keys_thread_num_; - int remain = total_len % thread_keys_thread_num_; + + size_t total_len = 0; + size_t len_per_thread = 0; + int remain = 0; size_t begin = 0; - auto gen_func = [this](const std::deque& total_data, int begin_index, - int end_index, int i) { - for (auto iter = total_data.begin() + begin_index; - iter != total_data.begin() + end_index; iter++) { - const auto& ins = *iter; - const auto& feasign_v = ins.uint64_feasigns_; - for (const auto feasign : feasign_v) { - uint64_t cur_key = feasign.sign().uint64_feasign_; - int shard_id = cur_key % thread_keys_shard_num_; - this->thread_keys_[i][shard_id].insert(cur_key); + + std::string data_set_name = std::string(typeid(*dataset_).name()); + + if (data_set_name.find("SlotRecordDataset") != std::string::npos) { + VLOG(0) << "ps_gpu_wrapper use SlotRecordDataset"; + SlotRecordDataset* dataset = dynamic_cast(dataset_); + auto input_channel = dataset->GetInputChannel(); + VLOG(0) << "yxf::buildtask::inputslotchannle size: " + << input_channel->Size(); + const std::deque& vec_data = input_channel->GetData(); + total_len = vec_data.size(); + len_per_thread = total_len / thread_keys_thread_num_; + remain = total_len % thread_keys_thread_num_; + VLOG(0) << "total len: " << total_len; + auto gen_func = [this](const std::deque& total_data, + int begin_index, int end_index, int i) { + for (auto iter = total_data.begin() + begin_index; + iter != total_data.begin() + end_index; iter++) { + const auto& ins = *iter; + const auto& feasign_v = ins->slot_uint64_feasigns_.slot_values; + for (const auto feasign : feasign_v) { + int shard_id = feasign % thread_keys_shard_num_; + this->thread_keys_[i][shard_id].insert(feasign); + } } + }; + for (int i = 0; i < thread_keys_thread_num_; i++) { + threads.push_back( + std::thread(gen_func, std::ref(vec_data), begin, + begin + len_per_thread + (i < remain ? 1 : 0), i)); + begin += len_per_thread + (i < remain ? 1 : 0); } - }; - for (int i = 0; i < thread_keys_thread_num_; i++) { - threads.push_back(std::thread(gen_func, std::ref(vec_data), begin, - begin + len_per_thread + (i < remain ? 1 : 0), - i)); - begin += len_per_thread + (i < remain ? 1 : 0); - } - for (std::thread& t : threads) { - t.join(); + for (std::thread& t : threads) { + t.join(); + } + timeline.Pause(); + VLOG(1) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds."; + } else { + CHECK(data_set_name.find("MultiSlotDataset") != std::string::npos); + VLOG(0) << "ps_gpu_wrapper use MultiSlotDataset"; + MultiSlotDataset* dataset = dynamic_cast(dataset_); + auto input_channel = dataset->GetInputChannel(); + + const std::deque& vec_data = input_channel->GetData(); + total_len = vec_data.size(); + len_per_thread = total_len / thread_keys_thread_num_; + remain = total_len % thread_keys_thread_num_; + auto gen_func = [this](const std::deque& total_data, + int begin_index, int end_index, int i) { + for (auto iter = total_data.begin() + begin_index; + iter != total_data.begin() + end_index; iter++) { + const auto& ins = *iter; + const auto& feasign_v = ins.uint64_feasigns_; + for (const auto feasign : feasign_v) { + uint64_t cur_key = feasign.sign().uint64_feasign_; + int shard_id = cur_key % thread_keys_shard_num_; + this->thread_keys_[i][shard_id].insert(cur_key); + } + } + }; + for (int i = 0; i < thread_keys_thread_num_; i++) { + threads.push_back( + std::thread(gen_func, std::ref(vec_data), begin, + begin + len_per_thread + (i < remain ? 1 : 0), i)); + begin += len_per_thread + (i < remain ? 1 : 0); + } + for (std::thread& t : threads) { + t.join(); + } + timeline.Pause(); + VLOG(1) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds."; } - timeline.Pause(); - VLOG(1) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds."; timeline.Start(); diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index b97c3106439bed..e33baa521630a9 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -673,3 +673,13 @@ PADDLE_DEFINE_EXPORTED_int32(get_host_by_name_time, 120, PADDLE_DEFINE_EXPORTED_bool( apply_pass_to_program, false, "It controls whether to apply IR pass to program when using Fleet APIs"); + +DEFINE_int32(record_pool_max_size, 2000000, + "SlotRecordDataset slot record pool max size"); +DEFINE_int32(slotpool_thread_num, 1, "SlotRecordDataset slot pool thread num"); +DEFINE_bool(enable_slotpool_wait_release, false, + "enable slotrecord obejct wait release, default false"); +DEFINE_bool(enable_slotrecord_reset_shrink, false, + "enable slotrecord obejct reset shrink memory, default false"); +DEFINE_bool(enable_ins_parser_file, false, + "enable parser ins file , default false"); diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 41cf0189d3d9d0..7a32d8729fc6ca 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -309,8 +309,6 @@ void BindDataset(py::module *m) { &framework::Dataset::SetFleetSendSleepSeconds, py::call_guard()) .def("enable_pv_merge", &framework::Dataset::EnablePvMerge, - py::call_guard()) - .def("set_heter_ps", &framework::Dataset::SetHeterPs, py::call_guard()); py::class_(*m, "IterableDatasetWrapper")