Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
27c495b
Struct Field in groupByExpr with CUBE
AngersZhuuuu May 10, 2020
4c0b04c
Merge branch 'master' into SPARK-31670
AngersZhuuuu Jun 4, 2020
282648d
WIP save
AngersZhuuuu Jun 6, 2020
c4ff823
Update Analyzer.scala
AngersZhuuuu Jun 6, 2020
6d1b60e
fix UT
AngersZhuuuu Jun 7, 2020
e28b084
use view
AngersZhuuuu Jun 7, 2020
1ee0542
Don't use unresolve attribute
AngersZhuuuu Jun 7, 2020
0af3166
Update Analyzer.scala
AngersZhuuuu Jun 7, 2020
5f0562c
Update Analyzer.scala
AngersZhuuuu Aug 30, 2020
7ecc8ad
Update Analyzer.scala
AngersZhuuuu Aug 30, 2020
53fd03a
Update SQLQuerySuite.scala
AngersZhuuuu Aug 31, 2020
cf818cf
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
cf31ab4
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
f846539
follow comment and fix end to end test in SQLQUeryTestSuite
AngersZhuuuu Aug 31, 2020
d63613f
follow comment
AngersZhuuuu Aug 31, 2020
ef6c87f
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
82f3876
Update Analyzer.scala
AngersZhuuuu Aug 31, 2020
d0f89af
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
3ebec5f
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
72dc305
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
d3ffbbd
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
f17dd53
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
891fd1b
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
281096a
follow comment
AngersZhuuuu Sep 1, 2020
51cea07
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
84e65af
Update Analyzer.scala
AngersZhuuuu Sep 1, 2020
9411887
Update Analyzer.scala
AngersZhuuuu Sep 2, 2020
e6fb91f
Update Analyzer.scala
AngersZhuuuu Sep 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,12 @@ class Analyzer(
// collect all the found AggregateExpression, so we can check an expression is part of
// any AggregateExpression or not.
val aggsBuffer = ArrayBuffer[Expression]()

Copy link
Member

@maropu maropu Jun 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: plz revert the unencessary changes. (Unrelated changes might lead to revert/backport failures sometimes...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: plz revert the unencessary changes. (Unrelated changes might lead to revert/backport failures sometimes...)

Sorry..., forgot to check diff.

// Returns whether the expression belongs to any expressions in `aggsBuffer` or not.
def isPartOfAggregation(e: Expression): Boolean = {
aggsBuffer.exists(a => a.find(_ eq e).isDefined)
}

replaceGroupingFunc(_, groupByExprs, gid).transformDown {
// AggregateExpression should be computed on the unmodified value of its argument
// expressions, so we should not replace any references to grouping expression
Expand Down Expand Up @@ -1259,6 +1261,11 @@ class Analyzer(
attr.withExprId(exprId)
}

private def dedupStructField(attr: Alias, structFieldMap: Map[String, Attribute]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used now?

Oh, yea

val exprId = structFieldMap.getOrElse(attr.child.sql, attr).exprId
Alias(attr.child, attr.name)(exprId, attr.qualifier, attr.explicitMetadata)
}

/**
* The outer plan may have been de-duplicated and the function below updates the
* outer references to refer to the de-duplicated attributes.
Expand Down Expand Up @@ -1479,11 +1486,70 @@ class Analyzer(
// Skip the having clause here, this will be handled in ResolveAggregateFunctions.
case h: UnresolvedHaving => h

case p: LogicalPlan if needResolveStructField(p) =>
logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}")
val resolved = p.mapExpressions(resolveExpressionTopDown(_, p))
val structFieldMap = new mutable.HashMap[String, Alias]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutable.HashMap -> mutable.Map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutable.HashMap -> mutable.Map

Done

resolved.transformExpressions {
case a @ Alias(struct: GetStructField, _) =>
if (structFieldMap.contains(struct.sql)) {
val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
} else {
structFieldMap.put(struct.sql, a)
a
}
case e => e
}

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you write this handling in an independent patten like this?

      case agg @ (_: Aggregate | _: GroupingSets) =>
        val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg))
        val structFieldMap = mutable.Map[String, Alias]()
        resolved.transformExpressionsDown {
          // Plz describe some comments why we need this handling...
          case a @ Alias(struct: GetStructField, _) =>
            if (structFieldMap.contains(struct.sql)) {
              val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
              Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
            } else {
              structFieldMap += (struct.sql -> a)
              a
            }
          case e => e
        }

      case q: LogicalPlan =>
        logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
        q.mapExpressions(resolveExpressionTopDown(_, q))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w/ some code cleanup;

      case agg @ (_: Aggregate | _: GroupingSets) =>
        val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg))
        val hasStructField = resolved.expressions.exists {
          _.collectFirst { case gsf: GetStructField => gsf }.isDefined
        }
        if (hasStructField) {
          // Plz describe some comments why we need this handling...
          val structFieldMap = mutable.Map[String, Alias]()
          resolved.transformExpressionsDown {
            case a @ Alias(struct: GetStructField, _) =>
              if (structFieldMap.contains(struct.sql)) {
                val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
                Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
              } else {
                structFieldMap += (struct.sql -> a)
                a
              }
          }
        } else {
          resolved
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w/ some code cleanup;

Done

q.mapExpressions(resolveExpressionTopDown(_, q))
}

def needResolveStructField(plan: LogicalPlan): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

plan match {
case UnresolvedHaving(havingCondition, a: Aggregate)
if containSameStructFields(a.groupingExpressions.flatMap(_.references),
a.aggregateExpressions.flatMap(_.references),
Some(havingCondition.references.toSeq)) => true
case Aggregate(groupingExpressions, aggregateExpressions, _)
if containSameStructFields(groupingExpressions.flatMap(_.references),
aggregateExpressions.flatMap(_.references)) => true
case GroupingSets(selectedGroupByExprs, groupByExprs, _, aggregations)
if containSameStructFields(groupByExprs.flatMap(_.references),
aggregations.flatMap(_.references),
Some(selectedGroupByExprs.flatMap(_.flatMap(_.references)))) => true
case _ => false
}
}

def containSameStructFields(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not enough just to check if both sides (grpExprs and aggExprs) have struct fields here? We need to confirm the identity by using unresolved attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not enough just to check if both sides (grpExprs and aggExprs) have struct fields here? We need to confirm the identity by using unresolved attributes?

Yea, hear attribute is still unresolved attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not enough just to check if both sides (grpExprs and aggExprs) have struct fields here?

GroupingSets need check selected cols too, so all need to check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to avoid comparing unresolved attributes.... could we resolve them then detect the mismatched exprIds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to avoid comparing unresolved attributes.... could we resolve them then detect the mismatched exprIds?

How about current? not check, just reoslve.

grpExprs: Seq[Attribute],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: grpExprs -> groupExprs for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: grpExprs -> groupExprs for consistency.

Done

aggExprs: Seq[Attribute],
extra: Option[Seq[Attribute]] = None): Boolean = {

def isStructField(attr: Attribute): Boolean = {
attr.isInstanceOf[UnresolvedAttribute] &&
attr.asInstanceOf[UnresolvedAttribute].nameParts.size == 2
}

val grpAttrs = grpExprs.filter(isStructField)
.map(_.asInstanceOf[UnresolvedAttribute].name)
val aggAttrs = aggExprs.filter(isStructField)
.map(_.asInstanceOf[UnresolvedAttribute].name)
val havingAttrs = extra.getOrElse(Seq.empty[Attribute]).filter(isStructField)
.map(_.asInstanceOf[UnresolvedAttribute].name)

if (extra.isDefined) {
grpAttrs.exists(aggAttrs.contains)
} else {
grpAttrs.exists(aggAttrs.contains) ||
grpAttrs.exists(havingAttrs.contains) ||
aggAttrs.exists(havingAttrs.contains)
}
}

def resolveAssignments(
assignments: Seq[Assignment],
mergeInto: MergeIntoTable,
Expand Down
82 changes: 82 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkIfSeedExistsInExplain(df2)
}

test("SPARK-31670: Struct Field in groupByExpr with CUBE") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make the title more correct? I think we don't need the word with CUBE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make the title more correct? I think we don't need the word with CUBE.

It's ok for current PR title?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looks fine.

withTable("t") {
sql(
"""CREATE TABLE t(
|a STRING,
|b INT,
|c ARRAY<STRUCT<row_id:INT,json_string:STRING>>,
|d ARRAY<ARRAY<STRING>>,
|e ARRAY<MAP<STRING, INT>>)
|USING ORC""".stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: #28490 (comment)

Test case change to view and with actual data.


checkAnswer(
sql(
"""
|SELECT a, each.json_string, SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) x AS each
|GROUP BY a, each.json_string
|WITH CUBE
|""".stripMargin), Nil)

checkAnswer(
sql(
"""
|SELECT a, get_json_object(each.json_string, '$.i'), SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) X AS each
|GROUP BY a, get_json_object(each.json_string, '$.i')
|WITH CUBE
|""".stripMargin), Nil)

checkAnswer(
sql(
"""
|SELECT a, each.json_string AS json_string, SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) x AS each
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we must need lateral view to reproduce this issue? I mean, this issue cannot happen without lateral view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we must need lateral view to reproduce this issue? I mean, this issue cannot happen without lateral view?

No, I changed.

|GROUP BY a, each.json_string
|WITH CUBE
|""".stripMargin), Nil)

checkAnswer(
sql(
"""
|SELECT a, each.json_string as js, SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) X AS each
|GROUP BY a, each.json_string
|WITH CUBE
|""".stripMargin), Nil)

checkAnswer(
sql(
"""
|SELECT a, each.json_string as js, SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) X AS each
|GROUP BY a, each.json_string
|WITH ROLLUP
|""".stripMargin), Nil)

sql(
"""
|SELECT a, each.json_string, SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) X AS each
|GROUP BY a, each.json_string
|GROUPING sets((a),(a, each.json_string))
|""".stripMargin).explain(true)

checkAnswer(
sql(
"""
|SELECT a, each.json_string, SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) X AS each
|GROUP BY a, each.json_string
|GROUPING sets((a),(a, each.json_string))
|""".stripMargin), Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests having queries with HAVING clauses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests having queries with HAVING clauses?

Seems I make a mistake, having won't have duplicate field since it won't have grouping keys in having condition

}
}

test("SPARK-31761: test byte, short, integer overflow for (Divide) integral type") {
checkAnswer(sql("Select -2147483648 DIV -1"), Seq(Row(Integer.MIN_VALUE.toLong * -1)))
checkAnswer(sql("select CAST(-128 as Byte) DIV CAST (-1 as Byte)"),
Expand Down