-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-52921][SQL] Specify outputPartitioning for UnionExec for same output partitoning as children operators #51623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| new ReliableCheckpointRDD[T](this, path) | ||
| } | ||
|
|
||
| protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment about the assumption, rdds.filter(!_.partitions.isEmpty)? Otherwise, it may cause correctness issues later if we use this blindly.
Otherwise, we had better include the assumption inside this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment and a check.
| private lazy val childrenRDDs = children.map(_.execute()) | ||
|
|
||
| override def outputPartitioning: Partitioning = { | ||
| val nonEmptyRdds = childrenRDDs.filter(!_.partitions.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. We can remove this too if isPartitionerAwareUnion has the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because SparkContext.union uses nonEmptyRdds, so I didn't move nonEmptyRdds logic into isPartitionerAwareUnion. I leave to the callers to pass in non empty rdds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it~ Thank you for the explanation.
|
cc @peter-toth |
|
|
||
| // Note that input rdds must be all non-empty, i.e., rdds.filter(_.partitions.isEmpty).isEmpty | ||
| protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = { | ||
| assert(!rdds.exists(_.partitions.isEmpty), "Must not have empty RDDs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @viirya .
| // child operator will be replaced by Spark in query planning later, in other | ||
| // words, `execute` won't be actually called on them during the execution of | ||
| // this plan. So we can safely return the default partitioning. | ||
| case e if NonFatal(e) => super.outputPartitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handles nodes that don't implement execute method. The reason is described like the comment said.
| protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = { | ||
| assert(!rdds.exists(_.partitions.isEmpty), "Must not have empty RDDs") | ||
| val partitioners = rdds.flatMap(_.partitioner).toSet | ||
| rdds.forall(_.partitioner.isDefined) && partitioners.size == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we don't need the partitioners set before the forall isDefined check.
peter-toth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a minor nit.
|
Hmm, there are a few test failures, I will take a look. |
| "default partitioning.") | ||
| .version("4.1.0") | ||
| .booleanConf | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For safety, added an internal config for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
|
Could you re-trigger |
They are related test failures. I'm investigating them. Thanks. |
| override def resetMetrics(): Unit = { | ||
| // no-op | ||
| // BroadcastExchangeExec after materialized won't be materialized again, so we should not | ||
| // reset the metrics. Otherwise, we will lose the metrics collected in the broadcast job. | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent a lot time debugging the remaining test failures. When there is broadcast exchange operator, AQE empty relation propagation rule will produce incorrect query plan around it. It is caused by this reset metrics method.
I think it is valuable to be a separate PR: #51673
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#51673 also has related test case.
|
|
||
| try { | ||
| val nonEmptyRdds = childrenRDDs.filter(!_.partitions.isEmpty) | ||
| if (sparkContext.isPartitionerAwareUnion(nonEmptyRdds)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this only covers limited cases like reused shuffles they have the same partitioner. I would like to extend this to SQL cases, i.e., the outputPartitioning is same or compatible for Union's children. But I will leave to follow up works.
|
@viirya Should unit tests be added in DataFrameSetOperationsSuite.scala to cover the following scenarios? // Core Functionality
test("union partitioning - different partitioners") {
// Covers: Different partitioner scenarios
}
// Mixed Partitioners
test("union partitioning - mixed partitioners") {
// Covers: Mixed partitioner scenarios
}
// Command Handling
test("union partitioning with commands") {
// Covers: Command plan interactions
}
// Error Handling
test("union partitioning error handling") {
// Covers: Error scenarios and fallback behavior
}
|
Those cases are covered by existing tests. For example, there were test failures on union with commands before. I added the logic to handle command case to pass these tests. |
|
Thanks, @viirya |
They can be found in previous CI failures. For example:
I think most union tests cover different/mixed partitioner case, because most union queries don't have same (rdd) partitioner. |
| } | ||
| } | ||
|
|
||
| private lazy val childrenRDDs = children.map(_.execute()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actaully, does this mean that children executions are triggered to get outputPartitioning of an Union?
E.g. a simple explain to show the physical plan can now trigger execuion of union children?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so this approach has this drawback. So as I mentioned #51623 (comment), this doesn't cover SQL cases generally. I plan to extend this to deal with outputPartitioning of children, i.e., no need to invoke execute on children.
It will be done in follow up works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually have the next PR ready locally. After this gets merged, I will open a new PR to improve it and get rid of this execute calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO that's a serious drawback. But if we can fix it in a follow-up PR right after this PR then I'm ok with merging. Or just update this PR with you local changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I updated to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I can check it tomorrow.
What changes were proposed in this pull request?
This patch updates
outputPartitioningforUnionExecoperator for the cases that the output partitionings of its children are the same. So the output partitioning can be known.Why are the changes needed?
Currently the output partitioning of
UnionExecis simply unknown. But if the output partitionings of its children are known to be the same, we can make the union output as the same output partitioning with the children.But different to the RDD-level
PartitionerAwareUnionRDD, which only considers the RDD partitioner, SQL operators'outputPartitioningdoesn't really rely on RDD's partitioner.Thus, this patch introduces
SQLPartitioningAwareUnionRDDwhich is a specified union RDD only for SQLUnionExecif the output partitioning is to be the same as its children. Similar toPartitionerAwareUnionRDD, it groups the partitions of parent RDDs at corresponding index together but it doesn't require that parent RDDs to have same partitioner.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test.
Was this patch authored or co-authored using generative AI tooling?
No