Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {

case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""
|The OFFSET clause is allowed in the LIMIT clause or be the outermost node,
|but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))

case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)

case _: Union | _: SetOperation if operator.children.length > 1 =>
Expand Down Expand Up @@ -608,7 +615,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
}
checkCollectedMetrics(plan)
checkOffsetOperator(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -851,30 +857,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
check(plan)
}

/**
* Validate whether the [[Offset]] is valid.
*/
private def checkOffsetOperator(plan: LogicalPlan): Unit = {
plan.foreachUp {
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
case _ =>
}
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
case _ =>
}
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,14 +1871,12 @@ object EliminateLimits extends Rule[LogicalPlan] {
}

/**
* Rewrite [[Offset]] as [[GlobalLimitAndOffset]] or [[LocalLimit]],
* merging the expressions into one single expression. See [[Limit]] for more information
* about the difference between [[LocalLimit]] and [[GlobalLimit]].
* Rewrite [[Offset]] by eliminate [[Offset]] or merge offset value and limit value into
* [[LocalLimit]]. See [[Limit]] for more information about the difference between
* [[LocalLimit]] and [[GlobalLimit]].
*/
object RewriteOffsets extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case GlobalLimit(le, Offset(oe, grandChild)) =>
GlobalLimitAndOffset(le, oe, grandChild)
case localLimit @ LocalLimit(le, Offset(oe, grandChild)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this logic to LimitPushDown? then we don't need this rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val offset = oe.eval().asInstanceOf[Int]
if (offset == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1304,36 +1304,16 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
}

object LimitAndOffset {
def unapply(p: GlobalLimitAndOffset): Option[(Expression, Expression, LogicalPlan)] = {
def unapply(p: GlobalLimit): Option[(Expression, Expression, LogicalPlan)] = {
p match {
case GlobalLimitAndOffset(le1, le2, LocalLimit(le3, child)) if le1.eval().asInstanceOf[Int]
case GlobalLimit(le1, Offset(le2, LocalLimit(le3, child))) if le1.eval().asInstanceOf[Int]
+ le2.eval().asInstanceOf[Int] == le3.eval().asInstanceOf[Int] =>
Some((le1, le2, child))
case _ => None
}
}
}

/**
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and
* emit at most `limitExpr` number in total. For example, if we have LIMIT 10 OFFSET 5, we impose a
* total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 10 rows remaining.
*/
case class GlobalLimitAndOffset(
limitExpr: Expression,
offsetExpr: Expression,
child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
limitExpr match {
case IntegerLiteral(limit) => Some(limit)
case _ => None
}
}
override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimitAndOffset =
copy(child = newChild)
}

/**
* This is similar with [[Limit]] except:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,18 +556,11 @@ class AnalysisErrorSuite extends AnalysisTest {
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"OFFSET clause is outermost node",
testRelation.offset(Literal(10, IntegerType)),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause is found to be the outermost node." :: Nil
)

errorTest(
"OFFSET clause in other node",
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause found in: Filter." :: Nil
"The OFFSET clause is allowed in the LIMIT clause or be the outermost node," +
" but the OFFSET clause found in: Filter." :: Nil
)

errorTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,17 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
CollectLimitExec(limit, planLater(child)) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), Some(offset)) :: Nil
TakeOrderedAndProjectExec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new indentation is incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm dazzled

limit, order, child.output, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child), Some(offset)) :: Nil
limit, order, projectList, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
CollectLimitExec(limit, planLater(child), Some(offset)) :: Nil
CollectLimitExec(limit, planLater(child), offset) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
CollectLimitExec(child = planLater(child), offset = offset) :: Nil
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
Expand All @@ -116,20 +118,20 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// In this case we generate a physical top-K sorting operator, passing down
// the limit and offset values to be evaluated inline during the physical
// sorting operation for greater efficiency.
case LimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Sort(order, true, child))
if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), Some(offset)) :: Nil
case LimitAndOffset(
IntegerLiteral(limit),
IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), Some(offset)) :: Nil
case _ => Nil
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil
case _ =>
Nil
}
}

Expand Down Expand Up @@ -818,8 +820,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
execution.GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
case union: logical.Union =>
execution.UnionExec(union.children.map(planLater)) :: Nil
case g @ logical.Generate(generator, _, outer, _, _, child) =>
Expand Down
Loading