Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
27c495b
Struct Field in groupByExpr with CUBE
AngersZhuuuu May 10, 2020
4c0b04c
Merge branch 'master' into SPARK-31670
AngersZhuuuu Jun 4, 2020
282648d
WIP save
AngersZhuuuu Jun 6, 2020
c4ff823
Update Analyzer.scala
AngersZhuuuu Jun 6, 2020
6d1b60e
fix UT
AngersZhuuuu Jun 7, 2020
e28b084
use view
AngersZhuuuu Jun 7, 2020
1ee0542
Don't use unresolve attribute
AngersZhuuuu Jun 7, 2020
0af3166
Update Analyzer.scala
AngersZhuuuu Jun 7, 2020
5f0562c
Update Analyzer.scala
AngersZhuuuu Aug 30, 2020
7ecc8ad
Update Analyzer.scala
AngersZhuuuu Aug 30, 2020
53fd03a
Update SQLQuerySuite.scala
AngersZhuuuu Aug 31, 2020
cf818cf
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
cf31ab4
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
f846539
follow comment and fix end to end test in SQLQUeryTestSuite
AngersZhuuuu Aug 31, 2020
d63613f
follow comment
AngersZhuuuu Aug 31, 2020
ef6c87f
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
82f3876
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
d0f89af
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
3ebec5f
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
72dc305
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
d3ffbbd
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
f17dd53
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
891fd1b
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
281096a
follow comment
AngersZhuuuu Sep 1, 2020
51cea07
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
84e65af
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
9411887
Update Analyzer.scala
AngersZhuuuu Sep 2, 2020
e6fb91f
Update Analyzer.scala
AngersZhuuuu Sep 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,31 +506,55 @@ class Analyzer(
aggregations: Seq[NamedExpression],
groupByAliases: Seq[Alias],
groupingAttrs: Seq[Expression],
gid: Attribute): Seq[NamedExpression] = aggregations.map {
// collect all the found AggregateExpression, so we can check an expression is part of
// any AggregateExpression or not.
val aggsBuffer = ArrayBuffer[Expression]()
// Returns whether the expression belongs to any expressions in `aggsBuffer` or not.
def isPartOfAggregation(e: Expression): Boolean = {
aggsBuffer.exists(a => a.find(_ eq e).isDefined)
gid: Attribute): Seq[NamedExpression] = {
val resolvedGroupByAliases = groupByAliases.map(_.transformDown {
Copy link
Member

@maropu maropu Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should not fix this issue in ResolveGroupingAnalytics, but in ResolveRefences just like this;

object ResolveReferences extends Rule[LogicalPlan] {
    ...
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
      case p: LogicalPlan if !p.childrenResolved => p
      ....

      case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field =>
       val newAgg = resolve expressions so that they have the same exprIds
       newAgg

A root cause seems to be that ResolveReferences assigns different exprIds to each#30.json_string AS json_strings (#31 vs #32);

20/06/03 16:22:47 WARN HiveSessionStateBuilder$$anon$1: 
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Aggregate [cube('a, 'get_json_object('each.json_string, $.iType))], ['a, 'coalesce('get_json_object('each.json_string, $.iType), -127) AS iType#29, unresolvedalias('sum('b), None)]
   +- Generate explode(c#4), false, x, [each#30]
      +- SubqueryAlias t
         +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
            +- Range (0, 1, step=1, splits=Some(4))

'Aggregate [cube(a#2, 'get_json_object(each#30.json_string AS json_string#31, $.iType))], [a#2, 'coalesce('get_json_object(each#30.json_string AS json_string#32, $.iType), -127) AS iType#29, unresolvedalias('sum(b#3), None)]
   +- Generate explode(c#4), false, x, [each#30]
      +- SubqueryAlias t
         +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
            +- Range (0, 1, step=1, splits=Some(4))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on above analysis, seems it is caused by ResolveReferences, instead of ResolveGroupingAnalytics? Can we possibly have a reproducible case without CUBE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on above analysis, seems it is caused by ResolveReferences, instead of ResolveGroupingAnalytics? Can we possibly have a reproducible case without CUBE?

This happened with cube when construct cube LogicalPlan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shown query plan above seems before ResolveGroupingAnalytics? So without CUBE is it possible to encounter similar issue? @maropu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A root cause seems to be that ResolveReferences assigns different exprIds to each#30.json_string AS json_strings (#31 vs #32);

Yea, StructGetFiled will construct with a alias in ResolveReference and got different ExprID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should not fix this issue in ResolveGroupingAnalytics, but in ResolveRefences just like this;

object ResolveReferences extends Rule[LogicalPlan] {
    ...
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
      case p: LogicalPlan if !p.childrenResolved => p
      ....

      case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field =>
       val newAgg = resolve expressions so that they have the same exprIds
       newAgg

Also having clause.. may have this problem. I didn't thinking enough case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shown query plan above seems before ResolveGroupingAnalytics? So without CUBE is it possible to encounter similar issue? @maropu

Yea, all the cases have the same issue;

scala> spark.range(1).selectExpr("'x' AS a", "1 AS b", "array(named_struct('row_id', 1, 'json_string', 'y')) AS c").createOrReplaceTempView("t")

// ROLLUP
scala> sql("""
     |   select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b)
     |   from t
     |   LATERAL VIEW explode(c) x AS each
     |   group by a, get_json_object(each.json_string,'$.iType')
     |   with rollup
     | """).show()
org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L], [a#17, coalesce(get_json_object(each#9.json_string, $.iType), -127) AS iType#8, sum(cast(b#3 as bigint)) AS sum(b)#13L]
+- Expand [ArrayBuffer(a#2, b#3, c#4, each#9, a#14, get_json_object(each#9.json_string AS json_string#10, $.iType)#15, 0), ArrayBuffer(a#2, b#3, c#4, each#9, a#14, null, 1), ArrayBuffer(a#2, b#3, c#4, each#9, null, null, 3)], [a#2, b#3, c#4, each#9, a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L]
   +- Project [a#2, b#3, c#4, each#9, a#2 AS a#14, get_json_object(each#9.json_string, $.iType) AS get_json_object(each#9.json_string AS json_string#10, $.iType)#15]
      +- Generate explode(c#4), false, x, [each#9]
         +- SubqueryAlias t
            +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
               +- Range (0, 1, step=1, splits=Some(4))

// GROUPING SETS
scala> sql("""
     |   select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b)
     |   from t
     |   LATERAL VIEW explode(c) x AS each
     |   group by grouping sets((a, get_json_object(each.json_string,'$.iType')))
     | """).show()
org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L], [a#28, coalesce(get_json_object(each#20.json_string, $.iType), -127) AS iType#19, sum(cast(b#3 as bigint)) AS sum(b)#24L]
+- Expand [ArrayBuffer(a#2, b#3, c#4, each#20, a#25, get_json_object(each#20.json_string AS json_string#21, $.iType)#26, 0)], [a#2, b#3, c#4, each#20, a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L]
   +- Project [a#2, b#3, c#4, each#20, a#2 AS a#25, get_json_object(each#20.json_string, $.iType) AS get_json_object(each#20.json_string AS json_string#21, $.iType)#26]
      +- Generate explode(c#4), false, x, [each#20]
         +- SubqueryAlias t
            +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
               +- Range (0, 1, step=1, splits=Some(4))

case a @ Alias(Alias(GetStructField(_, _, _), _), _) => a
case e => e.transformDown {
case Alias(gsf: GetStructField, _) => gsf
}
}.asInstanceOf[Alias])
aggregations.map {
// collect all the found AggregateExpression, so we can check an expression is part of
// any AggregateExpression or not.
val aggsBuffer = ArrayBuffer[Expression]()
// Returns whether the expression belongs to any expressions in `aggsBuffer` or not.
def isPartOfAggregation(e: Expression): Boolean = {
aggsBuffer.exists(a => a.find(_ eq e).isDefined)
}

replaceGroupingFunc(_, groupByExprs, gid) match {
case a @ Alias(e: GetStructField, _) =>
val index = resolvedGroupByAliases.indexWhere(alias => alias.child match {
case child: GetStructField =>
child.semanticEquals(e)
case _ => false
})
if (index == -1) {
a
} else {
groupingAttrs(index)
}.asInstanceOf[NamedExpression]
case e => e.transformDown {
case Alias(gsf: GetStructField, _) => gsf
}.transformDown {
// AggregateExpression should be computed on the unmodified value of its argument
// expressions, so we should not replace any references to grouping expression
// inside it.
case e: AggregateExpression =>
aggsBuffer += e
e
case e if isPartOfAggregation(e) => e
case e =>
// Replace expression by expand output attribute.
val index = resolvedGroupByAliases.indexWhere(_.child.semanticEquals(e))
if (index == -1) {
e
} else {
groupingAttrs(index)
}
}.asInstanceOf[NamedExpression]
}
}
replaceGroupingFunc(_, groupByExprs, gid).transformDown {
// AggregateExpression should be computed on the unmodified value of its argument
// expressions, so we should not replace any references to grouping expression
// inside it.
case e: AggregateExpression =>
aggsBuffer += e
e
case e if isPartOfAggregation(e) => e
case e =>
// Replace expression by expand output attribute.
val index = groupByAliases.indexWhere(_.child.semanticEquals(e))
if (index == -1) {
e
} else {
groupingAttrs(index)
}
}.asInstanceOf[NamedExpression]
}

/*
Expand Down
53 changes: 53 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
assert(df4.schema.head.name === "randn(1)")
checkIfSeedExistsInExplain(df2)
}

test("SPARK-31670: Struct Field in groupByExpr with CUBE") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make the title more correct? I think we don't need the word with CUBE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make the title more correct? I think we don't need the word with CUBE.

It's ok for current PR title?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looks fine.

withTable("t1") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: t1 -> t

sql(
"""create table t1(
|a string,
|b int,
|c array<struct<row_id:int,json_string:string>>,
|d array<array<string>>,
|e array<map<string, int>>)
|using orc""".stripMargin)
Copy link
Member

@maropu maropu Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a temp view for test performance. Also, its better to add some rows for answer checks in this test table instead of the current null table.


checkAnswer(
sql(
"""
|select a, each.json_string, sum(b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you use uppercases for the SQL keywords where possible?

|from t1
|LATERAL VIEW explode(c) x AS each
|group by a, each.json_string
|with cube
|""".stripMargin), Nil)

checkAnswer(
sql(
"""
|select a, get_json_object(each.json_string, '$.i'), sum(b)
|from t1
|LATERAL VIEW explode(c) x AS each
|group by a, get_json_object(each.json_string, '$.i')
|with cube
|""".stripMargin), Nil)

checkAnswer(
sql(
"""
|select a, each.json_string as json_string, sum(b)
|from t1
|LATERAL VIEW explode(c) x AS each
|group by a, each.json_string
|with cube
|""".stripMargin), Nil)

checkAnswer(
sql(
"""
|select a, each.json_string as js, sum(b)
|from t1
|LATERAL VIEW explode(c) x AS each
|group by a, each.json_string
|with cube
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check the other analytics grouping, too, e.g., GROUPING SETS and ROLLUP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check the other analytics grouping, too, e.g., GROUPING SETS and ROLLUP?

emmm grouping sets still have problem.

|""".stripMargin), Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests having queries with HAVING clauses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests having queries with HAVING clauses?

Seems I make a mistake, having won't have duplicate field since it won't have grouping keys in having condition

}
}
}

case class Foo(bar: Option[String])