@@ -309,20 +309,18 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions
309309 auto * parquet_fragment = checked_cast<ParquetFileFragment*>(fragment);
310310 std::vector<int > row_groups;
311311
312-
313- bool pre_filtered = false ;
314- auto MakeEmpty = [] { return MakeEmptyIterator<std::shared_ptr<ScanTask>>(); };
315-
316- // If RowGroup metadata is cached completely we can pre-filter RowGroups before opening
317- // a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to
318- // prior statistics knowledge. In the case where a RowGroup doesn't have statistics
319- // metdata, it will not be excluded.
320- if (parquet_fragment->metadata () != nullptr ) {
321- ARROW_ASSIGN_OR_RAISE (row_groups, parquet_fragment->FilterRowGroups (options->filter ));
322-
323- pre_filtered = true ;
324- if (row_groups.empty ()) MakeEmpty ();
325- }
312+ // FIXME REBASE: disable this feature temporarily 2021/01/26 hongze
313+ //
314+ // // If RowGroup metadata is cached completely we can pre-filter RowGroups before opening
315+ // // a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to
316+ // // prior statistics knowledge. In the case where a RowGroup doesn't have statistics
317+ // // metdata, it will not be excluded.
318+ // if (parquet_fragment->metadata() != nullptr) {
319+ // ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter));
320+ //
321+ // pre_filtered = true;
322+ // if (row_groups.empty()) MakeEmpty();
323+ // }
326324
327325 // Open the reader and pay the real IO cost.
328326
@@ -331,16 +329,46 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions
331329 ARROW_ASSIGN_OR_RAISE (std::shared_ptr<parquet::arrow::FileReader> file_reader,
332330 GetReader (source, options.get (), context.get ()));
333331
332+ auto reader = file_reader->parquet_reader ();
334333 // Ensure that parquet_fragment has FileMetaData
335334 RETURN_NOT_OK (parquet_fragment->EnsureCompleteMetadata (file_reader.get ()));
336335
337- if (!pre_filtered) {
338- ARROW_ASSIGN_OR_RAISE (row_groups, parquet_fragment->FilterRowGroups (options->filter ));
339- if (row_groups.empty ()) MakeEmpty ();
336+ ARROW_ASSIGN_OR_RAISE (row_groups, parquet_fragment->FilterRowGroups (options->filter ));
337+
338+ for (int i : row_groups) {
339+ if (i >= reader->metadata ()->num_row_groups ()) {
340+ return Status::IndexError (" trying to scan row group " , i, " but " , source.path (),
341+ " only has " , reader->metadata ()->num_row_groups (),
342+ " row groups" );
343+ }
344+ }
345+ if (source.start_offset () != -1L ) {
346+ // random read
347+ std::vector<int > random_read_selected_row_groups = std::vector<int >();
348+ for (int i : row_groups) {
349+ std::shared_ptr<parquet::ColumnChunkMetaData> leading_cc =
350+ reader->RowGroup (i)->metadata ()->ColumnChunk (0 );
351+ int64_t r_start = leading_cc->data_page_offset ();
352+ if (leading_cc->has_dictionary_page () &&
353+ r_start > leading_cc->dictionary_page_offset ()) {
354+ r_start = leading_cc->dictionary_page_offset ();
355+ }
356+ int64_t r_bytes = 0L ;
357+ for (int col_id = 0 ; col_id < reader->RowGroup (i)->metadata ()->num_columns ();
358+ col_id++) {
359+ r_bytes += reader->
360+ RowGroup (i)->metadata ()->ColumnChunk (col_id)->total_compressed_size ();
361+ }
362+ int64_t midpoint = r_start + r_bytes / 2 ;
363+ if (midpoint >= source.start_offset ()
364+ && midpoint < (source.start_offset () + source.length ())) {
365+ random_read_selected_row_groups.push_back (i);
366+ }
367+ }
368+ row_groups = random_read_selected_row_groups;
340369 }
341-
342370 if (row_groups.empty ()) {
343- return MakeEmpty ();
371+ return arrow::MakeEmptyIterator<std::shared_ptr<ScanTask>> ();
344372 }
345373
346374 auto column_projection = InferColumnProjection (*file_reader, *options);
@@ -447,51 +475,14 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r
447475 }
448476 physical_schema_ = std::move (schema);
449477
450- auto parquet_reader = reader->parquet_reader ();
451-
452478 if (!row_groups_) {
453- auto all_row_groups = internal::Iota (reader->num_row_groups ());
454- FileSource source = this ->source ();
455- for (int i : all_row_groups) {
456- if (i >= metadata_->num_row_groups ()) {
457- return Status::IndexError (" trying to scan row group " , i, " but " , source.path (),
458- " only has " , metadata_->num_row_groups (),
459- " row groups" );
460- }
461- }
462- if (source.start_offset () != -1L ) {
463- // random read
464- std::vector<int > random_read_selected_row_groups = std::vector<int >();
465- for (int i : all_row_groups) {
466- std::shared_ptr<parquet::ColumnChunkMetaData> leading_cc =
467- parquet_reader->RowGroup (i)->metadata ()->ColumnChunk (0 );
468- int64_t r_start = leading_cc->data_page_offset ();
469- if (leading_cc->has_dictionary_page () &&
470- r_start > leading_cc->dictionary_page_offset ()) {
471- r_start = leading_cc->dictionary_page_offset ();
472- }
473- int64_t r_bytes = 0L ;
474- for (int col_id = 0 ; col_id < parquet_reader->RowGroup (i)
475- ->metadata ()->num_columns ();
476- col_id++) {
477- r_bytes += parquet_reader->
478- RowGroup (i)->metadata ()->ColumnChunk (col_id)->total_compressed_size ();
479- }
480- int64_t midpoint = r_start + r_bytes / 2 ;
481- if (midpoint >= source.start_offset ()
482- && midpoint < (source.start_offset () + source.length ())) {
483- random_read_selected_row_groups.push_back (i);
484- }
485- }
486- row_groups_ = random_read_selected_row_groups;
487- }
479+ row_groups_ = internal::Iota (reader->num_row_groups ());
488480 }
489481
490482 ARROW_ASSIGN_OR_RAISE (
491483 auto manifest,
492- GetSchemaManifest (*parquet_reader->metadata (), reader->properties ()));
493-
494- return SetMetadata (parquet_reader->metadata (), std::move (manifest));
484+ GetSchemaManifest (*reader->parquet_reader ()->metadata (), reader->properties ()));
485+ return SetMetadata (reader->parquet_reader ()->metadata (), std::move (manifest));
495486}
496487
497488Status ParquetFileFragment::SetMetadata (
0 commit comments