Skip to content

Commit 67fd737

Browse files
committed
init
1 parent f80fe21 commit 67fd737

3 files changed

Lines changed: 38 additions & 8 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
8585
OptimizeWindowFunctions,
8686
CollapseWindow,
8787
CombineFilters,
88-
CombineLimits,
88+
EliminateLimits,
8989
CombineUnions,
9090
// Constant folding and strength reduction
9191
TransposeWindow,
@@ -1452,11 +1452,23 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
14521452
}
14531453

14541454
/**
1455-
* Combines two adjacent [[Limit]] operators into one, merging the
1456-
* expressions into one single expression.
1455+
* 1. Eliminate [[Limit]] operators if it's child max row <= limit.
1456+
* 2. Combines two adjacent [[Limit]] operators into one, merging the
1457+
* expressions into one single expression.
14571458
*/
1458-
object CombineLimits extends Rule[LogicalPlan] {
1459-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1459+
object EliminateLimits extends Rule[LogicalPlan] {
1460+
private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): Boolean = {
1461+
limitExpr.foldable &&
1462+
childMaxRow.isDefined &&
1463+
childMaxRow.get <= limitExpr.eval().toString.toInt
1464+
}
1465+
1466+
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
1467+
case GlobalLimit(l, child) if canEliminate(l, child.maxRows) =>
1468+
child
1469+
case LocalLimit(l, child) if canEliminate(l, child.maxRows) =>
1470+
child
1471+
14601472
case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
14611473
GlobalLimit(Least(Seq(ne, le)), grandChild)
14621474
case LocalLimit(le, LocalLimit(ne, grandChild)) =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ class CombiningLimitsSuite extends PlanTest {
3030
Batch("Column Pruning", FixedPoint(100),
3131
ColumnPruning,
3232
RemoveNoopOperators) ::
33-
Batch("Combine Limit", FixedPoint(10),
34-
CombineLimits) ::
33+
Batch("Eliminate Limit", FixedPoint(10),
34+
EliminateLimits) ::
3535
Batch("Constant Folding", FixedPoint(10),
3636
NullPropagation,
3737
ConstantFolding,
@@ -90,4 +90,22 @@ class CombiningLimitsSuite extends PlanTest {
9090

9191
comparePlans(optimized, correctAnswer)
9292
}
93+
94+
test("SPARK-33442: Change Combine Limit to Eliminate limit using max row") {
95+
// test child max row <= limit.
96+
val query1 = testRelation.select().groupBy()(count(1)).limit(1).analyze
97+
val optimized1 = Optimize.execute(query1)
98+
val expected1 = testRelation.select().groupBy()(count(1)).analyze
99+
comparePlans(optimized1, expected1)
100+
101+
// test child max row > limit.
102+
val query2 = testRelation.select().groupBy()(count(1)).limit(0).analyze
103+
val optimized2 = Optimize.execute(query2)
104+
comparePlans(optimized2, query2)
105+
106+
// test child max row is none
107+
val query3 = testRelation.select(Symbol("a")).limit(1).analyze
108+
val optimized3 = Optimize.execute(query3)
109+
comparePlans(optimized3, query3)
110+
}
93111
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
3333
EliminateSubqueryAliases) ::
3434
Batch("Limit pushdown", FixedPoint(100),
3535
LimitPushDown,
36-
CombineLimits,
36+
EliminateLimits,
3737
ConstantFolding,
3838
BooleanSimplification) :: Nil
3939
}

0 commit comments

Comments
 (0)