-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Is your feature request related to a problem? Please describe.
When performing multi-threaded reads over a set of parquet data sources, this is a typical pattern:
- read metadata to determine the size of the file (e.g. row splits, byte splits)
- in parallel: for all splits, call
read_parquet.
One might ask why not use the chunked parquet reader for this setup, there are two reasons:
- The chunked reader can only deliver outputs on a single stream. Therefore we can't get multi-stream pipelining (even when using multiple threads) from a single chunked reader.
- You can't drive the chunked reader from multiple host threads because it has too much internal state.
Parquet reading proceeds in two phases:
- read parquet file metadata
- read the actual data
When we split the input sources into chunks, we can't save reading the actual data each time, but I would like to avoid reading the metadata multiple times.
Today, to minimise this, I do something like the following:
- read metadata from the first file in the datasource to guesstimate how many total rows (bytes)
- chunk up the files so that a read will produce (hopefully) at least the target number of rows (bytes) per output chunk.
- launch the tasks on those chunks of files
I therefore end up reading metadata for each chunk num_tasks_per_chunk times. And my code is more complicated.
Describe the solution you'd like
Ideally what I would do is:
- read metadata from files
- launch tasks using read metadata
Something like:
auto [datasources, metadata] = prepare_parquet_metadata(source_info, options);
parallel_for (...) {
auto chunk_options = parquet_reader_options{options};
// slice out the piece we want.
chunk_options.set_num_rows(...);
chunk_options.set_skip_rows(...);
auto chunk = read_parquet(chunk_options, metadata, datasources);
...
}
This way, I only the pay the cost of reading metadata and preparing the datasources once.
Additional context
I had a look at doing this but it's significantly complicated (to my mind) by the reader_impl storing a vector of unique_ptrs to the datasources, and a unique_ptr to the prepared aggregate_reader_metadata. So it's not "just" as simple as threading everything through.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status