Skip to content

Commit f7c9ff5

Browse files
hvanhovellrxin
authored andcommitted
[SPARK-17068][SQL] Make view-usage visible during analysis
## What changes were proposed in this pull request? This PR adds a field to subquery alias in order to make the usage of views in a resolved `LogicalPlan` more visible (and more understandable). For example, the following view and query: ```sql create view constants as select 1 as id union all select 1 union all select 42 select * from constants; ``` ...now yields the following analyzed plan: ``` Project [id#39] +- SubqueryAlias c, `default`.`constants` +- Project [gen_attr_0#36 AS id#39] +- SubqueryAlias gen_subquery_0 +- Union :- Union : :- Project [1 AS gen_attr_0#36] : : +- OneRowRelation$ : +- Project [1 AS gen_attr_1#37] : +- OneRowRelation$ +- Project [42 AS gen_attr_2#38] +- OneRowRelation$ ``` ## How was this patch tested? Added tests for the two code paths in `SessionCatalogSuite` (sql/core) and `HiveMetastoreCatalogSuite` (sql/hive) Author: Herman van Hovell <hvanhovell@databricks.com> Closes #14657 from hvanhovell/SPARK-17068.
1 parent 4a2c375 commit f7c9ff5

20 files changed

Lines changed: 94 additions & 71 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class Analyzer(
138138
case u : UnresolvedRelation =>
139139
val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
140140
.map(_._2).map { relation =>
141-
val withAlias = u.alias.map(SubqueryAlias(_, relation))
141+
val withAlias = u.alias.map(SubqueryAlias(_, relation, None))
142142
withAlias.getOrElse(relation)
143143
}
144144
substituted.getOrElse(u)
@@ -2057,7 +2057,7 @@ class Analyzer(
20572057
*/
20582058
object EliminateSubqueryAliases extends Rule[LogicalPlan] {
20592059
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
2060-
case SubqueryAlias(_, child) => child
2060+
case SubqueryAlias(_, child, _) => child
20612061
}
20622062
}
20632063

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ trait CheckAnalysis extends PredicateHelper {
141141

142142
// Skip projects and subquery aliases added by the Analyzer and the SQLBuilder.
143143
def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
144-
case SubqueryAlias(_, child) => cleanQuery(child)
145-
case Project(_, child) => cleanQuery(child)
144+
case s: SubqueryAlias => cleanQuery(s.child)
145+
case p: Project => cleanQuery(p.child)
146146
case child => child
147147
}
148148

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -411,27 +411,29 @@ class SessionCatalog(
411411
}
412412

413413
/**
414-
* Return a [[LogicalPlan]] that represents the given table.
414+
* Return a [[LogicalPlan]] that represents the given table or view.
415415
*
416-
* If a database is specified in `name`, this will return the table from that database.
417-
* If no database is specified, this will first attempt to return a temporary table with
418-
* the same name, then, if that does not exist, return the table from the current database.
416+
* If a database is specified in `name`, this will return the table/view from that database.
417+
* If no database is specified, this will first attempt to return a temporary table/view with
418+
* the same name, then, if that does not exist, return the table/view from the current database.
419+
*
420+
* If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
421+
* track the name of the view.
419422
*/
420423
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
421424
synchronized {
422425
val db = formatDatabaseName(name.database.getOrElse(currentDb))
423426
val table = formatTableName(name.table)
424-
val relation =
425-
if (name.database.isDefined || !tempTables.contains(table)) {
426-
val metadata = externalCatalog.getTable(db, table)
427-
SimpleCatalogRelation(db, metadata)
428-
} else {
429-
tempTables(table)
427+
val relationAlias = alias.getOrElse(table)
428+
if (name.database.isDefined || !tempTables.contains(table)) {
429+
val metadata = externalCatalog.getTable(db, table)
430+
val view = Option(metadata.tableType).collect {
431+
case CatalogTableType.VIEW => name
430432
}
431-
val qualifiedTable = SubqueryAlias(table, relation)
432-
// If an alias was specified by the lookup, wrap the plan in a subquery so that
433-
// attributes are properly qualified with this alias.
434-
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
433+
SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)
434+
} else {
435+
SubqueryAlias(relationAlias, tempTables(table), Option(name))
436+
}
435437
}
436438
}
437439

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ package object dsl {
343343
orderSpec: Seq[SortOrder]): LogicalPlan =
344344
Window(windowExpressions, partitionSpec, orderSpec, logicalPlan)
345345

346-
def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan)
346+
def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)
347347

348348
def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan)
349349

@@ -367,7 +367,7 @@ package object dsl {
367367

368368
def as(alias: String): LogicalPlan = logicalPlan match {
369369
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
370-
case plan => SubqueryAlias(alias, plan)
370+
case plan => SubqueryAlias(alias, plan, None)
371371
}
372372

373373
def repartition(num: Integer): LogicalPlan =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ case class ScalarSubquery(
7272
override def dataType: DataType = query.schema.fields.head.dataType
7373
override def foldable: Boolean = false
7474
override def nullable: Boolean = true
75-
override def plan: LogicalPlan = SubqueryAlias(toString, query)
75+
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
7676
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = copy(query = plan)
7777
override def toString: String = s"scalar-subquery#${exprId.id} $conditionString"
7878
}
@@ -100,7 +100,7 @@ case class PredicateSubquery(
100100
override lazy val resolved = childrenResolved && query.resolved
101101
override lazy val references: AttributeSet = super.references -- query.outputSet
102102
override def nullable: Boolean = nullAware
103-
override def plan: LogicalPlan = SubqueryAlias(toString, query)
103+
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
104104
override def withNewPlan(plan: LogicalPlan): PredicateSubquery = copy(query = plan)
105105
override def semanticEquals(o: Expression): Boolean = o match {
106106
case p: PredicateSubquery =>
@@ -153,7 +153,7 @@ case class ListQuery(query: LogicalPlan, exprId: ExprId = NamedExpression.newExp
153153
override def dataType: DataType = ArrayType(NullType)
154154
override def nullable: Boolean = false
155155
override def withNewPlan(plan: LogicalPlan): ListQuery = copy(query = plan)
156-
override def plan: LogicalPlan = SubqueryAlias(toString, query)
156+
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
157157
override def toString: String = s"list#${exprId.id}"
158158
}
159159

@@ -174,6 +174,6 @@ case class Exists(query: LogicalPlan, exprId: ExprId = NamedExpression.newExprId
174174
override def children: Seq[Expression] = Seq.empty
175175
override def nullable: Boolean = false
176176
override def withNewPlan(plan: LogicalPlan): Exists = copy(query = plan)
177-
override def plan: LogicalPlan = SubqueryAlias(toString, query)
177+
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
178178
override def toString: String = s"exists#${exprId.id}"
179179
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,7 +1862,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
18621862
// and Project operators, followed by an optional Filter, followed by an
18631863
// Aggregate. Traverse the operators recursively.
18641864
def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match {
1865-
case SubqueryAlias(_, child) => evalPlan(child)
1865+
case SubqueryAlias(_, child, _) => evalPlan(child)
18661866
case Filter(condition, child) =>
18671867
val bindings = evalPlan(child)
18681868
if (bindings.isEmpty) bindings
@@ -1920,7 +1920,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
19201920
topPart += p
19211921
bottomPart = child
19221922

1923-
case s @ SubqueryAlias(_, child) =>
1923+
case s @ SubqueryAlias(_, child, _) =>
19241924
topPart += s
19251925
bottomPart = child
19261926

@@ -1991,8 +1991,8 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
19911991
topPart.reverse.foreach {
19921992
case Project(projList, _) =>
19931993
subqueryRoot = Project(projList ++ havingInputs, subqueryRoot)
1994-
case s @ SubqueryAlias(alias, _) =>
1995-
subqueryRoot = SubqueryAlias(alias, subqueryRoot)
1994+
case s @ SubqueryAlias(alias, _, None) =>
1995+
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)
19961996
case op => sys.error(s"Unexpected operator $op in corelated subquery")
19971997
}
19981998

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
107107
* This is only used for Common Table Expressions.
108108
*/
109109
override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) {
110-
SubqueryAlias(ctx.name.getText, plan(ctx.queryNoWith))
110+
SubqueryAlias(ctx.name.getText, plan(ctx.queryNoWith), None)
111111
}
112112

113113
/**
@@ -723,7 +723,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
723723
* Create an alias (SubqueryAlias) for a LogicalPlan.
724724
*/
725725
private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = {
726-
SubqueryAlias(alias.getText, plan)
726+
SubqueryAlias(alias.getText, plan, None)
727727
}
728728

729729
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22+
import org.apache.spark.sql.catalyst.TableIdentifier
2223
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2324
import org.apache.spark.sql.catalyst.expressions._
2425
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -693,7 +694,11 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
693694
}
694695
}
695696

696-
case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {
697+
case class SubqueryAlias(
698+
alias: String,
699+
child: LogicalPlan,
700+
view: Option[TableIdentifier])
701+
extends UnaryNode {
697702

698703
override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
699704
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,8 @@ class AnalysisSuite extends AnalysisTest {
339339
val query =
340340
Project(Seq($"x.key", $"y.key"),
341341
Join(
342-
Project(Seq($"x.key"), SubqueryAlias("x", input)),
343-
Project(Seq($"y.key"), SubqueryAlias("y", input)),
342+
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
343+
Project(Seq($"y.key"), SubqueryAlias("y", input, None)),
344344
Inner, None))
345345

346346
assertAnalysisSuccess(query)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,31 +395,38 @@ class SessionCatalogSuite extends SparkFunSuite {
395395
sessionCatalog.setCurrentDatabase("db2")
396396
// If we explicitly specify the database, we'll look up the relation in that database
397397
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
398-
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
398+
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
399399
// Otherwise, we'll first look up a temporary table with the same name
400400
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
401-
== SubqueryAlias("tbl1", tempTable1))
401+
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
402402
// Then, if that does not exist, look up the relation in the current database
403403
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
404404
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
405-
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
405+
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
406406
}
407407

408408
test("lookup table relation with alias") {
409409
val catalog = new SessionCatalog(newBasicCatalog())
410410
val alias = "monster"
411411
val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
412-
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata))
412+
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)
413413
val relationWithAlias =
414414
SubqueryAlias(alias,
415-
SubqueryAlias("tbl1",
416-
SimpleCatalogRelation("db2", tableMetadata)))
415+
SimpleCatalogRelation("db2", tableMetadata), None)
417416
assert(catalog.lookupRelation(
418417
TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
419418
assert(catalog.lookupRelation(
420419
TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
421420
}
422421

422+
test("lookup view with view name in alias") {
423+
val catalog = new SessionCatalog(newBasicCatalog())
424+
val tmpView = Range(1, 10, 2, 10)
425+
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
426+
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
427+
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
428+
}
429+
423430
test("table exists") {
424431
val catalog = new SessionCatalog(newBasicCatalog())
425432
assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))

0 commit comments

Comments
 (0)