diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index d7059e882e555..2f4cb174673a6 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -693,6 +693,9 @@ pub fn transpose(original: Vec>) -> Vec> { /// Computes the `skip` and `fetch` parameters of a single limit that would be /// equivalent to two consecutive limits with the given `skip`/`fetch` parameters. /// +/// This function assumes that the child skip is applied first, then the child fetch occurs at the +/// same time as the parent skip. +/// /// There are multiple cases to consider: /// /// # Case 0: Parent and child are disjoint (`child_fetch <= skip`). diff --git a/datafusion/functions/benches/repeat.rs b/datafusion/functions/benches/repeat.rs index 916c8374e5fb9..e45313660ea2c 100644 --- a/datafusion/functions/benches/repeat.rs +++ b/datafusion/functions/benches/repeat.rs @@ -67,7 +67,7 @@ fn criterion_benchmark(c: &mut Criterion) { let args = create_args::(size, 32, repeat_times, true); group.bench_function( - &format!( + format!( "repeat_string_view [size={}, repeat_times={}]", size, repeat_times ), @@ -76,7 +76,7 @@ fn criterion_benchmark(c: &mut Criterion) { let args = create_args::(size, 32, repeat_times, false); group.bench_function( - &format!( + format!( "repeat_string [size={}, repeat_times={}]", size, repeat_times ), @@ -85,7 +85,7 @@ fn criterion_benchmark(c: &mut Criterion) { let args = create_args::(size, 32, repeat_times, false); group.bench_function( - &format!( + format!( "repeat_large_string [size={}, repeat_times={}]", size, repeat_times ), @@ -103,7 +103,7 @@ fn criterion_benchmark(c: &mut Criterion) { let args = create_args::(size, 32, repeat_times, true); group.bench_function( - &format!( + format!( "repeat_string_view [size={}, repeat_times={}]", size, repeat_times ), @@ -112,7 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) { let args = create_args::(size, 32, repeat_times, false); group.bench_function( - &format!( + format!( "repeat_string [size={}, repeat_times={}]", size, repeat_times ), @@ -121,7 +121,7 @@ fn criterion_benchmark(c: &mut Criterion) { let args = create_args::(size, 32, repeat_times, false); group.bench_function( - &format!( + format!( "repeat_large_string [size={}, repeat_times={}]", size, repeat_times ), diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7f45292f9e27c..97cfa79dfc611 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -65,12 +65,12 @@ impl PhysicalOptimizerRule for LimitPushdown { plan: Arc, _config: &ConfigOptions, ) -> Result> { - let global_state = GlobalRequirements { + let mut global_state = GlobalRequirements { fetch: None, skip: 0, satisfied: false, }; - pushdown_limits(plan, global_state) + pushdown_limits(plan, &mut global_state) } fn name(&self) -> &str { @@ -123,13 +123,18 @@ impl From for Arc { } /// This function is the main helper function of the `LimitPushDown` rule. +/// /// The helper takes an `ExecutionPlan` and a global (algorithm) state which is /// an instance of `GlobalRequirements` and modifies these parameters while /// checking if the limits can be pushed down or not. +/// +/// In effect, it pops all `GlobalLimitExec` and `LocalLimitExec` plans off the top of the +/// ExecutionPlan stack (so to speak) and tries to pushdown their effects (specifically, the +/// effects of `LIMIT` and `OFFSET` conditions) into lower ExecutionPlans pub fn pushdown_limit_helper( mut pushdown_plan: Arc, - mut global_state: GlobalRequirements, -) -> Result<(Transformed>, GlobalRequirements)> { + global_state: &mut GlobalRequirements, +) -> Transformed> { if let Some(limit_exec) = extract_limit(&pushdown_plan) { // If we have fetch/skip info in the global state already, we need to // decide which one to continue with: @@ -145,22 +150,23 @@ pub fn pushdown_limit_helper( // Now the global state has the most recent information, we can remove // the `LimitExec` plan. We will decide later if we should add it again // or not. - return Ok(( - Transformed { - data: Arc::clone(limit_exec.input()), - transformed: true, - tnr: TreeNodeRecursion::Stop, - }, - global_state, - )); + return Transformed { + data: Arc::clone(limit_exec.input()), + transformed: true, + tnr: TreeNodeRecursion::Stop, + }; } // If we have a non-limit operator with fetch capability, update global // state as necessary: if pushdown_plan.fetch().is_some() { + // if global_state.fetch.is_none(), then there were no LimitExecs on top of the + // pushdown_plan, so we can safely assume that whatever this plan has set as its `fetch` + // value is already what we need, so the requirements are satisfied. if global_state.fetch.is_none() { global_state.satisfied = true; } + (global_state.skip, global_state.fetch) = combine_limit( global_state.skip, global_state.fetch, @@ -174,17 +180,10 @@ pub fn pushdown_limit_helper( return if global_state.skip > 0 && !global_state.satisfied { // There might be a case with only offset, if so add a global limit: global_state.satisfied = true; - Ok(( - Transformed::yes(add_global_limit( - pushdown_plan, - global_state.skip, - None, - )), - global_state, - )) + Transformed::yes(add_global_limit(pushdown_plan, global_state.skip, None)) } else { // There's no info on offset or fetch, nothing to do: - Ok((Transformed::no(pushdown_plan), global_state)) + Transformed::no(pushdown_plan) }; }; @@ -194,29 +193,24 @@ pub fn pushdown_limit_helper( if !combines_input_partitions(&pushdown_plan) { // We have information in the global state and the plan pushes down, // continue: - Ok((Transformed::no(pushdown_plan), global_state)) - } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) { - // This plan is combining input partitions, so we need to add the - // fetch info to plan if possible. If not, we must add a `LimitExec` - // with the information from the global state. - global_state.fetch = skip_and_fetch; - global_state.skip = 0; - global_state.satisfied = true; - Ok((Transformed::yes(plan_with_fetch), global_state)) + return Transformed::no(pushdown_plan); } else if global_state.satisfied { // If the plan is already satisfied, do not add a limit: - Ok((Transformed::no(pushdown_plan), global_state)) - } else { - global_state.satisfied = true; - Ok(( - Transformed::yes(add_limit( - pushdown_plan, - global_state.skip, - global_fetch, - )), - global_state, - )) + return Transformed::no(pushdown_plan); } + + if global_state.skip == 0 { + if let Some(plan_with_fetch) = pushdown_plan.with_fetch(Some(global_fetch)) { + // This plan is combining input partitions, so we need to add the + // fetch info to plan if possible. If not, we must add a `LimitExec` + // with the information from the global state. + global_state.satisfied = true; + return Transformed::yes(plan_with_fetch); + } + } + + global_state.satisfied = true; + Transformed::yes(add_limit(pushdown_plan, global_state.skip, global_fetch)) } else { // The plan does not support push down and it is not a limit. We will need // to add a limit or a fetch. If the plan is already satisfied, we will try @@ -230,9 +224,9 @@ pub fn pushdown_limit_helper( let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch); if global_state.satisfied { if let Some(plan_with_fetch) = maybe_fetchable { - Ok((Transformed::yes(plan_with_fetch), global_state)) + Transformed::yes(plan_with_fetch) } else { - Ok((Transformed::no(pushdown_plan), global_state)) + Transformed::no(pushdown_plan) } } else { // Add fetch or a `LimitExec`: @@ -246,7 +240,7 @@ pub fn pushdown_limit_helper( } else { add_limit(pushdown_plan, global_skip, global_fetch) }; - Ok((Transformed::yes(pushdown_plan), global_state)) + Transformed::yes(pushdown_plan) } } } @@ -254,20 +248,20 @@ pub fn pushdown_limit_helper( /// Pushes down the limit through the plan. pub(crate) fn pushdown_limits( pushdown_plan: Arc, - global_state: GlobalRequirements, + global_state: &mut GlobalRequirements, ) -> Result> { - let (mut new_node, mut global_state) = - pushdown_limit_helper(pushdown_plan, global_state)?; + let mut new_node = pushdown_limit_helper(pushdown_plan, global_state); while new_node.tnr == TreeNodeRecursion::Stop { - (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?; + new_node = pushdown_limit_helper(new_node.data, global_state); } let children = new_node.data.children(); let new_children = children .into_iter() .map(|child| { - pushdown_limits(Arc::::clone(child), global_state.clone()) + let mut state_clone = global_state.clone(); + pushdown_limits(Arc::::clone(child), &mut state_clone) }) .collect::>()?; diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 34d4ed6ff284b..f05585109483f 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -263,13 +263,13 @@ LIMIT 10; 0 NULL Timestamp(Millisecond, Some("UTC")) 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +12 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +2 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) 14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +5 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +1 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) # Test config listing_table_ignore_subdirectory: