Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,28 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
case _ => false
}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to add a new Partitioning ?

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoalescedHashPartitioning can satisfy the ClusteredDistribution because a skew join may match the case which contains Aggregation (non-skew side). UnknownPartitioning cannot satisfy ClusteredDistribution and add an additional shuffle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JkSelf I will provide another approach that removes this CoalescedHashPartitioning and simplify the code. But current implementation with CoalescedHashPartitioning might be more general for more cases.

* With AE, multiple partitions in hash partitioned output could be coalesced
* to a single partition. CoalescedHashPartitioning is designed for such case.
*/
case class CoalescedHashPartitioning(
expressions: Seq[Expression],
numPartitions: Int)
extends Expression with Partitioning with Unevaluable {

override def children: Seq[Expression] = expressions
override def nullable: Boolean = false
override def dataType: DataType = IntegerType

override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case ClusteredDistribution(requiredClustering, requiredNumPartitions) =>
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) &&
(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions)
case _ => false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{CoalescedHashPartitioning, HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -65,6 +65,14 @@ case class CustomShuffleReaderExec private(
case _ =>
throw new IllegalStateException("operating on canonicalization plan")
}
} else if (partitionSpecs.nonEmpty &&
partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec])) {
child match {
case ShuffleQueryStageExec(_, ShuffleExchangeExec(p: HashPartitioning, _, _)) =>
CoalescedHashPartitioning(p.expressions, partitionSpecs.size)
case _ =>
UnknownPartitioning(partitionSpecs.length)
}
} else {
UnknownPartitioning(partitionSpecs.length)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ object OptimizeLocalShuffleReader {
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.shuffle.canChangeNumPartitions
// This CustomShuffleReaderExec used in skew side, its numPartitions increased.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the rule of OptimizeLocalShuffleReader is disabled when enable the rule of OptimizedSkwedJoin rule ?

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. In this more general skew join handling, we can match more patterns. For example, we can handle skew join like https://user-images.githubusercontent.com/1853780/87743215-01e9e780-c81b-11ea-97d9-f274b379912e.png. The number partitions of CustomShuffleReader in the the BCJ (changed from SMJ by AE) after OptimizeLocalShuffleReader is not equals to the anther side. So simply, I disable createLocalReader.

case CustomShuffleReaderExec(_, partitionSpecs)
if partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) => false
// This CustomShuffleReaderExec used in non-skew side, its numPartitions equals to
// the skew side CustomShuffleReaderExec.
case CustomShuffleReaderExec(_, partitionSpecs) if partitionSpecs.size > 1 &&
partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec]) &&
partitionSpecs.toSet.size != partitionSpecs.size => false
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import org.apache.commons.io.FileUtils

import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -130,20 +131,61 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
}
}

private def canSplitLeftSide(joinType: JoinType) = {
joinType == Inner || joinType == Cross || joinType == LeftSemi ||
joinType == LeftAnti || joinType == LeftOuter
private def canSplitLeftSide(joinType: JoinType, plan: SparkPlan) = {
(joinType == Inner || joinType == Cross || joinType == LeftSemi ||
joinType == LeftAnti || joinType == LeftOuter) && canBypass(plan)
}

private def canSplitRightSide(joinType: JoinType) = {
joinType == Inner || joinType == Cross || joinType == RightOuter
private def canSplitRightSide(joinType: JoinType, plan: SparkPlan) = {
(joinType == Inner || joinType == Cross ||
joinType == RightOuter) && canBypass(plan)
}

// Bypass the node which its requiredChildDistribution contains [[UnspecifiedDistribution]]
private def canBypass(plan: SparkPlan) = {
val nodesCanBypass = plan.find {
case p: SparkPlan if p.requiredChildDistribution.exists {
case UnspecifiedDistribution => true
case _ => false
} => false // false means we bypass this node
case _ @ BypassTerminator() => true // terminate traverse
case _ => true // get the node which cannot bypass
}
nodesCanBypass.exists {
case _ @ BypassTerminator() => true
case _ => false
}
}

private object BypassTerminator {
def unapply(plan: SparkPlan): Boolean = plan match {
case _: ShuffleQueryStageExec => true
case _: CustomShuffleReaderExec => true
case _: Exchange => true
case _ => false
}
}

private def getSizeInfo(medianSize: Long, sizes: Seq[Long]): String = {
s"median size: $medianSize, max size: ${sizes.max}, min size: ${sizes.min}, avg size: " +
sizes.sum / sizes.length
}

private def findShuffleStage(plan: SparkPlan): Option[ShuffleStageInfo] = {
plan collectFirst {
case _ @ ShuffleStage(shuffleStageInfo) =>
shuffleStageInfo
}
}

private def replaceSkewedShufleReader(
smj: SparkPlan, newCtm: CustomShuffleReaderExec): SparkPlan = {
smj transformUp {
case _ @ CustomShuffleReaderExec(child, _) if child.sameResult(newCtm.child) =>
newCtm
}
}

/*
* This method aim to optimize the skewed join with the following steps:
* 1. Check whether the shuffle partition is skewed based on the median size
Expand All @@ -157,96 +199,106 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
* 3 tasks separately.
*/
def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
case smj @ SortMergeJoinExec(_, _, joinType, _,
s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
case smj @ SortMergeJoinExec(_, _, joinType, _, s1: SortExec, s2: SortExec, _)
if supportedJoinTypes.contains(joinType) =>
assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
val numPartitions = left.partitionsWithSizes.length
// We use the median size of the original shuffle partitions to detect skewed partitions.
val leftMedSize = medianSize(left.mapStats)
val rightMedSize = medianSize(right.mapStats)
logDebug(
s"""
|Optimizing skewed join.
|Left side partitions size info:
|${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
|Right side partitions size info:
|${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
""".stripMargin)
val canSplitLeft = canSplitLeftSide(joinType)
val canSplitRight = canSplitRightSide(joinType)
// We use the actual partition sizes (may be coalesced) to calculate target size, so that
// the final data distribution is even (coalesced partitions + split partitions).
val leftActualSizes = left.partitionsWithSizes.map(_._2)
val rightActualSizes = right.partitionsWithSizes.map(_._2)
val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
val rightTargetSize = targetSize(rightActualSizes, rightMedSize)

val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
var numSkewedLeft = 0
var numSkewedRight = 0
for (partitionIndex <- 0 until numPartitions) {
val leftActualSize = leftActualSizes(partitionIndex)
val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex

val rightActualSize = rightActualSizes(partitionIndex)
val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex

// A skewed partition should never be coalesced, but skip it here just to be safe.
val leftParts = if (isLeftSkew && !isLeftCoalesced) {
val reducerId = leftPartSpec.startReducerIndex
val skewSpecs = createSkewPartitionSpecs(
left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Left side partition $partitionIndex " +
s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
s"split it into ${skewSpecs.get.length} parts.")
numSkewedLeft += 1
// find the shuffleStage from the plan tree
val leftOpt = findShuffleStage(s1)
val rightOpt = findShuffleStage(s2)
if (leftOpt.isEmpty || rightOpt.isEmpty) {
smj
} else {
val left = leftOpt.get
val right = rightOpt.get
assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
val numPartitions = left.partitionsWithSizes.length
// We use the median size of the original shuffle partitions to detect skewed partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is very hard to reason about. We need to clearly define:

  1. what nodes can appear between the shuffle stage and SMJ. As we discussed before, Agg can't appear at the skew side.
  2. how to estimate the size? Since there are nodes in the middle, the stats of the shuffle stage may not be accurate for the final join child. (e.g. Filter in the middle)

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. what nodes can appear between the shuffle stage and SMJ. As we discussed before, Agg can't appear at the skew side.

In the canSplitLeftSide and canSplitRightSide, I added a allUnspecifiedDistribution(plan) check. Current we only support the nodes with UnspecifiedDistribution.

  1. how to estimate the size? Since there are nodes in the middle, the stats of the shuffle stage may not be accurate for the final join child. (e.g. Filter in the middle)

Filter should be pushdown to leaf, I didn't see this user case. Project may be a command case in the middle? Yes. the input size of shuffle stage may not be accurate. But the disadvantage is launching more tasks. I think the benefit from handling the skewing is more important than the disadvantage.

val leftMedSize = medianSize(left.mapStats)
val rightMedSize = medianSize(right.mapStats)
logDebug(
s"""
|Optimizing skewed join.
|Left side partitions size info:
|${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}

|Right side partitio

|${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
""".stripMargin)
val canSplitLeft = canSplitLeftSide(joinType, s1)
val canSplitRight = canSplitRightSide(joinType, s2)
// We use the actual partition sizes (may be coalesced) to calculate target size, so that
// the final data distribution is even (coalesced partitions + split partitions).
val leftActualSizes = left.partitionsWithSizes.map(_._2)
val rightActualSizes = right.partitionsWithSizes.map(_._2)
val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
val rightTargetSize = targetSize(rightActualSizes, rightMedSize)

val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
var numSkewedLeft = 0
var numSkewedRight = 0
for (partitionIndex <- 0 until numPartitions) {
val leftActualSize = leftActualSizes(partitionIndex)
val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex

val rightActualSize = rightActualSizes(partitionIndex)
val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex

// A skewed partition should never be coalesced, but skip it here just to be safe.
val leftParts = if (isLeftSkew && !isLeftCoalesced) {
val reducerId = leftPartSpec.startReducerIndex
val skewSpecs = createSkewPartitionSpecs(
left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Left side partition $partitionIndex " +
s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
s"split it into ${skewSpecs.get.length} parts.")
numSkewedLeft += 1
}
skewSpecs.getOrElse(Seq(leftPartSpec))
} else {
Seq(leftPartSpec)
}
skewSpecs.getOrElse(Seq(leftPartSpec))
} else {
Seq(leftPartSpec)
}

// A skewed partition should never be coalesced, but skip it here just to be safe.
val rightParts = if (isRightSkew && !isRightCoalesced) {
val reducerId = rightPartSpec.startReducerIndex
val skewSpecs = createSkewPartitionSpecs(
right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Right side partition $partitionIndex " +
s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
s"split it into ${skewSpecs.get.length} parts.")
numSkewedRight += 1
// A skewed partition should never be coalesced, but skip it here just to be safe.
val rightParts = if (isRightSkew && !isRightCoalesced) {
val reducerId = rightPartSpec.startReducerIndex
val skewSpecs = createSkewPartitionSpecs(
right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Right side partition $partitionIndex " +
s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
s"split it into ${skewSpecs.get.length} parts.")
numSkewedRight += 1
}
skewSpecs.getOrElse(Seq(rightPartSpec))
} else {
Seq(rightPartSpec)
}
skewSpecs.getOrElse(Seq(rightPartSpec))
} else {
Seq(rightPartSpec)
}

for {
leftSidePartition <- leftParts
rightSidePartition <- rightParts
} {
leftSidePartitions += leftSidePartition
rightSidePartitions += rightSidePartition
for {
leftSidePartition <- leftParts
rightSidePartition <- rightParts
} {
leftSidePartitions += leftSidePartition
rightSidePartitions += rightSidePartition
}
}
}

logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
if (numSkewedLeft > 0 || numSkewedRight > 0) {
val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq)
val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)
smj.copy(
left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
} else {
smj
logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
if (numSkewedLeft > 0 || numSkewedRight > 0) {
val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq)
val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)
val newSmj = replaceSkewedShufleReader(
replaceSkewedShufleReader(smj, newLeft), newRight).asInstanceOf[SortMergeJoinExec]
newSmj.copy(isSkewJoin = true)
} else {
smj
}
}
}

Expand All @@ -263,13 +315,15 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
val shuffleStages = collectShuffleStages(plan)

if (shuffleStages.length == 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not we break this limitation first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this PR is not to address the case which has multiple SMJ. We have another PR to change this limitation:

  1. optimizeSingleStageSkewJoin. This is the case one table is a bucket table and the SMJ is bucketing join with one side shuffle and skewing
  2. optimizeThreeShuffleStageSkewJoin. This is to address three tables SMJ (Two SMJs in one stage and no one can be changed to BCJ in AQE).

// When multi table join, there will be too many complex combination to consider.
// Currently we only handle 2 table join like following use case.
// SPARK-32201. Skew join supports below pattern, ".." may contain any number of nodes,
// includes such as BroadcastHashJoinExec. So it can handle more than two tables join.
// SMJ
// Sort
// Shuffle
// ..
// Shuffle
// Sort
// Shuffle
// ..
// Shuffle
val optimizePlan = optimizeSkewJoin(plan)
val numShuffles = ensureRequirements.apply(optimizePlan).collect {
case e: ShuffleExchangeExec => e
Expand Down Expand Up @@ -316,6 +370,23 @@ private object ShuffleStage {
}
Some(ShuffleStageInfo(s, mapStats, partitions))

case _: LeafExecNode => None

case _ @ UnaryExecNode((_, ShuffleStage(ss: ShuffleStageInfo))) =>
Some(ss)

case b: BinaryExecNode =>
b.left match {
case _ @ ShuffleStage(ss: ShuffleStageInfo) =>
Some(ss)
case _ =>
b.right match {
case _ @ ShuffleStage(ss: ShuffleStageInfo) =>
Some(ss)
case _ => None
}
}

case _ => None
}
}
Expand Down
Loading