Skip to content

Commit 8fb1463

Browse files
clockflyyhuai
authored andcommitted
[SPARK-6339][SQL] Supports CREATE TEMPORARY VIEW tableIdentifier AS query
## What changes were proposed in this pull request? This PR support new SQL syntax CREATE TEMPORARY VIEW. Like: ``` CREATE TEMPORARY VIEW viewName AS SELECT * from xx CREATE OR REPLACE TEMPORARY VIEW viewName AS SELECT * from xx CREATE TEMPORARY VIEW viewName (c1 COMMENT 'blabla', c2 COMMENT 'blabla') AS SELECT * FROM xx ``` ## How was this patch tested? Unit tests. Author: Sean Zhong <clockfly@gmail.com> Closes apache#12872 from clockfly/spark-6399.
1 parent fa79d34 commit 8fb1463

5 files changed

Lines changed: 175 additions & 38 deletions

File tree

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ statement
8484
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
8585
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
8686
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
87-
| CREATE (OR REPLACE)? VIEW (IF NOT EXISTS)? tableIdentifier
87+
| CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
8888
identifierCommentList? (COMMENT STRING)?
8989
(PARTITIONED ON identifierList)?
9090
(TBLPROPERTIES tablePropertyList)? AS query #createView

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
935935
*
936936
* For example:
937937
* {{{
938-
* CREATE VIEW [IF NOT EXISTS] [db_name.]view_name
938+
* CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
939939
* [(column_name [COMMENT column_comment], ...) ]
940940
* [COMMENT view_comment]
941941
* [TBLPROPERTIES (property_name = property_value, ...)]
@@ -958,7 +958,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
958958
ctx.query,
959959
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
960960
ctx.EXISTS != null,
961-
ctx.REPLACE != null
961+
ctx.REPLACE != null,
962+
ctx.TEMPORARY != null
962963
)
963964
}
964965
}
@@ -975,7 +976,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
975976
ctx.query,
976977
Map.empty,
977978
allowExist = false,
978-
replace = true)
979+
replace = true,
980+
isTemporary = false)
979981
}
980982

981983
/**
@@ -989,7 +991,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
989991
query: QueryContext,
990992
properties: Map[String, String],
991993
allowExist: Boolean,
992-
replace: Boolean): LogicalPlan = {
994+
replace: Boolean,
995+
isTemporary: Boolean): LogicalPlan = {
993996
val sql = Option(source(query))
994997
val tableDesc = CatalogTable(
995998
identifier = visitTableIdentifier(name),
@@ -1000,7 +1003,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10001003
viewOriginalText = sql,
10011004
viewText = sql,
10021005
comment = comment)
1003-
CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
1006+
CreateViewCommand(tableDesc, plan(query), allowExist, replace, isTemporary, command(ctx))
10041007
}
10051008

10061009
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
2020
import scala.util.control.NonFatal
2121

2222
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
23-
import org.apache.spark.sql.catalyst.SQLBuilder
23+
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
2424
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
2525
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
2626
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -37,13 +37,18 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
3737
* already exists, throws analysis exception.
3838
* @param replace if true, and if the view already exists, updates it; if false, and if the view
3939
* already exists, throws analysis exception.
40+
* @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
41+
* at the end of current Spark session. Existing permanent relations with the same
42+
* name are not visible to the current session while the temporary view exists,
43+
* unless they are specified with full qualified table name with database prefix.
4044
* @param sql the original sql
4145
*/
4246
case class CreateViewCommand(
4347
tableDesc: CatalogTable,
4448
child: LogicalPlan,
4549
allowExisting: Boolean,
4650
replace: Boolean,
51+
isTemporary: Boolean,
4752
sql: String)
4853
extends RunnableCommand {
4954

@@ -55,12 +60,23 @@ case class CreateViewCommand(
5560
require(tableDesc.tableType == CatalogTableType.VIEW)
5661
require(tableDesc.viewText.isDefined)
5762

58-
private val tableIdentifier = tableDesc.identifier
59-
6063
if (allowExisting && replace) {
6164
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
6265
}
6366

67+
// Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
68+
if (allowExisting && isTemporary) {
69+
throw new AnalysisException(
70+
"It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
71+
}
72+
73+
// Temporary view names should NOT contain database prefix like "database.table"
74+
if (isTemporary && tableDesc.identifier.database.isDefined) {
75+
val database = tableDesc.identifier.database.get
76+
throw new AnalysisException(
77+
s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.")
78+
}
79+
6480
override def run(sparkSession: SparkSession): Seq[Row] = {
6581
// If the plan cannot be analyzed, throw an exception and don't proceed.
6682
val qe = sparkSession.executePlan(child)
@@ -70,29 +86,59 @@ case class CreateViewCommand(
7086
require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
7187
val sessionState = sparkSession.sessionState
7288

73-
if (sessionState.catalog.tableExists(tableIdentifier)) {
74-
if (allowExisting) {
75-
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
76-
// already exists.
77-
} else if (replace) {
78-
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
79-
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
89+
if (isTemporary) {
90+
createTemporaryView(tableDesc.identifier, sparkSession, analyzedPlan)
91+
} else {
92+
// Adds default database for permanent table if it doesn't exist, so that tableExists()
93+
// only check permanent tables.
94+
val database = tableDesc.identifier.database.getOrElse(
95+
sessionState.catalog.getCurrentDatabase)
96+
val tableIdentifier = tableDesc.identifier.copy(database = Option(database))
97+
98+
if (sessionState.catalog.tableExists(tableIdentifier)) {
99+
if (allowExisting) {
100+
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
101+
// already exists.
102+
} else if (replace) {
103+
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
104+
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
105+
} else {
106+
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
107+
// exists.
108+
throw new AnalysisException(
109+
s"View $tableIdentifier already exists. If you want to update the view definition, " +
110+
"please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
111+
}
80112
} else {
81-
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
82-
// exists.
83-
throw new AnalysisException(s"View $tableIdentifier already exists. " +
84-
"If you want to update the view definition, please use ALTER VIEW AS or " +
85-
"CREATE OR REPLACE VIEW AS")
113+
// Create the view if it doesn't exist.
114+
sessionState.catalog.createTable(
115+
prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
86116
}
87-
} else {
88-
// Create the view if it doesn't exist.
89-
sessionState.catalog.createTable(
90-
prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
91117
}
92-
93118
Seq.empty[Row]
94119
}
95120

121+
private def createTemporaryView(
122+
table: TableIdentifier, sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
123+
124+
val sessionState = sparkSession.sessionState
125+
val catalog = sessionState.catalog
126+
127+
// Projects column names to alias names
128+
val logicalPlan = {
129+
if (tableDesc.schema.isEmpty) {
130+
analyzedPlan
131+
} else {
132+
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
133+
case (attr, col) => Alias(attr, col.name)()
134+
}
135+
sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
136+
}
137+
}
138+
139+
catalog.createTempTable(table.table, logicalPlan, replace)
140+
}
141+
96142
/**
97143
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
98144
* SQL based on the analyzed plan, and also creates the proper schema for the view.

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest {
3939
parser.parsePlan(sql).collect {
4040
case CreateTable(desc, allowExisting) => (desc, allowExisting)
4141
case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
42-
case CreateViewCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
42+
case CreateViewCommand(desc, _, allowExisting, _, _, _) => (desc, allowExisting)
4343
}.head
4444
}
4545

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,21 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
3737
sqlContext.sql(s"DROP TABLE IF EXISTS jt")
3838
}
3939

40-
test("nested views") {
41-
withView("jtv1", "jtv2") {
42-
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
43-
sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
40+
test("nested views (interleaved with temporary views)") {
41+
withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") {
42+
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
43+
sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6")
4444
checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
45+
46+
// Checks temporary views
47+
sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
48+
sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6")
49+
checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2))
50+
51+
// Checks interleaved temporary view and normal view
52+
sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3")
53+
sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6")
54+
checkAnswer(sql("select count(*) FROM jtv3"), Row(2))
4555
}
4656
}
4757

@@ -57,6 +67,33 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
5767
}
5868
}
5969

70+
test("error handling: fail if the temp view name contains the database prefix") {
71+
// Fully qualified table name like "database.table" is not allowed for temporary view
72+
val e = intercept[AnalysisException] {
73+
sql("CREATE OR REPLACE TEMPORARY VIEW default.myabcdview AS SELECT * FROM jt")
74+
}
75+
assert(e.message.contains("It is not allowed to add database prefix"))
76+
}
77+
78+
test("error handling: disallow IF NOT EXISTS for CREATE TEMPORARY VIEW") {
79+
val e = intercept[AnalysisException] {
80+
sql("CREATE TEMPORARY VIEW IF NOT EXISTS myabcdview AS SELECT * FROM jt")
81+
}
82+
assert(e.message.contains("It is not allowed to define a TEMPORARY view with IF NOT EXISTS"))
83+
}
84+
85+
test("error handling: fail if the temp view sql itself is invalid") {
86+
// A table that does not exist for temporary view
87+
intercept[AnalysisException] {
88+
sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT * FROM table_not_exist1345")
89+
}
90+
91+
// A column that does not exist, for temporary view
92+
intercept[AnalysisException] {
93+
sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT random1234 FROM jt")
94+
}
95+
}
96+
6097
test("correctly parse CREATE VIEW statement") {
6198
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
6299
sql(
@@ -69,18 +106,70 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
69106
}
70107
}
71108

109+
test("correctly parse CREATE TEMPORARY VIEW statement") {
110+
withView("testView") {
111+
sql(
112+
"""CREATE TEMPORARY VIEW
113+
|testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
114+
|TBLPROPERTIES ('a' = 'b')
115+
|AS SELECT * FROM jt
116+
|""".stripMargin)
117+
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
118+
}
119+
}
120+
121+
test("should NOT allow CREATE TEMPORARY VIEW when TEMPORARY VIEW with same name exists") {
122+
withView("testView") {
123+
sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
124+
125+
val e = intercept[AnalysisException] {
126+
sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
127+
}
128+
129+
assert(e.message.contains("Temporary table") && e.message.contains("already exists"))
130+
}
131+
}
132+
133+
test("should allow CREATE TEMPORARY VIEW when a permanent VIEW with same name exists") {
134+
withView("testView", "default.testView") {
135+
sql("CREATE VIEW testView AS SELECT id FROM jt")
136+
sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
137+
}
138+
}
139+
140+
test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") {
141+
withView("testView", "default.testView") {
142+
sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
143+
sql("CREATE VIEW testView AS SELECT id FROM jt")
144+
}
145+
}
146+
72147
test("correctly handle CREATE VIEW IF NOT EXISTS") {
73148
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
74149
withTable("jt2") {
75-
sql("CREATE VIEW testView AS SELECT id FROM jt")
150+
withView("testView") {
151+
sql("CREATE VIEW testView AS SELECT id FROM jt")
76152

77-
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
78-
df.write.format("json").saveAsTable("jt2")
79-
sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
153+
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
154+
df.write.format("json").saveAsTable("jt2")
155+
sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
80156

81-
// make sure our view doesn't change.
157+
// make sure our view doesn't change.
158+
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
159+
}
160+
}
161+
}
162+
}
163+
164+
test(s"correctly handle CREATE OR REPLACE TEMPORARY VIEW") {
165+
withTable("jt2") {
166+
withView("testView") {
167+
sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt")
82168
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
83-
sql("DROP VIEW testView")
169+
170+
sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id AS i, id AS j FROM jt")
171+
// make sure the view has been changed.
172+
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
84173
}
85174
}
86175
}
@@ -215,5 +304,4 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
215304
}
216305
}
217306
}
218-
219307
}

0 commit comments

Comments
 (0)