diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index da9fe2d3088b4..72c72ba9bc2e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -105,6 +105,7 @@ private[hive] class HiveClientImpl( case hive.v2_1 => new Shim_v2_1() case hive.v2_2 => new Shim_v2_2() case hive.v2_3 => new Shim_v2_3() + case hive.v3_0 => new Shim_v3_0() } // Create an internal session state for this HiveClientImpl. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 948ba542b5733..894bb9157fabf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType} @@ -1148,3 +1149,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { private[client] class Shim_v2_2 extends Shim_v2_1 private[client] class Shim_v2_3 extends Shim_v2_1 + +private[client] class Shim_v3_0 extends Shim_v2_3 { + // Spark supports only non-ACID operations + protected lazy val isAcidIUDoperation = JBoolean.FALSE + + // Writer ID can be 0 for non-ACID operations + protected lazy val writeIdInLoadTableOrPartition: JLong = 0L + + // Statement ID + protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0 + + protected lazy val listBucketingLevel: JInteger = 0 + + private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType") + + private lazy val loadPartitionMethod = + findMethod( + classOf[Hive], + "loadPartition", + classOf[Path], + classOf[Table], + classOf[JMap[String, String]], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE) + private lazy val loadTableMethod = + findMethod( + classOf[Hive], + "loadTable", + classOf[Path], + classOf[String], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE) + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + clazzLoadFileType, + JInteger.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + JLong.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + classOf[AcidUtils.Operation], + JBoolean.TYPE) + + override def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit = { + val session = SparkSession.getActiveSession + assert(session.nonEmpty) + val database = session.get.sessionState.catalog.getCurrentDatabase + val table = hive.getTable(database, tableName) + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get, + inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, + isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask, + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean) + } + + override def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + isSrcLocal: Boolean): Unit = { + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean, + isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask, + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean) + } + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + listBucketingEnabled: Boolean): Unit = { + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get, + numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID, + replace: JBoolean) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 2f34f69b5cf48..664f20c54aa69 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -99,6 +99,7 @@ private[hive] object IsolatedClientLoader extends Logging { case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1 case "2.2" | "2.2.0" => hive.v2_2 case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" => hive.v2_3 + case "3.0" | "3.0.0" => hive.v3_0 } private def downloadVersion( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index 25e9886fa6576..0b66622372385 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -79,7 +79,11 @@ package object client { exclusions = Seq("org.apache.curator:*", "org.pentaho:pentaho-aggdesigner-algorithm")) - val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3) + case object v3_0 extends HiveVersion("3.0.0", + exclusions = Seq("org.apache.curator:*", + "org.pentaho:pentaho-aggdesigner-algorithm")) + + val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0) } // scalastyle:on diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index e0f7375387d24..7e47749fb3f71 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -115,7 +115,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { // be removed by Hive when Hive is trying to empty the table directory. val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = - Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3) + Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0) // Ensure all the supported versions are considered here. assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala index 30592a3f85428..1f4dc18d1c38a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala @@ -23,5 +23,5 @@ import org.apache.spark.SparkFunSuite private[client] trait HiveClientVersions { protected val versions = - IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3") + IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.0") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index e5963d03f6b52..e237e2c4d40d8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -34,7 +34,8 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 - if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") { + if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || + version == "3.0") { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index dc96ec416afd8..ab2bf452690ab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -112,7 +112,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } private val versions = - Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3") + Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.0") private var client: HiveClient = null @@ -127,7 +127,8 @@ class VersionsSuite extends SparkFunSuite with Logging { // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 - if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") { + if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || + version == "3.0") { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") }