Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,12 @@ config_namespace! {
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false

/// (reading) Force the use of RowSelections for filter results, when
/// pushdown_filters is enabled. If false, the reader will automatically
/// choose between a RowSelection and a Bitmap based on the number and
/// pattern of selected rows.
pub force_filter_selections: bool, default = false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rluvaton suggests in #18820 (comment):

What do you think of making it an enum instead to allow for future additions without breaking changes?

(that enum should also be non exhaustive to avoid adding a variant a breaking change)

I also see that the with_row_selection_policy already accept enum.

making it an enum also allow to force mask or configure the threshold in the auto policy. this is also useful for testing to force specific path when creating a reproduction test for a bug

I personally think it is better as a flag (escape valve) as I don't forsee any reason to try and tune the parameters but would be happy to hear other opinions


/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_view_types: bool, default = true
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,7 @@ mod test {
// To pass the test the environment variable RUST_BACKTRACE should be set to 1 to enforce backtrace
#[cfg(feature = "backtrace")]
#[test]
#[expect(clippy::unnecessary_literal_unwrap)]
fn test_enabled_backtrace() {
match std::env::var("RUST_BACKTRACE") {
Ok(val) if val == "1" => {}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl ParquetOptions {
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
force_filter_selections: _, // not used for writer props
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
Expand Down Expand Up @@ -461,6 +462,7 @@ mod tests {
metadata_size_hint: defaults.metadata_size_hint,
pushdown_filters: defaults.pushdown_filters,
reorder_filters: defaults.reorder_filters,
force_filter_selections: defaults.force_filter_selections,
allow_single_file_parallelism: defaults.allow_single_file_parallelism,
maximum_parallel_row_group_writers: defaults
.maximum_parallel_row_group_writers,
Expand Down Expand Up @@ -572,6 +574,7 @@ mod tests {
metadata_size_hint: global_options_defaults.metadata_size_hint,
pushdown_filters: global_options_defaults.pushdown_filters,
reorder_filters: global_options_defaults.reorder_filters,
force_filter_selections: global_options_defaults.force_filter_selections,
allow_single_file_parallelism: global_options_defaults
.allow_single_file_parallelism,
maximum_parallel_row_group_writers: global_options_defaults
Expand Down
25 changes: 22 additions & 3 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,28 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
// The cache is on by default, and used when filter pushdown is enabled
PredicateCacheTest {
expected_inner_records: 8,
// reads more than necessary from the cache as then another bitmap is applied
// See https://github.com/apache/datafusion/pull/18820 for setting and workaround
expected_records: 7,
expected_records: 7, // reads more than necessary from the cache as then another bitmap is applied
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test shows the change in behavior after the parquet 57.1.0 upgrade. The previous result with 57.0.0 was 4

The old result can now be obtained by setting force_filter_selections to true

}
.run(&ctx)
.await
}

#[tokio::test]
async fn predicate_cache_pushdown_default_selections_only(
) -> datafusion_common::Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
// forcing filter selections minimizes the number of rows read from the cache
config
.options_mut()
.execution
.parquet
.force_filter_selections = true;
let ctx = SessionContext::new_with_config(config);
// The cache is on by default, and used when filter pushdown is enabled
PredicateCacheTest {
expected_inner_records: 8,
expected_records: 4,
}
.run(&ctx)
.await
Expand Down
17 changes: 16 additions & 1 deletion datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
Expand Down Expand Up @@ -87,6 +89,8 @@ pub(super) struct ParquetOpener {
pub pushdown_filters: bool,
/// Should the filters be reordered to optimize the scan?
pub reorder_filters: bool,
/// Should we force the reader to use RowSelections for filtering
pub force_filter_selections: bool,
/// Should the page index be read from parquet files, if present, to skip
/// data pages
pub enable_page_index: bool,
Expand Down Expand Up @@ -147,6 +151,7 @@ impl FileOpener for ParquetOpener {
let partition_fields = self.partition_fields.clone();
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let force_filter_selections = self.force_filter_selections;
let coerce_int96 = self.coerce_int96;
let enable_bloom_filter = self.enable_bloom_filter;
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
Expand Down Expand Up @@ -347,6 +352,10 @@ impl FileOpener for ParquetOpener {
}
};
};
if force_filter_selections {
builder =
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
}

// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
Expand Down Expand Up @@ -887,6 +896,7 @@ mod test {
partition_fields: vec![],
pushdown_filters: false, // note that this is false!
reorder_filters: false,
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
Expand Down Expand Up @@ -960,6 +970,7 @@ mod test {
))],
pushdown_filters: false, // note that this is false!
reorder_filters: false,
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
Expand Down Expand Up @@ -1049,6 +1060,7 @@ mod test {
))],
pushdown_filters: false, // note that this is false!
reorder_filters: false,
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
Expand Down Expand Up @@ -1141,6 +1153,7 @@ mod test {
))],
pushdown_filters: true, // note that this is true!
reorder_filters: true,
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
Expand Down Expand Up @@ -1233,6 +1246,7 @@ mod test {
))],
pushdown_filters: false, // note that this is false!
reorder_filters: false,
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
Expand Down Expand Up @@ -1383,6 +1397,7 @@ mod test {
partition_fields: vec![],
pushdown_filters: true,
reorder_filters: false,
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
Expand Down
6 changes: 6 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ impl ParquetSource {
self.table_parquet_options.global.reorder_filters
}

/// Return the value of [`datafusion_common::config::ParquetOptions::force_filter_selections`]
fn force_filter_selections(&self) -> bool {
self.table_parquet_options.global.force_filter_selections
}

/// If enabled, the reader will read the page index
/// This is used to optimize filter pushdown
/// via `RowSelector` and `RowFilter` by
Expand Down Expand Up @@ -595,6 +600,7 @@ impl FileSource for ParquetSource {
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ message ParquetOptions {
bool skip_metadata = 3; // default = true
bool pushdown_filters = 5; // default = false
bool reorder_filters = 6; // default = false
bool force_filter_selections = 34; // default = false
uint64 data_pagesize_limit = 7; // default = 1024 * 1024
uint64 write_batch_size = 8; // default = 1024
string writer_version = 9; // default = "1.0"
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
.unwrap_or(None),
pushdown_filters: value.pushdown_filters,
reorder_filters: value.reorder_filters,
force_filter_selections: value.force_filter_selections,
data_pagesize_limit: value.data_pagesize_limit as usize,
write_batch_size: value.write_batch_size as usize,
writer_version: value.writer_version.clone(),
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5557,6 +5557,9 @@ impl serde::Serialize for ParquetOptions {
if self.reorder_filters {
len += 1;
}
if self.force_filter_selections {
len += 1;
}
if self.data_pagesize_limit != 0 {
len += 1;
}
Expand Down Expand Up @@ -5651,6 +5654,9 @@ impl serde::Serialize for ParquetOptions {
if self.reorder_filters {
struct_ser.serialize_field("reorderFilters", &self.reorder_filters)?;
}
if self.force_filter_selections {
struct_ser.serialize_field("forceFilterSelections", &self.force_filter_selections)?;
}
if self.data_pagesize_limit != 0 {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
Expand Down Expand Up @@ -5816,6 +5822,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"pushdownFilters",
"reorder_filters",
"reorderFilters",
"force_filter_selections",
"forceFilterSelections",
"data_pagesize_limit",
"dataPagesizeLimit",
"write_batch_size",
Expand Down Expand Up @@ -5875,6 +5883,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
SkipMetadata,
PushdownFilters,
ReorderFilters,
ForceFilterSelections,
DataPagesizeLimit,
WriteBatchSize,
WriterVersion,
Expand Down Expand Up @@ -5927,6 +5936,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata),
"pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters),
"reorderFilters" | "reorder_filters" => Ok(GeneratedField::ReorderFilters),
"forceFilterSelections" | "force_filter_selections" => Ok(GeneratedField::ForceFilterSelections),
"dataPagesizeLimit" | "data_pagesize_limit" => Ok(GeneratedField::DataPagesizeLimit),
"writeBatchSize" | "write_batch_size" => Ok(GeneratedField::WriteBatchSize),
"writerVersion" | "writer_version" => Ok(GeneratedField::WriterVersion),
Expand Down Expand Up @@ -5977,6 +5987,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut skip_metadata__ = None;
let mut pushdown_filters__ = None;
let mut reorder_filters__ = None;
let mut force_filter_selections__ = None;
let mut data_pagesize_limit__ = None;
let mut write_batch_size__ = None;
let mut writer_version__ = None;
Expand Down Expand Up @@ -6035,6 +6046,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
reorder_filters__ = Some(map_.next_value()?);
}
GeneratedField::ForceFilterSelections => {
if force_filter_selections__.is_some() {
return Err(serde::de::Error::duplicate_field("forceFilterSelections"));
}
force_filter_selections__ = Some(map_.next_value()?);
}
GeneratedField::DataPagesizeLimit => {
if data_pagesize_limit__.is_some() {
return Err(serde::de::Error::duplicate_field("dataPagesizeLimit"));
Expand Down Expand Up @@ -6213,6 +6230,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
skip_metadata: skip_metadata__.unwrap_or_default(),
pushdown_filters: pushdown_filters__.unwrap_or_default(),
reorder_filters: reorder_filters__.unwrap_or_default(),
force_filter_selections: force_filter_selections__.unwrap_or_default(),
data_pagesize_limit: data_pagesize_limit__.unwrap_or_default(),
write_batch_size: write_batch_size__.unwrap_or_default(),
writer_version: writer_version__.unwrap_or_default(),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,9 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "6")]
pub reorder_filters: bool,
/// default = false
#[prost(bool, tag = "34")]
pub force_filter_selections: bool,
/// default = 1024 * 1024
#[prost(uint64, tag = "7")]
pub data_pagesize_limit: u64,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
pushdown_filters: value.pushdown_filters,
reorder_filters: value.reorder_filters,
force_filter_selections: value.force_filter_selections,
data_pagesize_limit: value.data_pagesize_limit as u64,
write_batch_size: value.write_batch_size as u64,
writer_version: value.writer_version.clone(),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,9 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "6")]
pub reorder_filters: bool,
/// default = false
#[prost(bool, tag = "34")]
pub force_filter_selections: bool,
/// default = 1024 * 1024
#[prost(uint64, tag = "7")]
pub data_pagesize_limit: u64,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ mod parquet {
}),
pushdown_filters: global_options.global.pushdown_filters,
reorder_filters: global_options.global.reorder_filters,
force_filter_selections: global_options.global.force_filter_selections,
data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
write_batch_size: global_options.global.write_batch_size as u64,
writer_version: global_options.global.writer_version.clone(),
Expand Down Expand Up @@ -471,6 +472,7 @@ mod parquet {
}),
pushdown_filters: proto.pushdown_filters,
reorder_filters: proto.reorder_filters,
force_filter_selections: proto.force_filter_selections,
data_pagesize_limit: proto.data_pagesize_limit as usize,
write_batch_size: proto.write_batch_size as usize,
writer_version: proto.writer_version.clone(),
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2033,6 +2033,7 @@ query error Failed to coerce arguments to satisfy a call to 'array_slice' functi
select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeListView(Int64)'), 2, 6),
array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeListView(Utf8)'), 3, 7);


# array_slice scalar function #6 (with positive indexes; nested array)
query ?
select array_slice(make_array(make_array(1, 2, 3, 4, 5), make_array(6, 7, 8, 9, 10)), 1, 1);
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ datafusion.execution.parquet.dictionary_enabled true
datafusion.execution.parquet.dictionary_page_size_limit 1048576
datafusion.execution.parquet.enable_page_index true
datafusion.execution.parquet.encoding NULL
datafusion.execution.parquet.force_filter_selections false
datafusion.execution.parquet.max_predicate_cache_size NULL
datafusion.execution.parquet.max_row_group_size 1048576
datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2
Expand Down Expand Up @@ -371,6 +372,7 @@ datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionar
datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes
datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded.
datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows.
datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching.
datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read.
datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
Expand Down
Loading