Skip to content

Commit 9c4a7ba

Browse files
gatorsmileMingjie Tang
authored andcommitted
[SPARK-20303][SQL] Rename createTempFunction to registerFunction
### What changes were proposed in this pull request? Session catalog API `createTempFunction` is being used by Hive build-in functions, persistent functions, and temporary functions. Thus, the name is confusing. This PR is to rename it by `registerFunction`. Also we can move construction of `FunctionBuilder` and `ExpressionInfo` into the new `registerFunction`, instead of duplicating the logics everywhere. In the next PRs, the remaining Function-related APIs also need cleanups. ### How was this patch tested? Existing test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes apache#17615 from gatorsmile/cleanupCreateTempFunction.
1 parent 52a9591 commit 9c4a7ba

7 files changed

Lines changed: 53 additions & 63 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,3 @@ class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[Tabl
4444

4545
class FunctionAlreadyExistsException(db: String, func: String)
4646
extends AnalysisException(s"Function '$func' already exists in database '$db'")
47-
48-
class TempFunctionAlreadyExistsException(func: String)
49-
extends AnalysisException(s"Temporary function '$func' already exists")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,7 @@ class SessionCatalog(
10501050
*
10511051
* This performs reflection to decide what type of [[Expression]] to return in the builder.
10521052
*/
1053-
def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
1053+
protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
10541054
// TODO: at least support UDAFs here
10551055
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
10561056
}
@@ -1064,18 +1064,20 @@ class SessionCatalog(
10641064
}
10651065

10661066
/**
1067-
* Create a temporary function.
1068-
* This assumes no database is specified in `funcDefinition`.
1067+
* Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
10691068
*/
1070-
def createTempFunction(
1071-
name: String,
1072-
info: ExpressionInfo,
1073-
funcDefinition: FunctionBuilder,
1074-
ignoreIfExists: Boolean): Unit = {
1075-
if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
1076-
throw new TempFunctionAlreadyExistsException(name)
1069+
def registerFunction(
1070+
funcDefinition: CatalogFunction,
1071+
ignoreIfExists: Boolean,
1072+
functionBuilder: Option[FunctionBuilder] = None): Unit = {
1073+
val func = funcDefinition.identifier
1074+
if (functionRegistry.functionExists(func.unquotedString) && !ignoreIfExists) {
1075+
throw new AnalysisException(s"Function $func already exists")
10771076
}
1078-
functionRegistry.registerFunction(name, info, funcDefinition)
1077+
val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
1078+
val builder =
1079+
functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
1080+
functionRegistry.registerFunction(func.unquotedString, info, builder)
10791081
}
10801082

10811083
/**
@@ -1180,12 +1182,7 @@ class SessionCatalog(
11801182
// catalog. So, it is possible that qualifiedName is not exactly the same as
11811183
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
11821184
// At here, we preserve the input from the user.
1183-
val info = new ExpressionInfo(
1184-
catalogFunction.className,
1185-
qualifiedName.database.orNull,
1186-
qualifiedName.funcName)
1187-
val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
1188-
createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
1185+
registerFunction(catalogFunction.copy(identifier = qualifiedName), ignoreIfExists = false)
11891186
// Now, we need to create the Expression.
11901187
functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
11911188
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,10 +1162,10 @@ abstract class SessionCatalogSuite extends PlanTest {
11621162
withBasicCatalog { catalog =>
11631163
val tempFunc1 = (e: Seq[Expression]) => e.head
11641164
val tempFunc2 = (e: Seq[Expression]) => e.last
1165-
val info1 = new ExpressionInfo("tempFunc1", "temp1")
1166-
val info2 = new ExpressionInfo("tempFunc2", "temp2")
1167-
catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
1168-
catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false)
1165+
catalog.registerFunction(
1166+
newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
1167+
catalog.registerFunction(
1168+
newFunc("temp2", None), ignoreIfExists = false, functionBuilder = Some(tempFunc2))
11691169
val arguments = Seq(Literal(1), Literal(2), Literal(3))
11701170
assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1))
11711171
assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3))
@@ -1174,13 +1174,15 @@ abstract class SessionCatalogSuite extends PlanTest {
11741174
catalog.lookupFunction(FunctionIdentifier("temp3"), arguments)
11751175
}
11761176
val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
1177-
val info3 = new ExpressionInfo("tempFunc3", "temp1")
11781177
// Temporary function already exists
1179-
intercept[TempFunctionAlreadyExistsException] {
1180-
catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false)
1181-
}
1178+
val e = intercept[AnalysisException] {
1179+
catalog.registerFunction(
1180+
newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc3))
1181+
}.getMessage
1182+
assert(e.contains("Function temp1 already exists"))
11821183
// Temporary function is overridden
1183-
catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true)
1184+
catalog.registerFunction(
1185+
newFunc("temp1", None), ignoreIfExists = true, functionBuilder = Some(tempFunc3))
11841186
assert(
11851187
catalog.lookupFunction(
11861188
FunctionIdentifier("temp1"), arguments) === Literal(arguments.length))
@@ -1193,8 +1195,8 @@ abstract class SessionCatalogSuite extends PlanTest {
11931195
assert(!catalog.isTemporaryFunction(FunctionIdentifier("temp1")))
11941196

11951197
val tempFunc1 = (e: Seq[Expression]) => e.head
1196-
val info1 = new ExpressionInfo("tempFunc1", "temp1")
1197-
catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
1198+
catalog.registerFunction(
1199+
newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
11981200

11991201
// Returns true when the function is temporary
12001202
assert(catalog.isTemporaryFunction(FunctionIdentifier("temp1")))
@@ -1243,9 +1245,9 @@ abstract class SessionCatalogSuite extends PlanTest {
12431245

12441246
test("drop temp function") {
12451247
withBasicCatalog { catalog =>
1246-
val info = new ExpressionInfo("tempFunc", "func1")
12471248
val tempFunc = (e: Seq[Expression]) => e.head
1248-
catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
1249+
catalog.registerFunction(
1250+
newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc))
12491251
val arguments = Seq(Literal(1), Literal(2), Literal(3))
12501252
assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1))
12511253
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
@@ -1284,9 +1286,9 @@ abstract class SessionCatalogSuite extends PlanTest {
12841286

12851287
test("lookup temp function") {
12861288
withBasicCatalog { catalog =>
1287-
val info1 = new ExpressionInfo("tempFunc1", "func1")
12881289
val tempFunc1 = (e: Seq[Expression]) => e.head
1289-
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
1290+
catalog.registerFunction(
1291+
newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
12901292
assert(catalog.lookupFunction(
12911293
FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1))
12921294
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
@@ -1298,14 +1300,14 @@ abstract class SessionCatalogSuite extends PlanTest {
12981300

12991301
test("list functions") {
13001302
withBasicCatalog { catalog =>
1301-
val info1 = new ExpressionInfo("tempFunc1", "func1")
1302-
val info2 = new ExpressionInfo("tempFunc2", "yes_me")
1303+
val funcMeta1 = newFunc("func1", None)
1304+
val funcMeta2 = newFunc("yes_me", None)
13031305
val tempFunc1 = (e: Seq[Expression]) => e.head
13041306
val tempFunc2 = (e: Seq[Expression]) => e.last
13051307
catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
13061308
catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
1307-
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
1308-
catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
1309+
catalog.registerFunction(funcMeta1, ignoreIfExists = false, functionBuilder = Some(tempFunc1))
1310+
catalog.registerFunction(funcMeta2, ignoreIfExists = false, functionBuilder = Some(tempFunc2))
13091311
assert(catalog.listFunctions("db1", "*").map(_._1).toSet ==
13101312
Set(FunctionIdentifier("func1"),
13111313
FunctionIdentifier("yes_me")))

sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ case class CreateFunctionCommand(
5151

5252
override def run(sparkSession: SparkSession): Seq[Row] = {
5353
val catalog = sparkSession.sessionState.catalog
54+
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
5455
if (isTemp) {
5556
if (databaseName.isDefined) {
5657
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
@@ -59,17 +60,13 @@ case class CreateFunctionCommand(
5960
// We first load resources and then put the builder in the function registry.
6061
// Please note that it is allowed to overwrite an existing temp function.
6162
catalog.loadFunctionResources(resources)
62-
val info = new ExpressionInfo(className, functionName)
63-
val builder = catalog.makeFunctionBuilder(functionName, className)
64-
catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
63+
catalog.registerFunction(func, ignoreIfExists = false)
6564
} else {
6665
// For a permanent, we will store the metadata into underlying external catalog.
6766
// This function will be loaded into the FunctionRegistry when a query uses it.
6867
// We do not load it into FunctionRegistry right now.
6968
// TODO: should we also parse "IF NOT EXISTS"?
70-
catalog.createFunction(
71-
CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources),
72-
ignoreIfExists = false)
69+
catalog.createFunction(func, ignoreIfExists = false)
7370
}
7471
Seq.empty[Row]
7572
}

sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ class CatalogSuite
7575
}
7676

7777
private def createTempFunction(name: String): Unit = {
78-
val info = new ExpressionInfo("className", name)
7978
val tempFunc = (e: Seq[Expression]) => e.head
80-
sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false)
79+
val funcMeta = CatalogFunction(FunctionIdentifier(name, None), "className", Nil)
80+
sessionCatalog.registerFunction(
81+
funcMeta, ignoreIfExists = false, functionBuilder = Some(tempFunc))
8182
}
8283

8384
private def dropFunction(name: String, db: Option[String] = None): Unit = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ import org.apache.spark.sql.AnalysisException
3131
import org.apache.spark.sql.catalyst.FunctionIdentifier
3232
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
3333
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
34-
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
35-
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
34+
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
35+
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
3636
import org.apache.spark.sql.catalyst.parser.ParserInterface
3737
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
3838
import org.apache.spark.sql.internal.SQLConf
@@ -124,13 +124,6 @@ private[sql] class HiveSessionCatalog(
124124
}
125125

126126
private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
127-
// TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to
128-
// if (super.functionExists(name)) {
129-
// super.lookupFunction(name, children)
130-
// } else {
131-
// // This function is a Hive builtin function.
132-
// ...
133-
// }
134127
val database = name.database.map(formatDatabaseName)
135128
val funcName = name.copy(database = database)
136129
Try(super.lookupFunction(funcName, children)) match {
@@ -164,10 +157,11 @@ private[sql] class HiveSessionCatalog(
164157
}
165158
}
166159
val className = functionInfo.getFunctionClass.getName
167-
val builder = makeFunctionBuilder(functionName, className)
160+
val functionIdentifier =
161+
FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database)
162+
val func = CatalogFunction(functionIdentifier, className, Nil)
168163
// Put this Hive built-in function to our function registry.
169-
val info = new ExpressionInfo(className, functionName)
170-
createTempFunction(functionName, info, builder, ignoreIfExists = false)
164+
registerFunction(func, ignoreIfExists = false)
171165
// Now, we need to create the Expression.
172166
functionRegistry.lookupFunction(functionName, children)
173167
}

sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import scala.concurrent.duration._
2222
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox
2323

2424
import org.apache.spark.sql.Column
25-
import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo, Literal}
25+
import org.apache.spark.sql.catalyst.FunctionIdentifier
26+
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
27+
import org.apache.spark.sql.catalyst.expressions.Literal
2628
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
2729
import org.apache.spark.sql.hive.HiveSessionCatalog
2830
import org.apache.spark.sql.hive.execution.TestingTypedCount
@@ -217,9 +219,9 @@ class ObjectHashAggregateExecBenchmark extends BenchmarkBase with TestHiveSingle
217219

218220
private def registerHiveFunction(functionName: String, clazz: Class[_]): Unit = {
219221
val sessionCatalog = sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
220-
val builder = sessionCatalog.makeFunctionBuilder(functionName, clazz.getName)
221-
val info = new ExpressionInfo(clazz.getName, functionName)
222-
sessionCatalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
222+
val functionIdentifier = FunctionIdentifier(functionName, database = None)
223+
val func = CatalogFunction(functionIdentifier, clazz.getName, resources = Nil)
224+
sessionCatalog.registerFunction(func, ignoreIfExists = false)
223225
}
224226

225227
private def percentile_approx(

0 commit comments

Comments
 (0)