Skip to content

Commit cc7cd95

Browse files
author
Marcelo Vanzin
committed
Rework the patch.
- Use the same code to translate between Spark and Hive tables when creating or altering the table. - Fix the test so that it doesn't try to create a new SparkSession, which conflicts with TestHiveSingleton. - Use 2.1's EnvironmentContext to disable auto updating of stats for DS tables.
1 parent aae3abd commit cc7cd95

4 files changed

Lines changed: 101 additions & 69 deletions

File tree

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
238238
}
239239

240240
if (DDLUtils.isDatasourceTable(tableDefinition)) {
241-
createDataSourceTable(
242-
tableDefinition.withNewStorage(locationUri = tableLocation),
243-
ignoreIfExists)
241+
saveDataSourceTable(tableDefinition.withNewStorage(locationUri = tableLocation)) { table =>
242+
saveTableIntoHive(table, ignoreIfExists)
243+
}
244244
} else {
245245
val tableWithDataSourceProps = tableDefinition.copy(
246246
// We can't leave `locationUri` empty and count on Hive metastore to set a default table
@@ -257,7 +257,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
257257
}
258258
}
259259

260-
private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
260+
private def saveDataSourceTable(table: CatalogTable)(saveFn: CatalogTable => Unit): Unit = {
261261
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
262262
val provider = table.provider.get
263263

@@ -363,19 +363,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
363363
// specific way.
364364
try {
365365
logInfo(message)
366-
saveTableIntoHive(table, ignoreIfExists)
366+
saveFn(table)
367367
} catch {
368368
case NonFatal(e) =>
369369
val warningMessage =
370370
s"Could not persist ${table.identifier.quotedString} in a Hive " +
371371
"compatible way. Persisting it into Hive metastore in Spark SQL specific format."
372372
logWarning(warningMessage, e)
373-
saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
373+
saveFn(newSparkSQLSpecificMetastoreTable())
374374
}
375375

376376
case (None, message) =>
377377
logWarning(message)
378-
saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
378+
saveFn(newSparkSQLSpecificMetastoreTable())
379379
}
380380
}
381381

@@ -610,30 +610,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
610610

611611
override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
612612
requireTableExists(db, table)
613-
val rawTable = getRawTable(db, table)
614-
val withNewSchema = rawTable.copy(schema = schema)
615-
verifyColumnNames(withNewSchema)
616-
// Add table metadata such as table schema, partition columns, etc. to table properties.
617-
val updatedTable = withNewSchema.copy(
618-
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
619-
620-
// If it's a data source table, make sure the original schema is left unchanged; the
621-
// actual schema is recorded as a table property.
622-
val tableToStore = if (DDLUtils.isDatasourceTable(updatedTable)) {
623-
updatedTable.copy(schema = rawTable.schema)
613+
val updatedTable = getTable(db, table).copy(schema = schema)
614+
verifyColumnNames(updatedTable)
615+
if (DDLUtils.isDatasourceTable(updatedTable)) {
616+
saveDataSourceTable(updatedTable) { table =>
617+
client.alterTable(table)
618+
}
624619
} else {
625-
updatedTable
626-
}
627-
628-
try {
629-
client.alterTable(tableToStore)
630-
} catch {
631-
case NonFatal(e) =>
632-
val warningMessage =
633-
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
634-
"compatible way. Updating Hive metastore in Spark SQL specific format."
635-
logWarning(warningMessage, e)
636-
client.alterTable(updatedTable.copy(schema = tableToStore.partitionSchema))
620+
client.alterTable(updatedTable)
637621
}
638622
}
639623

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -416,9 +416,6 @@ private[hive] class HiveClientImpl(
416416

417417
val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map())
418418

419-
val provider = properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER)
420-
.orElse(Some(DDLUtils.HIVE_PROVIDER))
421-
422419
// Hive-generated Statistics are also recorded in ignoredProperties
423420
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
424421
for (key <- HiveStatisticsProperties; value <- properties.get(key)) {
@@ -472,7 +469,6 @@ private[hive] class HiveClientImpl(
472469
throw new AnalysisException("Hive index table is not supported.")
473470
},
474471
schema = schema,
475-
provider = provider,
476472
partitionColumnNames = partCols.map(_.name),
477473
// If the table is written by Spark, we will put bucketing information in table properties,
478474
// and will always overwrite the bucket spec in hive metastore by the bucketing information
@@ -533,7 +529,7 @@ private[hive] class HiveClientImpl(
533529
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
534530
// Do not use `table.qualifiedName` here because this may be a rename
535531
val qualifiedTableName = s"${table.database}.$tableName"
536-
shim.alterTable(client, qualifiedTableName, hiveTable)
532+
shim.alterTable(client, qualifiedTableName, hiveTable, table.storage.locationUri.isDefined)
537533
}
538534

539535
override def createPartitions(
@@ -612,8 +608,10 @@ private[hive] class HiveClientImpl(
612608
db: String,
613609
table: String,
614610
newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
615-
val hiveTable = toHiveTable(getTable(db, table), Some(userName))
616-
shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
611+
val sparkTable = getTable(db, table)
612+
val hiveTable = toHiveTable(sparkTable, Some(userName))
613+
shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava,
614+
sparkTable.storage.locationUri.isDefined)
617615
}
618616

619617
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.client
2020
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
2121
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
2222
import java.net.URI
23-
import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet}
23+
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Locale, Map => JMap, Set => JSet}
2424
import java.util.concurrent.TimeUnit
2525

2626
import scala.collection.JavaConverters._
@@ -86,9 +86,13 @@ private[client] sealed abstract class Shim {
8686

8787
def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
8888

89-
def alterTable(hive: Hive, tableName: String, table: Table): Unit
89+
def alterTable(hive: Hive, tableName: String, table: Table, allowGatherStats: Boolean): Unit
9090

91-
def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit
91+
def alterPartitions(
92+
hive: Hive,
93+
tableName: String,
94+
newParts: JList[Partition],
95+
allowGatherStats: Boolean): Unit
9296

9397
def createPartitions(
9498
hive: Hive,
@@ -397,11 +401,19 @@ private[client] class Shim_v0_12 extends Shim with Logging {
397401
hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
398402
}
399403

400-
override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
404+
override def alterTable(
405+
hive: Hive,
406+
tableName: String,
407+
table: Table,
408+
allowGatherStats: Boolean): Unit = {
401409
alterTableMethod.invoke(hive, tableName, table)
402410
}
403411

404-
override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
412+
override def alterPartitions(
413+
hive: Hive,
414+
tableName: String,
415+
newParts: JList[Partition],
416+
allowGatherStats: Boolean): Unit = {
405417
alterPartitionsMethod.invoke(hive, tableName, newParts)
406418
}
407419

@@ -1008,9 +1020,6 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
10081020

10091021
// true if there is any following stats task
10101022
protected lazy val hasFollowingStatsTask = JBoolean.FALSE
1011-
// TODO: Now, always set environmentContext to null. In the future, we should avoid setting
1012-
// hive-generated stats to -1 when altering tables by using environmentContext. See Hive-12730
1013-
protected lazy val environmentContextInAlterTable = null
10141023

10151024
private lazy val loadPartitionMethod =
10161025
findMethod(
@@ -1102,11 +1111,35 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
11021111
hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID)
11031112
}
11041113

1105-
override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
1106-
alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable)
1114+
override def alterTable(
1115+
hive: Hive,
1116+
tableName: String,
1117+
table: Table,
1118+
allowGatherStats: Boolean): Unit = {
1119+
alterTableMethod.invoke(hive, tableName, table, createEnvironmentContext(allowGatherStats))
1120+
}
1121+
1122+
override def alterPartitions(
1123+
hive: Hive,
1124+
tableName: String,
1125+
newParts: JList[Partition],
1126+
allowGatherStats: Boolean): Unit = {
1127+
alterPartitionsMethod.invoke(hive, tableName, newParts,
1128+
createEnvironmentContext(allowGatherStats))
11071129
}
11081130

1109-
override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
1110-
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
1131+
// TODO: In the future, we should avoid setting hive-generated stats to -1 when altering tables.
1132+
// See HIVE-12730.
1133+
private def createEnvironmentContext(allowGatherStats: Boolean): EnvironmentContext = {
1134+
if (!allowGatherStats) {
1135+
val properties = new JHashMap[String, String]()
1136+
properties.put("DO_NOT_UPDATE_STATS", "true")
1137+
1138+
val ctx = new EnvironmentContext()
1139+
ctx.setProperties(properties)
1140+
ctx
1141+
} else {
1142+
null
1143+
}
11111144
}
11121145
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ import java.net.URI
2222

2323
import scala.language.existentials
2424

25+
import org.apache.hadoop.conf.Configuration
2526
import org.apache.hadoop.fs.Path
2627
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
2728

28-
import org.apache.spark.{SparkException, SparkFunSuite}
29+
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
2930
import org.apache.spark.launcher.SparkLauncher
3031
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession}
3132
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
@@ -39,6 +40,7 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3940
import org.apache.spark.sql.internal.StaticSQLConf._
4041
import org.apache.spark.sql.test.SQLTestUtils
4142
import org.apache.spark.sql.types._
43+
import org.apache.spark.tags.ExtendedHiveTest
4244
import org.apache.spark.util.Utils
4345

4446
// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
@@ -2004,41 +2006,56 @@ class HiveDDLSuite
20042006
* A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
20052007
* from the built-in ones.
20062008
*/
2007-
class HiveDDLSuite_2_1 extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
2009+
@ExtendedHiveTest
2010+
class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
2011+
with BeforeAndAfterAll {
20082012

2009-
private val spark = {
2013+
// Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
2014+
// use SparkSession here since there's already an active on managed by the TestHive object.
2015+
private var catalog = {
20102016
val warehouse = Utils.createTempDir()
20112017
val metastore = Utils.createTempDir()
20122018
metastore.delete()
2013-
SparkSession.builder()
2014-
.config(SparkLauncher.SPARK_MASTER, "local")
2015-
.config(WAREHOUSE_PATH.key, warehouse.toURI().toString())
2016-
.config(CATALOG_IMPLEMENTATION.key, "hive")
2017-
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
2018-
.config(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
2019-
.config("spark.hadoop.javax.jdo.option.ConnectionURL",
2020-
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
2021-
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
2022-
// empty metastore db.
2023-
.config("spark.hadoop.datanucleus.schema.autoCreateAll", "true")
2024-
.config("spark.hadoop.hive.metastore.schema.verification", "false")
2025-
.getOrCreate()
2019+
val sparkConf = new SparkConf()
2020+
.set(SparkLauncher.SPARK_MASTER, "local")
2021+
.set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
2022+
.set(CATALOG_IMPLEMENTATION.key, "hive")
2023+
.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
2024+
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
2025+
2026+
val hadoopConf = new Configuration()
2027+
hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
2028+
hadoopConf.set("javax.jdo.option.ConnectionURL",
2029+
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
2030+
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
2031+
// empty metastore db.
2032+
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
2033+
hadoopConf.set("hive.metastore.schema.verification", "false")
2034+
2035+
new HiveExternalCatalog(sparkConf, hadoopConf)
20262036
}
20272037

20282038
override def afterEach: Unit = {
2039+
catalog.listTables("default").foreach { t =>
2040+
catalog.dropTable("default", t, true, false)
2041+
}
20292042
spark.sessionState.catalog.reset()
20302043
}
20312044

20322045
override def afterAll(): Unit = {
2033-
spark.close()
2046+
catalog = null
20342047
}
20352048

20362049
test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") {
20372050
spark.sql("CREATE TABLE t1 (c1 int) USING json")
2038-
spark.sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
2051+
val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", "t1")
2052+
catalog.createTable(oldTable, true)
2053+
2054+
val newSchema = StructType(oldTable.schema.fields ++ Array(StructField("c2", IntegerType)))
2055+
catalog.alterTableSchema("default", "t1", newSchema)
20392056

2040-
val df = spark.table("t1")
2041-
assert(df.schema.fieldNames === Array("c1", "c2"))
2057+
val updatedTable = catalog.getTable("default", "t1")
2058+
assert(updatedTable.schema.fieldNames === Array("c1", "c2"))
20422059
}
20432060

20442061
}

0 commit comments

Comments
 (0)