Skip to content

Commit 1d7ffaa

Browse files
committed
Somewhat hacky fix for descending sorts
1 parent 08701e7 commit 1d7ffaa

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.Row
2828
import org.apache.spark.sql.catalyst.plans.physical._
2929
import org.apache.spark.util.collection.ExternalSorter
30+
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
3031
import org.apache.spark.util.{CompletionIterator, MutablePair}
3132
import org.apache.spark.{HashPartitioner, SparkEnv}
3233

@@ -274,7 +275,18 @@ case class UnsafeExternalSort(
274275
def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
275276
val ordering = newOrdering(sortOrder, child.output)
276277
val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
277-
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
278+
// Hack until we generate separate comparator implementations for ascending vs. descending
279+
// (or choose to codegen them):
280+
val prefixComparator = {
281+
val comp = SortPrefixUtils.getPrefixComparator(boundSortExpression)
282+
if (sortOrder.head.direction == Descending) {
283+
new PrefixComparator {
284+
override def compare(p1: Long, p2: Long): Int = -1 * comp.compare(p1, p2)
285+
}
286+
} else {
287+
comp
288+
}
289+
}
278290
val prefixComputer = {
279291
val prefixComputer = SortPrefixUtils.getPrefixComputer(boundSortExpression)
280292
new UnsafeExternalRowSorter.PrefixComputer {

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
3838

3939
// Test sorting on different data types
4040
for (
41-
dataType <- DataTypeTestUtils.atomicTypes; // Disable null type for now due to bug in SqlSerializer2 ++ Set(NullType);
41+
dataType <- DataTypeTestUtils.atomicTypes // Disable null type for now due to bug in SqlSerializer2 ++ Set(NullType);
42+
if !dataType.isInstanceOf[DecimalType]; // Since we don't have an unsafe representation for decimals
4243
nullable <- Seq(true, false);
43-
sortOrder <- Seq('a.asc :: Nil);
44+
sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
4445
randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
4546
) {
4647
test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {

0 commit comments

Comments
 (0)