From be6b315152d735153229db5d133c27024e5496b3 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 23 Aug 2022 01:38:52 +0300 Subject: [PATCH 1/3] Fix propagation of optimized predicates on nested projections --- datafusion/optimizer/src/filter_push_down.rs | 52 +++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 2ac5b6e3b0ff1..647007f1a637f 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -390,6 +390,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }) .collect::>(); + // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) + let mut no_col_predicates = vec![]; + // re-write all filters based on this projection // E.g. in `Filter: #b\n Projection: #a > 1 as b`, we can swap them, but the filter must be "#a > 1" for (predicate, columns) in state.filters.iter_mut() { @@ -397,12 +400,26 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { columns.clear(); expr_to_columns(predicate, columns)?; + if columns.is_empty() { + no_col_predicates.push(predicate.clone()) + } } + // Don't pushdown columnless predicates. + state + .filters + .retain(|(predicate, _)| !no_col_predicates.contains(predicate)); // optimize inner let new_input = optimize(input, state)?; - - from_plan(plan, expr, &[new_input]) + let inlined_plan = from_plan(plan, expr, &[new_input])?; + if !no_col_predicates.is_empty() { + Ok(utils::add_filter( + inlined_plan, + no_col_predicates.iter().collect::>().as_slice(), + )) + } else { + Ok(inlined_plan) + } } LogicalPlan::Aggregate(Aggregate { aggr_expr, input, .. @@ -2092,4 +2109,35 @@ mod tests { Ok(()) } + + #[test] + fn test_propagation_of_optimized_inner_filters_with_projections() -> Result<()> { + // SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1 + let plan = LogicalPlanBuilder::empty(true) + .project_with_alias(vec![lit(0i64).alias("a")], Some("b".to_owned()))? + .project_with_alias(vec![col("b.a")], Some("b".to_owned()))? + .filter(col("b.a").eq(lit(1i64)))? + .project(vec![col("b.a")])? + .build()?; + + let expected_before = "\ + Projection: #b.a\ + \n Filter: #b.a = Int64(1)\ + \n Projection: #b.a, alias=b\ + \n Projection: Int64(0) AS a, alias=b\ + \n EmptyRelation"; + assert_eq!(format!("{:?}", plan), expected_before); + + // Ensure that the predicate without any columns (0 = 1) is + // still there. + let expected_after = "\ + Projection: #b.a\ + \n Projection: #b.a, alias=b\ + \n Filter: Int64(0) = Int64(1)\ + \n Projection: Int64(0) AS a, alias=b\ + \n EmptyRelation"; + assert_optimized_plan_eq(&plan, expected_after); + + Ok(()) + } } From 39c645992f601fe410f27419cdcb4c05b5777315 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Sat, 27 Aug 2022 10:49:04 +0300 Subject: [PATCH 2/3] Add SQL integration tests --- datafusion/core/tests/sql/projection.rs | 39 +++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 6e59bd42146eb..97c6dcf8aa7fe 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -348,3 +348,42 @@ async fn project_column_with_same_name_as_relation() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn project_column_with_filters_that_cant_pushed_down_always_false() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select * from (select 1 as a) f where f.a=2;"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec!["++", "++"]; + assert_batches_sorted_eq!(expected, &actual); + + Ok(()) +} + +#[tokio::test] +async fn project_column_with_filters_that_cant_pushed_down_always_true() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select * from (select 1 as a) f where f.a=1;"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "+---+"]; + assert_batches_sorted_eq!(expected, &actual); + + Ok(()) +} + +#[tokio::test] +async fn project_columns_in_memory_without_propagation() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select column1 as a from (values (1), (2)) f where f.column1 = 2;"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"]; + assert_batches_sorted_eq!(expected, &actual); + + Ok(()) +} From ad0a93d77ea953eb41bc6f595869f10d21127d80 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Sat, 27 Aug 2022 20:21:43 +0300 Subject: [PATCH 3/3] Alternative implementation on `issue_filters` (#1) --- datafusion/optimizer/src/filter_push_down.rs | 56 ++++---------------- 1 file changed, 11 insertions(+), 45 deletions(-) diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 647007f1a637f..3d0415232dbf6 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -81,6 +81,7 @@ impl State { } /// returns all predicates in `state` that depend on any of `used_columns` +/// or the ones that does not reference any columns (e.g. WHERE 1=1) fn get_predicates<'a>( state: &'a State, used_columns: &HashSet, @@ -89,10 +90,11 @@ fn get_predicates<'a>( .filters .iter() .filter(|(_, columns)| { - !columns - .intersection(used_columns) - .collect::>() - .is_empty() + columns.is_empty() + || !columns + .intersection(used_columns) + .collect::>() + .is_empty() }) .map(|&(ref a, ref b)| (a, b)) .unzip() @@ -338,34 +340,16 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { let mut predicates = vec![]; utils::split_conjunction(predicate, &mut predicates); - // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) - let mut no_col_predicates = vec![]; - predicates .into_iter() .try_for_each::<_, Result<()>>(|predicate| { let mut columns: HashSet = HashSet::new(); expr_to_columns(predicate, &mut columns)?; - if columns.is_empty() { - no_col_predicates.push(predicate) - } else { - // collect the predicate - state.filters.push((predicate.clone(), columns)); - } + state.filters.push((predicate.clone(), columns)); Ok(()) })?; - // Predicates without columns will not be pushed down. - // As those contain only literals, they could be optimized using constant folding - // and removal of WHERE TRUE / WHERE FALSE - if !no_col_predicates.is_empty() { - Ok(utils::add_filter( - optimize(input, state)?, - &no_col_predicates, - )) - } else { - optimize(input, state) - } + optimize(input, state) } LogicalPlan::Projection(Projection { input, @@ -390,9 +374,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }) .collect::>(); - // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) - let mut no_col_predicates = vec![]; - // re-write all filters based on this projection // E.g. in `Filter: #b\n Projection: #a > 1 as b`, we can swap them, but the filter must be "#a > 1" for (predicate, columns) in state.filters.iter_mut() { @@ -400,26 +381,11 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { columns.clear(); expr_to_columns(predicate, columns)?; - if columns.is_empty() { - no_col_predicates.push(predicate.clone()) - } } - // Don't pushdown columnless predicates. - state - .filters - .retain(|(predicate, _)| !no_col_predicates.contains(predicate)); // optimize inner let new_input = optimize(input, state)?; - let inlined_plan = from_plan(plan, expr, &[new_input])?; - if !no_col_predicates.is_empty() { - Ok(utils::add_filter( - inlined_plan, - no_col_predicates.iter().collect::>().as_slice(), - )) - } else { - Ok(inlined_plan) - } + Ok(from_plan(plan, expr, &[new_input])?) } LogicalPlan::Aggregate(Aggregate { aggr_expr, input, .. @@ -2133,8 +2099,8 @@ mod tests { let expected_after = "\ Projection: #b.a\ \n Projection: #b.a, alias=b\ - \n Filter: Int64(0) = Int64(1)\ - \n Projection: Int64(0) AS a, alias=b\ + \n Projection: Int64(0) AS a, alias=b\ + \n Filter: Int64(0) = Int64(1)\ \n EmptyRelation"; assert_optimized_plan_eq(&plan, expected_after);