Skip to content

Commit b1b5e1f

Browse files
gatorsmileuzadude
authored andcommitted
[SPARK-18217][SQL] Disallow creating permanent views based on temporary views or UDFs
### What changes were proposed in this pull request? Based on the discussion in [SPARK-18209](https://issues.apache.org/jira/browse/SPARK-18209). It doesn't really make sense to create permanent views based on temporary views or temporary UDFs. To disallow the supports and issue the exceptions, this PR needs to detect whether a temporary view/UDF is being used when defining a permanent view. Basically, this PR can be split to two sub-tasks: **Task 1:** detecting a temporary view from the query plan of view definition. When finding an unresolved temporary view, Analyzer replaces it by a `SubqueryAlias` with the corresponding logical plan, which is stored in an in-memory HashMap. After replacement, it is impossible to detect whether the `SubqueryAlias` is added/generated from a temporary view. Thus, to detect the usage of a temporary view in view definition, this PR traverses the unresolved logical plan and uses the name of an `UnresolvedRelation` to detect whether it is a (global) temporary view. **Task 2:** detecting a temporary UDF from the query plan of view definition. Detecting usage of a temporary UDF in view definition is not straightfoward. First, in the analyzed plan, we are having different forms to represent the functions. More importantly, some classes (e.g., `HiveGenericUDF`) are not accessible from `CreateViewCommand`, which is part of `sql/core`. Thus, we used the unanalyzed plan `child` of `CreateViewCommand` to detect the usage of a temporary UDF. Because the plan has already been successfully analyzed, we can assume the functions have been defined/registered. Second, in Spark, the functions have four forms: Spark built-in functions, built-in hash functions, permanent UDFs and temporary UDFs. We do not have any direct way to determine whether a function is temporary or not. Thus, we introduced a function `isTemporaryFunction` in `SessionCatalog`. This function contains the detailed logics to determine whether a function is temporary or not. ### How was this patch tested? Added test cases. Author: gatorsmile <[email protected]> Closes apache#15764 from gatorsmile/blockTempFromPermViewCreation.
1 parent d54a7f1 commit b1b5e1f

5 files changed

Lines changed: 172 additions & 12 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,24 @@ class SessionCatalog(
923923
}
924924
}
925925

926+
/**
927+
* Returns whether it is a temporary function. If not existed, returns false.
928+
*/
929+
def isTemporaryFunction(name: FunctionIdentifier): Boolean = {
930+
// copied from HiveSessionCatalog
931+
val hiveFunctions = Seq(
932+
"hash",
933+
"histogram_numeric",
934+
"percentile")
935+
936+
// A temporary function is a function that has been registered in functionRegistry
937+
// without a database name, and is neither a built-in function nor a Hive function
938+
name.database.isEmpty &&
939+
functionRegistry.functionExists(name.funcName) &&
940+
!FunctionRegistry.builtin.functionExists(name.funcName) &&
941+
!hiveFunctions.contains(name.funcName.toLowerCase)
942+
}
943+
926944
protected def failFunctionLookup(name: String): Nothing = {
927945
throw new NoSuchFunctionException(db = currentDb, func = name)
928946
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,34 @@ class SessionCatalogSuite extends SparkFunSuite {
919919
catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(arguments.length))
920920
}
921921

922+
test("isTemporaryFunction") {
923+
val externalCatalog = newBasicCatalog()
924+
val sessionCatalog = new SessionCatalog(externalCatalog)
925+
926+
// Returns false when the function does not exist
927+
assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1")))
928+
929+
val tempFunc1 = (e: Seq[Expression]) => e.head
930+
val info1 = new ExpressionInfo("tempFunc1", "temp1")
931+
sessionCatalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
932+
933+
// Returns true when the function is temporary
934+
assert(sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1")))
935+
936+
// Returns false when the function is permanent
937+
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
938+
assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1", Some("db2"))))
939+
assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("db2.func1")))
940+
sessionCatalog.setCurrentDatabase("db2")
941+
assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1")))
942+
943+
// Returns false when the function is built-in or hive
944+
assert(FunctionRegistry.builtin.functionExists("sum"))
945+
assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("sum")))
946+
assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("histogram_numeric")))
947+
assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("percentile")))
948+
}
949+
922950
test("drop function") {
923951
val externalCatalog = newBasicCatalog()
924952
val sessionCatalog = new SessionCatalog(externalCatalog)

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.command
1919

2020
import scala.util.control.NonFatal
2121

22-
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
22+
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2323
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
24+
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
2425
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
2526
import org.apache.spark.sql.catalyst.expressions.Alias
2627
import org.apache.spark.sql.catalyst.plans.QueryPlan
2728
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
28-
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
29-
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
29+
import org.apache.spark.sql.types.MetadataBuilder
3030

3131

3232
/**
@@ -131,6 +131,10 @@ case class CreateViewCommand(
131131
s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
132132
}
133133

134+
// When creating a permanent view, not allowed to reference temporary objects.
135+
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
136+
verifyTemporaryObjectsNotExists(sparkSession)
137+
134138
val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
135139
analyzedPlan
136140
} else {
@@ -172,6 +176,34 @@ case class CreateViewCommand(
172176
Seq.empty[Row]
173177
}
174178

179+
/**
180+
* Permanent views are not allowed to reference temp objects, including temp function and views
181+
*/
182+
private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = {
183+
if (!isTemporary) {
184+
// This func traverses the unresolved plan `child`. Below are the reasons:
185+
// 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding
186+
// logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is
187+
// added/generated from a temporary view.
188+
// 2) The temp functions are represented by multiple classes. Most are inaccessible from this
189+
// package (e.g., HiveGenericUDF).
190+
child.collect {
191+
// Disallow creating permanent views based on temporary views.
192+
case s: UnresolvedRelation
193+
if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
194+
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
195+
s"referencing a temporary view ${s.tableIdentifier}")
196+
case other if !other.resolved => other.expressions.flatMap(_.collect {
197+
// Disallow creating permanent views based on temporary UDFs.
198+
case e: UnresolvedFunction
199+
if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) =>
200+
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
201+
s"referencing a temporary function `${e.name}`")
202+
})
203+
}
204+
}
205+
}
206+
175207
/**
176208
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
177209
* SQL based on the analyzed plan, and also creates the proper schema for the view.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ private[sql] class HiveSessionCatalog(
232232
// current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field,
233233
// in_file, index, matchpath, ngrams, noop, noopstreaming, noopwithmap,
234234
// noopwithmapstreaming, parse_url_tuple, reflect2, windowingtablefunction.
235+
// Note: don't forget to update SessionCatalog.isTemporaryFunction
235236
private val hiveFunctions = Seq(
236237
"histogram_numeric",
237238
"percentile"

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,46 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
3838
spark.sql(s"DROP TABLE IF EXISTS jt")
3939
}
4040

41-
test("nested views (interleaved with temporary views)") {
42-
withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") {
41+
test("create a permanent view on a permanent view") {
42+
withView("jtv1", "jtv2") {
4343
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
4444
sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6")
4545
checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
46+
}
47+
}
4648

47-
// Checks temporary views
49+
test("create a temp view on a permanent view") {
50+
withView("jtv1", "temp_jtv1") {
51+
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
52+
sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jtv1 WHERE id < 6")
53+
checkAnswer(sql("select count(*) FROM temp_jtv1"), Row(2))
54+
}
55+
}
56+
57+
test("create a temp view on a temp view") {
58+
withView("temp_jtv1", "temp_jtv2") {
4859
sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
4960
sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6")
5061
checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2))
62+
}
63+
}
64+
65+
test("create a permanent view on a temp view") {
66+
withView("jtv1", "temp_jtv1", "global_temp_jtv1") {
67+
sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
68+
var e = intercept[AnalysisException] {
69+
sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6")
70+
}.getMessage
71+
assert(e.contains("Not allowed to create a permanent view `jtv1` by " +
72+
"referencing a temporary view `temp_jtv1`"))
5173

52-
// Checks interleaved temporary view and normal view
53-
sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3")
54-
sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6")
55-
checkAnswer(sql("select count(*) FROM jtv3"), Row(2))
74+
val globalTempDB = spark.sharedState.globalTempViewManager.database
75+
sql("CREATE GLOBAL TEMP VIEW global_temp_jtv1 AS SELECT * FROM jt WHERE id > 0")
76+
e = intercept[AnalysisException] {
77+
sql(s"CREATE VIEW jtv1 AS SELECT * FROM $globalTempDB.global_temp_jtv1 WHERE id < 6")
78+
}.getMessage
79+
assert(e.contains(s"Not allowed to create a permanent view `jtv1` by referencing " +
80+
s"a temporary view `global_temp`.`global_temp_jtv1`"))
5681
}
5782
}
5883

@@ -439,7 +464,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
439464
}
440465
}
441466

442-
test("SPARK-14933 - create view from hive parquet tabale") {
467+
test("SPARK-14933 - create view from hive parquet table") {
443468
withTable("t_part") {
444469
withView("v_part") {
445470
spark.sql("create table t_part stored as parquet as select 1 as a, 2 as b")
@@ -451,7 +476,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
451476
}
452477
}
453478

454-
test("SPARK-14933 - create view from hive orc tabale") {
479+
test("SPARK-14933 - create view from hive orc table") {
455480
withTable("t_orc") {
456481
withView("v_orc") {
457482
spark.sql("create table t_orc stored as orc as select 1 as a, 2 as b")
@@ -462,4 +487,60 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
462487
}
463488
}
464489
}
490+
491+
test("create a permanent/temp view using a hive, built-in, and permanent user function") {
492+
val permanentFuncName = "myUpper"
493+
val permanentFuncClass =
494+
classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName
495+
val builtInFuncNameInLowerCase = "abs"
496+
val builtInFuncNameInMixedCase = "aBs"
497+
val hiveFuncName = "histogram_numeric"
498+
499+
withUserDefinedFunction(permanentFuncName -> false) {
500+
sql(s"CREATE FUNCTION $permanentFuncName AS '$permanentFuncClass'")
501+
withTable("tab1") {
502+
(1 to 10).map(i => (s"$i", i)).toDF("str", "id").write.saveAsTable("tab1")
503+
Seq("VIEW", "TEMPORARY VIEW").foreach { viewMode =>
504+
withView("view1") {
505+
sql(
506+
s"""
507+
|CREATE $viewMode view1
508+
|AS SELECT
509+
|$permanentFuncName(str),
510+
|$builtInFuncNameInLowerCase(id),
511+
|$builtInFuncNameInMixedCase(id) as aBs,
512+
|$hiveFuncName(id, 5) over()
513+
|FROM tab1
514+
""".stripMargin)
515+
checkAnswer(sql("select count(*) FROM view1"), Row(10))
516+
}
517+
}
518+
}
519+
}
520+
}
521+
522+
test("create a permanent/temp view using a temporary function") {
523+
val tempFunctionName = "temp"
524+
val functionClass =
525+
classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName
526+
withUserDefinedFunction(tempFunctionName -> true) {
527+
sql(s"CREATE TEMPORARY FUNCTION $tempFunctionName AS '$functionClass'")
528+
withView("view1", "tempView1") {
529+
withTable("tab1") {
530+
(1 to 10).map(i => s"$i").toDF("id").write.saveAsTable("tab1")
531+
532+
// temporary view
533+
sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $tempFunctionName(id) from tab1")
534+
checkAnswer(sql("select count(*) FROM tempView1"), Row(10))
535+
536+
// permanent view
537+
val e = intercept[AnalysisException] {
538+
sql(s"CREATE VIEW view1 AS SELECT $tempFunctionName(id) from tab1")
539+
}.getMessage
540+
assert(e.contains("Not allowed to create a permanent view `view1` by referencing " +
541+
s"a temporary function `$tempFunctionName`"))
542+
}
543+
}
544+
}
545+
}
465546
}

0 commit comments

Comments
 (0)