Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ pub fn pushdown_limit_helper(
global_state.skip = skip;
global_state.fetch = fetch;

if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
// If the child is a `CoalescePartitionsExec`, we should not remove the limit
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
// todo we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
global_state.satisfied = true;
return Ok((Transformed::no(pushdown_plan), global_state));
}

// Now the global state has the most recent information, we can remove
// the `LimitExec` plan. We will decide later if we should add it again
// or not.
Expand Down
104 changes: 104 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,107 @@ physical_plan

statement ok
drop table testSubQueryLimit;


# Test push down limit with more than one partition
statement ok
set datafusion.explain.logical_plan_only = false;

# Set up 3 partitions
statement ok
set datafusion.execution.target_partitions = 3;

# automatically partition all files over 1 byte
statement ok
set datafusion.optimizer.repartition_file_min_size = 1;

# Create a table as a data source
statement ok
CREATE TABLE src_table (
part_key INT,
value INT
) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6);


# Setup 3 files, i.e., as many as there are partitions:

# File 1:
query I
COPY (SELECT * FROM src_table where part_key = 1)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet'
STORED AS PARQUET;
----
3

# File 2:
query I
COPY (SELECT * FROM src_table where part_key = 2)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet'
STORED AS PARQUET;
----
4

# File 3:
query I
COPY (SELECT * FROM src_table where part_key = 3)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet'
STORED AS PARQUET;
----
3

statement ok
CREATE EXTERNAL TABLE test_limit_with_partitions
(
part_key INT,
value INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_limit_with_partitions/';

query TT
explain
with selection as (
select *
from test_limit_with_partitions
limit 1
)
select 1 as foo
from selection
order by part_key
limit 1000;
----
logical_plan
01)Projection: foo
02)--Sort: selection.part_key ASC NULLS LAST, fetch=1000
03)----Projection: Int64(1) AS foo, selection.part_key
04)------SubqueryAlias: selection
05)--------Limit: skip=0, fetch=1
06)----------TableScan: test_limit_with_partitions projection=[part_key], fetch=1
physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
04)------GlobalLimitExec: skip=0, fetch=1
05)--------CoalescePartitionsExec
06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1

query I
with selection as (
select *
from test_limit_with_partitions
limit 1
)
select 1 as foo
from selection
order by part_key
limit 1000;
----
1

# Tear down test_filter_with_limit table:
statement ok
DROP TABLE test_limit_with_partitions;

# Tear down src_table table:
statement ok
DROP TABLE src_table;