From 703d23e067e2d02f69bd4ec429106a426c6bf132 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Mon, 20 Mar 2017 20:36:22 +0800 Subject: [PATCH 1/4] support hive permanent function --- .../spark/sql/hive/HiveSessionCatalog.scala | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index c9be1b9d100b0..22168b5b21bd2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -17,19 +17,23 @@ package org.apache.spark.sql.hive +import java.io.File + import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.ql.session.SessionState.ResourceType import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -37,7 +41,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, DoubleType} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} private[sql] class HiveSessionCatalog( @@ -135,6 +139,24 @@ private[sql] class HiveSessionCatalog( } } + override def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { + logDebug("loading hive permanent function resources") + resources.foreach { resource => + val resourceType = resource.resourceType match { + case JarResource => + ResourceType.JAR + case FileResource => + ResourceType.FILE + case ArchiveResource => + ResourceType.ARCHIVE + } + val sessionState = SessionState.get() + val localPath = sessionState.add_resource(resourceType, resource.uri) + ShutdownHookManager.registerShutdownDeleteDir(new File(localPath).getParentFile) + functionResourceLoader.loadResource(FunctionResource(resource.resourceType, localPath)) + } + } + override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { try { lookupFunction0(name, children) From a668bb591564b6dc3db204f25ff2d06127c01668 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Tue, 21 Mar 2017 01:01:54 +0800 Subject: [PATCH 2/4] Don't remove file when testing --- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 22168b5b21bd2..268e132b29314 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -150,10 +150,14 @@ private[sql] class HiveSessionCatalog( case ArchiveResource => ResourceType.ARCHIVE } - val sessionState = SessionState.get() - val localPath = sessionState.add_resource(resourceType, resource.uri) - ShutdownHookManager.registerShutdownDeleteDir(new File(localPath).getParentFile) - functionResourceLoader.loadResource(FunctionResource(resource.resourceType, localPath)) + if (System.getProperties("spark.testing") != "true") { + val sessionState = SessionState.get() + val localPath = sessionState.add_resource(resourceType, resource.uri) + ShutdownHookManager.registerShutdownDeleteDir(new File(localPath).getParentFile) + functionResourceLoader.loadResource(FunctionResource(resource.resourceType, localPath)) + } else { + functionResourceLoader.loadResource(resource) + } } } From 9325a7c2a4ce9d0db6c39d7781e517082a2fe4be Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Tue, 21 Mar 2017 01:27:12 +0800 Subject: [PATCH 3/4] fix compile error --- .../scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 268e132b29314..d58879eb05583 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -150,7 +150,7 @@ private[sql] class HiveSessionCatalog( case ArchiveResource => ResourceType.ARCHIVE } - if (System.getProperties("spark.testing") != "true") { + if (System.getProperties.get("spark.testing") != "true") { val sessionState = SessionState.get() val localPath = sessionState.add_resource(resourceType, resource.uri) ShutdownHookManager.registerShutdownDeleteDir(new File(localPath).getParentFile) From 92c077d4d767aed448ca479914b1a34fc8b3cec8 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Tue, 21 Mar 2017 07:49:47 +0800 Subject: [PATCH 4/4] check resouce scheme --- .../spark/sql/hive/HiveSessionCatalog.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index d58879eb05583..dfb6abaf34507 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -18,16 +18,19 @@ package org.apache.spark.sql.hive import java.io.File +import java.net.URI import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.session.SessionState.ResourceType import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} +import org.apache.hadoop.util.Shell import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} @@ -150,13 +153,20 @@ private[sql] class HiveSessionCatalog( case ArchiveResource => ResourceType.ARCHIVE } - if (System.getProperties.get("spark.testing") != "true") { + val uri = if (!Shell.WINDOWS) { + new URI(resource.uri) + } + else { + new Path(resource.uri).toUri + } + val scheme = if (uri.getScheme == null) null else uri.getScheme.toLowerCase + if (scheme == null || scheme == "file") { + functionResourceLoader.loadResource(resource) + } else { val sessionState = SessionState.get() val localPath = sessionState.add_resource(resourceType, resource.uri) ShutdownHookManager.registerShutdownDeleteDir(new File(localPath).getParentFile) functionResourceLoader.loadResource(FunctionResource(resource.resourceType, localPath)) - } else { - functionResourceLoader.loadResource(resource) } } }