Skip to content

[FEA] Add a new Parquet reader for high-selectivity table scan #17896

@GregoryKimball

Description

@GregoryKimball

Is your feature request related to a problem? Please describe.
As of 25.04, Spark-RAPIDS performs several additional processing steps as part of the cuDF-based Parquet table scan implementation. First, Spark-RAPIDS parses the file footer and prunes out columns and row groups not needed by the table scan task. Then the plugin uses parquet-mr functions to apply the filter predicate to min/max stats in the footer, further pruning row groups. Next, Spark-RAPIDS completes IO for all the column chunks that remain, optionally decompresses these on the host, and then assembles a new parquet file in host pinned memory with a new footer. Then cuDF's chunked parquet reader materializes the data in chunks to remain with the task's memory foot print. Finally, Spark-RAPIDS applies the predicate filter row-wise.

This implementation has several inefficiencies, especially with high-selectivity table scans. First, the file footer is parsed twice in Spark-RAPIDS, written again for cuDF, and parsed by cuDF. Second, the IO for all column chunks is performed without checking bloom filters, dictionary pages, or the column index to see which data pages could actually pass the filter predicate. This means that excess IO, decompression and decoding will happen on data that ends up discarded during the row-wise predicate filtering.

Describe the solution you'd like
We should add new functions to an experimental namespace, exposing the steps needed to process a high-selectivity parquet table scan. These steps should minimize the total IO as much as possible, using metadata in the footer, column index, dictionary pages, bloom filters to avoid IO at the data page level. The steps should also be stateless, to give Spark-RAPIDS better control of spilling, chunking and retries.

1. Parse metadata from footer bytes.

Function parquet_metadata (no GPU):

  • Inputs
    • None
  • Outputs
    • Parquet file footer struct: FileMetaData

Function page_index_byte_range (no GPU):

  • Inputs
    • None
  • Outputs
    • byte range of parquet page index

Function setup_page_index (no GPU):

  • Inputs
    • Host span of page index bytes
  • Outputs
    • None but calling parquet_metadata after this will return FileMetaData with page index structs also populated

Q. This reader should only support files with column index data that is near the file footer and not support the older spec with column index data in the page headers.
A. Since the reader isn't fetching page index bytes, it is left to the user to either set it up or not.

2. Row Groups and pruning

Function all_row_groups (no GPU):

  • Inputs
    • Parquet reader options
  • Outputs
    • Vector of all row group indices in the file if none selected, otherwise select row group indices from options.

NOTE: The filter expression in the Parquet reader options must not change after any of the following filter_* row group or page pruning APIs are called

Q. Do we need to prune columns from the parquet metadata? Or would that break the filter column indexing? I guess we could prune and reorder the columns and update the filter predicate
A. Parquet metadata will be kept as is since it also contains information of payload columns

(Optional) Function num_rows_in_row_groups (no GPU):

  • Inputs
    • Input row group indices
  • Outputs
    • Total number of rows in row group indicated by input row group indices

Function filter_row_groups_with_stats (needs GPU):

  • Inputs
    • Input row group indices
    • Parquet reader options
  • Outputs
    • Updated row group indices that survive stats filtering

Function secondary_filters_byte_ranges (no GPU):

  • Inputs
    • Input row group indices
    • Parquet reader options
  • Outputs
    • A pair of vectors of byte ranges of per-column-chunk bloom filters and dictionary pages

Function filter_row_groups_with_dictionary_pages (needs GPU):

  • Inputs
    • Dictionary page data in a vector of device buffers.
    • Input row group indices
    • Parquet reader options
  • Outputs
    • Updated row group indices that survive dictionary page filtering

Function filter_row_groups_with_bloom_filters (needs GPU):

  • Inputs
    • Bloom filter data in a vector of device buffers.
    • Input row group indices
    • Parquet reader options
  • Outputs
    • Updated row group indices that survive bloom filter

3. Pruning filter column data pages

Function filter_data_pages_with_stats (needs GPU for filtering):

  • Inputs
    • Input row group indices (after all row group pruning)
    • Parquet reader options
  • Outputs
    • Row mask: BOOL8 column with bit set at index (i) if the corresponding row may survive in the final row mask.
    • Page mask: Vector containing boolean thrust::host_vectors, one per filter representing which data pages of that column survive page filtering.

4. Materializing Filter Columns

Function filter_column_chunks_byte_ranges (no GPU):

  • Inputs
    • Input row groups indices
    • Parquet reader options
  • Outputs
    • Byte ranges for filter column chunks

Function materialize_filter_columns (needs GPU):

  • Inputs
    • Page mask: A span containing boolean thrust::host_vectors, one per filter representing which data pages of that column survive page filtering. Empty span means no page filtering done.
    • Input row groups indices
    • Compressed column chunk data as vector of device buffers (libcudf chooses host or device decompression)
    • Row mask: BOOL8 column indicating rows that have survived from page filtering so far. All true column if page mask is empty
    • Parquet reader options
  • Outputs
    • Table containing Filter columns
    • Updated row mask

5. Materializing Payload Columns

Function payload_column_chunks_byte_ranges (no GPU):

  • Inputs
    • Input row groups indices
    • Parquet reader options
  • Outputs
    • Byte ranges for payload column chunks

Function materialize_payload_columns (needs GPU):

  • Inputs
    • Input row groups indices
    • Compressed column chunk data as vector of device buffers (libcudf chooses host or device decompression)
    • Row mask: BOOL8 column from materialize_filter_columns indicating rows that exist in the table from filter columns
    • Parquet reader options
  • Outputs
    • Table containing Payload columns
    • NOTE: This function internally takes care of page pruning for payload columns using the input row mask

Additional context
Also see issues about refactoring the footer management in the "bulk reader" table scan (default in Spark-RAPIDS as of 25.04). #17716

Metadata

Metadata

Assignees

Labels

SparkFunctionality that helps Spark RAPIDScuIOcuIO issuefeature requestNew feature or requestlibcudfAffects libcudf (C++/CUDA) code.

Type

No type

Projects

Status

Story Issue

Relationships

None yet

Development

No branches or pull requests

Issue actions