Skip to content

Commit 753e6f8

Browse files
Ted-Jiangandygrove
andauthored
Support limit push down for offset_plan (#2566)
* support offset push down * change the limit and offset order * add test for join and subquery * fmt * Apply suggestions from code review Co-authored-by: Andy Grove <[email protected]> * add sql test in planner.rs * add sql test in planner.rs * fix clippy Co-authored-by: Andy Grove <[email protected]>
1 parent c3f1d72 commit 753e6f8

2 files changed

Lines changed: 247 additions & 10 deletions

File tree

datafusion/core/src/optimizer/limit_push_down.rs

Lines changed: 205 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::logical_plan::plan::Projection;
2424
use crate::logical_plan::{Limit, TableScan};
2525
use crate::logical_plan::{LogicalPlan, Union};
2626
use crate::optimizer::optimizer::OptimizerRule;
27+
use datafusion_expr::logical_plan::Offset;
2728
use std::sync::Arc;
2829

2930
/// Optimization rule that tries pushes down LIMIT n
@@ -43,18 +44,24 @@ fn limit_push_down(
4344
upper_limit: Option<usize>,
4445
plan: &LogicalPlan,
4546
_execution_props: &ExecutionProps,
47+
is_offset: bool,
4648
) -> Result<LogicalPlan> {
4749
match (plan, upper_limit) {
4850
(LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
49-
let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n);
51+
let new_limit: usize = if is_offset {
52+
*n + upper_limit.unwrap_or(0)
53+
} else {
54+
upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n)
55+
};
5056
Ok(LogicalPlan::Limit(Limit {
51-
n: smallest,
57+
n: new_limit,
5258
// push down limit to plan (minimum of upper limit and current limit)
5359
input: Arc::new(limit_push_down(
5460
_optimizer,
55-
Some(smallest),
61+
Some(new_limit),
5662
input.as_ref(),
5763
_execution_props,
64+
false,
5865
)?),
5966
}))
6067
}
@@ -95,6 +102,7 @@ fn limit_push_down(
95102
upper_limit,
96103
input.as_ref(),
97104
_execution_props,
105+
false,
98106
)?),
99107
schema: schema.clone(),
100108
alias: alias.clone(),
@@ -119,6 +127,7 @@ fn limit_push_down(
119127
Some(upper_limit),
120128
x,
121129
_execution_props,
130+
false,
122131
)?),
123132
}))
124133
})
@@ -129,6 +138,25 @@ fn limit_push_down(
129138
schema: schema.clone(),
130139
}))
131140
}
141+
// offset 5 limit 10 then push limit 15 (5 + 10)
142+
// Limit should always be Offset's input
143+
(LogicalPlan::Offset(Offset { offset, input }), upper_limit) => {
144+
let new_limit = if let Some(ul) = upper_limit {
145+
ul + *offset
146+
} else {
147+
*offset
148+
};
149+
Ok(LogicalPlan::Offset(Offset {
150+
offset: *offset,
151+
input: Arc::new(limit_push_down(
152+
_optimizer,
153+
Some(new_limit),
154+
input.as_ref(),
155+
_execution_props,
156+
true,
157+
)?),
158+
}))
159+
}
132160
// For other nodes we can't push down the limit
133161
// But try to recurse and find other limit nodes to push down
134162
_ => {
@@ -138,7 +166,9 @@ fn limit_push_down(
138166
let inputs = plan.inputs();
139167
let new_inputs = inputs
140168
.iter()
141-
.map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
169+
.map(|plan| {
170+
limit_push_down(_optimizer, None, plan, _execution_props, false)
171+
})
142172
.collect::<Result<Vec<_>>>()?;
143173

144174
utils::from_plan(plan, &expr, &new_inputs)
@@ -152,7 +182,7 @@ impl OptimizerRule for LimitPushDown {
152182
plan: &LogicalPlan,
153183
execution_props: &ExecutionProps,
154184
) -> Result<LogicalPlan> {
155-
limit_push_down(self, None, plan, execution_props)
185+
limit_push_down(self, None, plan, execution_props, false)
156186
}
157187

158188
fn name(&self) -> &str {
@@ -167,6 +197,8 @@ mod test {
167197
logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder},
168198
test::*,
169199
};
200+
use datafusion_expr::exists;
201+
use datafusion_expr::logical_plan::JoinType;
170202

171203
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
172204
let rule = LimitPushDown::new();
@@ -278,4 +310,172 @@ mod test {
278310

279311
Ok(())
280312
}
313+
314+
#[test]
315+
fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
316+
let table_scan = test_table_scan()?;
317+
318+
let plan = LogicalPlanBuilder::from(table_scan)
319+
.project(vec![col("a")])?
320+
.offset(10)?
321+
.limit(1000)?
322+
.build()?;
323+
324+
// Should push the limit down to table provider
325+
// When it has a select
326+
let expected = "Limit: 1000\
327+
\n Offset: 10\
328+
\n Projection: #test.a\
329+
\n TableScan: test projection=None, limit=1010";
330+
331+
assert_optimized_plan_eq(&plan, expected);
332+
333+
Ok(())
334+
}
335+
336+
#[test]
337+
fn limit_pushdown_with_offset_after_limit() -> Result<()> {
338+
let table_scan = test_table_scan()?;
339+
340+
let plan = LogicalPlanBuilder::from(table_scan)
341+
.project(vec![col("a")])?
342+
.limit(1000)?
343+
.offset(10)?
344+
.build()?;
345+
346+
let expected = "Offset: 10\
347+
\n Limit: 1010\
348+
\n Projection: #test.a\
349+
\n TableScan: test projection=None, limit=1010";
350+
351+
assert_optimized_plan_eq(&plan, expected);
352+
353+
Ok(())
354+
}
355+
356+
#[test]
357+
fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
358+
let table_scan = test_table_scan()?;
359+
360+
let plan = LogicalPlanBuilder::from(table_scan)
361+
.offset(10)?
362+
.limit(1000)?
363+
.limit(10)?
364+
.build()?;
365+
366+
// Should push down the smallest limit
367+
// Towards table scan
368+
// This rule doesn't replace multiple limits
369+
let expected = "Limit: 10\
370+
\n Limit: 10\
371+
\n Offset: 10\
372+
\n TableScan: test projection=None, limit=20";
373+
374+
assert_optimized_plan_eq(&plan, expected);
375+
376+
Ok(())
377+
}
378+
379+
#[test]
380+
fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
381+
let table_scan = test_table_scan()?;
382+
383+
let plan = LogicalPlanBuilder::from(table_scan)
384+
.aggregate(vec![col("a")], vec![max(col("b"))])?
385+
.offset(10)?
386+
.limit(1000)?
387+
.build()?;
388+
389+
// Limit should *not* push down aggregate node
390+
let expected = "Limit: 1000\
391+
\n Offset: 10\
392+
\n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
393+
\n TableScan: test projection=None";
394+
395+
assert_optimized_plan_eq(&plan, expected);
396+
397+
Ok(())
398+
}
399+
400+
#[test]
401+
fn limit_should_push_down_with_offset_union() -> Result<()> {
402+
let table_scan = test_table_scan()?;
403+
404+
let plan = LogicalPlanBuilder::from(table_scan.clone())
405+
.union(LogicalPlanBuilder::from(table_scan).build()?)?
406+
.offset(10)?
407+
.limit(1000)?
408+
.build()?;
409+
410+
// Limit should push down through union
411+
let expected = "Limit: 1000\
412+
\n Offset: 10\
413+
\n Union\
414+
\n Limit: 1010\
415+
\n TableScan: test projection=None, limit=1010\
416+
\n Limit: 1010\
417+
\n TableScan: test projection=None, limit=1010";
418+
419+
assert_optimized_plan_eq(&plan, expected);
420+
421+
Ok(())
422+
}
423+
424+
#[test]
425+
fn limit_should_not_push_down_with_offset_join() -> Result<()> {
426+
let table_scan_1 = test_table_scan()?;
427+
let table_scan_2 = test_table_scan_with_name("test2")?;
428+
429+
let plan = LogicalPlanBuilder::from(table_scan_1)
430+
.join(
431+
&LogicalPlanBuilder::from(table_scan_2).build()?,
432+
JoinType::Left,
433+
(vec!["a"], vec!["a"]),
434+
)?
435+
.limit(1000)?
436+
.offset(10)?
437+
.build()?;
438+
439+
// Limit pushdown Not supported in Join
440+
let expected = "Offset: 10\
441+
\n Limit: 1010\
442+
\n Left Join: #test.a = #test2.a\
443+
\n TableScan: test projection=None\
444+
\n TableScan: test2 projection=None";
445+
446+
assert_optimized_plan_eq(&plan, expected);
447+
448+
Ok(())
449+
}
450+
451+
#[test]
452+
fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
453+
let table_scan_1 = test_table_scan_with_name("test1")?;
454+
let table_scan_2 = test_table_scan_with_name("test2")?;
455+
456+
let subquery = LogicalPlanBuilder::from(table_scan_1)
457+
.project(vec![col("a")])?
458+
.filter(col("a").eq(col("test1.a")))?
459+
.build()?;
460+
461+
let outer_query = LogicalPlanBuilder::from(table_scan_2)
462+
.project(vec![col("a")])?
463+
.filter(exists(Arc::new(subquery)))?
464+
.limit(100)?
465+
.offset(10)?
466+
.build()?;
467+
468+
// Limit pushdown Not supported in sub_query
469+
let expected = "Offset: 10\
470+
\n Limit: 110\
471+
\n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
472+
\n Projection: #test1.a\
473+
\n TableScan: test1 projection=None)\
474+
\n Projection: #test2.a\
475+
\n TableScan: test2 projection=None";
476+
477+
assert_optimized_plan_eq(&outer_query, expected);
478+
479+
Ok(())
480+
}
281481
}

datafusion/core/src/sql/planner.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
296296

297297
let plan = self.order_by(plan, query.order_by)?;
298298

299-
let plan: LogicalPlan = self.offset(plan, query.offset)?;
299+
let plan: LogicalPlan = self.limit(plan, query.limit)?;
300300

301-
self.limit(plan, query.limit)
301+
//make limit as offset's input will enable limit push down simply
302+
self.offset(plan, query.offset)
302303
}
303304

304305
fn set_expr_to_plan(
@@ -2646,6 +2647,9 @@ fn parse_sql_number(n: &str) -> Result<Expr> {
26462647
#[cfg(test)]
26472648
mod tests {
26482649
use crate::datasource::empty::EmptyTable;
2650+
use crate::execution::context::ExecutionProps;
2651+
use crate::optimizer::limit_push_down::LimitPushDown;
2652+
use crate::optimizer::optimizer::OptimizerRule;
26492653
use crate::{assert_contains, logical_plan::create_udf, sql::parser::DFParser};
26502654
use datafusion_expr::{ScalarFunctionImplementation, Volatility};
26512655

@@ -4386,6 +4390,16 @@ mod tests {
43864390
assert_eq!(format!("{:?}", plan), expected);
43874391
}
43884392

4393+
fn quick_test_with_limit_pushdown(sql: &str, expected: &str) {
4394+
let plan = logical_plan(sql).unwrap();
4395+
let rule = LimitPushDown::new();
4396+
let optimized_plan = rule
4397+
.optimize(&plan, &ExecutionProps::new())
4398+
.expect("failed to optimize plan");
4399+
let formatted_plan = format!("{:?}", optimized_plan);
4400+
assert_eq!(formatted_plan, expected);
4401+
}
4402+
43894403
struct MockContextProvider {}
43904404

43914405
impl ContextProvider for MockContextProvider {
@@ -4834,10 +4848,10 @@ mod tests {
48344848
}
48354849

48364850
#[test]
4837-
fn test_offset_with_limit() {
4851+
fn test_zero_offset_with_limit() {
48384852
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
4839-
let expected = "Limit: 5\
4840-
\n Offset: 0\
4853+
let expected = "Offset: 0\
4854+
\n Limit: 5\
48414855
\n Projection: #person.id\
48424856
\n Filter: #person.id > Int64(100)\
48434857
\n TableScan: person projection=None";
@@ -4858,6 +4872,29 @@ mod tests {
48584872
quick_test(sql, expected);
48594873
}
48604874

4875+
#[test]
4876+
fn test_offset_after_limit_with_limit_push() {
4877+
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;";
4878+
let expected = "Offset: 3\
4879+
\n Limit: 8\
4880+
\n Projection: #person.id\
4881+
\n Filter: #person.id > Int64(100)\
4882+
\n TableScan: person projection=None";
4883+
4884+
quick_test_with_limit_pushdown(sql, expected);
4885+
}
4886+
4887+
#[test]
4888+
fn test_offset_before_limit_with_limit_push() {
4889+
let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;";
4890+
let expected = "Offset: 3\
4891+
\n Limit: 8\
4892+
\n Projection: #person.id\
4893+
\n Filter: #person.id > Int64(100)\
4894+
\n TableScan: person projection=None";
4895+
quick_test_with_limit_pushdown(sql, expected);
4896+
}
4897+
48614898
fn assert_field_not_found(err: DataFusionError, name: &str) {
48624899
match err {
48634900
DataFusionError::SchemaError { .. } => {

0 commit comments

Comments
 (0)