Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
child
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use Partitioning instead of RoundRobinPartitioning . Since we already support this SELECT /*+ REPARTITION(5, a) */ * FROM test ORDER BY a.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will change it

distribution: OrderedDistribution) =>
ShuffleExchangeExec(
distribution.createPartitioning(partitioning.numPartitions), child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update like the following, @stczwd ?

-      case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _),
-          distribution: OrderedDistribution) =>
-        ShuffleExchangeExec(
-          distribution.createPartitioning(partitioning.numPartitions), child)
+      case (ShuffleExchangeExec(partitioning, child, _), distribution) =>
+        ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You means this should work in other Partitioning? Let me run some test for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i have change my code

case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(defaultNumPreShufflePartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {
def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
// Range has range partitioning in its output now. To have a range shuffle, we
// need to run a repartition first.
val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc)
// Range has range partitioning in its output now.
Copy link
Contributor

@cloud-fan cloud-fan Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this comment now? it's not useful as we do add shuffle, the range output partitioning doesn't matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey

val data = spark.range(0, n, 1, 10).sort($"id".desc)
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()

// Compute histogram for the number of records per partition post sort
Expand All @@ -55,12 +54,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)
assert(computeChiSquareTest() < 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the physical plan is same as before, what caused this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not same, we had two shuffles before, one was RoundRobinPartitioning, the other was RangePartitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see


withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") {
// If we only sample one point, the range boundaries will be pretty bad and the
// chi-sq value would be very high.
assert(computeChiSquareTest() > 300)
assert(computeChiSquareTest() > 100)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,24 @@ class PlannerSuite extends SharedSparkSession {
}
}

test("SPARK-30036: EnsureRequirements replace Exchange " +
"if child has SortExec and RoundRobinPartitioning") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just saying Remove unnecessary RoundRobinPartitioning in the test title? Also, can you make the PR title obivious, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good for test title, thanks.
But it is not suitable for PR title, there are other situations in this titile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about Avoid RoundRobinPartitioning that EnsureRequirements Redundantly adds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because HashPartitioning should also be concerned.

val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = RoundRobinPartitioning(5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.find{ -> .find {.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case e: ShuffleExchangeExec => e.outputPartitioning.isInstanceOf[RoundRobinPartitioning]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- case e: ShuffleExchangeExec => e.outputPartitioning.isInstanceOf[RoundRobinPartitioning]
+ case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good

case _ => false}.isEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

...find {
  case ...
  case ...
}.isEmpty

"RoundRobinPartitioning should be changed to RangePartitioning")
}

test("EnsureRequirements does not eliminate Exchange with different partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val partitioning = HashPartitioning(Literal(2) :: Nil, 5)
Expand Down