Skip to content

Commit aa9dc60

Browse files
authored
Eliminate duplicated filter within (filter(TableScan)) plan (#51)
* Eliminate duplicated filter within (filter(TableScan)) plan * Updates * fix * add test * fix
1 parent 2eb5c10 commit aa9dc60

3 files changed

Lines changed: 32 additions & 10 deletions

File tree

datafusion/sql/src/unparser/expr.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
use datafusion_expr::expr::Unnest;
1919
use sqlparser::ast::Value::SingleQuotedString;
2020
use sqlparser::ast::{
21-
self, BinaryOperator, Expr as AstExpr, Function, Ident, Interval, ObjectName, TimezoneInfo, UnaryOperator
21+
self, BinaryOperator, Expr as AstExpr, Function, Ident, Interval, ObjectName,
22+
TimezoneInfo, UnaryOperator,
2223
};
2324
use std::sync::Arc;
2425
use std::vec;
@@ -217,7 +218,7 @@ impl Unparser<'_> {
217218
}
218219
};
219220

220-
let order_by =order_by
221+
let order_by = order_by
221222
.iter()
222223
.map(|sort_expr| self.sort_to_sql(sort_expr))
223224
.collect::<Result<Vec<_>>>()?;
@@ -1881,7 +1882,7 @@ mod tests {
18811882
name: "array_col".to_string(),
18821883
})),
18831884
}),
1884-
r#"UNNEST("schema"."table".array_col)"#
1885+
r#"UNNEST("schema"."table".array_col)"#,
18851886
),
18861887
];
18871888

datafusion/sql/src/unparser/utils.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{cmp::Ordering, sync::Arc, vec};
18+
use std::{cmp::Ordering, collections::HashSet, sync::Arc, vec};
1919

2020
use datafusion_common::{
2121
internal_err,
@@ -102,7 +102,7 @@ pub(crate) fn find_unnest_node_within_select(
102102
/// TableScan: j1
103103
/// And filters: [ta.j1_id < 5, ta.j1_id > 10]
104104
pub (crate) fn try_transform_to_simple_table_scan_with_filters(plan: &LogicalPlan) -> Option<(LogicalPlan, Vec<Expr>)> {
105-
let mut filters: Vec<Expr> = vec![];
105+
let mut filters: HashSet<Expr> = HashSet::new();
106106
let mut plan_stack = vec![plan];
107107
let mut table_alias = None;
108108

@@ -113,7 +113,7 @@ pub (crate) fn try_transform_to_simple_table_scan_with_filters(plan: &LogicalPla
113113
plan_stack.push(alias.input.as_ref());
114114
}
115115
LogicalPlan::Filter(filter) => {
116-
filters.push(filter.predicate.clone());
116+
filters.insert(filter.predicate.clone());
117117
plan_stack.push(filter.input.as_ref());
118118
}
119119
LogicalPlan::TableScan(table_scan) => {
@@ -138,7 +138,9 @@ pub (crate) fn try_transform_to_simple_table_scan_with_filters(plan: &LogicalPla
138138
}
139139
}).collect::<Result<Vec<_>, DataFusionError>>().ok()?;
140140

141-
filters.extend(table_scan_filters);
141+
for table_scan_filter in table_scan_filters {
142+
filters.insert(table_scan_filter);
143+
}
142144

143145
let mut builder = LogicalPlanBuilder::scan(
144146
table_scan.table_name.clone(),
@@ -151,6 +153,7 @@ pub (crate) fn try_transform_to_simple_table_scan_with_filters(plan: &LogicalPla
151153
}
152154

153155
let plan = builder.build().ok()?;
156+
let filters: Vec<Expr> = filters.into_iter().collect();
154157

155158
return Some((plan, filters));
156159

datafusion/sql/tests/cases/plan_to_sql.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,10 +1035,9 @@ fn test_join_with_table_scan_filters() -> Result<()> {
10351035

10361036
assert_eq!(sql.to_string(), expected_sql);
10371037

1038-
1039-
let join_plan_no_filter = LogicalPlanBuilder::from(left_plan)
1038+
let join_plan_no_filter = LogicalPlanBuilder::from(left_plan.clone())
10401039
.join(
1041-
right_plan,
1040+
right_plan.clone(),
10421041
datafusion_expr::JoinType::Inner,
10431042
(vec!["left.id"], vec!["right_table.id"]),
10441043
None
@@ -1051,6 +1050,25 @@ fn test_join_with_table_scan_filters() -> Result<()> {
10511050

10521051
assert_eq!(sql.to_string(), expected_sql);
10531052

1053+
let right_plan_with_filter_schema = table_scan_with_filters(Some("right_table"), &schema_right, None, vec![col("right_table.age").gt(lit(10))])?
1054+
.build()?;
1055+
let right_plan_with_duplicated_filter = LogicalPlanBuilder::from(right_plan_with_filter_schema.clone()).filter(col("right_table.age").gt(lit(10)))?.build()?;
1056+
1057+
let join_plan_duplicated_filter = LogicalPlanBuilder::from(left_plan)
1058+
.join(
1059+
right_plan_with_duplicated_filter,
1060+
datafusion_expr::JoinType::Inner,
1061+
(vec!["left.id"], vec!["right_table.id"]),
1062+
Some(col("left.id").gt(lit(5))),
1063+
)?
1064+
.build()?;
1065+
1066+
let sql = plan_to_sql(&join_plan_duplicated_filter)?;
1067+
1068+
let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (right_table.age > 10)))"#;
1069+
1070+
assert_eq!(sql.to_string(), expected_sql);
1071+
10541072
Ok(())
10551073
}
10561074

0 commit comments

Comments
 (0)