Skip to content

Commit 19dc69d

Browse files
ueshinJoshRosen
authored andcommitted
[SPARK-12976][SQL] Add LazilyGenerateOrdering and use it for RangePartitioner of Exchange.
Add `LazilyGenerateOrdering` to support generated ordering for `RangePartitioner` of `Exchange` instead of `InterpretedOrdering`. Author: Takuya UESHIN <[email protected]> Closes #10894 from ueshin/issues/SPARK-12976.
1 parent 00c72d2 commit 19dc69d

3 files changed

Lines changed: 42 additions & 8 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.codegen
1919

20+
import java.io.ObjectInputStream
21+
2022
import org.apache.spark.Logging
2123
import org.apache.spark.sql.catalyst.InternalRow
2224
import org.apache.spark.sql.catalyst.expressions._
2325
import org.apache.spark.sql.types.StructType
26+
import org.apache.spark.util.Utils
2427

2528
/**
2629
* Inherits some default implementation for Java from `Ordering[Row]`
@@ -138,3 +141,37 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
138141
CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
139142
}
140143
}
144+
145+
/**
146+
* A lazily generated row ordering comparator.
147+
*/
148+
class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
149+
150+
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
151+
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
152+
153+
@transient
154+
private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
155+
156+
def compare(a: InternalRow, b: InternalRow): Int = {
157+
generatedOrdering.compare(a, b)
158+
}
159+
160+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
161+
in.defaultReadObject()
162+
generatedOrdering = GenerateOrdering.generate(ordering)
163+
}
164+
}
165+
166+
object LazilyGeneratedOrdering {
167+
168+
/**
169+
* Creates a [[LazilyGeneratedOrdering]] for the given schema, in natural ascending order.
170+
*/
171+
def forSchema(schema: StructType): LazilyGeneratedOrdering = {
172+
new LazilyGeneratedOrdering(schema.zipWithIndex.map {
173+
case (field, ordinal) =>
174+
SortOrder(BoundReference(ordinal, field.dataType, nullable = true), Ascending)
175+
})
176+
}
177+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext
2828
import org.apache.spark.sql.catalyst.InternalRow
2929
import org.apache.spark.sql.catalyst.errors.attachTree
3030
import org.apache.spark.sql.catalyst.expressions._
31+
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
3132
import org.apache.spark.sql.catalyst.plans.physical._
3233
import org.apache.spark.sql.catalyst.rules.Rule
3334
import org.apache.spark.util.MutablePair
@@ -206,10 +207,7 @@ object Exchange {
206207
val mutablePair = new MutablePair[InternalRow, Null]()
207208
iter.map(row => mutablePair.update(row.copy(), null))
208209
}
209-
// We need to use an interpreted ordering here because generated orderings cannot be
210-
// serialized and this ordering needs to be created on the driver in order to be passed into
211-
// Spark core code.
212-
implicit val ordering = new InterpretedOrdering(sortingExpressions, outputAttributes)
210+
implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
213211
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
214212
case SinglePartition =>
215213
new Partitioner {

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
2121
import org.apache.spark.serializer.Serializer
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions._
24+
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
2425
import org.apache.spark.sql.catalyst.plans.physical._
2526

2627

@@ -88,11 +89,8 @@ case class TakeOrderedAndProject(
8889

8990
override def outputPartitioning: Partitioning = SinglePartition
9091

91-
// We need to use an interpreted ordering here because generated orderings cannot be serialized
92-
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
93-
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
94-
9592
override def executeCollect(): Array[InternalRow] = {
93+
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
9694
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
9795
if (projectList.isDefined) {
9896
val proj = UnsafeProjection.create(projectList.get, child.output)
@@ -105,6 +103,7 @@ case class TakeOrderedAndProject(
105103
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
106104

107105
protected override def doExecute(): RDD[InternalRow] = {
106+
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
108107
val localTopK: RDD[InternalRow] = {
109108
child.execute().map(_.copy()).mapPartitions { iter =>
110109
org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)

0 commit comments

Comments
 (0)