-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33822][SQL][3.0] Use the CastSupport.cast method in HashJoin
#30830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc: @dongjoon-hyun |
|
I'm checking if q5 can pass with AQE enabled in this branch. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. (Pending CIs).
Thank you, @maropu !
|
okay, I've checked that q5 passed. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. This is a backport PR for #30818. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed with AQE enabled. Closes #30830 from maropu/SPARK-33822-BRANCH3.0. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
|
Test build #132975 has finished for PR 30830 at commit
|
|
Refer to this link for build results (access rights to CI server needed): |
What changes were proposed in this pull request?
This PR intends to fix the bug that throws a unsupported exception when running the TPCDS q5 with AQE enabled (this option is enabled by default now via SPARK-33679):
I've checked the AQE code and I found
EnsureRequirementswrongly putsBroadcastExchangeon a top ofBroadcastQueryStagein thereOptimizephase as follows:A root cause is that a
Castclass in a required child's distribution does not have atimeZoneIdfield (timeZoneId=None), and aCastclass inchild.outputPartitioninghas it. So, this difference can make the distribution requirement check fail inEnsureRequirements:spark/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Lines 47 to 50 in 1e85707
The
Castclass that does not have atimeZoneIdfield is generated in theHashJoinobject. To fix this issue, this PR proposes to use theCastSupport.castmethod there.This is a backport PR for #30818.
Why are the changes needed?
Bugfix.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually checked that q5 passed with AQE enabled.