Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private[hive] class HiveClientImpl(
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
}

// Create an internal session state for this HiveClientImpl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
Expand Down Expand Up @@ -1148,3 +1149,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
private[client] class Shim_v2_2 extends Shim_v2_1

private[client] class Shim_v2_3 extends Shim_v2_1

private[client] class Shim_v3_0 extends Shim_v2_3 {
// Spark supports only non-ACID operations
protected lazy val isAcidIUDoperation = JBoolean.FALSE

// Writer ID can be 0 for non-ACID operations
protected lazy val writeIdInLoadTableOrPartition: JLong = 0L

// Statement ID
protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0

protected lazy val listBucketingLevel: JInteger = 0

private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
"org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")

private lazy val loadPartitionMethod =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I tracked and checked all the signature changed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @HyukjinKwon .

findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[Table],
classOf[JMap[String, String]],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
clazzLoadFileType,
JInteger.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JLong.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
classOf[AcidUtils.Operation],
JBoolean.TYPE)

override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
val session = SparkSession.getActiveSession
assert(session.nonEmpty)
val database = session.get.sessionState.catalog.getCurrentDatabase
val table = hive.getTable(database, tableName)
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask,
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean)
}

override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean,
isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask,
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean)
}

override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get,
numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition,
stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID,
replace: JBoolean)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
case "2.2" | "2.2.0" => hive.v2_2
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" => hive.v2_3
case "3.0" | "3.0.0" => hive.v3_0
Copy link
Member

@wangyum wangyum Sep 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Please update sql-programming-guide.md and HiveUtils.scala:

options are <code>0.12.0</code> through <code>2.3.3</code>.

s"<code>0.12.0</code> through <code>2.3.3</code>.")

}

private def downloadVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
case object v3_0 extends HiveVersion("3.0.0",
exclusions = Seq("org.apache.curator:*",
"org.apache.hadoop:hadoop-aws",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if we do not have this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll remove this in this PR.

"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0)
}
// scalastyle:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import org.apache.spark.SparkFunSuite

private[client] trait HiveClientVersions {
protected val versions =
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.0") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}

private val versions =
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.0")

private var client: HiveClient = null

Expand All @@ -127,7 +127,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.0") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
Expand Down