-
Notifications
You must be signed in to change notification settings - Fork 271
Align GpuUnionExec with Spark 4.1's partitioner-aware union behavior [databricks] #14164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Align GpuUnionExec with Spark 4.1's partitioner-aware union behavior [databricks] #14164
Conversation
…as children operators Signed-off-by: Niranjan Artal <[email protected]>
Signed-off-by: Niranjan Artal <[email protected]>
|
This was built on top of #14120. So it can be merged after Spark-4.1.1 support is added. |
Greptile OverviewGreptile SummaryThis PR implements Spark 4.1's partitioner-aware union behavior (SPARK-52921) for GpuUnionExec and fixes Key ChangesSpark 4.1 Partitioner-Aware Union:
outputPartitioning Fixes:
Testing: Integration tests in Confidence Score: 4.5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Spark as Spark 4.1 Planner
participant GpuUnion as GpuUnionExec
participant Shim as GpuUnionExecShim
participant RDD as GpuPartitionerAwareUnionRDD
participant Children as Child RDDs
Spark->>GpuUnion: Plan UnionExec
GpuUnion->>Shim: getOutputPartitioning(children)
alt All children have compatible partitioning
Shim-->>GpuUnion: HashPartitioning/SinglePartition
Note over Shim: SQL_UNION_OUTPUT_PARTITIONING=true
else Incompatible partitioning
Shim-->>GpuUnion: UnknownPartitioning(0)
end
Spark->>GpuUnion: executeColumnar()
GpuUnion->>Shim: unionColumnarRdds()
alt UnknownPartitioning
Shim->>Children: sc.union(rdds)
Note over Shim,Children: Concatenate all partitions
Children-->>GpuUnion: Sequential RDD
else Has known partitioning
Shim->>RDD: new GpuPartitionerAwareUnionRDD
RDD->>Children: Group partitions at index i
Note over RDD,Children: Partition-aware: rdds[0][i] + rdds[1][i] + ...
Children-->>GpuUnion: Partitioner-aware RDD
end
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 files reviewed, 1 comment
sql-plugin/src/main/spark411/scala/com/nvidia/spark/rapids/shims/GpuUnionExecShim.scala
Show resolved
Hide resolved
| {"spark": "400"} | ||
| {"spark": "401"} | ||
| spark-rapids-shim-json-lines ***/ | ||
| package com.nvidia.spark.rapids.shims |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing {"spark": "400db173"} ?
It's better to test Databricks by adding [databricks] marker in the PR title.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added [databricks] marker in the PR title. I will add "400db173" in the PR to support for Databricks-17.3. Adding now itself might be confusing for some.
| children.map(_.executeColumnar()), | ||
| numOutputRows, | ||
| numOutputBatches) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to also update what we say out output partitioning is?
|
NOTE: release/26.02 has been created from main. Please retarget your PR to release/26.02 if it should be included in the release. |
Signed-off-by: Niranjan Artal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No files reviewed, no comments
Signed-off-by: Niranjan Artal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 files reviewed, no comments
jihoonson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @nartal1. Are all these changes covered by existing tests?
GpuProjectExecLike: Fixed outputPartitioning to remap expressions through aliases (was missing, unlike Spark's PartitioningPreservingUnaryExecNode).
GpuBroadcastHashJoinExecBase: Added missing outputPartitioning override
GpuCustomShuffleReaderExec: Fixed outputPartitioning for AQE coalesced reads to preserve HashPartitioning (matching Spark's AQEShuffleReadExec).
GpuShuffledHashJoinExec: Added outputPartitioning override matching Spark's HashJoin trait behavior.
GpuShuffledSymmetricHashJoinExec: Added outputPartitioning override for InnerLike and FullOuter join types.
GpuShuffledAsymmetricHashJoinExec: Added outputPartitioning override for LeftOuter and RightOuter join types.
GpuBroadcastNestedLoopJoinExecBase: Added outputPartitioning override matching Spark's BroadcastNestedLoopJoinExec behavior.
| * This is critical for Spark 4.1+ where UnionExec uses outputPartitioning | ||
| * to decide between partitioner-aware union vs concatenation. | ||
| */ | ||
| override def outputPartitioning: Partitioning = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need to handle the case when the project has aliases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also wonder if it's a good idea to have GpuProjectExecLike extend PartitioningPreservingUnaryExecNode instead of copying this code.
| } | ||
|
|
||
| override def outputPartitioning: Partitioning = | ||
| GpuUnionExecShim.getOutputPartitioning(children, output, conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, can we take the outputPartitioning of the cpu exec as a parameter of GpuUnionExec instead of duplicating the Spark code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jihoonson for the review. Your review comments on the refactor makes sense. I will do it as a follow-on PR for 26.04 if that's okay. Filed an issue for these - #14229
…nionexec_paritionaware
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 files reviewed, 1 comment
...ugin/src/main/spark411/scala/com/nvidia/spark/rapids/shims/GpuPartitionerAwareUnionRDD.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 files reviewed, no comments
|
build |
jihoonson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for filing a follow-up issue. LGTM
Fixes #14083 and contributes to #14135.
Description
This PR aligns GpuUnionExec with Apache Spark 4.1's change to UnionExec behavior introduced in SPARK-52921.
In Spark 4.1, UnionExec was changed to use SQLPartitioningAwareUnionRDD which groups partitions at corresponding indices across child RDDs, rather than concatenating all partitions sequentially.
We have copied most of the code from Apache Spark's codebase for Partitioner-aware union.
While fixing this, audited the outputPartitioning override functions of other execs and updated/added for the missing ones.
These are again copied from the Spark's code and modified to fit for this repo.
GpuProjectExecLike: Fixed outputPartitioning to remap expressions through aliases (was missing, unlike Spark's PartitioningPreservingUnaryExecNode).GpuBroadcastHashJoinExecBase: Added missing outputPartitioning overrideGpuCustomShuffleReaderExec: FixedoutputPartitioningfor AQE coalesced reads to preserve HashPartitioning (matching Spark's AQEShuffleReadExec).GpuShuffledHashJoinExec: AddedoutputPartitioningoverride matching Spark's HashJoin trait behavior.GpuShuffledSymmetricHashJoinExec: AddedoutputPartitioningoverride for InnerLike and FullOuter join types.GpuShuffledAsymmetricHashJoinExec: AddedoutputPartitioningoverride for LeftOuter and RightOuter join types.GpuBroadcastNestedLoopJoinExecBase: AddedoutputPartitioningoverride matching Spark's BroadcastNestedLoopJoinExec behavior.Some of the integration tests were failing on Spark-4.1 with below error:
Testing
All the intergration tests in dpp_test.py pass now with this PR.
Before this PR:
With this PR:
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)