Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ pub fn pushdown_limit_helper(
global_state.skip = skip;
global_state.fetch = fetch;

if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
Copy link

@mertak-synnada mertak-synnada Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree with checking via API suggestion, please also check with the combines_input_partitions() helper function so that SortPreservingMerge can be affected as well.

In the optimizer logic, we remove the Limit operators first, and then we add them to the lowest possible point at the plan, if the plan is "satisfied" we drop the limit information. So if the plan is combining input partitions, we're only adding a global limit if skip information is there, maybe we can identify if the local limits are enough or not and then decide to add the global limit at there. But in the end, I think rather than adding a global limit, we should be able to limit in the CoalescePartitionsExec or in SortPreservingMerge so that it won't unnecessarily push more data

// Execution plans can't (yet) handle skip, so if we have one,
// we still need to add a global limit
if global_state.skip > 0 {
   new_plan =
       add_global_limit(new_plan, global_state.skip, global_state.fetch);
}

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mertak-synnada for review:

While I agree with checking via API suggestion, please also check with the combines_input_partitions() helper function so that SortPreservingMerge can be affected as well.

I agree, i checked the SortPreservingMergeExec already, it supported with_fetch() and fetch(), so it's not affected i think?

impl SortPreservingMergeExec {
    /// Create a new sort execution plan
    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
        let cache = Self::compute_properties(&input, expr.clone());
        Self {
            input,
            expr,
            metrics: ExecutionPlanMetricsSet::new(),
            fetch: None,
            cache,
            enable_round_robin_repartition: true,
        }
    }

    /// Sets the number of rows to fetch
    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
        self.fetch = fetch;
        self
    }

    /// Sets the selection strategy of tied winners of the loser tree algorithm
    ///
    /// If true (the default) equal output rows are placed in the merged stream
    /// in round robin fashion. This approach consumes input streams at more
    /// even rates when there are many rows with the same sort key.
    ///
    /// If false, equal output rows are always placed in the merged stream in
    /// the order of the inputs, resulting in potentially slower execution but a
    /// stable output order.
    pub fn with_round_robin_repartition(
        mut self,
        enable_round_robin_repartition: bool,
    ) -> Self {
        self.enable_round_robin_repartition = enable_round_robin_repartition;
        self
    }

    /// Input schema
    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
        &self.input
    }

    /// Sort expressions
    pub fn expr(&self) -> &LexOrdering {
        self.expr.as_ref()
    }

    /// Fetch
    pub fn fetch(&self) -> Option<usize> {
        self.fetch
    }

    /// Creates the cache object that stores the plan properties
    /// such as schema, equivalence properties, ordering, partitioning, etc.
    fn compute_properties(
        input: &Arc<dyn ExecutionPlan>,
        ordering: LexOrdering,
    ) -> PlanProperties {
        let mut eq_properties = input.equivalence_properties().clone();
        eq_properties.clear_per_partition_constants();
        eq_properties.add_new_orderings(vec![ordering]);
        PlanProperties::new(
            eq_properties,                        // Equivalence Properties
            Partitioning::UnknownPartitioning(1), // Output Partitioning
            input.pipeline_behavior(),            // Pipeline Behavior
            input.boundedness(),                  // Boundedness
        )
    }
}

But in the end, I think rather than adding a global limit, we should be able to limit in the CoalescePartitionsExec or in SortPreservingMerge so that it won't unnecessarily push more data.

I totally agree this! So i created a follow-up #14446 to support limit in the CoalescePartitionsExec, SortPreservingMerge already supported this according above code.

So if the plan is combining input partitions, we're only adding a global limit if skip information is there, maybe we can identify if the local limits are enough or not and then decide to add the global limit at there.

This is a good point, we can create another issue to try to improve this!

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, i confirmed SortPreservingMerge works well with fetch:

# Check output plan, expect no "output_ordering" clause in the physical_plan -> ParquetExec:
query TT
explain with selection as (
    select *
    from test_table
    ORDER BY string_col, int_col limit 1
)
select 1 as foo
from selection
order by string_col
limit 1000;
----
logical_plan
01)Projection: foo
02)--Sort: selection.string_col ASC NULLS LAST, fetch=1000
03)----Projection: Int64(1) AS foo, selection.string_col
04)------SubqueryAlias: selection
05)--------Projection: test_table.string_col
06)----------Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST, fetch=1
07)------------TableScan: test_table projection=[int_col, string_col]
physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--ProjectionExec: expr=[1 as foo, string_col@0 as string_col]
03)----ProjectionExec: expr=[string_col@1 as string_col]
04)------SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], fetch=1
05)--------SortExec: TopK(fetch=1), expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true]
06)----------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]}, projection=[int_col, string_col]

// If the child is a `CoalescePartitionsExec`, we should not remove the limit
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
// TODO: we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
// Follow-up issue: https://github.com/apache/datafusion/issues/14446
global_state.satisfied = true;
return Ok((Transformed::no(pushdown_plan), global_state));
}

// Now the global state has the most recent information, we can remove
// the `LimitExec` plan. We will decide later if we should add it again
// or not.
Expand Down
104 changes: 104 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,107 @@ physical_plan

statement ok
drop table testSubQueryLimit;


# Test push down limit with more than one partition
statement ok
set datafusion.explain.logical_plan_only = false;

# Set up 3 partitions
statement ok
set datafusion.execution.target_partitions = 3;

# automatically partition all files over 1 byte
statement ok
set datafusion.optimizer.repartition_file_min_size = 1;

# Create a table as a data source
statement ok
CREATE TABLE src_table (
part_key INT,
value INT
) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6);


# Setup 3 files, i.e., as many as there are partitions:

# File 1:
query I
COPY (SELECT * FROM src_table where part_key = 1)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet'
STORED AS PARQUET;
----
3

# File 2:
query I
COPY (SELECT * FROM src_table where part_key = 2)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet'
STORED AS PARQUET;
----
4

# File 3:
query I
COPY (SELECT * FROM src_table where part_key = 3)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet'
STORED AS PARQUET;
----
3

statement ok
CREATE EXTERNAL TABLE test_limit_with_partitions
(
part_key INT,
value INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_limit_with_partitions/';

query TT
explain
with selection as (
select *
from test_limit_with_partitions
limit 1
)
select 1 as foo
from selection
order by part_key
limit 1000;
----
logical_plan
01)Projection: foo
02)--Sort: selection.part_key ASC NULLS LAST, fetch=1000
03)----Projection: Int64(1) AS foo, selection.part_key
04)------SubqueryAlias: selection
05)--------Limit: skip=0, fetch=1
06)----------TableScan: test_limit_with_partitions projection=[part_key], fetch=1
physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
04)------GlobalLimitExec: skip=0, fetch=1
05)--------CoalescePartitionsExec
06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1

query I
with selection as (
select *
from test_limit_with_partitions
limit 1
)
select 1 as foo
from selection
order by part_key
limit 1000;
----
1

# Tear down test_filter_with_limit table:
statement ok
DROP TABLE test_limit_with_partitions;

# Tear down src_table table:
statement ok
DROP TABLE src_table;