Skip to content
Closed
3 changes: 2 additions & 1 deletion dev/deps/spark-deps-hadoop-3.2
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ curator-client-2.13.0.jar
curator-framework-2.13.0.jar
curator-recipes-2.13.0.jar
datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-core-4.1.17.jar
datanucleus-rdbms-3.2.9.jar
derby-10.12.1.1.jar
dnsjava-2.1.7.jar
Expand All @@ -76,6 +76,7 @@ hadoop-yarn-common-3.2.0.jar
hadoop-yarn-registry-3.2.0.jar
hadoop-yarn-server-common-3.2.0.jar
hadoop-yarn-server-web-proxy-3.2.0.jar
hive-storage-api-2.6.0.jar
hk2-api-2.4.0-b34.jar
hk2-locator-2.4.0-b34.jar
hk2-utils-2.4.0-b34.jar
Expand Down
85 changes: 82 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@
<hive.version>1.2.1.spark2</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<!--
Hive 2.3 need hive-common, hive-serde, hive-shims and hive-llap-client,
but Hive 1.2 does not need these dependencies. We use hive.extra.deps.scope to implement it.
It should be removed once we drop Hive 1.2 support.
-->
<hive.extra.deps.scope>provided</hive.extra.deps.scope>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.2.0</kafka.version>
<derby.version>10.12.1.1</derby.version>
Expand Down Expand Up @@ -1384,7 +1390,7 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<scope>${hive.deps.scope}</scope>
<scope>${hive.extra.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>${hive.group}</groupId>
Expand Down Expand Up @@ -1414,6 +1420,18 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- jetty-all conflict with jetty 9.4.12.v20180830 -->
<exclusion>
Copy link
Member Author

@wangyum wangyum Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude jetty-all, it conflict with jetty 9.4.12.v20180830:

build/sbt clean package -Phadoop-3.2 -Phive
...
[error] /home/yumwang/opensource/spark/core/src/main/scala/org/apache/spark/SSLOptions.scala:78: value setTrustStorePath is not a member of org.eclipse.jetty.util.ssl.SslContextFactory
[error]         trustStore.foreach(file => sslContextFactory.setTrustStorePath(file.getAbsolutePath))
[error]

<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
<!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.logging.log4j:* conflict with log4j-1.2.17.jar:

build/sbt clean package -Phadoop-3.2 -Phive
...
[error] /home/yumwang/opensource/spark/core/src/main/scala/org/apache/spark/internal/Logging.scala:236: value getLevel is not a member of org.apache.log4j.spi.LoggingEvent
[error]     if (!loggingEvent.getLevel().eq(rootLevel)) {
[error]                       ^
[error] /home/yumwang/opensource/spark/core/src/main/scala/org/apache/spark/internal/Logging.scala:239: value getLogger is not a member of org.apache.log4j.spi.LoggingEvent
[error]     var logger = loggingEvent.getLogger()

<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>

Expand Down Expand Up @@ -1532,6 +1550,27 @@
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- Do not need Tez -->
<exclusion>
<groupId>${hive.group}</groupId>
<artifactId>hive-llap-tez</artifactId>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not need tez.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent

</exclusion>
<!-- Do not need Calcite, see SPARK-27054 -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude calcite-druid and avatica. more details: https://issues.apache.org/jira/browse/SPARK-27054

<groupId>org.apache.calcite</groupId>
<artifactId>calcite-druid</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -1647,7 +1686,7 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
<scope>${hive.deps.scope}</scope>
<scope>${hive.extra.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>${hive.group}</groupId>
Expand Down Expand Up @@ -1697,6 +1736,22 @@
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- parquet-hadoop-bundle:1.8.1 conflict with 1.10.1 -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude parquet-hadoop-bundle, otherwise:

build/sbt clean package -Phadoop-3.2 -Phive
...
[error] /home/yumwang/opensource/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:36: value JobSummaryLevel is not a member of object org.apache.parquet.hadoop.ParquetOutputFormat
[error] import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These several exclusions would apply to both Hive 2 and Hive 1 in the build as it is now. That's probably OK; maybe they don't even exist in Hive 1. But some like this one I'm not as sure about?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. org.apache.parquet:parquet-hadoop-bundle don't exist in Hive 1. It shoud be com.twitter:parquet-hadoop-bundle in Hive 1: https://github.com/apache/hive/blob/release-1.2.1/pom.xml#L256-L260

<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<!-- Do not need Jasper, see HIVE-19799 -->
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>

Expand All @@ -1720,7 +1775,7 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-shims</artifactId>
<version>${hive.version}</version>
<scope>${hive.deps.scope}</scope>
<scope>${hive.extra.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
Expand Down Expand Up @@ -1762,6 +1817,13 @@
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- Exclude log4j-slf4j-impl, otherwise throw NCDFE when starting spark-shell -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude log4j-slf4j-impl, otherwise:

$ build/sbt clean package -Phadoop-3.2 -Phive
$ export SPARK_PREPEND_CLASSES=true
$ bin/spark-shell
NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/logging/log4j/spi/AbstractLoggerAdapter
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:36)
	at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:217)
	at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:122)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
	at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:73)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:81)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:939)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:948)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.logging.log4j.spi.AbstractLoggerAdapter
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 22 more

<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -2656,7 +2718,24 @@
<hadoop.version>3.2.0</hadoop.version>
<curator.version>2.13.0</curator.version>
<zookeeper.version>3.4.13</zookeeper.version>
<hive.group>org.apache.hive</hive.group>
<hive.classifier>core</hive.classifier>
<hive.version>2.3.4</hive.version>
<hive.version.short>${hive.version}</hive.version.short>
<hive.extra.deps.scope>${hive.deps.scope}</hive.extra.deps.scope>
<hive.parquet.version>${parquet.version}</hive.parquet.version>
<orc.classifier></orc.classifier>
<hive.parquet.group>org.apache.parquet</hive.parquet.group>
<datanucleus-core.version>4.1.17</datanucleus-core.version>
</properties>
<dependencies>
<!-- Both ORC and Parquet need hive-storage-api, but it is excluded by orc-mapreduce -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
<version>2.6.0</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches what 2.3.4 needs, and should it be provided or use hive.deps.scope?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Both Hive and ORC need hive-storage-api:

  1. Remove hive-storage-api and save as table:
scala> spark.range(10).write.saveAsTable("test2")
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
  at org.apache.hive.common.util.ReflectionUtil.newInstance(ReflectionUtil.java:85)
  at org.apache.hadoop.hive.ql.exec.Registry.registerGenericUDF(Registry.java:177)
  at org.apache.hadoop.hive.ql.exec.Registry.registerGenericUDF(Registry.java:170)
  at org.apache.hadoop.hive.ql.exec.FunctionRegistry.<clinit>(FunctionRegistry.java:209)
  at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:247)
  at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
  at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
  at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
  at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
  at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
  at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:258)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:280)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:270)
  at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:361)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:217)
  at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:217)
  at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:139)
  at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:129)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:40)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$1(HiveSessionStateBuilder.scala:55)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:90)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:90)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:420)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:446)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:441)
  ... 47 elided
Caused by: java.lang.reflect.InvocationTargetException: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/serde2/io/HiveDecimalWritable
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.hive.common.util.ReflectionUtil.newInstance(ReflectionUtil.java:83)
  ... 75 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/serde2/io/HiveDecimalWritable
  at org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloorCeilBase.<init>(GenericUDFFloorCeilBase.java:48)
  at org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor.<init>(GenericUDFFloor.java:41)
  ... 80 more
  1. Remove hive-storage-api and write to ORC:
scala> spark.range(10).write.orc("test3")
19/04/01 21:47:40 WARN DAGScheduler: Broadcasting large task binary with size 172.4 KiB
[Stage 0:>                                                          (0 + 4) / 4]19/04/01 21:47:41 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/exec/vector/ColumnVector
	at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.createOrcValue(OrcSerializer.scala:226)
	at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.<init>(OrcSerializer.scala:36)
	at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.<init>(OrcOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anon$1.newInstance(OrcFileFormat.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:109)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:428)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1321)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:431)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.exec.vector.ColumnVector
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 16 more

</dependency>
</dependencies>
</profile>

<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.math.BigDecimal;

import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.hadoop.hive.ql.exec.vector.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes .. we shouldn't do this..


import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here.


import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.datasources.orc

import java.sql.Date

import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here.


import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types.Decimal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}

import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down
42 changes: 31 additions & 11 deletions sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,10 @@
<version>${protobuf.version}</version>
</dependency>
-->
<!--
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-common</artifactId>
</dependency>
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
Expand All @@ -103,16 +101,38 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
</dependency>
<!--
<dependency>
<groupId>${hive.group}</groupId>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-serde</artifactId>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-shims</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-client</artifactId>
<version>2.3.4</version>
<scope>${hive.extra.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-shims</artifactId>
</dependency>
-->
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- hive-serde already depends on avro, but this brings in customized config of avro deps from parent -->
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
73 changes: 49 additions & 24 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.{InputStream, OutputStream}
import java.lang.reflect.Method
import java.rmi.server.UID

import scala.collection.JavaConverters._
Expand All @@ -28,15 +29,13 @@ import com.google.common.base.Objects
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
import org.apache.hadoop.io.Writable
import org.apache.hive.com.esotericsoftware.kryo.Kryo
import org.apache.hive.com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.Decimal
Expand Down Expand Up @@ -146,34 +145,60 @@ private[hive] object HiveShim {
case _ => false
}

@transient
def deserializeObjectByKryo[T: ClassTag](
kryo: Kryo,
in: InputStream,
clazz: Class[_]): T = {
val inp = new Input(in)
val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
inp.close()
t
}
private lazy val serUtilClass =
Utils.classForName("org.apache.hadoop.hive.ql.exec.SerializationUtilities")
private lazy val utilClass = Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities")
private val deserializeMethodName = "deserializeObjectByKryo"
private val serializeMethodName = "serializeObjectByKryo"

@transient
def serializeObjectByKryo(
kryo: Kryo,
plan: Object,
out: OutputStream) {
val output: Output = new Output(out)
kryo.writeObject(output, plan)
output.close()
private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
val method = klass.getDeclaredMethod(name, args: _*)
method.setAccessible(true)
method
}

def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
.asInstanceOf[UDFType]
if (HiveUtils.isSupportedHive2) {
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
try {
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
} else {
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
val getMethod = threadLocalValue.getClass.getMethod("get")
val kryo = getMethod.invoke(threadLocalValue)
val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName,
kryo.getClass, classOf[InputStream], classOf[Class[_]])
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
}
}

def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
if (HiveUtils.isSupportedHive2) {
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
try {
serializeObjectByKryo.invoke(null, kryo, function, out)
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
} else {
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
val getMethod = threadLocalValue.getClass.getMethod("get")
val kryo = getMethod.invoke(threadLocalValue)
val serializeObjectByKryo = findMethod(utilClass, serializeMethodName,
kryo.getClass, classOf[Object], classOf[OutputStream])
serializeObjectByKryo.invoke(null, kryo, function, out)
}
}

def writeExternal(out: java.io.ObjectOutput) {
Expand Down
Loading