-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13957] [SQL] Support Group By Ordinal in SQL #11846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 45 commits
01e4cdf
6835704
9180687
b38a21e
d2b84af
fda8025
ac0dccd
6e0018b
0546772
b37a64f
c2a872c
ab6dbd7
4276356
2dab708
0458770
1debdfa
763706d
4de6ec1
9422a4f
52bdf48
1e95df3
fab24cf
8b2e33b
2ee1876
b9f0090
ade6f7e
9fd63d2
5199d49
404214c
c001dd9
59daa48
41d5f64
472a6e3
0fba10a
95f25a6
a927376
b10d076
79a537a
a1835e5
cbf73b3
c711347
960ead7
b61345b
c08f561
18bab66
dacf2d8
b19b73c
474df88
74a16be
3d9828d
a06c4ce
6d08009
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,7 @@ class Analyzer( | |
| ResolveGroupingAnalytics :: | ||
| ResolvePivot :: | ||
| ResolveUpCast :: | ||
| ResolveOrdinalInOrderByAndGroupBy :: | ||
| ResolveSortReferences :: | ||
| ResolveGenerate :: | ||
| ResolveFunctions :: | ||
|
|
@@ -628,24 +629,26 @@ class Analyzer( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * In many dialects of SQL it is valid to sort by attributes that are not present in the SELECT | ||
| * clause. This rule detects such queries and adds the required attributes to the original | ||
| * projection, so that they will be available during sorting. Another projection is added to | ||
| * remove these attributes after sorting. | ||
| * | ||
| * This rule also resolves the position number in sort references. This support is introduced | ||
| * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. | ||
| * - When the sort references are not integer but foldable expressions, ignore them. | ||
| * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too. | ||
| */ | ||
| object ResolveSortReferences extends Rule[LogicalPlan] { | ||
| /** | ||
| * In many dialects of SQL it is valid to use ordinal positions in order/sort by and group by | ||
| * clauses. This rule is to convert ordinal positions to the corresponding expressions in the | ||
| * select list. This support is introduced in Spark 2.0. | ||
| * | ||
| * - When the sort references or group by expressions are not integer but foldable expressions, | ||
| * just ignore them. | ||
| * - When spark.sql.orderByOrdinal/spark.sql.groupByOrdinal is set to false, ignore the position | ||
| * numbers too. | ||
| * | ||
| * Before the release of Spark 2.0, the literals in order/sort by and group by clauses | ||
| * have no effect on the results. | ||
| */ | ||
| object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case s: Sort if !s.child.resolved => s | ||
| // Replace the index with the related attribute for ORDER BY | ||
| // Replace the index with the related attribute for ORDER BY, | ||
| // which is a 1-base position of the projection list. | ||
| case s @ Sort(orders, global, child) | ||
| if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => | ||
| if conf.orderByOrdinal && child.resolved && | ||
| orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => | ||
| val newOrders = orders map { | ||
| case s @ SortOrder(IntegerIndex(index), direction) => | ||
| if (index > 0 && index <= child.output.size) { | ||
|
|
@@ -659,6 +662,40 @@ class Analyzer( | |
| } | ||
| Sort(newOrders, global, child) | ||
|
|
||
| // Replace the index with the corresponding expression in aggregateExpressions. The index is | ||
| // a 1-base position of aggregateExpressions, which is output columns (select expression) | ||
| case a @ Aggregate(groups, aggs, child) | ||
| if conf.groupByOrdinal && child.resolved && aggs.forall(_.resolved) && | ||
| groups.exists(IntegerIndex.unapply(_).nonEmpty) => | ||
| val newGroups = groups.map { | ||
| case IntegerIndex(index) if index > 0 && index <= aggs.size => | ||
| aggs(index - 1) match { | ||
| case Alias(c, _) if c.isInstanceOf[AggregateExpression] => | ||
|
||
| throw new UnresolvedException(a, | ||
| s"Group by position: the '$index'th column in the select is an aggregate " + | ||
| s"function: ${c.sql}. Aggregate functions are not allowed in GROUP BY") | ||
| // Group by clause is unable to use the alias defined in aggregateExpressions | ||
|
||
| case Alias(c, _) => c | ||
| case o => o | ||
| } | ||
| case IntegerIndex(index) => | ||
| throw new UnresolvedException(a, | ||
| s"Group by position: '$index' exceeds the size of the select list '${aggs.size}'.") | ||
| case o => o | ||
| } | ||
| Aggregate(newGroups, aggs, child) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * In many dialects of SQL it is valid to sort by attributes that are not present in the SELECT | ||
| * clause. This rule detects such queries and adds the required attributes to the original | ||
| * projection, so that they will be available during sorting. Another projection is added to | ||
| * remove these attributes after sorting. | ||
| */ | ||
| object ResolveSortReferences extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case s: Sort if !s.child.resolved => s | ||
|
||
| // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions | ||
| case sa @ Sort(_, _, child: Aggregate) => sa | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import java.sql.Timestamp | |
| import org.apache.spark.AccumulatorSuite | ||
| import org.apache.spark.sql.catalyst.analysis.UnresolvedException | ||
| import org.apache.spark.sql.catalyst.expressions.SortOrder | ||
| import org.apache.spark.sql.catalyst.plans.logical.Aggregate | ||
| import org.apache.spark.sql.execution.aggregate | ||
| import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} | ||
| import org.apache.spark.sql.functions._ | ||
|
|
@@ -459,25 +460,96 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
| Seq(Row(1, 3), Row(2, 3), Row(3, 3))) | ||
| } | ||
|
|
||
| test("literal in agg grouping expressions") { | ||
| test("Group By Ordinal - basic") { | ||
| checkAnswer( | ||
| sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"), | ||
| Seq(Row(1, 2), Row(2, 2), Row(3, 2))) | ||
| checkAnswer( | ||
| sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"), | ||
| Seq(Row(1, 2), Row(2, 2), Row(3, 2))) | ||
| sql("SELECT a, sum(b) FROM testData2 GROUP BY 1"), | ||
| sql("SELECT a, sum(b) FROM testData2 GROUP BY a")) | ||
|
|
||
| // duplicate group-by columns | ||
| checkAnswer( | ||
| sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"), | ||
| sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) | ||
|
|
||
| checkAnswer( | ||
| sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY 1, 2"), | ||
| sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) | ||
| } | ||
|
|
||
| test("Group By Ordinal - non aggregate expressions") { | ||
| checkAnswer( | ||
| sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, 2"), | ||
| sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2")) | ||
|
|
||
| checkAnswer( | ||
| sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2"), | ||
| Seq(Row(1, 3, 1), Row(1, 4, 1), Row(2, 3, 1), Row(2, 4, 1), Row(3, 3, 1), Row(3, 4, 1))) | ||
|
||
| } | ||
|
|
||
| test("Group By Ordinal - non-foldable constant expression") { | ||
| checkAnswer( | ||
| sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b, 1 + 0"), | ||
| sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b")) | ||
|
|
||
| checkAnswer( | ||
| sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"), | ||
| sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) | ||
| } | ||
|
|
||
| test("Group By Ordinal - alias") { | ||
| checkAnswer( | ||
| sql("SELECT a, (b + 2) as c, count(2) FROM testData2 GROUP BY a, 2"), | ||
| sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2")) | ||
|
|
||
| checkAnswer( | ||
| sql("SELECT a as b, b as a, sum(b) FROM testData2 GROUP BY 1, 2"), | ||
| sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b")) | ||
| } | ||
|
|
||
| test("Group By Ordinal - constants") { | ||
| checkAnswer( | ||
| sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"), | ||
| sql("SELECT 1, 2, sum(b) FROM testData2")) | ||
| } | ||
|
|
||
| test("Group By Ordinal - negative cases") { | ||
| intercept[UnresolvedException[Aggregate]] { | ||
| sql("SELECT a, b FROM testData2 GROUP BY -1") | ||
| } | ||
|
|
||
| intercept[UnresolvedException[Aggregate]] { | ||
| sql("SELECT a, b FROM testData2 GROUP BY 3") | ||
| } | ||
|
|
||
| val e = intercept[UnresolvedException[Aggregate]]( | ||
| sql("SELECT SUM(a) FROM testData2 GROUP BY 1")) | ||
| assert(e.getMessage contains | ||
| "Invalid call to Group by position: the '1'th column in the select is an aggregate function") | ||
|
|
||
| var ae = intercept[AnalysisException]( | ||
| sql("SELECT a, rand(0), sum(b) FROM testData2 GROUP BY a, 2")) | ||
| assert(ae.getMessage contains | ||
| "nondeterministic expression rand(0) should not appear in grouping expression") | ||
|
|
||
| ae = intercept[AnalysisException]( | ||
| sql("SELECT * FROM testData2 GROUP BY a, b, 1")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where do we handle the star case?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The latest merge removed it. : ) Let me add it back. Thanks! BTW, the latest test build will fail.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Found a bug in star expansion. Will fix it first. Thanks!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
After the fix, it can resolve the star in the above query. Thanks! |
||
| assert(ae.getMessage contains | ||
| "Group by position: star is not allowed to use in the select list " + | ||
| "when using ordinals in group by") | ||
| } | ||
|
|
||
| test("Group By Ordinal: spark.sql.groupByOrdinal=false") { | ||
| withSQLConf(SQLConf.GROUP_BY_ORDINAL.key -> "false") { | ||
| // If spark.sql.groupByOrdinal=false, ignore the position number. | ||
| intercept[AnalysisException] { | ||
| sql("SELECT a, sum(b) FROM testData2 GROUP BY 1") | ||
| } | ||
| // '*' is not allowed to use in the select list when users specify ordinals in group by | ||
| checkAnswer( | ||
| sql("SELECT * FROM testData2 GROUP BY a, b, 1"), | ||
| sql("SELECT * FROM testData2 GROUP BY a, b")) | ||
| } | ||
| } | ||
|
|
||
| test("aggregates with nulls") { | ||
| checkAnswer( | ||
| sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," + | ||
|
|
@@ -2166,7 +2238,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
| checkAnswer( | ||
| sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"), | ||
| sql("SELECT * FROM testData2 ORDER BY b ASC")) | ||
|
|
||
| checkAnswer( | ||
| sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), | ||
| sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a
case plan if !plan.childrenResolved => planat the beginning.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me do it. Thanks!
Will use
pinstead ofplansinceplancauses a warning by IntelliJ compiler for possible shadowing.