Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver

import java.sql.DatabaseMetaData
import java.util.UUID

import scala.collection.JavaConverters.seqAsJavaListConverter

import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils}
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetFunctionsOperation
import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetFunctionsOperation
*
* @param sqlContext SQLContext to use
* @param parentSession a HiveSession from SessionManager
* @param catalogName catalog name. null if not applicable
* @param schemaName database name, null or a concrete database name
* @param functionName function name pattern
*/
private[hive] class SparkGetFunctionsOperation(
sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String,
functionName: String)
extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.listener.onOperationClosed(statementId)
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'"
logInfo(s"$logMsg with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

val catalog = sqlContext.sessionState.catalog
// get databases for schema pattern
val schemaPattern = convertSchemaPattern(schemaName)
val matchingDbs = catalog.listDatabases(schemaPattern)
val functionPattern = CLIServiceUtils.patternToRegex(functionName)

if (isAuthV2Enabled) {
// authorize this call on the schema objects
val privObjs =
HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava)
authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr)
}

HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
statementId,
parentSession.getUsername)

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct now: retrieve all functions available for all matching schemas.

matchingDbs.foreach { db =>
catalog.listFunctions(db, functionPattern).foreach {
case (funcIdentifier, _) =>
val info = catalog.lookupFunctionInfo(funcIdentifier)
Comment on lines +91 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun @wangyum hmm... it looks that if it's run for a wildcard schema pattern, all Spark builtin functions from FunctionRegistry are returned for every schema... This makes it return hundreds of thousands of rows for a big catalog with hundreds of schemas.
Should it return builtin function only once, and in-schema functions only for UDFs registered in the catalog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski and @wangyum . Since this is Apache Spark 3.0 feature, the suggestion sounds like a breaking API change. Is it safe?

val rowData = Array[AnyRef](
DEFAULT_HIVE_CATALOG, // FUNCTION_CAT
db, // FUNCTION_SCHEM
funcIdentifier.funcName, // FUNCTION_NAME
info.getUsage, // REMARKS
DatabaseMetaData.functionResultUnknown.asInstanceOf[AnyRef], // FUNCTION_TYPE
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support FUNCTION_TYPE now. Set it to Unknown:

    // java.sql.DatabaseMetaData

    /**
     * Indicates that it is not known whether the function returns
     * a result or a table.
     * <P>
     * A possible value for column <code>FUNCTION_TYPE</code> in the
     * <code>ResultSet</code> object returned by the method
     * <code>getFunctions</code>.
     * @since 1.6
     */
    int functionResultUnknown   = 0;

    /**
     * Indicates that the function  does not return a table.
     * <P>
     * A possible value for column <code>FUNCTION_TYPE</code> in the
     * <code>ResultSet</code> object returned by the method
     * <code>getFunctions</code>.
     * @since 1.6
     */
    int functionNoTable         = 1;

    /**
     * Indicates that the function  returns a table.
     * <P>
     * A possible value for column <code>FUNCTION_TYPE</code> in the
     * <code>ResultSet</code> object returned by the method
     * <code>getFunctions</code>.
     * @since 1.6
     */
    int functionReturnsTable    = 2;

info.getClassName) // SPECIFIC_NAME
rowSet.addRow(rowData);
}
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we handle other exceptions too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be the same as SparkExecuteStatementOperation:

// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
val currentState = getStatus().getState()
logError(s"Error executing query, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e.toString)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or the same as GetTablesOperation and GetSchemasOperation for the sake of consistency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for same as GetTablesOperation and GetSchemasOperation.

HiveThriftServer2.listener.onStatementFinish(statementId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need the onStatementClosed handler like in the other ops.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,21 @@ private[thriftserver] class SparkSQLOperationManager()
operation
}

override def newGetFunctionsOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String,
functionName: String): GetFunctionsOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
" initialized or had already closed.")
val operation = new SparkGetFunctionsOperation(sqlContext, parentSession,
catalogName, schemaName, functionName)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created GetFunctionsOperation with session=$parentSession.")
operation
}

def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
val iterator = confMap.entrySet().iterator()
while (iterator.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.sql.ResultSet
import java.sql.{DatabaseMetaData, ResultSet}

class SparkMetadataOperationSuite extends HiveThriftJdbcTest {

Expand Down Expand Up @@ -182,4 +182,45 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
checkResult(metaData.getTableTypes, Seq("TABLE", "VIEW"))
}
}

Copy link
Contributor

@bogdanghit bogdanghit Jul 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some tests for the usage and the function class name? @wangyum

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some tests:

ssert(rs.getString("REMARKS").startsWith(s"${functionName(i)}("))
assert(rs.getString("SPECIFIC_NAME").startsWith("org.apache.spark.sql.catalyst"))

Do you think we need to assert more details?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to run a DESCRIBE function statement and then compare the results.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test("Spark's own GetFunctionsOperation(SparkGetFunctionsOperation)") {
def checkResult(rs: ResultSet, functionName: Seq[String]): Unit = {
for (i <- functionName.indices) {
assert(rs.next())
assert(rs.getString("FUNCTION_SCHEM") === "default")
assert(rs.getString("FUNCTION_NAME") === functionName(i))
assert(rs.getString("REMARKS").startsWith(s"${functionName(i)}("))
assert(rs.getInt("FUNCTION_TYPE") === DatabaseMetaData.functionResultUnknown)
assert(rs.getString("SPECIFIC_NAME").startsWith("org.apache.spark.sql.catalyst"))
}
// Make sure there are no more elements
assert(!rs.next())
}

withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
// Hive does not have an overlay function, we use overlay to test.
checkResult(metaData.getFunctions(null, null, "overlay"), Seq("overlay"))
checkResult(metaData.getFunctions(null, null, "overla*"), Seq("overlay"))
checkResult(metaData.getFunctions(null, "", "overla*"), Seq("overlay"))
checkResult(metaData.getFunctions(null, null, "does-not-exist*"), Seq.empty)
checkResult(metaData.getFunctions(null, "default", "overlay"), Seq("overlay"))
checkResult(metaData.getFunctions(null, "default", "shift*"),
Seq("shiftleft", "shiftright", "shiftrightunsigned"))
}

withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
val rs = metaData.getFunctions(null, "default", "upPer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing all these tests. Was the capital P here intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

assert(rs.next())
assert(rs.getString("FUNCTION_SCHEM") === "default")
assert(rs.getString("FUNCTION_NAME") === "upper")
assert(rs.getString("REMARKS") ===
"upper(str) - Returns `str` with all characters changed to uppercase.")
assert(rs.getInt("FUNCTION_TYPE") === DatabaseMetaData.functionResultUnknown)
assert(rs.getString("SPECIFIC_NAME") === "org.apache.spark.sql.catalyst.expressions.Upper")
// Make sure there are no more elements
assert(!rs.next())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class GetFunctionsOperation extends MetadataOperation {
private final String schemaName;
private final String functionName;

private final RowSet rowSet;
protected final RowSet rowSet;

public GetFunctionsOperation(HiveSession parentSession,
String catalogName, String schemaName, String functionName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class GetFunctionsOperation extends MetadataOperation {
private final String schemaName;
private final String functionName;

private final RowSet rowSet;
protected final RowSet rowSet;

public GetFunctionsOperation(HiveSession parentSession,
String catalogName, String schemaName, String functionName) {
Expand Down