-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join #35419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join #35419
Changes from 3 commits
90595dd
4639b7e
adfe796
d722b2c
4b68d28
753a5b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -90,6 +90,34 @@ case class ClusteredDistribution( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Represents the requirement of distribution on the stateful operator. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Each partition in stateful operator initializes state store(s), which are independent with state | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * store(s) in other partitions. Since it is not possible to repartition the data in state store, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Spark should make sure the physical partitioning of the stateful operator is unchanged across | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Spark versions. Violation of this requirement may bring silent correctness issue. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * stateful operator, only [[HashPartitioning]] can satisfy this distribution. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case class StatefulOpClusteredDistribution( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the new name :) thanks for making it more specific. Do we also need to update
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Each input must follow the required distribution provided from stateful operator to respect the requirement of state partitioning. State partitioning is the first class, so even both sides of the streaming join are co-partitioned, Spark must perform shuffle if they don't match with state partitioning. (If that was the previous behavior, we broke something at some time point.)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think this PR will skip shuffle if both sides of a streaming join are co-partitioned. In
In the step 2) we'd only consider
I'm not quite sure about this. Shouldn't we retain the behavior before #32875? Quoting the comment from @cloud-fan:
If we respect co-partitioning and avoid shuffle before #32875 but start shuffle after this PR, I think similar issue like described in the comment can happen?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For stream-stream join, once each input satisfy the required "hash" distribution of each, they will be co-partitioned. stream-stream join must guarantee this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem brought up because ClusteredDistribution has much more relaxed requirement; what we really need to require for "any" stateful operator including stream-stream join is that for all children a specific tuple having specific grouping key must be bound to the deterministic partition "ID", which only HashClusteredDistribution could guarantee.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think one behavior difference between this PR and the state before #32875 is that, previously, we'd also check
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashClusteredDistribution also has a requirement of the number of partitions, so step 1) should fulfill it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes,
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, it should be good then! |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expressions: Seq[Expression], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requiredNumPartitions: Option[Int] = None) extends Distribution { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| require( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expressions != Nil, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "The expressions for hash of a StatefulOpClusteredDistribution should not be Nil. " + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "An AllTuples should be used to represent a distribution that only has " + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "a single partition.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| override def createPartitioning(numPartitions: Int): Partitioning = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s"This StatefulOpClusteredDistribution requires ${requiredNumPartitions.get} " + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s"partitions, but the actual number of partitions is $numPartitions.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| HashPartitioning(expressions, numPartitions) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Represents data where tuples have been ordered according to the `ordering` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * [[Expression Expressions]]. Its requirement is defined as the following: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Represents a partitioning where rows are split up across partitions based on the hash | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * of `expressions`. All rows where `expressions` evaluate to the same values are guaranteed to be | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * in the same partition. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * stateful operators to retain the same physical partitioning during the lifetime of the query | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * across Spark versions. Violation of this requirement may bring silent correctness issue. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| test("streaming join should require HashClusteredDistribution from children") { | |
| val input1 = MemoryStream[Int] | |
| val input2 = MemoryStream[Int] | |
| val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) | |
| val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) | |
| val joined = df1.join(df2, Seq("a", "b")).select('a) | |
| testStream(joined)( | |
| AddData(input1, 1.to(1000): _*), | |
| AddData(input2, 1.to(1000): _*), | |
| CheckAnswer(1.to(1000): _*), | |
| Execute { query => | |
| // Verify the query plan | |
| def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { | |
| expressions.flatMap { | |
| case ref: AttributeReference => Some(ref.name) | |
| } | |
| } | |
| val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) | |
| assert(query.lastExecution.executedPlan.collect { | |
| case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, | |
| ShuffleExchangeExec(opA: HashPartitioning, _, _), | |
| ShuffleExchangeExec(opB: HashPartitioning, _, _)) | |
| if partitionExpressionsColumns(opA.expressions) === Seq("a", "b") | |
| && partitionExpressionsColumns(opB.expressions) === Seq("a", "b") | |
| && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j | |
| }.size == 1) | |
| }) | |
| } |
If we want to be exhaustive, I can make a combination of repartitions which could have not triggered shuffle with hash partitioning against joining keys if stream-stream join uses ClusteredDistribution. It may not be exhaustive for future-proof indeed.
Instead, if we are pretty sure StateOpClusteredDistribution would work as expected, we can simply check the required child distribution of the physical plan of stream-stream join, and additionally check the output partitioning of each child to be HashPartitioning with joining keys (this effectively verifies StateOpClusteredDistribution indeed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh actually I was referring to the assumption:
HashPartitioning.partitionIdExpressionhas to be exactlyPmod(new Murmur3Hash(expressions), Literal(numPartitions)).
It would be just to add some logic to check opA/opB.partitionIdExpression for the opA/opB at Line 598/599. I can also do it later if it's not clear to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We checked HashPartitioning and partitionExpression here - the remaining is partitionIdExpression, which is the implementation of HashPartitioning.
That said, it would be nice if we have a separate test against HashPartitioning if we don't have one. Could you please check and craft one if we don't have it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can add one later this week.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec( | |
| val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) | ||
|
|
||
| override def requiredChildDistribution: Seq[Distribution] = | ||
| ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) :: | ||
| ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil | ||
| StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) :: | ||
| StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil | ||
|
||
|
|
||
| override def output: Seq[Attribute] = joinType match { | ||
| case _: InnerLike => left.output ++ right.output | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow | |
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection | ||
| import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark | ||
| import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} | ||
| import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning, StatefulOpClusteredDistribution} | ||
| import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ | ||
| import org.apache.spark.sql.errors.QueryExecutionErrors | ||
| import org.apache.spark.sql.execution._ | ||
|
|
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec( | |
| if (keyExpressions.isEmpty) { | ||
| AllTuples :: Nil | ||
| } else { | ||
| ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
| StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
|
||
| } | ||
| } | ||
|
|
||
|
|
@@ -496,7 +496,7 @@ case class StateStoreSaveExec( | |
| if (keyExpressions.isEmpty) { | ||
| AllTuples :: Nil | ||
| } else { | ||
| ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
| StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -573,7 +573,8 @@ case class SessionWindowStateStoreRestoreExec( | |
| } | ||
|
|
||
| override def requiredChildDistribution: Seq[Distribution] = { | ||
| ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
| StatefulOpClusteredDistribution(keyWithoutSessionExpressions, | ||
| stateInfo.map(_.numPartitions)) :: Nil | ||
| } | ||
|
|
||
| override def requiredChildOrdering: Seq[Seq[SortOrder]] = { | ||
|
|
@@ -684,7 +685,7 @@ case class SessionWindowStateStoreSaveExec( | |
| override def outputPartitioning: Partitioning = child.outputPartitioning | ||
|
|
||
| override def requiredChildDistribution: Seq[Distribution] = { | ||
| ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
| StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
| } | ||
|
|
||
| override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = { | ||
|
|
@@ -742,7 +743,7 @@ case class StreamingDeduplicateExec( | |
|
|
||
| /** Distribute by grouping attributes */ | ||
| override def requiredChildDistribution: Seq[Distribution] = | ||
| ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
| StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil | ||
|
|
||
| override protected def doExecute(): RDD[InternalRow] = { | ||
| metrics // force lazy init at driver | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need to put "structured streaming" before "stateful operator"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "stateful" is already representing the streaming context, but no big deal if we repeat here.