Skip to content

Commit e5e8164

Browse files
authored
Implement row group pruning with stats in experimental PQ reader (#18543)
Contributes to #17896. Part of #18011. This PR implements row group pruning with stats in the experimental Parquet reader optimized for hybrid scan queries Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) - Bradley Dice (https://github.com/bdice) Approvers: - David Wendt (https://github.com/davidwendt) - Vukasin Milovanovic (https://github.com/vuule) - Bradley Dice (https://github.com/bdice) URL: #18543
1 parent abff71b commit e5e8164

8 files changed

Lines changed: 351 additions & 30 deletions

File tree

cpp/include/cudf/io/parquet.hpp

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,18 @@ class parquet_reader_options {
118118
* @brief Default constructor.
119119
*
120120
* This has been added since Cython requires a default constructor to create objects on stack.
121+
* The `hybrid_scan_reader` also uses this to create `parquet_reader_options` without a source.
121122
*/
122123
explicit parquet_reader_options() = default;
123124

124125
/**
125-
* @brief Creates a parquet_reader_options_builder which will build parquet_reader_options.
126+
* @brief Creates a `parquet_reader_options_builder` to build `parquet_reader_options`.
127+
* By default, build with empty data source info.
126128
*
127129
* @param src Source information to read parquet file
128130
* @return Builder to build reader options
129131
*/
130-
static parquet_reader_options_builder builder(source_info src);
132+
static parquet_reader_options_builder builder(source_info src = source_info{});
131133

132134
/**
133135
* @brief Returns source info.
@@ -137,8 +139,7 @@ class parquet_reader_options {
137139
[[nodiscard]] source_info const& get_source() const { return _source; }
138140

139141
/**
140-
* @brief Returns true/false depending on whether strings should be converted to categories or
141-
* not.
142+
* @brief Returns boolean depending on whether strings should be converted to categories.
142143
*
143144
* @return `true` if strings should be converted to categories
144145
*/
@@ -148,21 +149,21 @@ class parquet_reader_options {
148149
}
149150

150151
/**
151-
* @brief Returns true/false depending whether to use pandas metadata or not while reading.
152+
* @brief Returns boolean depending on whether to use pandas metadata while reading.
152153
*
153154
* @return `true` if pandas metadata is used while reading
154155
*/
155156
[[nodiscard]] bool is_enabled_use_pandas_metadata() const { return _use_pandas_metadata; }
156157

157158
/**
158-
* @brief Returns true/false depending whether to use arrow schema while reading.
159+
* @brief Returns boolean depending on whether to use arrow schema while reading.
159160
*
160161
* @return `true` if arrow schema is used while reading
161162
*/
162163
[[nodiscard]] bool is_enabled_use_arrow_schema() const { return _use_arrow_schema; }
163164

164165
/**
165-
* @brief Returns true/false depending on whether to read matching projected and filter columns
166+
* @brief Returns boolean depending on whether to read matching projected and filter columns
166167
* from mismatched Parquet sources.
167168
*
168169
* @return `true` if mismatched projected and filter columns will be read from mismatched Parquet
@@ -308,23 +309,23 @@ class parquet_reader_options {
308309
/**
309310
* @brief Sets to enable/disable use of pandas metadata to read.
310311
*
311-
* @param val Boolean value whether to use pandas metadata
312+
* @param val Boolean indicating whether to use pandas metadata
312313
*/
313314
void enable_use_pandas_metadata(bool val) { _use_pandas_metadata = val; }
314315

315316
/**
316317
* @brief Sets to enable/disable use of arrow schema to read.
317318
*
318-
* @param val Boolean value whether to use arrow schema
319+
* @param val Boolean indicating whether to use arrow schema
319320
*/
320321
void enable_use_arrow_schema(bool val) { _use_arrow_schema = val; }
321322

322323
/**
323324
* @brief Sets to enable/disable reading of matching projected and filter columns from mismatched
324325
* Parquet sources.
325326
*
326-
* @param val Boolean value whether to read matching projected and filter columns from mismatched
327-
* Parquet sources.
327+
* @param val Boolean indicating whether to read matching projected and filter columns from
328+
* mismatched Parquet sources.
328329
*/
329330
void enable_allow_mismatched_pq_schemas(bool val) { _allow_mismatched_pq_schemas = val; }
330331

@@ -372,6 +373,7 @@ class parquet_reader_options_builder {
372373
* @brief Default constructor.
373374
*
374375
* This has been added since Cython requires a default constructor to create objects on stack.
376+
* The `hybrid_scan_reader` also uses this to construct `parquet_reader_options` without a source.
375377
*/
376378
parquet_reader_options_builder() = default;
377379

cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,30 @@
2929
#include <functional>
3030
#include <numeric>
3131
#include <optional>
32+
#include <unordered_set>
3233

3334
namespace cudf::io::parquet::experimental::detail {
3435

3536
using aggregate_reader_metadata_base = parquet::detail::aggregate_reader_metadata;
3637
using metadata_base = parquet::detail::metadata;
3738

39+
using io::detail::inline_column_buffer;
3840
using parquet::detail::CompactProtocolReader;
3941
using parquet::detail::equality_literals_collector;
4042
using parquet::detail::input_column_info;
4143
using parquet::detail::row_group_info;
4244

45+
namespace {
46+
47+
[[nodiscard]] auto all_row_group_indices(
48+
host_span<std::vector<cudf::size_type> const> row_group_indices)
49+
{
50+
return std::vector<std::vector<cudf::size_type>>(row_group_indices.begin(),
51+
row_group_indices.end());
52+
}
53+
54+
} // namespace
55+
4356
metadata::metadata(cudf::host_span<uint8_t const> footer_bytes)
4457
{
4558
CompactProtocolReader cp(footer_bytes.data(), footer_bytes.size());
@@ -137,4 +150,117 @@ void aggregate_reader_metadata::setup_page_index(cudf::host_span<uint8_t const>
137150
}
138151
}
139152

153+
std::tuple<std::vector<input_column_info>,
154+
std::vector<inline_column_buffer>,
155+
std::vector<cudf::size_type>>
156+
aggregate_reader_metadata::select_payload_columns(
157+
std::optional<std::vector<std::string>> const& payload_column_names,
158+
std::optional<std::vector<std::string>> const& filter_column_names,
159+
bool include_index,
160+
bool strings_to_categorical,
161+
type_id timestamp_type_id)
162+
{
163+
// If neither payload nor filter columns are specified, select all columns
164+
if (not payload_column_names.has_value() and not filter_column_names.has_value()) {
165+
// Call the base `select_columns()` method without specifying any columns
166+
return select_columns({}, {}, include_index, strings_to_categorical, timestamp_type_id);
167+
}
168+
169+
std::vector<std::string> valid_payload_columns;
170+
171+
// If payload columns are specified, only select payload columns that do not appear in the filter
172+
// expression
173+
if (payload_column_names.has_value()) {
174+
valid_payload_columns = *payload_column_names;
175+
// Remove filter columns from the provided payload column names
176+
if (filter_column_names.has_value() and not filter_column_names->empty()) {
177+
// Add filter column names to a hash set for faster lookup
178+
std::unordered_set<std::string> filter_columns_set(filter_column_names->begin(),
179+
filter_column_names->end());
180+
// Remove a payload column name if it is also present in the hash set
181+
valid_payload_columns.erase(std::remove_if(valid_payload_columns.begin(),
182+
valid_payload_columns.end(),
183+
[&filter_columns_set](auto const& col) {
184+
return filter_columns_set.count(col) > 0;
185+
}),
186+
valid_payload_columns.end());
187+
}
188+
// Call the base `select_columns()` method with valid payload columns
189+
return select_columns(
190+
valid_payload_columns, {}, include_index, strings_to_categorical, timestamp_type_id);
191+
}
192+
193+
// Else if only filter columns are specified, select all columns that do not appear in the
194+
// filter expression
195+
196+
// Add filter column names to a hash set for faster lookup
197+
std::unordered_set<std::string> filter_columns_set(filter_column_names->begin(),
198+
filter_column_names->end());
199+
200+
std::function<void(std::string, int)> add_column_path = [&](std::string path_till_now,
201+
int schema_idx) {
202+
auto const& schema_elem = get_schema(schema_idx);
203+
std::string const curr_path = path_till_now + schema_elem.name;
204+
// If the current path is not a filter column, then add it and its children to the list of valid
205+
// payload columns
206+
if (filter_columns_set.count(curr_path) == 0) {
207+
valid_payload_columns.push_back(curr_path);
208+
// Add all children as well
209+
for (auto const& child_idx : schema_elem.children_idx) {
210+
add_column_path(curr_path + ".", child_idx);
211+
}
212+
}
213+
};
214+
215+
// Add all but filter columns to valid payload columns
216+
if (not filter_column_names->empty()) {
217+
for (auto const& child_idx : get_schema(0).children_idx) {
218+
add_column_path("", child_idx);
219+
}
220+
}
221+
222+
// Call the base `select_columns()` method with all but filter columns
223+
return select_columns(
224+
valid_payload_columns, {}, include_index, strings_to_categorical, timestamp_type_id);
225+
}
226+
227+
std::vector<std::vector<cudf::size_type>> aggregate_reader_metadata::filter_row_groups_with_stats(
228+
host_span<std::vector<cudf::size_type> const> row_group_indices,
229+
host_span<data_type const> output_dtypes,
230+
host_span<int const> output_column_schemas,
231+
std::optional<std::reference_wrapper<ast::expression const>> filter,
232+
rmm::cuda_stream_view stream) const
233+
{
234+
// Return all row groups if no filter expression
235+
if (not filter.has_value()) { return all_row_group_indices(row_group_indices); }
236+
237+
// Compute total number of input row groups
238+
cudf::size_type total_row_groups = [&]() {
239+
if (not row_group_indices.empty()) {
240+
size_t const total_row_groups =
241+
std::accumulate(row_group_indices.begin(),
242+
row_group_indices.end(),
243+
size_t{0},
244+
[](auto sum, auto const& pfm) { return sum + pfm.size(); });
245+
246+
// Check if we have less than 2B total row groups.
247+
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
248+
"Total number of row groups exceed the cudf::size_type's limit");
249+
return static_cast<cudf::size_type>(total_row_groups);
250+
} else {
251+
return num_row_groups;
252+
}
253+
}();
254+
255+
// Filter stats table with StatsAST expression and collect filtered row group indices
256+
auto const stats_filtered_row_group_indices = apply_stats_filters(row_group_indices,
257+
total_row_groups,
258+
output_dtypes,
259+
output_column_schemas,
260+
filter.value(),
261+
stream);
262+
263+
return stats_filtered_row_group_indices.value_or(all_row_group_indices(row_group_indices));
264+
}
265+
140266
} // namespace cudf::io::parquet::experimental::detail

cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ namespace cudf::io::parquet::experimental::detail {
3636
using aggregate_reader_metadata_base = parquet::detail::aggregate_reader_metadata;
3737
using metadata_base = parquet::detail::metadata;
3838

39+
using io::detail::inline_column_buffer;
3940
using parquet::detail::equality_literals_collector;
4041
using parquet::detail::input_column_info;
4142
using parquet::detail::row_group_info;
@@ -124,6 +125,44 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base {
124125
* @param page_index_bytes Host span of Parquet page index buffer bytes
125126
*/
126127
void setup_page_index(cudf::host_span<uint8_t const> page_index_bytes);
128+
129+
/**
130+
* @brief Filters and reduces down to the selection of payload columns
131+
*
132+
* @param payload_column_names List of paths of select payload column names, if any
133+
* @param filter_columns_names List of paths of column names present only in filter, if any
134+
* @param include_index Whether to always include the PANDAS index column(s)
135+
* @param strings_to_categorical Type conversion parameter
136+
* @param timestamp_type_id Type conversion parameter
137+
*
138+
* @return input column information, output column buffers, list of output column schema
139+
* indices
140+
*/
141+
[[nodiscard]] std::
142+
tuple<std::vector<input_column_info>, std::vector<inline_column_buffer>, std::vector<size_type>>
143+
select_payload_columns(std::optional<std::vector<std::string>> const& payload_column_names,
144+
std::optional<std::vector<std::string>> const& filter_column_names,
145+
bool include_index,
146+
bool strings_to_categorical,
147+
type_id timestamp_type_id);
148+
149+
/**
150+
* @brief Filter the row groups with statistics based on predicate filter
151+
*
152+
* @param row_group_indices Input row groups indices
153+
* @param output_dtypes Datatypes of output columns
154+
* @param output_column_schemas schema indices of output columns
155+
* @param filter Optional AST expression to filter row groups based on Column chunk statistics
156+
* @param stream CUDA stream used for device memory operations and kernel launches
157+
*
158+
* @return Filtered row group indices, if any are filtered
159+
*/
160+
[[nodiscard]] std::vector<std::vector<size_type>> filter_row_groups_with_stats(
161+
host_span<std::vector<size_type> const> row_group_indices,
162+
host_span<data_type const> output_dtypes,
163+
host_span<int const> output_column_schemas,
164+
std::optional<std::reference_wrapper<ast::expression const>> filter,
165+
rmm::cuda_stream_view stream) const;
127166
};
128167

129168
} // namespace cudf::io::parquet::experimental::detail

0 commit comments

Comments
 (0)