-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35742][SQL] Expression.semanticEquals should be symmetrical #32885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the subclasses of SubqueryExpression have implemented canonicalized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need canonicalized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default implementation works for AttributeReference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already there, in L164
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree with the idea.
|
BTW, could you make CI happy? |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good if CI can pass.
|
Test build #139711 has finished for PR 32885 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Yeah, +1 if CI passes. |
| // map and/or the sort-based aggregation once it has processed a given number of input rows. | ||
| private val testFallbackStartsAt: Option[(Int, Int)] = { | ||
| sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match { | ||
| Option(sqlContext).map { sc => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hidden bug. SubqueryExpression will be sent to the executor side and build Projection, and be put in EquivalentExpressions, which needs to call canonicalized.
This means, Spark may serialize and send HashAggregateExec to the executor side, where sqlContext should be null.
It's hidden for a long time because ScalarSubquery didn't implement canonicalized, so the bug is not triggered. However, it also means semanticHash is wrong.
I think it only affects common subquery elimination, and shouldn't be a serious bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the long term, I think we should only send an "expression evaluator" to the executor side. The semantic check should only be done in the driver side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late comment @cloud-fan, but I think I've run into this issue before:
https://github.com/apache/spark/pull/28885/files#diff-9b62cef6bfdeb6c802bb120c7a724a974d5067a69585285bebb64c48603f8d6fR105-R108. The point is that there might be other nodes where canonicalization on executor side can cause issues. SortExec.enableRadixSort is the other one I found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InSubqueryExec already implements canonicalized before this PR, so we need to fix these bugs anyway.
I have an idea to fix this problem in all physical plans:
- remove
SparkPlan.sqlContext, so that we can catch all the callers of it - add
@transient final val session = SparkSession.getActiveSession.orNull, to replace the previoussqlContext - override
confinSparkPlan:if (session != null) session.sessionState.conf else SQLConf.get
@peter-toth what do you think? AFAIK the only reason to access SparkPlan.sqlContext at executor side is to read a conf, and we can do that with SQLConf.get at executor side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm quite busy this week and may not have time to implement this idea recently. @peter-toth feel free to pick up this idea and open a PR if you have time, or I'll do it next or next next week. Thanks in advance!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks. I will try to open a PR this week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #32947.
| HashAggregate [c_customer_sk,sum,isEmpty] [sum(CheckOverflow((promote_precision(cast(cast(ss_quantity as decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_sales_price as decimal(12,2)))), DecimalType(18,2), true)),sum(CheckOverflow((promote_precision(cast(cast(ss_quantity as decimal(10,0)) as decimal(12,2))) * promote_precision(cast(ss_sales_price as decimal(12,2)))), DecimalType(18,2), true)),sum,isEmpty] | ||
| InputAdapter | ||
| ReusedExchange [c_customer_sk,sum,isEmpty] #8 | ||
| ReusedExchange [c_customer_sk,c_first_name,c_last_name] #19 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the fix, more exchange reuse happens :)
Seems the problem was caused by ReusedSubquery.
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #139789 has finished for PR 32885 at commit
|
|
LGTM |
|
thanks for the review, merging to master! |
Currently, there are some expressions that overwrite `semanticEquals`, which makes it not symmetrical. Ideally, expressions should overwrite `canonicalized` instead of `semanticEquals`. This PR marks `semanticEquals` as final, and implement `canonicalized` for the few expressions that overwrote `semanticEquals` before. To avoid subtle bugs (I haven't found a real bug yet). no a new test Closes apache#32885 from cloud-fan/attr. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Currently, there are some expressions that overwrite
semanticEquals, which makes it not symmetrical. Ideally, expressions should overwritecanonicalizedinstead ofsemanticEquals.This PR marks
semanticEqualsas final, and implementcanonicalizedfor the few expressions that overwrotesemanticEqualsbefore.Why are the changes needed?
To avoid subtle bugs (I haven't found a real bug yet).
Does this PR introduce any user-facing change?
no
How was this patch tested?
a new test