diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 94667fbd00c1..b2f94cae2dfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -143,17 +143,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { // also sort the input partitions according to their partition key order. This ensures // a canonical order from both sides of a bucketed join, for example. val partitionDataTypes = expressions.map(_.dataType) - val partitionOrdering: Ordering[(InternalRow, InputPartition)] = { - RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1) - } - val sortedKeyToPartitions = results.sorted(partitionOrdering) - val groupedPartitions = sortedKeyToPartitions + val rowOrdering = RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) + val sortedKeyToPartitions = results.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1)) + val sortedGroupedPartitions = sortedKeyToPartitions .map(t => (InternalRowComparableWrapper(t._1, expressions), t._2)) .groupBy(_._1) .toSeq .map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) } + .sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value)) - Some(KeyGroupedPartitionInfo(groupedPartitions, sortedKeyToPartitions.map(_._2))) + Some(KeyGroupedPartitionInfo(sortedGroupedPartitions, sortedKeyToPartitions.map(_._2))) } } }