Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
case ClusteredDistribution(requiredClustering, _) =>
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
case c @ ClusteredDistribution(requiredClustering, _) =>
if (SQLConf.get.requireAllClusterKeysForHashPartition) {
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be worth to think of the trade-off on having a flag on ClusteredDistribution vs checking SQL config in here. For former, we may need to change the whole spots initializing ClusteredDistribution, but it is also meaning that we are open to finer-grained control in the future (additional mix-up conditions and configurations per-operator). For latter, it's probably the simplest change, but here we have no idea about the operator so we are restricted to apply the change in future as global manner.

I don't have strong preference on this as I'm talking about the extensibility which might not happen (or happen sooner). Just a 2 cents.

Copy link
Contributor Author

@c21 c21 Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually also thought about the pros and cons for these two approaches: 1). change behavior of HashPartitioning, vs 2). change behavior of ClusteredDistribution. I am more inclined to 1). change behavior of HashPartitioning for followed reason:

ClusteredDistribution's current definition is pretty clean and flexible, so let's not move backward.

/**
 * Represents data where tuples that share the same values for the `clustering`
 * [[Expression Expressions]] will be co-located in the same partition.
 */
case class ClusteredDistribution

As long as data is partitioned in the way where tuples/rows having same values for clustering is in same partition, then the partitioning can satisfy ClusteredDistribution. It tolerates both full keys and subset of keys, so it's flexible enough to work for a range of operators - aggregate, window, join (together with ShuffleSpec introduced recently for co-partitioning). It does not has any implicit requirement of hash expression, or hash function (so it gets rid of the drawback of HashClusteredDistribution). More partitioning other than HashPartitioning can satisfy ClusteredDistribution (e.g. RangePartitioning and DataSourcePartitioning). Add flag such as requiresFullKeysMatch into ClusteredDistribution would make every Partitioning implementation unnecessarily more complicated, as this is just a problem for HashPartitioning now.

HashPartitioning can decide flexibly by itself when should it satisfy ClusteredDistribution, either subset of keys (current behavior), or full keys (with config introduced in this PR). This leaves other Partitioning (RangePartitioning and DataSourcePartitioning) and ClusteredDistribution untouched. Indeed this is just a local decision made by HashPartitioning. I think this is more flexible and extendable. In the future, if other Partitioning has similar requirement, e.g. DataSourcePartitioning, similar logic can be introduced inside DataSourcePartitioning.satisfies0() locally without any intrusive interface change.

For latter, it's probably the simplest change, but here we have no idea about the operator so we are restricted to apply the change in future as global manner.

It's true, but I think the granularity is tricky to decide, so let's start with best solution to maintain our interface cleanly. We can discuss later if there is a strong requirement. One can further argue if user wants more finer granularity control that he/she wants to specify exact operator in the query (e.g. the query has 3 aggregates, and only wants to enable feature for 1 of them).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also inclined to option 1) for now, and agree to the points that @c21 raised above.

As a Spark developer, I was originally confused when seeing both HashClusteredDistribution and ClusteredDistribution and had to navigate the code base and reason about their behavior differences. Combined with the newly introduced config, a developer now has to remember parsing the value of the config and choose HashClusteredDistribution or ClusteredDistribution accordingly, which is some extra burden. In addition, it's better to have a separate StatefulOpClusteredDistribution dedicated to SS use cases, as it makes them more distinctive.

Of course, having a separate HashClusteredDistribution opens up more opportunities for it to evolve separately. But I'd suggest to only consider that when we have some concrete ideas. So far, I don't see what can't be done with ClusteredDistribution alone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a Spark developer, I was originally confused when seeing both HashClusteredDistribution and ClusteredDistribution and had to navigate the code base and reason about their behavior differences.

The classdoc of two classes were very clear about differences. The confusion may come from the structure we have actual logic about requirement check for distribution in partitioning instead of distribution (I'm not an expert of this part, it might have to go with this way), but as long as the implementation matches up with classdoc, it is pretty clear.

Combined with the newly introduced config, a developer now has to remember parsing the value of the config and choose HashClusteredDistribution or ClusteredDistribution accordingly, which is some extra burden.

Well, I'd say it is more extra burden if we have to expect two different requirements from ClusteredDistribution. Once we understand the difference between HashClusteredDistribution and ClusteredDistribution, it is obvious that we can easily infer the behavior from which class is used.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already made a single exception (spark.sql.requireAllClusterKeysForCoPartition), and we are going to make a broader exception here (spark.sql.requireAllClusterKeysForHashPartition).
(EDIT: shouldn't we want this for data source partitioning as well?)

There have been valid cases to support these exceptions, and they are not edge-cases like from odd data distribution. That said, this is going to be a valid requirement for distribution. In other words, there are known cases ClusteredDistribution solely does not solve the problem nicely. It is trying hard to eliminate shuffle as many as possible (assuming that shuffle is evil), but it is doing nothing for the cases shuffle does help.

So I don't think adding flag in ClusteredDistribution is messing up interface and structure. I think it is opposite. We are making exceptions for requirement of ClusteredDistribution - requiring full keys is not a one of requirements of ClusteredDistribution as of now (and even with this PR), right? We haven't documented and it is now completely depending on the implementation of HashPartitioning. If someone starts to look into ClusteredDistribution, it is likely that someone misses the case. It is also possible we miss the config when implementing DataSourcePartitioning against ClusteredDistribution. I said "trade-off" because it pinpoints the issue and tries to bring a small amount of fix which may be preferable for someone, but my preference is making it clearer.

If we are skeptical to address this in ClusteredDistribution because we don't want to make requirement of ClusteredDistribution be extended further, this is really a good rationalization we should revive back HashClusteredDistribution because the requirement is 100% fit to what we are doing. The only difference is that data source partitioning wouldn't satisfy the requirement in any way which may be considered as major downside according to the roadmap, so yes it brings data source partitioning as a second class for some cases. If we are against of it, please make sure ClusteredDistribution covers everything "by definition" HashClusteredDistribution could cover it solely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerning that requiring full keys is a valid requirement we shouldn't simply drop, at least I see from inputs.

Just to make sure we are on the same page. The operators such as aggregate, window are always using ClusteredDistribution for years. So the problem of having data skew in those operators, is a new problem we are aiming to fix, not a regression coming from recent PRs.

We are technically changing ClusteredDistribution, and we need to make it clear. Otherwise this is also going to be an undocumented one. Instead of just documenting, I prefer having the flag explicitly so that any partitioning would know about the detailed requirement very clear. You don't need to remember the config when you work on DataSourcePartitioning. ClusteredDistribution will provide it for you.

hmm maybe I am thinking too much from my perspective, but I am still not very convinced this is a problem for ClusteredDistribution. Hashing on subset of keys causes data skew seems to me a problem for HashPartitioning only. Other Partitioning such as RangePartitioning or DataSourcePartitioning can partition data very differently from HashPartitioning, or they do not use hash at all. So they might have very different causes other than subset of keys, to lead to data skew (e.g. suboptimal sampling algorithm for RangePartitioning to cause bad choice of partition boundary, or suboptimal user-defined DataSourcePartitioning to cause skew). I am kind of worried about introducing a flag such as requiresFullKeysMatch in ClusteredDistribution might be just useful for HashPartitioning, but not for other Partitioning classes. Once we introduce the flag, it's hard to change/remove the flag, because other developers or users depend on DataSourcePartitioning might be broken once we change ClusteredDistribution again. So just want to make sure we are very cautious about it.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure we are on the same page. The operators such as aggregate, window are always using ClusteredDistribution for years. So the problem of having data skew in those operators, is a new problem we are aiming to fix, not a regression coming from recent PRs.

I agree with this, but the difference is that we dropped HashClusteredDistribution so we have less feature to leverage. In the SPIP doc we said "unify" two classes, but what we had done is "removing" the HashClusteredDistribution without alternatives.

Regarding the problem, there are two perspectives on the problem, 1) data skew 2) insufficient number of partitions. 2) applies to any partitioning.

Let's think about end user's perspective. They run the batch query, and expect Spark to finish it as quick as possible (or be resource-efficient). Spark produces the general config - default number of shuffle partitions - which defines the general parallelism whenever shuffle is introduced.

The point is when the config takes effect. More and more output partitioning ClusteredDistribution could match, less and less shuffle could be kicked in, while end users may expect like "a set of known operators would introduce shuffles, which adjusts parallelism as I set the config". For example, for the case ClusteredDistribution with DataSourcePartitioning, it could be hypothetically "nowhere". The max parallelism could be tied to source's parallelism and nothing except manual change of the query could help since there could be no shuffle at all. AQE won't help the parallelism/skew issue within the stage.

Shuffle may not be something we should try hard to eliminate at all. We also need to think when the shuffle would be likely help. We can't leverage stats in physical execution, so my fall back goes to be heuristic, like the different view and resolution on this problem #35574 (comment).

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In totally ideal world, previous stage could calculate the cardinality of all required grouping keys in next stage, and once the previous stage finishes, query executor decides to split the stage based on the difference of cardinality and the desired threshold of number of partitions (or even the number of values bound to each key). This is totally ideal and I don't know whether it is even technically feasible. But if we agree that this is ideal, we are in agreement that shuffle is not always an evil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle may not be something we should try hard to eliminate at all. We also need to think when the shuffle would be likely help. We can't leverage stats in physical execution, so my fall back goes to be heuristic, like the different view and resolution on this problem #35574 (comment).

Instead, having a threshold (minimum) of the number of partitions doesn't sound crazy for me. The threshold could be heuristic one, or config - number or ratio compared to the default number of shuffle partitions, or default number of shuffle partitions if we wouldn't want to bring another config (but it may be too high to use for minimum).

@HeartSaVioR - First I agree with you sometime shuffle is good to have, so I guess this PR is aiming for the same goal - add proper shuffle on full clustering keys based on config, right? Just for my understanding, are you proposing above to have a config to set the minimal threshold for number of partitions for all queries needed shuffle? Can you elaborate more how query and which part of query would be rewritten if violating the config? With cardinality stats from CBO and AQE (we don't have cardinality stats collected in AQE for now), we may potentially give some hint in query plan during logical planning. But this approach still sounds a little bit too high level for me, without elaborating details of algorithm.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this PR is aiming for the same goal - add proper shuffle on full clustering keys based on config, right?

Yes, but we still have an argument that why it is needed. I wouldn't say it is due to HashPartitioning, as I mentioned about two different perspectives. Even if the hash function of HashPartitioning is somewhat inefficient on proper distribution on some sort of grouping keys and data, it is only contributing to data skew. In any partitioning, the number of partitions are the physical limit of parallelism. It is ClusteredDistribution preventing two operators to have shuffle in between, coupling operators in the same stage, with same partitioning & parallelism.
(Please correct me if AQE can decide to split the single stage to multiple stages injecting shuffles in between.)

Considering two different perspectives, there are multiple cases of child partitioning for each physical node we may need to deal with:

  1. clustered keys are fully considered and having sufficient number of partitions (ideal)
  2. clustered keys are fully considered but having insufficient number of partitions
  3. only a subset of clustered keys are considered and having insufficient number of partitions
  4. only a subset of clustered keys are considered but having sufficient number of partitions

Since we are going with the manual / heuristic way to deal with it, I would like to see the way addressing more cases with less side-effects. That is the main reason I tried to think about alternatives.

Requiring full clustered keys can deal with 3) and 4), where 4) may be skewed (good case for shuffle) or not (shuffled may not be needed) so the benefit of having shuffle is conditional. Requiring a minimum threshold of number of partitioning can deal with 2) and 3) which are good in general to ensure minimum parallelism for grouping/joining operators, where it misses the case of 4), but we just mentioned the benefit of having shuffle in 4) is conditional. In addition, it is no longer only bound to data skew, hence applies to any partitioning.

Implementation wise, I imagine it is simple as we just add the another constraint of ClusteredDistribution. If the required number of partitions exists, it will strictly follow the number, otherwise we compare numPartitions of partitioning and the threshold being defined in ClusteredDistribution. The value of threshold could be optional if we doubt about the good default value working for majority of queries.

For sure, these constraints can be co-used, to deal with as many cases as they can. Requiring full clustered keys is still needed to deal with 4) - if we are not sure about whether this applies only to HashPartitioning or further partitioning, and want to defer the decision to have this in official constraint of ClusteredDistribution, I'd agree about deferring the decision till we deal with DataSourcePartitioning.

// Checks `HashPartitioning` is partitioned on exactly full clustering keys of
// `ClusteredDistribution`. Opt in this feature with enabling
// "spark.sql.requireAllClusterKeysForHashPartition", can help avoid potential data
// skewness for some jobs.
isPartitionedOnFullKeys(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up with strict ordering, we could document the method doc on isPartitionedOnFullKeys that it is also requiring exact order, and replace the condition of StatefulOpClusteredDistribution with isPartitionedOnFullKeys. I'm wondering we would care about ordering for cases we described.

} else {
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ => false
}
}
Expand All @@ -271,6 +279,17 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
HashShuffleSpec(this, distribution)

/**
* Checks if [[HashPartitioning]] is partitioned on exactly same full `clustering` keys of
* [[ClusteredDistribution]].
*/
def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = {
expressions.length == distribution.clustering.length &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition is more restrict than we explain in the config (e.g. is the ordering important here?), but I'm fine with this if we are all OK with this, as my proposal is technically the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR - this is a good point. I am more inclined to more restricted condition, as hash(x1, x2) will send rows in different partitions compared to hash(x2, x1), and potentially it can have data skew in one case, but not in the other. Before me update the doc, cc more folks for commenting, @cloud-fan and @sunchao.

Copy link
Member

@sunchao sunchao Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if ordering is important here: is it a common case that data skewness is introduced after changing the order the hash keys? I'll be surprised if murmur3 hash exhibits this kind of property.

This also makes the optimization harder to kick in (imagine users have to carefully align join or aggregation keys to the same order as that of bucket keys in the table). It is also a behavior change of bucket join, since currently Spark is more relaxed and will reorder the hash keys w.r.t join keys in EnsureRequirements.reorderJoinPredicates

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm also in favor of having more restricted condition. With more restricted condition, end users can change the order of keys to turn their query further as a last resort if simply turning the config on isn't performant enough. We expect that changing the order of the hash keys would make a change on the partition ID, right?

The scenario when end users will turn on this config is a major point. They wouldn't turn on this config before they try running the query. (This config is marked as internal, and by default it's disabled.) They would turn on the config after running the query and Spark worked badly. One can argue that they can add repartition manually in their code/SQL statement which makes sense in general, but we have counter-arguments, 1) they don't only have a few of queries 2) the queries could be machine/tool-generated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with more strict condition. It looks no harm and will be less likely to bring unexpected case of data skewness. And I think for such config, it is intended to give users more control so ordering might be also one.

expressions.zip(distribution.clustering).forall {
case (l, r) => l.semanticEquals(r)
}
}

/**
* Returns an expression that will produce a valid partition ID(i.e. non-negative and is less
* than numPartitions) based on hashing expressions.
Expand Down Expand Up @@ -524,10 +543,7 @@ case class HashShuffleSpec(
// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all
// the join keys.
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) {
partitioning.expressions.length == distribution.clustering.length &&
partitioning.expressions.zip(distribution.clustering).forall {
case (l, r) => l.semanticEquals(r)
}
partitioning.isPartitionedOnFullKeys(distribution)
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it is beyond the scope of the PR, same thing applies here. Would we need to require strict order of keys? Just curious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably need strict order here, as it feels more safer with requiring of order, and this is the original behavior when join requires HashClusteredDistribution. This essentially restores the join's behavior to what it was with HashClusteredDistribution.

} else {
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention the ordering is also required if it is also in the condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sure, will change later once we all reach the consensus in comments.

buildConf("spark.sql.requireAllClusterKeysForHashPartition")
.internal()
.doc("When true, the planner requires all the clustering keys as the hash partition keys " +
"of the children, to eliminate the shuffle for the operator that needs its children to " +
"be hash partitioned, such as AGGREGATE and WINDOW node. This is to avoid data skews " +
"which can lead to significant performance regression if shuffle is eliminated.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort")
.internal()
.doc("When true, enable use of radix sort when possible. Radix sort is much faster but " +
Expand Down Expand Up @@ -3951,6 +3962,9 @@ class SQLConf extends Serializable with Logging {

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)

def requireAllClusterKeysForHashPartition: Boolean =
getConf(REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION)

def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)

def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ abstract class StreamExecution(
// Disable cost-based join optimization as we do not want stateful operations
// to be rearranged
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
// Disable any config affecting the required child distribution of stateful operators.
// Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution for
// details.
sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key,
"false")

updateStatusMessage("Initializing sources")
// force initialization of the logical plan so that the sources can be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import scala.util.Random
import org.scalatest.matchers.must.Matchers.the

import org.apache.spark.SparkException
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.{InputAdapter, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1453,6 +1455,57 @@ class DataFrameAggregateSuite extends QueryTest
val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id")
checkAnswer(df, Row(2, 3, 1))
}

test("SPARK-38237: require all cluster keys for child required distribution") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same unit test added in #35552, credited to @HeartSaVioR. I may change the test query to window query instead of aggregate, to make it more convincing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Please feel free to adjust the test case or leverage the test case to deduce additional test cases.

def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = {
expressions.flatMap {
case ref: AttributeReference => Some(ref.name)
}
}

def isShuffleExecByRequirement(
plan: ShuffleExchangeExec,
desiredClusterColumns: Seq[String],
desiredNumPartitions: Int): Boolean = plan match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check num partitions? I think this test should focus on shuffle is added or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - I think we don't need to check number of partitions here, unless @HeartSaVioR do you have a strong opinion on this? Removed it for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relaxed check is OK for me, as long as the partition keys are asserted.

Just curious, is it due to the possibility where the number of partitions is different than the config, or just that we don't need to be strict?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh well, we don't strictly require the number of partitions. The test came from checking stateful operator, which should strictly require the number of partitions (with AQE disabled). It seems better to remove the check.

case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS)
if partitionExpressionsColumns(op.expressions) === desiredClusterColumns &&
op.numPartitions === desiredNumPartitions => true

case _ => false
}

val df = Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value")

withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "true") {

val grouped = df
// repartition by sub group keys which satisfies ClusteredDistribution(group keys)
.repartition($"key1")
.groupBy($"key1", $"key2")
.agg(sum($"value"))

checkAnswer(grouped, Seq(Row("a", 1, 1), Row("a", 2, 2), Row("b", 1, 7)))

val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)

val shuffleByRequirement = grouped.queryExecution.executedPlan.flatMap {
case a if a.isInstanceOf[BaseAggregateExec] =>
a.children.head match {
case InputAdapter(s: ShuffleExchangeExec)
if isShuffleExecByRequirement(s, Seq("key1", "key2"), numPartitions) => Some(s)
case s: ShuffleExchangeExec
if isShuffleExecByRequirement(s, Seq("key1", "key2"), numPartitions) => Some(s)
case _ => None
}

case _ => None
}

assert(shuffleByRequirement.nonEmpty, "Can't find desired shuffle node from the query plan")
}
}
}

case class B(c: Option[Double])
Expand Down