diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index efaee23366a1c..c8bebf74d4b08 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -46,7 +46,7 @@ use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties} use datafusion::parquet::schema::types::ColumnPath; use datafusion::physical_expr::utils::{Guarantee, LiteralGuarantee}; use datafusion::physical_expr::PhysicalExpr; -use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; @@ -300,8 +300,11 @@ impl IndexTableProvider { // In this example, we use the PruningPredicate's literal guarantees to // analyze the predicate. In a real system, using // `PruningPredicate::prune` would likely be easier to do. - let pruning_predicate = - PruningPredicate::try_new(Arc::clone(predicate), self.schema())?; + let pruning_predicate = PruningPredicate::try_new( + Arc::clone(predicate), + self.schema(), + vec![ColumnOrdering::Unknown; self.schema().fields().len()], + )?; // The PruningPredicate's guarantees must all be satisfied in order for // the predicate to possibly evaluate to true. diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index e5ae3cc86bfe5..53193cc4f5714 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -40,7 +40,7 @@ use datafusion::parquet::arrow::{ arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter, }; use datafusion::physical_expr::PhysicalExpr; -use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use std::any::Any; @@ -366,8 +366,11 @@ impl ParquetMetadataIndex { ) -> Result> { // Use the PruningPredicate API to determine which files can not // possibly have any relevant data. - let pruning_predicate = - PruningPredicate::try_new(predicate, self.schema().clone())?; + let pruning_predicate = PruningPredicate::try_new( + predicate, + self.schema().clone(), + vec![ColumnOrdering::Unknown; self.schema().fields().len()], + )?; // Now evaluate the pruning predicate into a boolean mask, one element per // file in the index. If the mask is true, the file may have rows that diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/pruning.rs index b2d2fa13b7ed2..f421ae755fe02 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/pruning.rs @@ -24,7 +24,7 @@ use datafusion::common::pruning::PruningStatistics; use datafusion::common::{DFSchema, ScalarValue}; use datafusion::execution::context::ExecutionProps; use datafusion::physical_expr::create_physical_expr; -use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate}; use datafusion::prelude::*; /// This example shows how to use DataFusion's `PruningPredicate` to prove @@ -190,7 +190,12 @@ fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap(); let props = ExecutionProps::new(); let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); - PruningPredicate::try_new(physical_expr, schema.clone()).unwrap() + PruningPredicate::try_new( + physical_expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap() } fn i32_array<'a>(values: impl Iterator>) -> ArrayRef { diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 9da879a32f6b5..1c498c61d6cba 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -661,11 +661,15 @@ uint_tests!(64); // page-2 0 0.0 4.0 // page-3 0 5.0 9.0 async fn prune_f64_lt() { + // TODO: because NaN could be present but not accounted for in the statistics, these + // expressions should not be pruned at present. When IEEE 754 total order is added to + // the Parquet spec this can be revisited. + // See https://github.com/apache/parquet-format/pull/221 test_prune( Scenario::Float64, "SELECT * FROM t where f < 1", Some(0), - Some(5), + Some(0), 11, 5, ) @@ -674,7 +678,7 @@ async fn prune_f64_lt() { Scenario::Float64, "SELECT * FROM t where -f > -1", Some(0), - Some(5), + Some(0), 11, 5, ) @@ -683,13 +687,17 @@ async fn prune_f64_lt() { #[tokio::test] async fn prune_f64_scalar_fun_and_gt() { + // TODO: because NaN could be present but not accounted for in the statistics, this + // expression should not be pruned at present. When IEEE 754 total order is added to + // the Parquet spec this can be revisited. + // See https://github.com/apache/parquet-format/pull/221 // result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1" // only use "f >= 0" to prune test_prune( Scenario::Float64, "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1", Some(0), - Some(10), + Some(0), 1, 5, ) diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 5a85f47c015a9..54e7a80b2a7f6 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -635,12 +635,16 @@ async fn prune_uint32_eq_large_in_list() { #[tokio::test] async fn prune_f64_lt() { + // TODO: because NaN could be present but not accounted for in the statistics, these + // expressions should not be pruned at present. When IEEE 754 total order is added to + // the Parquet spec this can be revisited. + // See https://github.com/apache/parquet-format/pull/221 RowGroupPruningTest::new() .with_scenario(Scenario::Float64) .with_query("SELECT * FROM t where f < 1") .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(3)) - .with_pruned_by_stats(Some(1)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(11) @@ -650,8 +654,8 @@ async fn prune_f64_lt() { .with_scenario(Scenario::Float64) .with_query("SELECT * FROM t where -f > -1") .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(3)) - .with_pruned_by_stats(Some(1)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(11) @@ -661,14 +665,19 @@ async fn prune_f64_lt() { #[tokio::test] async fn prune_f64_scalar_fun_and_gt() { + // TODO: because NaN could be present but not accounted for in the statistics, this + // expression should not be pruned at present. When IEEE 754 total order is added to + // the Parquet spec this can be revisited. + // See https://github.com/apache/parquet-format/pull/221 + // result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1" // only use "f >= 0" to prune RowGroupPruningTest::new() .with_scenario(Scenario::Float64) .with_query("SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1") .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(2)) - .with_pruned_by_stats(Some(2)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 9e14425074f78..7a74225a8a995 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -33,7 +33,7 @@ use arrow::datatypes::{SchemaRef, TimeUnit}; use arrow::error::ArrowError; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_physical_optimizer::pruning::{ColumnOrdering, PruningPredicate}; use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use futures::{StreamExt, TryStreamExt}; @@ -41,6 +41,7 @@ use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::basic::{ColumnOrder, SortOrder}; use parquet::file::metadata::ParquetMetaDataReader; /// Implements [`FileOpener`] for a parquet file @@ -179,10 +180,42 @@ impl FileOpener for ParquetOpener { } } + // Map column ordering from physical schema to logical + let ordering: Vec = if let Some(column_orders) = + reader_metadata.metadata().file_metadata().column_orders() + { + logical_file_schema + .fields() + .iter() + .map(|field| { + match physical_file_schema.index_of(field.name()) { + Ok(idx) => match column_orders[idx] { + ColumnOrder::TYPE_DEFINED_ORDER(sort_order) => { + match sort_order { + SortOrder::SIGNED => ColumnOrdering::Signed, + SortOrder::UNSIGNED => ColumnOrdering::Unsigned, + _ => ColumnOrdering::Undefined, + } + } + /* TODO(ets): for future + ColumnOrder::IEEE_754_TOTAL_ORDER => { + ColumnOrdering::TotalOrder + }*/ + ColumnOrder::UNDEFINED => ColumnOrdering::Unknown, + }, + _ => ColumnOrdering::Unknown, + } + }) + .collect::>() + } else { + vec![ColumnOrdering::Unknown; logical_file_schema.fields().len()] + }; + // Build predicates for this specific file let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( predicate.as_ref(), &logical_file_schema, + ordering, &predicate_creation_errors, ); @@ -363,9 +396,11 @@ fn create_initial_plan( pub(crate) fn build_pruning_predicate( predicate: Arc, file_schema: &SchemaRef, + column_orderings: Vec, predicate_creation_errors: &Count, ) -> Option> { - match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema), column_orderings) + { Ok(pruning_predicate) => { if !pruning_predicate.always_true() { return Some(Arc::new(pruning_predicate)); @@ -385,16 +420,19 @@ pub(crate) fn build_pruning_predicate( pub(crate) fn build_page_pruning_predicate( predicate: &Arc, file_schema: &SchemaRef, + column_orderings: Vec, ) -> Arc { Arc::new(PagePruningAccessPlanFilter::new( predicate, Arc::clone(file_schema), + column_orderings, )) } pub(crate) fn build_pruning_predicates( predicate: Option<&Arc>, file_schema: &SchemaRef, + column_orderings: Vec, predicate_creation_errors: &Count, ) -> ( Option>, @@ -406,9 +444,11 @@ pub(crate) fn build_pruning_predicates( let pruning_predicate = build_pruning_predicate( Arc::clone(predicate), file_schema, + column_orderings.clone(), predicate_creation_errors, ); - let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema); + let page_pruning_predicate = + build_page_pruning_predicate(predicate, file_schema, column_orderings); (pruning_predicate, Some(page_pruning_predicate)) } diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index 84f5c4c2d6d5f..8515445174a4a 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -31,7 +31,7 @@ use arrow::{ use datafusion_common::pruning::PruningStatistics; use datafusion_common::ScalarValue; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; -use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_physical_optimizer::pruning::{ColumnOrdering, PruningPredicate}; use log::{debug, trace}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; @@ -119,7 +119,11 @@ pub struct PagePruningAccessPlanFilter { impl PagePruningAccessPlanFilter { /// Create a new [`PagePruningAccessPlanFilter`] from a physical /// expression. - pub fn new(expr: &Arc, schema: SchemaRef) -> Self { + pub fn new( + expr: &Arc, + schema: SchemaRef, + column_orderings: Vec, + ) -> Self { // extract any single column predicates let predicates = split_conjunction(expr) .into_iter() @@ -127,6 +131,7 @@ impl PagePruningAccessPlanFilter { let pp = match PruningPredicate::try_new( Arc::clone(predicate), Arc::clone(&schema), + column_orderings.clone(), ) { Ok(pp) => pp, Err(e) => { diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index d44fa16843201..717164ea8786b 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -441,6 +441,7 @@ mod tests { use datafusion_common::Result; use datafusion_expr::{cast, col, lit, Expr}; use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_optimizer::pruning::ColumnOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::arrow::ArrowSchemaConverter; @@ -502,7 +503,12 @@ mod tests { Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); let expr = col("c1").gt(lit(15)); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let field = PrimitiveTypeField::new("c1", PhysicalType::INT32); let schema_descr = get_test_schema_descr(vec![field]); @@ -547,7 +553,12 @@ mod tests { Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); let expr = col("c1").gt(lit(15)); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let field = PrimitiveTypeField::new("c1", PhysicalType::INT32); let schema_descr = get_test_schema_descr(vec![field]); @@ -590,7 +601,12 @@ mod tests { ])); let expr = col("c1").gt(lit(15)).and(col("c2").rem(lit(2)).eq(lit(0))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let schema_descr = get_test_schema_descr(vec![ PrimitiveTypeField::new("c1", PhysicalType::INT32), @@ -629,7 +645,12 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out @@ -655,8 +676,12 @@ mod tests { ])); let expr = col("c1").gt(lit(0)); let expr = logical2physical(&expr, &table_schema); - let pruning_predicate = - PruningPredicate::try_new(expr, table_schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + table_schema.clone(), + vec![ColumnOrdering::Unknown; table_schema.fields().len()], + ) + .unwrap(); // Model a file schema's column order c2 then c1, which is the opposite // of the table schema @@ -733,7 +758,12 @@ mod tests { let schema_descr = ArrowSchemaConverter::new().convert(&schema).unwrap(); let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let groups = gen_row_group_meta_data_for_pruning_predicate(); let metrics = parquet_file_metrics(); @@ -764,7 +794,12 @@ mod tests { .gt(lit(15)) .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let groups = gen_row_group_meta_data_for_pruning_predicate(); let metrics = parquet_file_metrics(); @@ -802,7 +837,12 @@ mod tests { let schema_descr = get_test_schema_descr(vec![field]); let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let rgm1 = get_row_group_meta_data( &schema_descr, // [1.00, 6.00] @@ -873,7 +913,12 @@ mod tests { Decimal128(11, 2), )); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let rgm1 = get_row_group_meta_data( &schema_descr, // [100, 600] @@ -965,7 +1010,12 @@ mod tests { let schema_descr = get_test_schema_descr(vec![field]); let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let rgm1 = get_row_group_meta_data( &schema_descr, // [6.00, 8.00] @@ -1026,7 +1076,12 @@ mod tests { let left = cast(col("c1"), Decimal128(28, 3)); let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -1104,7 +1159,12 @@ mod tests { let left = cast(col("c1"), Decimal128(28, 3)); let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let pruning_predicate = PruningPredicate::try_new( + expr, + schema.clone(), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -1271,8 +1331,9 @@ mod tests { false, ); let expr = logical2physical(&expr, &schema); + let column_orders = vec![ColumnOrdering::Unknown; schema.fields().len()]; let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + PruningPredicate::try_new(expr, Arc::new(schema), column_orders).unwrap(); let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( file_name, @@ -1488,8 +1549,9 @@ mod tests { let data = bytes::Bytes::from(std::fs::read(path).unwrap()); let expr = logical2physical(&expr, &schema); + let column_orders = vec![ColumnOrdering::Unknown; schema.fields().len()]; let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + PruningPredicate::try_new(expr, Arc::new(schema), column_orders).unwrap(); let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( &file_name, diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 69347f440c365..c0e305c09df3b 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -40,6 +40,7 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_optimizer::pruning::ColumnOrdering; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PredicateSupport; use datafusion_physical_plan::filter_pushdown::PredicateSupports; @@ -572,6 +573,7 @@ impl FileSource for ParquetSource { if let (Some(pruning_predicate), _) = build_pruning_predicates( Some(predicate), file_schema, + vec![ColumnOrdering::Unknown; file_schema.fields().len()], &predicate_creation_errors, ) { let mut guarantees = pruning_predicate diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index 1beaa0eb00186..bc6c714082c5f 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -405,6 +405,20 @@ impl UnhandledPredicateHook for ConstantUnhandledPredicateHook { } } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ColumnOrdering { + /// No column ordering was specified + Unknown, + /// Column ordering uses signed comparison + Signed, + /// Column ordering uses unsigned comparison + Unsigned, + /// Column ordering is undefined for the type + Undefined, + /// Column ordering uses IEEE 754 total ordering (depends on PARQUET-2249) + TotalOrder, +} + impl PruningPredicate { /// Try to create a new instance of [`PruningPredicate`] /// @@ -428,7 +442,11 @@ impl PruningPredicate { /// /// See the struct level documentation on [`PruningPredicate`] for more /// details. - pub fn try_new(expr: Arc, schema: SchemaRef) -> Result { + pub fn try_new( + expr: Arc, + schema: SchemaRef, + column_orderings: Vec, + ) -> Result { // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate` // which does not handle dynamic exprs in general let expr = snapshot_physical_expr(expr)?; @@ -440,6 +458,7 @@ impl PruningPredicate { &expr, schema.as_ref(), &mut required_columns, + &column_orderings, &unhandled_hook, ); @@ -1317,12 +1336,14 @@ impl PredicateRewriter { &self, expr: &Arc, schema: &Schema, + column_orderings: &Vec, ) -> Arc { let mut required_columns = RequiredColumns::new(); build_predicate_expression( expr, schema, &mut required_columns, + column_orderings, &self.unhandled_hook, ) } @@ -1341,6 +1362,7 @@ fn build_predicate_expression( expr: &Arc, schema: &Schema, required_columns: &mut RequiredColumns, + column_orderings: &Vec, unhandled_hook: &Arc, ) -> Arc { if is_always_false(expr) { @@ -1406,6 +1428,7 @@ fn build_predicate_expression( &change_expr, schema, required_columns, + column_orderings, unhandled_hook, ); } else { @@ -1441,10 +1464,20 @@ fn build_predicate_expression( }; if op == Operator::And || op == Operator::Or { - let left_expr = - build_predicate_expression(&left, schema, required_columns, unhandled_hook); - let right_expr = - build_predicate_expression(&right, schema, required_columns, unhandled_hook); + let left_expr = build_predicate_expression( + &left, + schema, + required_columns, + column_orderings, + unhandled_hook, + ); + let right_expr = build_predicate_expression( + &right, + schema, + required_columns, + column_orderings, + unhandled_hook, + ); // simplify boolean expression if applicable let expr = match (&left_expr, op, &right_expr) { (left, Operator::And, right) @@ -1467,6 +1500,54 @@ fn build_predicate_expression( return expr; } + // Special handlng for floats. Because current Parquet statistics do not allow NaN, and + // Datafusion uses total order (i.e. -NaN < -x < -0.0 < 0.0 < x < NaN), pruning predicates + // for floating point columns may be overly aggressive. For example, say the max for a column + // that also contains NaN is 1.0, and a predicate like `x > 2.0` is provided. This will + // be turned into a stats predicate like `max(x) > 2.0`, which will evaluate false in this + // case, so the page/column chunk will be (improperly) pruned. PARQUET-2249 attempts to + // solve this by adding a new ColumnOrder (IEEE_754_TOTAL_ORDER) for floating point columns. + // This will allow for NaN to appear in the statistics so pruning will be correct. + + // TODO(ets): this is rather parquet specific...still wonder if this should be done + // at the datasource level + + // Sanity check that column orderings match schema + if column_orderings.len() == schema.fields().len() { + let colidx = column_index_for_expr(&left, schema) + .or_else(|| column_index_for_expr(&right, schema)); + if let Some(colidx) = colidx { + let col_order = column_orderings[colidx]; + + // If the ColumnOrder is undefined (as opposed to unknown), we shouldn't be pruning + // since min/max are invalid. + if col_order == ColumnOrdering::Undefined { + dbg!("Cannot prune because column order is undefined"); + return unhandled_hook.handle(expr); + } + + // left and right should have the same type by now, so only check left + if left.data_type(schema).is_ok_and(|t| t.is_floating()) { + // By the time we've reached this code, we've narrowed down the possible expressions + // to binary expressions. Of those allowed by `build_statistics_expr`, we only need + // to worry about greater/less than expressions and not equal. + match op { + Operator::Gt + | Operator::GtEq + | Operator::Lt + | Operator::LtEq + | Operator::NotEq => { + if col_order != ColumnOrdering::TotalOrder { + dbg!("Cannot prune floating point column because NaN may be present"); + return unhandled_hook.handle(expr); + } + } + _ => (), + } + } + } + } + let expr_builder = PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns); let mut expr_builder = match expr_builder { @@ -1483,6 +1564,28 @@ fn build_predicate_expression( .unwrap_or_else(|_| unhandled_hook.handle(expr)) } +// Find column index for the given expression. Expects either column or cast-like expressions. +fn column_index_for_expr(expr: &Arc, schema: &Schema) -> Option { + if let Some(col) = expr.as_any().downcast_ref::() { + // Cannot always trust `col.index()` since the schema may be rewritten along the way. + let col_idx = schema.index_of(col.name()); + // Sanity check that columns are still in the same order as the `ColumnOrder` array. + if col_idx.is_ok_and(|idx| idx == col.index()) { + Some(col.index()) + } else { + None + } + } else if let Some(cast) = expr.as_any().downcast_ref::() { + column_index_for_expr(cast.expr(), schema) + } else if let Some(cast) = expr.as_any().downcast_ref::() { + column_index_for_expr(cast.expr(), schema) + } else if let Some(neg) = expr.as_any().downcast_ref::() { + column_index_for_expr(neg.arg(), schema) + } else { + None + } +} + fn build_statistics_expr( expr_builder: &mut PruningExpressionBuilder, ) -> Result> { @@ -2223,7 +2326,12 @@ mod tests { ])); let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200))); let expr = logical2physical(&expr, &schema); - let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap(); + let p = PruningPredicate::try_new( + expr, + Arc::clone(&schema), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); // note pruning expression refers to row_count twice assert_eq!( "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5", @@ -4544,12 +4652,18 @@ mod tests { Field::new("b", DataType::Int32, true), ]); + let column_orderings = vec![ColumnOrdering::Unknown; schema.fields().len()]; + let rewriter = PredicateRewriter::new() .with_unhandled_hook(Arc::new(CustomUnhandledHook {})); let transform_expr = |expr| { let expr = logical2physical(&expr, &schema_with_b); - rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema) + rewriter.rewrite_predicate_to_statistics_predicate( + &expr, + &schema, + &column_orderings, + ) }; // transform an arbitrary valid expression that we know is handled @@ -4558,6 +4672,7 @@ mod tests { .rewrite_predicate_to_statistics_predicate( &logical2physical(&known_expression, &schema), &schema, + &column_orderings, ); // an expression referencing an unknown column (that is not in the schema) gets passed to the hook @@ -5071,7 +5186,12 @@ mod tests { ) { println!("Pruning with expr: {expr}"); let expr = logical2physical(&expr, schema); - let p = PruningPredicate::try_new(expr, Arc::::clone(schema)).unwrap(); + let p = PruningPredicate::try_new( + expr, + Arc::::clone(schema), + vec![ColumnOrdering::Unknown; schema.fields().len()], + ) + .unwrap(); let result = p.prune(statistics).unwrap(); assert_eq!(result, expected); } @@ -5082,8 +5202,15 @@ mod tests { required_columns: &mut RequiredColumns, ) -> Arc { let expr = logical2physical(expr, schema); + let column_orderings = vec![ColumnOrdering::Unknown; schema.fields().len()]; let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _; - build_predicate_expression(&expr, schema, required_columns, &unhandled_hook) + build_predicate_expression( + &expr, + schema, + required_columns, + &column_orderings, + &unhandled_hook, + ) } #[test] diff --git a/datafusion/sqllogictest/test_files/parquet_nan_statistics.slt b/datafusion/sqllogictest/test_files/parquet_nan_statistics.slt new file mode 100644 index 0000000000000..617f217d24ea5 --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_nan_statistics.slt @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests of NaN pruning + +# Create table with mixed values and NaNs + +statement ok +CREATE TABLE with_nans ( + f1 DOUBLE +) AS VALUES +(-2.0), +(-1.0), +(0.0), +(1.0), +(2.0), +('NaN'), +('-NaN'); + +query I +COPY (SELECT * FROM with_nans) +TO 'test_files/scratch/parquet_nan_statistics/nans.parquet' +STORED AS PARQUET; +---- +7 + +statement ok +CREATE EXTERNAL TABLE test_nans +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_nan_statistics/nans.parquet'; + +# If page pruning is performed with TypeDefinedOrder statistics, +# this query will return 0 rows. +# See https://github.com/apache/datafusion/issues/15812 +query R +SELECT * from test_nans WHERE f1 > 2.0; +---- +NaN + +# Test >= too +query R +SELECT * from test_nans WHERE f1 >= 3.0; +---- +NaN + + +# There should be no pruning predicates since Parquet float +# statistics cannot be trusted. +# See https://github.com/apache/datafusion/issues/15812 +query TT +EXPLAIN SELECT * from test_nans WHERE f1 > 2.0; +---- +logical_plan +01)Filter: test_nans.f1 > Float64(2) +02)--TableScan: test_nans projection=[f1], partial_filters=[test_nans.f1 > Float64(2)] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: f1@0 > 2 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_nan_statistics/nans.parquet]]}, projection=[f1], file_type=parquet, predicate=f1@0 > 2 + +# -NaN is returned as 'NaN' +query R +SELECT * from test_nans WHERE f1 < -2.0; +---- +NaN + +# Test <= too +query R +SELECT * from test_nans WHERE f1 <= -3.0; +---- +NaN + +statement ok +DROP TABLE test_nans; + +statement ok +DROP TABLE with_nans; + +# Create table with single value and NaNs + +statement ok +CREATE TABLE single_with_nans ( + f1 DOUBLE +) AS VALUES +(2.0), +(2.0), +(2.0), +(2.0), +(2.0), +('NaN'), +('-NaN'); + +query I +COPY (SELECT * FROM single_with_nans) +TO 'test_files/scratch/parquet_nan_statistics/single_nans.parquet' +STORED AS PARQUET; +---- +7 + +statement ok +CREATE EXTERNAL TABLE test_nans +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_nan_statistics/single_nans.parquet'; + +# Test != +query R +SELECT * from test_nans WHERE f1 != 2.0; +---- +NaN +NaN + +statement ok +DROP TABLE test_nans; + +statement ok +DROP TABLE single_with_nans;