Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates ShuffleExchangeExec.canChangeNumPartitions to not coalesce for SinglePartition.

Why are the changes needed?

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires AllTuples and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to #28916, this change also fix some perf problems.

Does this PR introduce any user-facing change?

no

How was this patch tested?

updated test.

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Jul 30, 2020

Test build #126806 has finished for PR 29307 at commit 0e8dd18.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// If users specify the num partitions via APIs like `repartition`, we shouldn't change it.
// For `SinglePartition`, it requires exactly one partition and we can't change it either.
override def canChangeNumPartitions: Boolean =
!isUserSpecifiedNumPartitions && outputPartitioning != SinglePartition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have test for SinglePartition case ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is for future-proof. It doesn't change anything. When there is a global aggregate, there will always be data in the final partition and we can't coalesce to 0 partitions anyway.

def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.shuffle.canChangeNumPartitions
s.shuffle.canChangeNumPartitions && s.mapStats.isDefined
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This skips 0 partitions case, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, otherwise we will hit

val splitPoints = if (numMappers == 0) {
  Seq.empty
} else ...

which creates a local reader with 0 partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to turn the if into assert, but I'd like to only do it in master to be safe.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This idea looks good. 0 partitions sounds a bit risky, and it's nice to avoid creating such edge case.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126859 has finished for PR 29307 at commit 2ae792b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Jul 31, 2020

thanks for the review, merging to master!

@cloud-fan cloud-fan closed this in 1c6dff7 Jul 31, 2020
@abellina
Copy link
Contributor

@cloud-fan @viirya should this change go into 3.0.1 as well?

@cloud-fan
Copy link
Contributor Author

@abellina yes, I'm working on the backport PR (need to fix some conflicts)

@abellina
Copy link
Contributor

Thanks @cloud-fan !!

cloud-fan added a commit to cloud-fan/spark that referenced this pull request Jul 31, 2020
This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to apache#28916, this change also fix some perf problems.

no

updated test.

Closes apache#29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit to cloud-fan/spark that referenced this pull request Aug 3, 2020
This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to apache#28916, this change also fix some perf problems.

no

updated test.

Closes apache#29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit that referenced this pull request Aug 3, 2020
…Partition

This is a partial backport of #29307

Most of the changes are not needed because #28226 is in master only.

This PR only backports the safeguard in `ShuffleExchangeExec.canChangeNumPartitions`

Closes #29321 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants