Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,9 @@ pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
/// Computes the `skip` and `fetch` parameters of a single limit that would be
/// equivalent to two consecutive limits with the given `skip`/`fetch` parameters.
///
/// This function assumes that the child skip is applied first, then the child fetch occurs at the
/// same time as the parent skip.
///
/// There are multiple cases to consider:
///
/// # Case 0: Parent and child are disjoint (`child_fetch <= skip`).
Expand Down
12 changes: 6 additions & 6 deletions datafusion/functions/benches/repeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn criterion_benchmark(c: &mut Criterion) {

let args = create_args::<i32>(size, 32, repeat_times, true);
group.bench_function(
&format!(
format!(
"repeat_string_view [size={}, repeat_times={}]",
size, repeat_times
),
Expand All @@ -76,7 +76,7 @@ fn criterion_benchmark(c: &mut Criterion) {

let args = create_args::<i32>(size, 32, repeat_times, false);
group.bench_function(
&format!(
format!(
"repeat_string [size={}, repeat_times={}]",
size, repeat_times
),
Expand All @@ -85,7 +85,7 @@ fn criterion_benchmark(c: &mut Criterion) {

let args = create_args::<i64>(size, 32, repeat_times, false);
group.bench_function(
&format!(
format!(
"repeat_large_string [size={}, repeat_times={}]",
size, repeat_times
),
Expand All @@ -103,7 +103,7 @@ fn criterion_benchmark(c: &mut Criterion) {

let args = create_args::<i32>(size, 32, repeat_times, true);
group.bench_function(
&format!(
format!(
"repeat_string_view [size={}, repeat_times={}]",
size, repeat_times
),
Expand All @@ -112,7 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) {

let args = create_args::<i32>(size, 32, repeat_times, false);
group.bench_function(
&format!(
format!(
"repeat_string [size={}, repeat_times={}]",
size, repeat_times
),
Expand All @@ -121,7 +121,7 @@ fn criterion_benchmark(c: &mut Criterion) {

let args = create_args::<i64>(size, 32, repeat_times, false);
group.bench_function(
&format!(
format!(
"repeat_large_string [size={}, repeat_times={}]",
size, repeat_times
),
Expand Down
92 changes: 43 additions & 49 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ impl PhysicalOptimizerRule for LimitPushdown {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let global_state = GlobalRequirements {
let mut global_state = GlobalRequirements {
fetch: None,
skip: 0,
satisfied: false,
};
pushdown_limits(plan, global_state)
pushdown_limits(plan, &mut global_state)
}

fn name(&self) -> &str {
Expand Down Expand Up @@ -123,13 +123,18 @@ impl From<LimitExec> for Arc<dyn ExecutionPlan> {
}

/// This function is the main helper function of the `LimitPushDown` rule.
///
/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
/// an instance of `GlobalRequirements` and modifies these parameters while
/// checking if the limits can be pushed down or not.
///
/// In effect, it pops all `GlobalLimitExec` and `LocalLimitExec` plans off the top of the
/// ExecutionPlan stack (so to speak) and tries to pushdown their effects (specifically, the
/// effects of `LIMIT` and `OFFSET` conditions) into lower ExecutionPlans
pub fn pushdown_limit_helper(
mut pushdown_plan: Arc<dyn ExecutionPlan>,
mut global_state: GlobalRequirements,
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
global_state: &mut GlobalRequirements,
) -> Transformed<Arc<dyn ExecutionPlan>> {
if let Some(limit_exec) = extract_limit(&pushdown_plan) {
// If we have fetch/skip info in the global state already, we need to
// decide which one to continue with:
Expand All @@ -145,22 +150,23 @@ pub fn pushdown_limit_helper(
// Now the global state has the most recent information, we can remove
// the `LimitExec` plan. We will decide later if we should add it again
// or not.
return Ok((
Transformed {
data: Arc::clone(limit_exec.input()),
transformed: true,
tnr: TreeNodeRecursion::Stop,
},
global_state,
));
return Transformed {
data: Arc::clone(limit_exec.input()),
transformed: true,
tnr: TreeNodeRecursion::Stop,
};
}

// If we have a non-limit operator with fetch capability, update global
// state as necessary:
if pushdown_plan.fetch().is_some() {
// if global_state.fetch.is_none(), then there were no LimitExecs on top of the
// pushdown_plan, so we can safely assume that whatever this plan has set as its `fetch`
// value is already what we need, so the requirements are satisfied.
if global_state.fetch.is_none() {
global_state.satisfied = true;
}

(global_state.skip, global_state.fetch) = combine_limit(
global_state.skip,
global_state.fetch,
Expand All @@ -174,17 +180,10 @@ pub fn pushdown_limit_helper(
return if global_state.skip > 0 && !global_state.satisfied {
// There might be a case with only offset, if so add a global limit:
global_state.satisfied = true;
Ok((
Transformed::yes(add_global_limit(
pushdown_plan,
global_state.skip,
None,
)),
global_state,
))
Transformed::yes(add_global_limit(pushdown_plan, global_state.skip, None))
} else {
// There's no info on offset or fetch, nothing to do:
Ok((Transformed::no(pushdown_plan), global_state))
Transformed::no(pushdown_plan)
};
};

Expand All @@ -194,29 +193,24 @@ pub fn pushdown_limit_helper(
if !combines_input_partitions(&pushdown_plan) {
// We have information in the global state and the plan pushes down,
// continue:
Ok((Transformed::no(pushdown_plan), global_state))
} else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
// This plan is combining input partitions, so we need to add the
// fetch info to plan if possible. If not, we must add a `LimitExec`
// with the information from the global state.
global_state.fetch = skip_and_fetch;
global_state.skip = 0;
global_state.satisfied = true;
Ok((Transformed::yes(plan_with_fetch), global_state))
return Transformed::no(pushdown_plan);
} else if global_state.satisfied {
// If the plan is already satisfied, do not add a limit:
Ok((Transformed::no(pushdown_plan), global_state))
} else {
global_state.satisfied = true;
Ok((
Transformed::yes(add_limit(
pushdown_plan,
global_state.skip,
global_fetch,
)),
global_state,
))
return Transformed::no(pushdown_plan);
}

if global_state.skip == 0 {
if let Some(plan_with_fetch) = pushdown_plan.with_fetch(Some(global_fetch)) {
// This plan is combining input partitions, so we need to add the
// fetch info to plan if possible. If not, we must add a `LimitExec`
// with the information from the global state.
global_state.satisfied = true;
return Transformed::yes(plan_with_fetch);
}
}

global_state.satisfied = true;
Transformed::yes(add_limit(pushdown_plan, global_state.skip, global_fetch))
} else {
// The plan does not support push down and it is not a limit. We will need
// to add a limit or a fetch. If the plan is already satisfied, we will try
Expand All @@ -230,9 +224,9 @@ pub fn pushdown_limit_helper(
let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
if global_state.satisfied {
if let Some(plan_with_fetch) = maybe_fetchable {
Ok((Transformed::yes(plan_with_fetch), global_state))
Transformed::yes(plan_with_fetch)
} else {
Ok((Transformed::no(pushdown_plan), global_state))
Transformed::no(pushdown_plan)
}
} else {
// Add fetch or a `LimitExec`:
Expand All @@ -246,28 +240,28 @@ pub fn pushdown_limit_helper(
} else {
add_limit(pushdown_plan, global_skip, global_fetch)
};
Ok((Transformed::yes(pushdown_plan), global_state))
Transformed::yes(pushdown_plan)
}
}
}

/// Pushes down the limit through the plan.
pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
global_state: GlobalRequirements,
global_state: &mut GlobalRequirements,
) -> Result<Arc<dyn ExecutionPlan>> {
let (mut new_node, mut global_state) =
pushdown_limit_helper(pushdown_plan, global_state)?;
let mut new_node = pushdown_limit_helper(pushdown_plan, global_state);

while new_node.tnr == TreeNodeRecursion::Stop {
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
new_node = pushdown_limit_helper(new_node.data, global_state);
}

let children = new_node.data.children();
let new_children = children
.into_iter()
.map(|child| {
pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
let mut state_clone = global_state.clone();
pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), &mut state_clone)
})
.collect::<Result<_>>()?;

Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,13 @@ LIMIT 10;
0 NULL Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
12 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
2 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
5 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
1 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
Comment on lines +267 to +272
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's not just the ordering. The values in the first count column is incorrect.


# Test config listing_table_ignore_subdirectory:

Expand Down