Skip to content

Commit b5ce20b

Browse files
belieferchenzhx
authored andcommitted
[SPARK-39057][SQL] Offset could work without Limit
### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 415bef2 commit b5ce20b

14 files changed

Lines changed: 538 additions & 160 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
404404

405405
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
406406

407+
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
408+
&& o.children.exists(_.isInstanceOf[Offset]) =>
409+
failAnalysis(
410+
s"""
411+
|The OFFSET clause is allowed in the LIMIT clause or be the outermost node,
412+
|but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
413+
407414
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
408415

409416
case _: Union | _: SetOperation if operator.children.length > 1 =>
@@ -564,7 +571,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
564571
}
565572
}
566573
checkCollectedMetrics(plan)
567-
checkOffsetOperator(plan)
568574
extendedCheckRules.foreach(_(plan))
569575
plan.foreachUp {
570576
case o if !o.resolved =>
@@ -712,30 +718,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
712718
check(plan)
713719
}
714720

715-
/**
716-
* Validate whether the [[Offset]] is valid.
717-
*/
718-
private def checkOffsetOperator(plan: LogicalPlan): Unit = {
719-
plan.foreachUp {
720-
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
721-
&& o.children.exists(_.isInstanceOf[Offset]) =>
722-
failAnalysis(
723-
s"""
724-
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
725-
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
726-
case _ =>
727-
}
728-
plan match {
729-
case Offset(offsetExpr, _) =>
730-
checkLimitLikeClause("offset", offsetExpr)
731-
failAnalysis(
732-
s"""
733-
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
734-
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
735-
case _ =>
736-
}
737-
}
738-
739721
/**
740722
* Validates to make sure the outer references appearing inside the subquery
741723
* are allowed.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
9393
OptimizeWindowFunctions,
9494
CollapseWindow,
9595
CombineFilters,
96-
RewriteOffsets,
96+
EliminateOffsets,
9797
EliminateLimits,
98-
RewriteOffsets,
9998
CombineUnions,
10099
// Constant folding and strength reduction
101100
OptimizeRepartition,
@@ -640,7 +639,7 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
640639
}
641640

642641
/**
643-
* Pushes down [[LocalLimit]] beneath UNION ALL and joins.
642+
* Pushes down [[LocalLimit]] beneath UNION ALL, OFFSET and joins.
644643
*/
645644
object LimitPushDown extends Rule[LogicalPlan] {
646645

@@ -710,6 +709,14 @@ object LimitPushDown extends Rule[LogicalPlan] {
710709
// There is a Project between LocalLimit and Join if they do not have the same output.
711710
case LocalLimit(exp, project @ Project(_, join: Join)) =>
712711
LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, join)))
712+
// Push down limit 1 through Aggregate and turn Aggregate into Project if it is group only.
713+
case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly =>
714+
Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child)))
715+
case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly =>
716+
Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child))))
717+
// Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset.
718+
case LocalLimit(le, Offset(oe, grandChild)) =>
719+
Offset(oe, LocalLimit(Add(le, oe), grandChild))
713720
}
714721
}
715722

@@ -1785,34 +1792,22 @@ object EliminateLimits extends Rule[LogicalPlan] {
17851792
}
17861793

17871794
/**
1788-
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one,
1789-
* merging the expressions into one single expression.
1790-
*/
1791-
object RewriteOffsets extends Rule[LogicalPlan] {
1792-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1793-
case GlobalLimit(le, Offset(oe, grandChild)) =>
1794-
GlobalLimitAndOffset(le, oe, grandChild)
1795-
case LocalLimit(le, Offset(oe, grandChild)) =>
1796-
Offset(oe, LocalLimit(Add(le, oe), grandChild))
1797-
}
1798-
}
1799-
1800-
/**
1801-
* Rewrite [[Offset]] as [[GlobalLimitAndOffset]] or [[LocalLimit]],
1802-
* merging the expressions into one single expression. See [[Limit]] for more information
1803-
* about the difference between [[LocalLimit]] and [[GlobalLimit]].
1795+
* This rule optimizes Offset operators by:
1796+
* 1. Eliminate [[Offset]] operators if offset == 0.
1797+
* 2. Replace [[Offset]] operators to empty [[LocalRelation]]
1798+
* if [[Offset]]'s child max row <= offset.
1799+
* 3. Combines two adjacent [[Offset]] operators into one, merging the
1800+
* expressions into one single expression.
18041801
*/
1805-
object RewriteOffsets extends Rule[LogicalPlan] {
1802+
object EliminateOffsets extends Rule[LogicalPlan] {
18061803
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1807-
case GlobalLimit(le, Offset(oe, grandChild)) =>
1808-
GlobalLimitAndOffset(le, oe, grandChild)
1809-
case localLimit @ LocalLimit(le, Offset(oe, grandChild)) =>
1810-
val offset = oe.eval().asInstanceOf[Int]
1811-
if (offset == 0) {
1812-
localLimit.withNewChildren(Seq(grandChild))
1813-
} else {
1814-
Offset(oe, LocalLimit(Add(le, oe), grandChild))
1815-
}
1804+
case Offset(oe, child) if oe.foldable && oe.eval().asInstanceOf[Int] == 0 =>
1805+
child
1806+
case Offset(oe, child)
1807+
if oe.foldable && child.maxRows.exists(_ <= oe.eval().asInstanceOf[Int]) =>
1808+
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming)
1809+
case Offset(oe1, Offset(oe2, child)) =>
1810+
Offset(Add(oe1, oe2), child)
18161811
}
18171812
}
18181813

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,36 +1245,16 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
12451245
}
12461246

12471247
object LimitAndOffset {
1248-
def unapply(p: GlobalLimitAndOffset): Option[(Expression, Expression, LogicalPlan)] = {
1248+
def unapply(p: GlobalLimit): Option[(Expression, Expression, LogicalPlan)] = {
12491249
p match {
1250-
case GlobalLimitAndOffset(le1, le2, LocalLimit(le3, child)) if le1.eval().asInstanceOf[Int]
1250+
case GlobalLimit(le1, Offset(le2, LocalLimit(le3, child))) if le1.eval().asInstanceOf[Int]
12511251
+ le2.eval().asInstanceOf[Int] == le3.eval().asInstanceOf[Int] =>
12521252
Some((le1, le2, child))
12531253
case _ => None
12541254
}
12551255
}
12561256
}
12571257

1258-
/**
1259-
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and
1260-
* emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a
1261-
* total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining.
1262-
*/
1263-
case class GlobalLimitAndOffset(
1264-
limitExpr: Expression,
1265-
offsetExpr: Expression,
1266-
child: LogicalPlan) extends OrderPreservingUnaryNode {
1267-
override def output: Seq[Attribute] = child.output
1268-
override def maxRows: Option[Long] = {
1269-
limitExpr match {
1270-
case IntegerLiteral(limit) => Some(limit)
1271-
case _ => None
1272-
}
1273-
}
1274-
override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset =
1275-
copy(child = newChild)
1276-
}
1277-
12781258
/**
12791259
* This is similar with [[Limit]] except:
12801260
*

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -574,18 +574,11 @@ class AnalysisErrorSuite extends AnalysisTest {
574574
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
575575
)
576576

577-
errorTest(
578-
"OFFSET clause is outermost node",
579-
testRelation.offset(Literal(10, IntegerType)),
580-
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
581-
" clause is found to be the outermost node." :: Nil
582-
)
583-
584577
errorTest(
585578
"OFFSET clause in other node",
586579
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
587-
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
588-
" clause found in: Filter." :: Nil
580+
"The OFFSET clause is allowed in the LIMIT clause or be the outermost node," +
581+
" but the OFFSET clause found in: Filter." :: Nil
589582
)
590583

591584
errorTest(
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.dsl.expressions._
22+
import org.apache.spark.sql.catalyst.dsl.plans._
23+
import org.apache.spark.sql.catalyst.expressions.{Add, Literal}
24+
import org.apache.spark.sql.catalyst.plans.PlanTest
25+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
26+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
27+
28+
class EliminateOffsetsSuite extends PlanTest {
29+
object Optimize extends RuleExecutor[LogicalPlan] {
30+
val batches =
31+
Batch("Eliminate Offset", FixedPoint(10), EliminateOffsets) :: Nil
32+
}
33+
34+
val testRelation = LocalRelation.fromExternalRows(
35+
Seq("a".attr.int, "b".attr.int, "c".attr.int),
36+
1.to(10).map(_ => Row(1, 2, 3))
37+
)
38+
39+
test("Offsets: eliminate Offset operators if offset == 0") {
40+
val originalQuery =
41+
testRelation
42+
.select($"a")
43+
.offset(0)
44+
45+
val optimized = Optimize.execute(originalQuery.analyze)
46+
val correctAnswer =
47+
testRelation
48+
.select($"a")
49+
.analyze
50+
51+
comparePlans(optimized, correctAnswer)
52+
}
53+
54+
test("Offsets: cannot eliminate Offset operators if offset > 0") {
55+
val originalQuery =
56+
testRelation
57+
.select($"a")
58+
.offset(2)
59+
60+
val optimized = Optimize.execute(originalQuery.analyze)
61+
val correctAnswer =
62+
testRelation
63+
.select($"a")
64+
.offset(2)
65+
.analyze
66+
67+
comparePlans(optimized, correctAnswer)
68+
}
69+
70+
test("Replace Offset operators to empty LocalRelation if child max row <= offset") {
71+
val child = testRelation.select($"a").analyze
72+
val originalQuery = child.offset(10)
73+
74+
val optimized = Optimize.execute(originalQuery.analyze)
75+
val correctAnswer =
76+
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming).analyze
77+
78+
comparePlans(optimized, correctAnswer)
79+
}
80+
81+
test("Cannot replace Offset operators to empty LocalRelation if child max row > offset") {
82+
val child = testRelation.select($"a").analyze
83+
val originalQuery = child.offset(3)
84+
85+
val optimized = Optimize.execute(originalQuery.analyze)
86+
val correctAnswer = originalQuery.analyze
87+
88+
comparePlans(optimized, correctAnswer)
89+
}
90+
91+
test("Combines Offset operators") {
92+
val child = testRelation.select($"a").analyze
93+
val originalQuery = child.offset(2).offset(3)
94+
95+
val optimized = Optimize.execute(originalQuery.analyze)
96+
val correctAnswer = child.offset(Add(Literal(3), Literal(2))).analyze
97+
98+
comparePlans(optimized, correctAnswer)
99+
}
100+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,10 @@ class LimitPushdownSuite extends PlanTest {
239239
Limit(5, LocalLimit(5, x).join(y, LeftOuter, joinCondition).select("x.a".attr)).analyze
240240
comparePlans(optimized, correctAnswer)
241241
}
242+
243+
test("Push down limit 1 through Offset") {
244+
comparePlans(
245+
Optimize.execute(testRelation.offset(2).limit(1).analyze),
246+
GlobalLimit(1, Offset(2, LocalLimit(3, testRelation))).analyze)
247+
}
242248
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,16 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
9292
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
9393
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
9494
TakeOrderedAndProjectExec(
95-
limit, order, child.output, planLater(child), Some(offset)) :: Nil
95+
limit, order, child.output, planLater(child), offset) :: Nil
9696
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
9797
Project(projectList, Sort(order, true, child)))
9898
if limit + offset < conf.topKSortFallbackThreshold =>
9999
TakeOrderedAndProjectExec(
100-
limit, order, projectList, planLater(child), Some(offset)) :: Nil
100+
limit, order, projectList, planLater(child), offset) :: Nil
101101
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
102-
CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil
102+
CollectLimitExec(limit, planLater(child), offset) :: Nil
103+
case logical.Offset(IntegerLiteral(offset), child) =>
104+
CollectLimitExec(child = planLater(child), offset = offset) :: Nil
103105
case Tail(IntegerLiteral(limit), child) =>
104106
CollectTailExec(limit, planLater(child)) :: Nil
105107
case other => planLater(other) :: Nil
@@ -115,20 +117,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
115117
// In this case we generate a physical top-K sorting operator, passing down
116118
// the limit and offset values to be evaluated inline during the physical
117119
// sorting operation for greater efficiency.
118-
case LimitAndOffset(
119-
IntegerLiteral(limit),
120-
IntegerLiteral(offset),
121-
Sort(order, true, child))
122-
if limit + offset < conf.topKSortFallbackThreshold =>
123-
TakeOrderedAndProjectExec(
124-
limit, order, child.output, planLater(child), Some(offset)) :: Nil
125-
case LimitAndOffset(
126-
IntegerLiteral(limit),
127-
IntegerLiteral(offset),
128-
Project(projectList, Sort(order, true, child)))
120+
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
121+
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
122+
TakeOrderedAndProjectExec(
123+
limit, order, child.output, planLater(child), offset) :: Nil
124+
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
125+
Project(projectList, Sort(order, true, child)))
129126
if limit + offset < conf.topKSortFallbackThreshold =>
130-
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), Some(offset)) :: Nil
131-
case _ => Nil
127+
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), offset) :: Nil
128+
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
129+
GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
130+
case _ =>
131+
Nil
132132
}
133133
}
134134

@@ -790,8 +790,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
790790
execution.LocalLimitExec(limit, planLater(child)) :: Nil
791791
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
792792
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
793-
case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
794-
execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
793+
case logical.Offset(IntegerLiteral(offset), child) =>
794+
GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil
795795
case union: logical.Union =>
796796
execution.UnionExec(union.children.map(planLater)) :: Nil
797797
case g @ logical.Generate(generator, _, outer, _, _, child) =>

0 commit comments

Comments
 (0)