@@ -257,6 +257,11 @@ bool InMemoryDataFeed<T>::Start() {
257257 output_channel_->Write (std::move (data));
258258 }
259259#endif
260+ if (batch_offsets_.size () > 0 ) {
261+ VLOG (3 ) << " batch_size offsets: " << batch_offsets_.size ();
262+ enable_heterps_ = true ;
263+ this ->offset_index_ = 0 ;
264+ }
260265 this ->finish_start_ = true ;
261266 return true ;
262267}
@@ -265,34 +270,64 @@ template <typename T>
265270int InMemoryDataFeed<T>::Next() {
266271#ifdef _LINUX
267272 this ->CheckStart ();
268- CHECK (output_channel_ != nullptr );
269- CHECK (consume_channel_ != nullptr );
270- VLOG (3 ) << " output_channel_ size=" << output_channel_->Size ()
271- << " , consume_channel_ size=" << consume_channel_->Size ()
272- << " , thread_id=" << thread_id_;
273- int index = 0 ;
274- T instance;
275- std::vector<T> ins_vec;
276- ins_vec.reserve (this ->default_batch_size_ );
277- while (index < this ->default_batch_size_ ) {
278- if (output_channel_->Size () == 0 ) {
279- break ;
273+ if (!enable_heterps_) {
274+ CHECK (output_channel_ != nullptr );
275+ CHECK (consume_channel_ != nullptr );
276+ VLOG (3 ) << " output_channel_ size=" << output_channel_->Size ()
277+ << " , consume_channel_ size=" << consume_channel_->Size ()
278+ << " , thread_id=" << thread_id_;
279+ int index = 0 ;
280+ T instance;
281+ std::vector<T> ins_vec;
282+ ins_vec.reserve (this ->default_batch_size_ );
283+ while (index < this ->default_batch_size_ ) {
284+ if (output_channel_->Size () == 0 ) {
285+ break ;
286+ }
287+ output_channel_->Get (instance);
288+ ins_vec.push_back (instance);
289+ ++index;
290+ consume_channel_->Put (std::move (instance));
291+ }
292+ this ->batch_size_ = index;
293+ VLOG (3 ) << " batch_size_=" << this ->batch_size_
294+ << " , thread_id=" << thread_id_;
295+ if (this ->batch_size_ != 0 ) {
296+ PutToFeedVec (ins_vec);
297+ } else {
298+ VLOG (3 ) << " finish reading, output_channel_ size="
299+ << output_channel_->Size ()
300+ << " , consume_channel_ size=" << consume_channel_->Size ()
301+ << " , thread_id=" << thread_id_;
280302 }
281- output_channel_->Get (instance);
282- ins_vec.push_back (instance);
283- ++index;
284- consume_channel_->Put (std::move (instance));
285- }
286- this ->batch_size_ = index;
287- VLOG (3 ) << " batch_size_=" << this ->batch_size_
288- << " , thread_id=" << thread_id_;
289- if (this ->batch_size_ != 0 ) {
290- PutToFeedVec (ins_vec);
291303 } else {
292- VLOG (3 ) << " finish reading, output_channel_ size="
293- << output_channel_->Size ()
294- << " , consume_channel_ size=" << consume_channel_->Size ()
304+ VLOG (3 ) << " enable heter NEXT: " << offset_index_
305+ << " batch_offsets: " << batch_offsets_.size ();
306+ if (offset_index_ >= batch_offsets_.size ()) {
307+ VLOG (3 ) << " offset_index: " << offset_index_
308+ << " batch_offsets: " << batch_offsets_.size ();
309+ return 0 ;
310+ }
311+ auto & batch = batch_offsets_[offset_index_++];
312+ this ->batch_size_ = batch.second ;
313+ VLOG (3 ) << " batch_size_=" << this ->batch_size_
295314 << " , thread_id=" << thread_id_;
315+ if (this ->batch_size_ != 0 ) {
316+ PutToFeedVec (&records_[batch.first ], this ->batch_size_ );
317+ } else {
318+ VLOG (3 ) << " finish reading for heterps, batch size zero, thread_id="
319+ << thread_id_;
320+ }
321+ /*
322+ if (offset_index_ == batch_offsets_.size() - 1) {
323+ std::vector<Record> data;
324+ output_channel_->ReadAll(data);
325+ consume_channel_->Write(std::move(data));
326+ }
327+ */
328+ VLOG (3 ) << " #15 enable heter NEXT: " << offset_index_
329+ << " batch_offsets: " << batch_offsets_.size ()
330+ << " baych_size: " << this ->batch_size_ ;
296331 }
297332 return this ->batch_size_ ;
298333#else
@@ -1141,6 +1176,103 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) {
11411176 return false ;
11421177}
11431178
1179+ void MultiSlotInMemoryDataFeed::PutToFeedVec (const Record* ins_vec, int num) {
1180+ #ifdef _LINUX
1181+ for (size_t i = 0 ; i < batch_float_feasigns_.size (); ++i) {
1182+ batch_float_feasigns_[i].clear ();
1183+ batch_uint64_feasigns_[i].clear ();
1184+ offset_[i].clear ();
1185+ offset_[i].push_back (0 );
1186+ }
1187+ ins_content_vec_.clear ();
1188+ ins_content_vec_.reserve (num);
1189+ ins_id_vec_.clear ();
1190+ ins_id_vec_.reserve (num);
1191+ for (int i = 0 ; i < num; ++i) {
1192+ auto & r = ins_vec[i];
1193+ ins_id_vec_.push_back (r.ins_id_ );
1194+ ins_content_vec_.push_back (r.content_ );
1195+ for (auto & item : r.float_feasigns_ ) {
1196+ batch_float_feasigns_[item.slot ()].push_back (item.sign ().float_feasign_ );
1197+ visit_[item.slot ()] = true ;
1198+ }
1199+ for (auto & item : r.uint64_feasigns_ ) {
1200+ batch_uint64_feasigns_[item.slot ()].push_back (
1201+ item.sign ().uint64_feasign_ );
1202+ visit_[item.slot ()] = true ;
1203+ }
1204+ for (size_t j = 0 ; j < use_slots_.size (); ++j) {
1205+ const auto & type = all_slots_type_[j];
1206+ if (visit_[j]) {
1207+ visit_[j] = false ;
1208+ } else {
1209+ // fill slot value with default value 0
1210+ if (type[0 ] == ' f' ) { // float
1211+ batch_float_feasigns_[j].push_back (0.0 );
1212+ } else if (type[0 ] == ' u' ) { // uint64
1213+ batch_uint64_feasigns_[j].push_back (0 );
1214+ }
1215+ }
1216+ // get offset of this ins in this slot
1217+ if (type[0 ] == ' f' ) { // float
1218+ offset_[j].push_back (batch_float_feasigns_[j].size ());
1219+ } else if (type[0 ] == ' u' ) { // uint64
1220+ offset_[j].push_back (batch_uint64_feasigns_[j].size ());
1221+ }
1222+ }
1223+ }
1224+
1225+ for (size_t i = 0 ; i < use_slots_.size (); ++i) {
1226+ if (feed_vec_[i] == nullptr ) {
1227+ continue ;
1228+ }
1229+ int total_instance = offset_[i].back ();
1230+ const auto & type = all_slots_type_[i];
1231+ if (type[0 ] == ' f' ) { // float
1232+ float * feasign = batch_float_feasigns_[i].data ();
1233+ float * tensor_ptr =
1234+ feed_vec_[i]->mutable_data <float >({total_instance, 1 }, this ->place_ );
1235+ CopyToFeedTensor (tensor_ptr, feasign, total_instance * sizeof (float ));
1236+ } else if (type[0 ] == ' u' ) { // uint64
1237+ // no uint64_t type in paddlepaddle
1238+ uint64_t * feasign = batch_uint64_feasigns_[i].data ();
1239+ int64_t * tensor_ptr = feed_vec_[i]->mutable_data <int64_t >(
1240+ {total_instance, 1 }, this ->place_ );
1241+ CopyToFeedTensor (tensor_ptr, feasign, total_instance * sizeof (int64_t ));
1242+ }
1243+ auto & slot_offset = offset_[i];
1244+ if (this ->input_type_ == 0 ) {
1245+ LoD data_lod{slot_offset};
1246+ feed_vec_[i]->set_lod (data_lod);
1247+ } else if (this ->input_type_ == 1 ) {
1248+ if (!use_slots_is_dense_[i]) {
1249+ std::vector<size_t > tmp_offset;
1250+ PADDLE_ENFORCE_EQ (slot_offset.size (), 2 ,
1251+ platform::errors::InvalidArgument (
1252+ " In batch reader, the sparse tensor lod size "
1253+ " must be 2, but received %d." ,
1254+ slot_offset.size ()));
1255+ const auto & max_size = slot_offset[1 ];
1256+ tmp_offset.reserve (max_size + 1 );
1257+ for (unsigned int k = 0 ; k <= max_size; k++) {
1258+ tmp_offset.emplace_back (k);
1259+ }
1260+ slot_offset = tmp_offset;
1261+ LoD data_lod{slot_offset};
1262+ feed_vec_[i]->set_lod (data_lod);
1263+ }
1264+ }
1265+ if (use_slots_is_dense_[i]) {
1266+ if (inductive_shape_index_[i] != -1 ) {
1267+ use_slots_shape_[i][inductive_shape_index_[i]] =
1268+ total_instance / total_dims_without_inductive_[i];
1269+ }
1270+ feed_vec_[i]->Resize (framework::make_ddim (use_slots_shape_[i]));
1271+ }
1272+ }
1273+ #endif
1274+ }
1275+
11441276void MultiSlotInMemoryDataFeed::PutToFeedVec (
11451277 const std::vector<Record>& ins_vec) {
11461278#ifdef _LINUX
0 commit comments