Skip to content
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ license: |

- Since Spark 3.3, nulls are written as empty strings in CSV data source by default. In Spark 3.2 or earlier, nulls were written as empty strings as quoted empty strings, `""`. To restore the previous behavior, set `nullValue` to `""`.

- Since Spark 3.3, Spark Thrift Server will return the available system function metadata for databases only once, and Spark will set the function schema as `SYSTEM`. In Spark 3.2 or earlier, Spark Thrift Server will return all system functions metadata for all databases which results in duplicates. To restore the behavior before Spark 3.3, you can set `spark.sql.thriftserver.uniqueSystemFunctions` to `false`.

## Upgrading from Spark SQL 3.1 to 3.2

- Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ object FunctionRegistry {

val FUNC_ALIAS = TreeNodeTag[String]("functionAliasName")

val builtinFunctionScope = "SYSTEM"
val userFunctionScope = "USER"

// Note: Whenever we add a new entry here, make sure we also update ExpressionToSQLSuite
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
// misc non-aggregate functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1699,9 +1699,11 @@ class SessionCatalog(
// The session catalog caches some persistent functions in the FunctionRegistry
// so there can be duplicates.
functions.map {
case f if FunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
case f if TableFunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
case f => (f, "USER")
case f if FunctionRegistry.functionSet.contains(f) =>
(f, FunctionRegistry.builtinFunctionScope)
case f if TableFunctionRegistry.functionSet.contains(f) =>
(f, FunctionRegistry.builtinFunctionScope)
case f => (f, FunctionRegistry.userFunctionScope)
}.distinct
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,14 @@ object SQLConf {
.intConf
.createWithDefault(200)

val THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS =
buildConf("spark.sql.thriftserver.uniqueSystemFunctions")
.doc("When true, Spark Thrift Server will return the available system function metadata " +
"for databases only once, and Spark will set the function schema as 'SYSTEM'.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ case class ShowFunctionsCommand(
sparkSession.sessionState.catalog
.listFunctions(dbName, pattern.getOrElse("*"))
.collect {
case (f, "USER") if showUserFunctions => f.unquotedString
case (f, "SYSTEM") if showSystemFunctions => f.unquotedString
case (f, FunctionRegistry.userFunctionScope) if showUserFunctions => f.unquotedString
case (f, FunctionRegistry.builtinFunctionScope) if showSystemFunctions =>
f.unquotedString
}
// Hard code "<>", "!=", "between", "case", and "||"
// for now as there is no corresponding functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry}
import org.apache.spark.sql.internal.SQLConf

/**
* Spark's own GetFunctionsOperation
Expand Down Expand Up @@ -80,8 +83,21 @@ private[hive] class SparkGetFunctionsOperation(
parentSession.getUsername)

try {
val separateDisplaySystemFunctions =
sqlContext.conf.getConf(SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS)
var matchedBuiltInFunctions = if (separateDisplaySystemFunctions && functionPattern == "*"
&& matchingDbs.nonEmpty) {
FunctionRegistry.functionSet ++ TableFunctionRegistry.functionSet
} else {
Set.empty[FunctionIdentifier]
}
matchingDbs.foreach { db =>
catalog.listFunctions(db, functionPattern).foreach {
case (funcIdentifier, FunctionRegistry.`builtinFunctionScope`)
if separateDisplaySystemFunctions =>
if (!matchedBuiltInFunctions.contains(funcIdentifier)) {
matchedBuiltInFunctions += funcIdentifier
}
case (funcIdentifier, _) =>
val info = catalog.lookupFunctionInfo(funcIdentifier)
val rowData = Array[AnyRef](
Expand All @@ -94,6 +110,17 @@ private[hive] class SparkGetFunctionsOperation(
rowSet.addRow(rowData);
}
}
matchedBuiltInFunctions.foreach { functionIdentifier =>
val info = catalog.lookupFunctionInfo(functionIdentifier)
val rowData = Array[AnyRef](
DEFAULT_HIVE_CATALOG, // FUNCTION_CAT
FunctionRegistry.builtinFunctionScope, // FUNCTION_SCHEM
functionIdentifier.funcName, // FUNCTION_NAME
s"Usage: ${info.getUsage}\nExtended Usage:${info.getExtended}", // REMARKS
DatabaseMetaData.functionResultUnknown.asInstanceOf[AnyRef], // FUNCTION_TYPE
info.getClassName) // SPECIFIC_NAME
rowSet.addRow(rowData);
}
setState(OperationState.FINISHED)
} catch onError()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,14 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase {
}

test("Spark's own GetFunctionsOperation(SparkGetFunctionsOperation)") {
def checkResult(rs: ResultSet, functionNames: Seq[String]): Unit = {
def checkResult(
rs: ResultSet,
functionNames: Seq[String],
functionSchema: String = "default"): Unit = {
functionNames.foreach { func =>
val exprInfo = FunctionRegistry.expressions(func)._1
assert(rs.next())
assert(rs.getString("FUNCTION_SCHEM") === "default")
assert(rs.getString("FUNCTION_SCHEM") === functionSchema)
assert(rs.getString("FUNCTION_NAME") === exprInfo.getName)
assert(rs.getString("REMARKS") ===
s"Usage: ${exprInfo.getUsage}\nExtended Usage:${exprInfo.getExtended}")
Expand All @@ -226,6 +229,7 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase {
}

withJdbcStatement() { statement =>
statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=false")
val metaData = statement.getConnection.getMetaData
// Hive does not have an overlay function, we use overlay to test.
checkResult(metaData.getFunctions(null, null, "overlay"), Seq("overlay"))
Expand All @@ -236,6 +240,23 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase {
checkResult(metaData.getFunctions(null, "default", "shift*"),
Seq("shiftleft", "shiftright", "shiftrightunsigned"))
checkResult(metaData.getFunctions(null, "default", "upPer"), Seq("upper"))

statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=true")
checkResult(metaData.getFunctions(null, null, "overlay"), Seq("overlay"),
FunctionRegistry.builtinFunctionScope)
checkResult(metaData.getFunctions(null, null, "overla*"), Seq("overlay"),
FunctionRegistry.builtinFunctionScope)
checkResult(metaData.getFunctions(null, "", "overla*"), Seq("overlay"),
FunctionRegistry.builtinFunctionScope)
checkResult(metaData.getFunctions(null, null, "does-not-exist*"), Seq.empty,
FunctionRegistry.builtinFunctionScope)
checkResult(metaData.getFunctions(null, "default", "overlay"), Seq("overlay"),
FunctionRegistry.builtinFunctionScope)
checkResult(metaData.getFunctions(null, "default", "shift*"),
Seq("shiftleft", "shiftright", "shiftrightunsigned"),
FunctionRegistry.builtinFunctionScope)
checkResult(metaData.getFunctions(null, "default", "upPer"), Seq("upper"),
FunctionRegistry.builtinFunctionScope)
}
}

Expand Down Expand Up @@ -685,4 +706,36 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase {
}
}
}

test("SPARK-37173: SparkGetFunctionOperation return builtin function only once") {
def checkFunctions(
rs: ResultSet,
functionName: String,
expectedFunctionSchemas: Seq[String],
repeats: Int): Unit = {
var nums = 0
var functionSchemas = Seq.empty[String]
while (rs.next()) {
if (rs.getString("FUNCTION_NAME") == functionName) {
functionSchemas = functionSchemas :+ rs.getString("FUNCTION_SCHEM")
nums += 1
}
}
assert(nums === repeats)
functionSchemas.zip(expectedFunctionSchemas).foreach { case (actual, expected) =>
assert(actual === expected)
}
}

withDatabase("test_spark_37173") { statement =>
statement.execute(s"CREATE DATABASE IF NOT EXISTS test_spark_37173")
statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=false")
val metaData = statement.getConnection.getMetaData
checkFunctions(metaData.getFunctions(null, "*", "*"),
"length", Seq("default", "test_spark_37173"), 2)
statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=true")
checkFunctions(metaData.getFunctions(null, "*", "*"),
"length", Seq("SYSTEM"), 1)
}
}
}