-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Improve TableScan with filters pushdown unparsing (joins) #13132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
15d6560
e82340e
774550a
936b076
e560ece
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1008,6 +1008,93 @@ fn test_sort_with_push_down_fetch() -> Result<()> { | |
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_join_with_table_scan_filters() -> Result<()> { | ||
| let schema_left = Schema::new(vec![ | ||
| Field::new("id", DataType::Utf8, false), | ||
| Field::new("name", DataType::Utf8, false), | ||
| ]); | ||
|
|
||
| let schema_right = Schema::new(vec![ | ||
| Field::new("id", DataType::Utf8, false), | ||
| Field::new("age", DataType::Utf8, false), | ||
| ]); | ||
|
|
||
| let left_plan = table_scan_with_filters( | ||
| Some("left_table"), | ||
| &schema_left, | ||
| None, | ||
| vec![col("name").like(lit("some_name"))], | ||
| )? | ||
| .alias("left")? | ||
| .build()?; | ||
|
|
||
| let right_plan = table_scan_with_filters( | ||
| Some("right_table"), | ||
| &schema_right, | ||
| None, | ||
| vec![col("age").gt(lit(10))], | ||
| )? | ||
| .build()?; | ||
|
|
||
| let join_plan_with_filter = LogicalPlanBuilder::from(left_plan.clone()) | ||
| .join( | ||
| right_plan.clone(), | ||
| datafusion_expr::JoinType::Inner, | ||
| (vec!["left.id"], vec!["right_table.id"]), | ||
| Some(col("left.id").gt(lit(5))), | ||
| )? | ||
| .build()?; | ||
goldmedal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let sql = plan_to_sql(&join_plan_with_filter)?; | ||
|
|
||
| let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (age > 10)))"#; | ||
|
|
||
| assert_eq!(sql.to_string(), expected_sql); | ||
|
|
||
| let join_plan_no_filter = LogicalPlanBuilder::from(left_plan.clone()) | ||
| .join( | ||
| right_plan, | ||
| datafusion_expr::JoinType::Inner, | ||
| (vec!["left.id"], vec!["right_table.id"]), | ||
| None, | ||
| )? | ||
| .build()?; | ||
|
|
||
| let sql = plan_to_sql(&join_plan_no_filter)?; | ||
|
|
||
| let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"#; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed you put the pushdown condition in the join condition instead of the I did some tests for different join type and different place (join condition or filter) in DataFusion let join_type = vec![
"inner join", "left join", "right join", "full join"
];
for join in join_type {
println!("-----------------{join}-------------------");
println!("###### predicate in filter ######");
let sql = format!("select o_orderkey from orders {join} customer on o_custkey = c_custkey where c_name = 'Customer#000000001'");
println!("SQL: {}", sql);
match ctx.sql(&sql).await?.into_optimized_plan() {
Ok(plan) => {println!("{plan}")},
Err(e) => eprintln!("Error: {}", e),
}
println!("###### predicate in join condition ######");
let sql = format!("select o_orderkey from orders {join} customer on o_custkey = c_custkey and c_name = 'Customer#000000001'");
println!("SQL: {}", sql);
match ctx.sql(&sql).await?.into_optimized_plan() {
Ok(plan) => {println!("{plan}")},
Err(e) => eprintln!("Error: {}", e),
}
}The result is We can find the plan is the same in I'm not pretty sure if it's a common rule (putting the predicate in By the way, this PR is ok for me now. I think it can be improved by a follow-up PR if we care about the performance of the generated SQL.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @goldmedal - thank you for deep review. I suspect that filters were not fully pushed down for the right join and full join cases by DF during optimization for samples above as two test queries are not exactly the same as how records are filtered: It seems in all examples above the original
and we will translate it to @goldmedal - Is my understanding correct that tha main concern is that the first option is preferred as it could be executed more efficient by target engine?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, if we can push down the predicate to the table scan, it usually means it will perform better. I tried the subquery pattern: Every predicate is pushed down to the table scan. It's better 👍
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sgrebnov Do you want to improve it in this PR? or we can do it in the follow-up PR (maybe file an issue). WDYT?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @goldmedal - I would prefer the incremental approach with a follow-up PR. Thank you! |
||
|
|
||
| assert_eq!(sql.to_string(), expected_sql); | ||
|
|
||
| let right_plan_with_filter = table_scan_with_filters( | ||
| Some("right_table"), | ||
| &schema_right, | ||
| None, | ||
| vec![col("age").gt(lit(10))], | ||
| )? | ||
| .filter(col("right_table.name").eq(lit("before_join_filter_val")))? | ||
| .build()?; | ||
|
|
||
| let join_plan_multiple_filters = LogicalPlanBuilder::from(left_plan.clone()) | ||
| .join( | ||
| right_plan_with_filter, | ||
| datafusion_expr::JoinType::Inner, | ||
| (vec!["left.id"], vec!["right_table.id"]), | ||
| Some(col("left.id").gt(lit(5))), | ||
| )? | ||
| .filter(col("left.name").eq(lit("after_join_filter_val")))? | ||
| .build()?; | ||
|
|
||
| let sql = plan_to_sql(&join_plan_multiple_filters)?; | ||
|
|
||
| let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val')) AND (age > 10))) WHERE ("left"."name" = 'after_join_filter_val')"#; | ||
|
|
||
| assert_eq!(sql.to_string(), expected_sql); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_interval_lhs_eq() { | ||
| sql_round_trip( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.