From 954df19a3024f8f96417bb67aa8882773a603f0b Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Sun, 2 Feb 2025 23:44:34 +0800 Subject: [PATCH 1/4] fix: Limits are not applied correctly --- datafusion/physical-optimizer/src/limit_pushdown.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 5887cb51a727..a87ddcd6e023 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -300,6 +300,9 @@ pub(crate) fn pushdown_limits( /// [`GlobalLimitExec`] or a [`LocalLimitExec`]. fn extract_limit(plan: &Arc) -> Option { if let Some(global_limit) = plan.as_any().downcast_ref::() { + if global_limit.input().as_any().is::() { + return None; + } Some(LimitExec::Global(GlobalLimitExec::new( Arc::clone(global_limit.input()), global_limit.skip(), From 952a85808902b6c143e2229a247fd55defa622a8 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 3 Feb 2025 14:20:00 +0800 Subject: [PATCH 2/4] Add easy fix --- datafusion/physical-optimizer/src/limit_pushdown.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index a87ddcd6e023..7f8a12049d04 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -146,6 +146,13 @@ pub fn pushdown_limit_helper( global_state.skip = skip; global_state.fetch = fetch; + if limit_exec.input().as_any().is::() { + // If the child is a `CoalescePartitionsExec`, we should not remove the limit + // the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit. + global_state.satisfied = true; + return Ok((Transformed::no(pushdown_plan), global_state)); + } + // Now the global state has the most recent information, we can remove // the `LimitExec` plan. We will decide later if we should add it again // or not. @@ -300,9 +307,6 @@ pub(crate) fn pushdown_limits( /// [`GlobalLimitExec`] or a [`LocalLimitExec`]. fn extract_limit(plan: &Arc) -> Option { if let Some(global_limit) = plan.as_any().downcast_ref::() { - if global_limit.input().as_any().is::() { - return None; - } Some(LimitExec::Global(GlobalLimitExec::new( Arc::clone(global_limit.input()), global_limit.skip(), From 4bb1a468c1e08b46cd7ab7bc12bc2ca0fad8d4a9 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 3 Feb 2025 14:20:24 +0800 Subject: [PATCH 3/4] Add fix --- datafusion/physical-optimizer/src/limit_pushdown.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7f8a12049d04..1f027282c480 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -149,6 +149,7 @@ pub fn pushdown_limit_helper( if limit_exec.input().as_any().is::() { // If the child is a `CoalescePartitionsExec`, we should not remove the limit // the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit. + // todo we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec. global_state.satisfied = true; return Ok((Transformed::no(pushdown_plan), global_state)); } From 45cf6120a174e48b6114f2947ff4dbe53fa1f9dc Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 3 Feb 2025 14:44:43 +0800 Subject: [PATCH 4/4] Add slt testing --- datafusion/sqllogictest/test_files/limit.slt | 104 +++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 308e759fa9fa..65f35d40fcf5 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -772,3 +772,107 @@ physical_plan statement ok drop table testSubQueryLimit; + + +# Test push down limit with more than one partition +statement ok +set datafusion.explain.logical_plan_only = false; + +# Set up 3 partitions +statement ok +set datafusion.execution.target_partitions = 3; + +# automatically partition all files over 1 byte +statement ok +set datafusion.optimizer.repartition_file_min_size = 1; + +# Create a table as a data source +statement ok +CREATE TABLE src_table ( + part_key INT, + value INT +) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6); + + +# Setup 3 files, i.e., as many as there are partitions: + +# File 1: +query I +COPY (SELECT * FROM src_table where part_key = 1) +TO 'test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet' +STORED AS PARQUET; +---- +3 + +# File 2: +query I +COPY (SELECT * FROM src_table where part_key = 2) +TO 'test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet' +STORED AS PARQUET; +---- +4 + +# File 3: +query I +COPY (SELECT * FROM src_table where part_key = 3) +TO 'test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet' +STORED AS PARQUET; +---- +3 + +statement ok +CREATE EXTERNAL TABLE test_limit_with_partitions +( + part_key INT, + value INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/test_limit_with_partitions/'; + +query TT +explain +with selection as ( + select * + from test_limit_with_partitions + limit 1 +) +select 1 as foo +from selection +order by part_key +limit 1000; +---- +logical_plan +01)Projection: foo +02)--Sort: selection.part_key ASC NULLS LAST, fetch=1000 +03)----Projection: Int64(1) AS foo, selection.part_key +04)------SubqueryAlias: selection +05)--------Limit: skip=0, fetch=1 +06)----------TableScan: test_limit_with_partitions projection=[part_key], fetch=1 +physical_plan +01)ProjectionExec: expr=[foo@0 as foo] +02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false] +03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key] +04)------GlobalLimitExec: skip=0, fetch=1 +05)--------CoalescePartitionsExec +06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1 + +query I +with selection as ( + select * + from test_limit_with_partitions + limit 1 +) +select 1 as foo +from selection +order by part_key +limit 1000; +---- +1 + +# Tear down test_filter_with_limit table: +statement ok +DROP TABLE test_limit_with_partitions; + +# Tear down src_table table: +statement ok +DROP TABLE src_table;