Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
operatorOptimizationBatch) :+
Batch("Join Reorder", Once,
CostBasedJoinReorder) :+
Batch("Remove Redundant Sorts", Once,
RemoveRedundantSorts) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
Expand Down Expand Up @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
}
}

/**
* Removes Sort operations on already sorted data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Removes Sort operation if the child is already sorted?

*/
object RemoveRedundantSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Sort(orders, true, child) if child.outputOrdering.nonEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.outputOrdering.nonEmpty looks like unnecessary

&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
Copy link
Contributor

@cloud-fan cloud-fan Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do it after planning as we already have SparkPlan.outputOrdering?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah they are different. This is global ordering,

child
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might not want to do it in this PR, but you could easily remove another simple kind of redundant sort, e.g.:

rel.orderBy('a.desc).orderBy('a.asc)

(and I think that orderBy is not stable, so any two consecutive orderBy operators are redundant).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. Probably we can do this in other PR. May you open a JIRA for this? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good follow-up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed SPARK-23973 for this

}

/**
* Removes filters that can be evaluated trivially. This can be done through the following ways:
* 1) by eliding the filter for cases where it will always evaluate to `true`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ abstract class LogicalPlan
* Refreshes (or invalidates) any metadata/data cached in the plan recursively.
*/
def refresh(): Unit = children.foreach(_.refresh())

/**
* Returns the output ordering that this plan generates.
*/
def outputOrdering: Seq[SortOrder] = Nil
}

/**
Expand Down Expand Up @@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan {

override final def children: Seq[LogicalPlan] = Seq(left, right)
}

abstract class KeepOrderUnaryNode extends UnaryNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrderPreservingUnaryNode? Or perhaps do you think this would be better modeled as a mixin trait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion. I'd love to hear also @cloud-fan's and @wzhfy's opinion on this in order to choose all together the best name for it. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrderPreservingUnaryNode sounds better.

It only makes sense for unary node, so I don't think mixin trait is a good idea.

override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
* This node is inserted at the top of a subquery when it is optimized. This makes sure we can
* recognize a subquery as such, and it allows us to write subquery aware transformations.
*/
case class Subquery(child: LogicalPlan) extends UnaryNode {
case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode {
override def output: Seq[Attribute] = child.output
}

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like ProjectExec.outputOrdering, we can propagate ordering for aliased attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I don't fully understand what you mean. In ProjectExec.outputOrdering we are getting the child.outputOrdering exactly as it is done here.

extends KeepOrderUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def maxRows: Option[Long] = child.maxRows

Expand Down Expand Up @@ -125,7 +126,7 @@ case class Generate(
}

case class Filter(condition: Expression, child: LogicalPlan)
extends UnaryNode with PredicateHelper {
extends KeepOrderUnaryNode with PredicateHelper {
override def output: Seq[Attribute] = child.output

override def maxRows: Option[Long] = child.maxRows
Expand Down Expand Up @@ -469,6 +470,7 @@ case class Sort(
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = child.maxRows
override def outputOrdering: Seq[SortOrder] = order
}

/** Factory for constructing new `Range` nodes. */
Expand Down Expand Up @@ -522,6 +524,8 @@ case class Range(
override def computeStats(): Statistics = {
Statistics(sizeInBytes = LongType.defaultSize * numElements)
}

override def outputOrdering: Seq[SortOrder] = output.map(a => SortOrder(a, Descending))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordering is the same when step in Range is positive or negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, thanks! I missed it!

}

case class Aggregate(
Expand Down Expand Up @@ -728,7 +732,7 @@ object Limit {
*
* See [[Limit]] for more information.
*/
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
limitExpr match {
Expand All @@ -744,7 +748,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
*
* See [[Limit]] for more information.
*/
case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode {
override def output: Seq[Attribute] = child.output

override def maxRowsPerPartition: Option[Long] = {
Expand All @@ -764,7 +768,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
case class SubqueryAlias(
alias: String,
child: LogicalPlan)
extends UnaryNode {
extends KeepOrderUnaryNode {

override def doCanonicalize(): LogicalPlan = child.canonicalized

Expand Down Expand Up @@ -867,6 +871,11 @@ case class RepartitionByExpression(

override def maxRows: Option[Long] = child.maxRows
override def shuffle: Boolean = true

override def outputOrdering: Seq[SortOrder] = partitioning match {
case RangePartitioning(ordering, _) => ordering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RangePartitioning doesn't guarantee ordering inside partition, we can't do this.

case _ => Nil
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL}

class RemoveRedundantSortsSuite extends PlanTest {
override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
val analyzer = new Analyzer(catalog, conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't use ordinal number, we can remove these.


object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Remove Redundant Sorts", Once,
RemoveRedundantSorts) ::
Batch("Collapse Project", Once,
CollapseProject) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("remove redundant order by") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst)
val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use unnecessaryReordered.analyze?

val correctAnswer = analyzer.execute(orderedPlan.select('a))
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("do not remove sort if the order is different") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc)
val optimized = Optimize.execute(analyzer.execute(reorderedDifferently))
val correctAnswer = analyzer.execute(reorderedDifferently)
comparePlans(optimized, correctAnswer)
}

test("filters don't affect order") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc)
val optimized = Optimize.execute(analyzer.execute(filteredAndReordered))
val correctAnswer = analyzer.execute(orderedPlan.where('a > Literal(10)))
comparePlans(optimized, correctAnswer)
}

test("limits don't affect order") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc)
val optimized = Optimize.execute(analyzer.execute(filteredAndReordered))
val correctAnswer = analyzer.execute(orderedPlan.limit(Literal(10)))
comparePlans(optimized, correctAnswer)
}

test("range is already sorted") {
val inputPlan = Range(1L, 1000L, 1, 10)
val orderedPlan = inputPlan.orderBy('id.desc)
val optimized = Optimize.execute(analyzer.execute(orderedPlan))
val correctAnswer = analyzer.execute(inputPlan)
comparePlans(optimized, correctAnswer)
}

test("sort should not be removed when there is a node which doesn't guarantee any order") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)
val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)
val optimized = Optimize.execute(analyzer.execute(groupedAndResorted))
val correctAnswer = analyzer.execute(groupedAndResorted)
comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ case class LogicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
outputPartitioning: Partitioning = UnknownPartitioning(0),
outputOrdering: Seq[SortOrder] = Nil,
override val outputOrdering: Seq[SortOrder] = Nil,
override val isStreaming: Boolean = false)(session: SparkSession)
extends LeafNode with MultiInstanceRelation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,6 @@ case class InMemoryRelation(

override protected def otherCopyArgs: Seq[AnyRef] =
Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in SparkPlan

/** Specifies how data is ordered in each partition. */
def outputOrdering: Seq[SortOrder] = Nil

So we can't do this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should carry the logical ordering from the cached logical plan when building the InMemoryRelation

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange,
ShuffleExchangeExec}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a unnecessary change. We don't have length limit for imports

import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext {
assert(planned.child.isInstanceOf[CollectLimitExec])
}

test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") {
val query = testData.select('key, 'value).sort('key.desc).cache()
assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation])
val resorted = query.sort('key.desc)
assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty)
assert(resorted.select('key).collect().map(_.getInt(0)).toSeq ==
(1 to 100).sorted(Ordering[Int].reverse))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1 to 100).reverse?

// with a different order, the sort is needed
val sortedAsc = query.sort('key)
assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.nonEmpty -> .size == 1

assert(sortedAsc.select('key).collect().map(_.getInt(0)).toSeq == (1 to 100))
}

test("PartitioningCollection") {
withTempView("normal", "small", "tiny") {
testData.createOrReplaceTempView("normal")
Expand Down