@@ -31,6 +31,11 @@ USE_INT_STAT(STAT_total_feasign_num_in_mem);
3131namespace paddle {
3232namespace framework {
3333
34+ DLManager& global_dlmanager_pool () {
35+ static DLManager manager;
36+ return manager;
37+ }
38+
3439void RecordCandidateList::ReSize (size_t length) {
3540 mutex_.lock ();
3641 capacity_ = length;
@@ -366,6 +371,10 @@ void InMemoryDataFeed<T>::SetParseInsId(bool parse_ins_id) {
366371template <typename T>
367372void InMemoryDataFeed<T>::LoadIntoMemory() {
368373#ifdef _LINUX
374+ if (!so_parser_name_.empty ()) {
375+ LoadIntoMemoryFromSo ();
376+ return ;
377+ }
369378 VLOG (3 ) << " LoadIntoMemory() begin, thread_id=" << thread_id_;
370379 std::string filename;
371380 while (this ->PickOneFile (&filename)) {
@@ -408,6 +417,51 @@ void InMemoryDataFeed<T>::LoadIntoMemory() {
408417#endif
409418}
410419
420+ template <typename T>
421+ void InMemoryDataFeed<T>::LoadIntoMemoryFromSo () {
422+ #ifdef _LINUX
423+ VLOG (3 ) << " LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_;
424+
425+ string::LineFileReader reader;
426+ paddle::framework::CustomParser* parser =
427+ global_dlmanager_pool ().Load (so_parser_name_, slot_conf_);
428+
429+ std::string filename;
430+ while (this ->PickOneFile (&filename)) {
431+ VLOG (3 ) << " PickOneFile, filename=" << filename
432+ << " , thread_id=" << thread_id_;
433+ int err_no = 0 ;
434+ this ->fp_ = fs_open_read (filename, &err_no, this ->pipe_command_ );
435+ CHECK (this ->fp_ != nullptr );
436+ __fsetlocking (&*(this ->fp_ ), FSETLOCKING_BYCALLER);
437+
438+ paddle::framework::ChannelWriter<T> writer (input_channel_);
439+ T instance;
440+ platform::Timer timeline;
441+ timeline.Start ();
442+
443+ while (1 ) {
444+ if (!reader.getline (&*(fp_.get ()))) {
445+ break ;
446+ } else {
447+ const char * str = reader.get ();
448+ ParseOneInstanceFromSo (str, &instance, parser);
449+ }
450+
451+ writer << std::move (instance);
452+ instance = T ();
453+ }
454+
455+ writer.Flush ();
456+ timeline.Pause ();
457+ VLOG (3 ) << " LoadIntoMemoryFromSo() read all lines, file=" << filename
458+ << " , cost time=" << timeline.ElapsedSec ()
459+ << " seconds, thread_id=" << thread_id_;
460+ }
461+ VLOG (3 ) << " LoadIntoMemoryFromSo() end, thread_id=" << thread_id_;
462+ #endif
463+ }
464+
411465// explicit instantiation
412466template class InMemoryDataFeed <Record>;
413467
@@ -827,16 +881,23 @@ void MultiSlotInMemoryDataFeed::Init(
827881 inductive_shape_index_.resize (all_slot_num);
828882 use_slots_.clear ();
829883 use_slots_is_dense_.clear ();
884+ slot_conf_.resize (all_slot_num);
830885 for (size_t i = 0 ; i < all_slot_num; ++i) {
831886 const auto & slot = multi_slot_desc.slots (i);
832887 all_slots_[i] = slot.name ();
833888 all_slots_type_[i] = slot.type ();
834889 use_slots_index_[i] = slot.is_used () ? use_slots_.size () : -1 ;
890+
891+ slot_conf_[i].name = slot.name ();
892+ slot_conf_[i].type = slot.type ();
893+ slot_conf_[i].use_slots_index = use_slots_index_[i];
894+
835895 total_dims_without_inductive_[i] = 1 ;
836896 inductive_shape_index_[i] = -1 ;
837897 if (slot.is_used ()) {
838898 use_slots_.push_back (all_slots_[i]);
839899 use_slots_is_dense_.push_back (slot.is_dense ());
900+ slot_conf_[i].use_slots_is_dense = slot.is_dense ();
840901 std::vector<int > local_shape;
841902 if (slot.is_dense ()) {
842903 for (int j = 0 ; j < slot.shape_size (); ++j) {
@@ -869,6 +930,7 @@ void MultiSlotInMemoryDataFeed::Init(
869930 }
870931 visit_.resize (all_slot_num, false );
871932 pipe_command_ = data_feed_desc.pipe_command ();
933+ so_parser_name_ = data_feed_desc.so_parser_name ();
872934 finish_init_ = true ;
873935 input_type_ = data_feed_desc.input_type ();
874936}
@@ -887,6 +949,12 @@ void MultiSlotInMemoryDataFeed::GetMsgFromLogKey(const std::string& log_key,
887949 *rank = (uint32_t )strtoul (rank_str.c_str (), NULL , 16 );
888950}
889951
952+ void MultiSlotInMemoryDataFeed::ParseOneInstanceFromSo (const char * str,
953+ Record* instance,
954+ CustomParser* parser) {
955+ parser->ParseOneInstance (str, instance);
956+ }
957+
890958bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe (Record* instance) {
891959#ifdef _LINUX
892960 thread_local string::LineFileReader reader;
0 commit comments