Skip to content

Commit 5787ace

Browse files
ouyangxiaochencloud-fan
authored andcommitted
[SPARK-20383][SQL] Supporting Create [temporary] Function with the keyword 'OR REPLACE' and 'IF NOT EXISTS'
## What changes were proposed in this pull request? support to create [temporary] function with the keyword 'OR REPLACE' and 'IF NOT EXISTS' ## How was this patch tested? manual test and added test cases Please review http://spark.apache.org/contributing.html before opening a pull request. Author: ouyangxiaochen <[email protected]> Closes #17681 from ouyangxiaochen/spark-419.
1 parent 873f3ad commit 5787ace

File tree

12 files changed

+216
-19
lines changed

12 files changed

+216
-19
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ statement
126126
tableIdentifier ('(' colTypeList ')')? tableProvider
127127
(OPTIONS tablePropertyList)? #createTempViewUsing
128128
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
129-
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
129+
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
130+
qualifiedName AS className=STRING
130131
(USING resource (',' resource)*)? #createFunction
131132
| DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction
132133
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,15 @@ abstract class ExternalCatalog
332332

333333
protected def doDropFunction(db: String, funcName: String): Unit
334334

335+
final def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = {
336+
val name = funcDefinition.identifier.funcName
337+
postToAll(AlterFunctionPreEvent(db, name))
338+
doAlterFunction(db, funcDefinition)
339+
postToAll(AlterFunctionEvent(db, name))
340+
}
341+
342+
protected def doAlterFunction(db: String, funcDefinition: CatalogFunction): Unit
343+
335344
final def renameFunction(db: String, oldName: String, newName: String): Unit = {
336345
postToAll(RenameFunctionPreEvent(db, oldName, newName))
337346
doRenameFunction(db, oldName, newName)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,12 @@ class InMemoryCatalog(
590590
catalog(db).functions.remove(funcName)
591591
}
592592

593+
override protected def doAlterFunction(db: String, func: CatalogFunction): Unit = synchronized {
594+
requireDbExists(db)
595+
requireFunctionExists(db, func.identifier.funcName)
596+
catalog(db).functions.put(func.identifier.funcName, func)
597+
}
598+
593599
override protected def doRenameFunction(
594600
db: String,
595601
oldName: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,29 @@ class SessionCatalog(
10551055
}
10561056
}
10571057

1058+
/**
1059+
* overwirte a metastore function in the database specified in `funcDefinition`..
1060+
* If no database is specified, assume the function is in the current database.
1061+
*/
1062+
def alterFunction(funcDefinition: CatalogFunction): Unit = {
1063+
val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
1064+
requireDbExists(db)
1065+
val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
1066+
val newFuncDefinition = funcDefinition.copy(identifier = identifier)
1067+
if (functionExists(identifier)) {
1068+
if (functionRegistry.functionExists(identifier)) {
1069+
// If we have loaded this function into the FunctionRegistry,
1070+
// also drop it from there.
1071+
// For a permanent function, because we loaded it to the FunctionRegistry
1072+
// when it's first used, we also need to drop it from the FunctionRegistry.
1073+
functionRegistry.dropFunction(identifier)
1074+
}
1075+
externalCatalog.alterFunction(db, newFuncDefinition)
1076+
} else {
1077+
throw new NoSuchFunctionException(db = db, func = identifier.toString)
1078+
}
1079+
}
1080+
10581081
/**
10591082
* Retrieve the metadata of a metastore function.
10601083
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,16 @@ case class DropFunctionPreEvent(database: String, name: String) extends Function
139139
*/
140140
case class DropFunctionEvent(database: String, name: String) extends FunctionEvent
141141

142+
/**
143+
* Event fired before a function is altered.
144+
*/
145+
case class AlterFunctionPreEvent(database: String, name: String) extends FunctionEvent
146+
147+
/**
148+
* Event fired after a function has been altered.
149+
*/
150+
case class AlterFunctionEvent(database: String, name: String) extends FunctionEvent
151+
142152
/**
143153
* Event fired before a function is renamed.
144154
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,15 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
176176
}
177177
checkEvents(RenameFunctionPreEvent("db5", "fn7", "fn4") :: Nil)
178178

179+
// ALTER
180+
val alteredFunctionDefinition = CatalogFunction(
181+
identifier = FunctionIdentifier("fn4", Some("db5")),
182+
className = "org.apache.spark.AlterFunction",
183+
resources = Seq.empty)
184+
catalog.alterFunction("db5", alteredFunctionDefinition)
185+
checkEvents(
186+
AlterFunctionPreEvent("db5", "fn4") :: AlterFunctionEvent("db5", "fn4") :: Nil)
187+
179188
// DROP
180189
intercept[AnalysisException] {
181190
catalog.dropFunction("db5", "fn7")

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
752752
}
753753
}
754754

755+
test("alter function") {
756+
val catalog = newBasicCatalog()
757+
assert(catalog.getFunction("db2", "func1").className == funcClass)
758+
val myNewFunc = catalog.getFunction("db2", "func1").copy(className = newFuncClass)
759+
catalog.alterFunction("db2", myNewFunc)
760+
assert(catalog.getFunction("db2", "func1").className == newFuncClass)
761+
}
762+
755763
test("list functions") {
756764
val catalog = newBasicCatalog()
757765
catalog.createFunction("db2", newFunc("func2"))
@@ -916,6 +924,7 @@ abstract class CatalogTestUtils {
916924
lazy val partWithEmptyValue =
917925
CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
918926
lazy val funcClass = "org.apache.spark.myFunc"
927+
lazy val newFuncClass = "org.apache.spark.myNewFunc"
919928

920929
/**
921930
* Creates a basic catalog, with the following structure:

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
687687
*
688688
* For example:
689689
* {{{
690-
* CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
691-
* [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
690+
* CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
691+
* AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
692692
* }}}
693693
*/
694694
override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
@@ -709,7 +709,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
709709
functionIdentifier.funcName,
710710
string(ctx.className),
711711
resources,
712-
ctx.TEMPORARY != null)
712+
ctx.TEMPORARY != null,
713+
ctx.EXISTS != null,
714+
ctx.REPLACE != null)
713715
}
714716

715717
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
3131
* The DDL command that creates a function.
3232
* To create a temporary function, the syntax of using this command in SQL is:
3333
* {{{
34-
* CREATE TEMPORARY FUNCTION functionName
34+
* CREATE [OR REPLACE] TEMPORARY FUNCTION functionName
3535
* AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
3636
* }}}
3737
*
3838
* To create a permanent function, the syntax in SQL is:
3939
* {{{
40-
* CREATE FUNCTION [databaseName.]functionName
40+
* CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] [databaseName.]functionName
4141
* AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
4242
* }}}
4343
*/
@@ -46,26 +46,46 @@ case class CreateFunctionCommand(
4646
functionName: String,
4747
className: String,
4848
resources: Seq[FunctionResource],
49-
isTemp: Boolean)
49+
isTemp: Boolean,
50+
ifNotExists: Boolean,
51+
replace: Boolean)
5052
extends RunnableCommand {
5153

54+
if (ifNotExists && replace) {
55+
throw new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE" +
56+
" is not allowed.")
57+
}
58+
59+
// Disallow to define a temporary function with `IF NOT EXISTS`
60+
if (ifNotExists && isTemp) {
61+
throw new AnalysisException(
62+
"It is not allowed to define a TEMPORARY function with IF NOT EXISTS.")
63+
}
64+
65+
// Temporary function names should not contain database prefix like "database.function"
66+
if (databaseName.isDefined && isTemp) {
67+
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
68+
s"is not allowed: '${databaseName.get}'")
69+
}
70+
5271
override def run(sparkSession: SparkSession): Seq[Row] = {
5372
val catalog = sparkSession.sessionState.catalog
5473
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
5574
if (isTemp) {
56-
if (databaseName.isDefined) {
57-
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
58-
s"is not allowed: '${databaseName.get}'")
59-
}
6075
// We first load resources and then put the builder in the function registry.
6176
catalog.loadFunctionResources(resources)
62-
catalog.registerFunction(func, overrideIfExists = false)
77+
catalog.registerFunction(func, overrideIfExists = replace)
6378
} else {
64-
// For a permanent, we will store the metadata into underlying external catalog.
65-
// This function will be loaded into the FunctionRegistry when a query uses it.
66-
// We do not load it into FunctionRegistry right now.
67-
// TODO: should we also parse "IF NOT EXISTS"?
68-
catalog.createFunction(func, ignoreIfExists = false)
79+
// Handles `CREATE OR REPLACE FUNCTION AS ... USING ...`
80+
if (replace && catalog.functionExists(func.identifier)) {
81+
// alter the function in the metastore
82+
catalog.alterFunction(CatalogFunction(func.identifier, className, resources))
83+
} else {
84+
// For a permanent, we will store the metadata into underlying external catalog.
85+
// This function will be loaded into the FunctionRegistry when a query uses it.
86+
// We do not load it into FunctionRegistry right now.
87+
catalog.createFunction(CatalogFunction(func.identifier, className, resources), ifNotExists)
88+
}
6989
}
7090
Seq.empty[Row]
7191
}

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,26 +181,74 @@ class DDLCommandSuite extends PlanTest {
181181
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
182182
|FILE '/path/to/file'
183183
""".stripMargin
184+
val sql3 =
185+
"""
186+
|CREATE OR REPLACE TEMPORARY FUNCTION helloworld3 as
187+
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
188+
|JAR '/path/to/jar2'
189+
""".stripMargin
190+
val sql4 =
191+
"""
192+
|CREATE OR REPLACE FUNCTION hello.world1 as
193+
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
194+
|FILE '/path/to/file'
195+
""".stripMargin
196+
val sql5 =
197+
"""
198+
|CREATE FUNCTION IF NOT EXISTS hello.world2 as
199+
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
200+
|FILE '/path/to/file'
201+
""".stripMargin
184202
val parsed1 = parser.parsePlan(sql1)
185203
val parsed2 = parser.parsePlan(sql2)
204+
val parsed3 = parser.parsePlan(sql3)
205+
val parsed4 = parser.parsePlan(sql4)
206+
val parsed5 = parser.parsePlan(sql5)
186207
val expected1 = CreateFunctionCommand(
187208
None,
188209
"helloworld",
189210
"com.matthewrathbone.example.SimpleUDFExample",
190211
Seq(
191212
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
192213
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
193-
isTemp = true)
214+
isTemp = true, ifNotExists = false, replace = false)
194215
val expected2 = CreateFunctionCommand(
195216
Some("hello"),
196217
"world",
197218
"com.matthewrathbone.example.SimpleUDFExample",
198219
Seq(
199220
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
200221
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
201-
isTemp = false)
222+
isTemp = false, ifNotExists = false, replace = false)
223+
val expected3 = CreateFunctionCommand(
224+
None,
225+
"helloworld3",
226+
"com.matthewrathbone.example.SimpleUDFExample",
227+
Seq(
228+
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
229+
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
230+
isTemp = true, ifNotExists = false, replace = true)
231+
val expected4 = CreateFunctionCommand(
232+
Some("hello"),
233+
"world1",
234+
"com.matthewrathbone.example.SimpleUDFExample",
235+
Seq(
236+
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
237+
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
238+
isTemp = false, ifNotExists = false, replace = true)
239+
val expected5 = CreateFunctionCommand(
240+
Some("hello"),
241+
"world2",
242+
"com.matthewrathbone.example.SimpleUDFExample",
243+
Seq(
244+
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
245+
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
246+
isTemp = false, ifNotExists = true, replace = false)
202247
comparePlans(parsed1, expected1)
203248
comparePlans(parsed2, expected2)
249+
comparePlans(parsed3, expected3)
250+
comparePlans(parsed4, expected4)
251+
comparePlans(parsed5, expected5)
204252
}
205253

206254
test("drop function") {

0 commit comments

Comments
 (0)