-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31670][SQL] Trim unnecessary Struct field alias in Aggregate/GroupingSets #28490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 26 commits
27c495b
4c0b04c
282648d
c4ff823
6d1b60e
e28b084
1ee0542
0af3166
5f0562c
7ecc8ad
53fd03a
cf818cf
cf31ab4
f846539
d63613f
ef6c87f
82f3876
d0f89af
3ebec5f
72dc305
d3ffbbd
f17dd53
891fd1b
281096a
51cea07
84e65af
9411887
e6fb91f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1325,25 +1325,45 @@ class Analyzer( | |
| * | ||
| * Note : In this routine, the unresolved attributes are resolved from the input plan's | ||
| * children attributes. | ||
| * | ||
| * @param e The expression need to be resolved. | ||
| * @param q The LogicalPlan whose children are used to resolve expression's attribute. | ||
| * @param trimAlias When true, trim unnecessary alias of `GetStructField`. Note that, | ||
| * we cannot trim the alias of top-level `GetStructField`, as we should | ||
| * resolve `UnresolvedAttribute` to a named expression. The caller side | ||
| * can trim the alias of top-level `GetStructField` if it's safe to do so. | ||
| * @return resolved Expression. | ||
| */ | ||
| private def resolveExpressionTopDown(e: Expression, q: LogicalPlan): Expression = { | ||
| if (e.resolved) return e | ||
| e match { | ||
| case f: LambdaFunction if !f.bound => f | ||
| case u @ UnresolvedAttribute(nameParts) => | ||
| // Leave unchanged if resolution fails. Hopefully will be resolved next round. | ||
| val result = | ||
| withPosition(u) { | ||
| q.resolveChildren(nameParts, resolver) | ||
| .orElse(resolveLiteralFunction(nameParts, u, q)) | ||
| .getOrElse(u) | ||
| private def resolveExpressionTopDown( | ||
| e: Expression, | ||
| q: LogicalPlan, | ||
| trimAlias: Boolean = false): Expression = { | ||
|
|
||
| def innerResolve(e: Expression, isTopLevel: Boolean): Expression = { | ||
| if (e.resolved) return e | ||
| e match { | ||
| case f: LambdaFunction if !f.bound => f | ||
| case u @ UnresolvedAttribute(nameParts) => | ||
| // Leave unchanged if resolution fails. Hopefully will be resolved next round. | ||
| val resolved = | ||
| withPosition(u) { | ||
| q.resolveChildren(nameParts, resolver) | ||
| .orElse(resolveLiteralFunction(nameParts, u, q)) | ||
| .getOrElse(u) | ||
| } | ||
| val result = resolved match { | ||
| case Alias(s: GetStructField, _) if trimAlias && !isTopLevel => s | ||
| case others => others | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I'm wondering if there is any cases we don't want to trim nested (i.e., non top-level)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since we will call CleanupAlias later in Analyzer, any way this field will be trimmed, but if we don't handle it here, we can't pass
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we don't merge this yet, can you also add a comment here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yea
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
See the comment I added just now. cc @cloud-fan |
||
| logDebug(s"Resolving $u to $result") | ||
| result | ||
| case UnresolvedExtractValue(child, fieldExpr) if child.resolved => | ||
| ExtractValue(child, fieldExpr, resolver) | ||
| case _ => e.mapChildren(resolveExpressionTopDown(_, q)) | ||
| logDebug(s"Resolving $u to $result") | ||
| result | ||
| case UnresolvedExtractValue(child, fieldExpr) if child.resolved => | ||
| ExtractValue(child, fieldExpr, resolver) | ||
| case _ => e.mapChildren(innerResolve(_, isTopLevel = false)) | ||
| } | ||
| } | ||
|
|
||
| innerResolve(e, isTopLevel = true) | ||
| } | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { | ||
|
|
@@ -1425,11 +1445,39 @@ class Analyzer( | |
| // rule: ResolveDeserializer. | ||
| case plan if containsDeserializer(plan.expressions) => plan | ||
|
|
||
| // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of | ||
| // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute | ||
| // names leading to ambiguous references exception. | ||
| case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => | ||
| a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) | ||
| case a: Aggregate => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a high-level comment to explain why we trim alias here, e.g.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Done |
||
| val planForResolve = a.child match { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of | ||
| // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute | ||
| // names leading to ambiguous references exception. | ||
| case appendColumns: AppendColumns => appendColumns | ||
| case _ => a | ||
| } | ||
|
|
||
| val resolvedGroupingExprs = a.groupingExpressions | ||
| .map(resolveExpressionTopDown(_, planForResolve, trimAlias = true)) | ||
| .map(trimTopLevelGetStructFieldAlias) | ||
|
|
||
| val resolvedAggExprs = a.aggregateExpressions | ||
| .map(resolveExpressionTopDown(_, planForResolve, trimAlias = true)) | ||
| .map(_.asInstanceOf[NamedExpression]) | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| a.copy(resolvedGroupingExprs, resolvedAggExprs, a.child) | ||
|
|
||
| case g: GroupingSets => | ||
| val resolvedSelectedExprs = g.selectedGroupByExprs | ||
| .map(_.map(resolveExpressionTopDown(_, g, trimAlias = true)) | ||
| .map(trimTopLevelGetStructFieldAlias)) | ||
|
|
||
| val resolvedGroupingExprs = g.groupByExprs | ||
| .map(resolveExpressionTopDown(_, g, trimAlias = true)) | ||
| .map(trimTopLevelGetStructFieldAlias) | ||
|
|
||
| val resolvedAggExprs = g.aggregations | ||
| .map(resolveExpressionTopDown(_, g, trimAlias = true)) | ||
| .map(_.asInstanceOf[NamedExpression]) | ||
|
|
||
| g.copy(resolvedSelectedExprs, resolvedGroupingExprs, g.child, resolvedAggExprs) | ||
|
|
||
| case o: OverwriteByExpression if !o.outputResolved => | ||
| // do not resolve expression attributes until the query attributes are resolved against the | ||
|
|
@@ -1525,6 +1573,17 @@ class Analyzer( | |
| AttributeSet(projectList.collect { case a: Alias => a.toAttribute }) | ||
| } | ||
|
|
||
| // This method is used to trim groupByExpressions/selectedGroupByExpressions's top-level | ||
| // GetStructField Alias. Since these expression are not NamedExpression originally, | ||
| // we are safely to trim top-level GetStructField Alias. | ||
AngersZhuuuu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| def trimTopLevelGetStructFieldAlias(e: Expression): Expression = { | ||
| e match { | ||
| // trim Alias over top-level GetStructField | ||
AngersZhuuuu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| case Alias(s: GetStructField, _) => s | ||
| case other => other | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Build a project list for Project/Aggregate and expand the star if possible | ||
| */ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.