Skip to content

Commit 6bcf0d9

Browse files
dilipbiswalRobert Kruszewski
authored andcommitted
[SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables
## What changes were proposed in this pull request? Reopens the closed PR apache#15190 (Please refer to the above link for review comments on the PR) Make sure the hive.default.fileformat is used to when creating the storage format metadata. Output ``` SQL scala> spark.sql("SET hive.default.fileformat=orc") res1: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.sql("CREATE TABLE tmp_default(id INT)") res2: org.apache.spark.sql.DataFrame = [] ``` Before ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` After ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Added new tests to HiveDDLCommandSuite, SQLQuerySuite Author: Dilip Biswal <[email protected]> Closes apache#15495 from dilipbiswal/orc2.
1 parent 1d37542 commit 6bcf0d9

3 files changed

Lines changed: 60 additions & 9 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,9 +1010,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10101010
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
10111011
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
10121012
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
1013-
// Note: Keep this unspecified because we use the presence of the serde to decide
1014-
// whether to convert a table created by CTAS to a datasource table.
1015-
serde = None,
1013+
serde = defaultHiveSerde.flatMap(_.serde),
10161014
compressed = false,
10171015
properties = Map())
10181016
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
3030
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
3131
import org.apache.spark.sql.execution.command._
3232
import org.apache.spark.sql.execution.datasources.CreateTable
33-
import org.apache.spark.sql.hive.test.TestHive
33+
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
34+
import org.apache.spark.sql.internal.SQLConf
35+
import org.apache.spark.sql.test.SQLTestUtils
3436
import org.apache.spark.sql.types.StructType
3537

36-
class HiveDDLCommandSuite extends PlanTest {
38+
class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingleton {
3739
val parser = TestHive.sessionState.sqlParser
3840

3941
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
@@ -556,4 +558,24 @@ class HiveDDLCommandSuite extends PlanTest {
556558
assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2")
557559
}
558560

561+
test("Test the default fileformat for Hive-serde tables") {
562+
withSQLConf("hive.default.fileformat" -> "orc") {
563+
val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
564+
assert(exists)
565+
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
566+
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
567+
assert(desc.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
568+
}
569+
570+
withSQLConf("hive.default.fileformat" -> "parquet") {
571+
val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
572+
assert(exists)
573+
val input = desc.storage.inputFormat
574+
val output = desc.storage.outputFormat
575+
val serde = desc.storage.serde
576+
assert(input == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
577+
assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
578+
assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
579+
}
580+
}
559581
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
492492

493493
def checkRelation(
494494
tableName: String,
495-
isDataSourceParquet: Boolean,
495+
isDataSourceTable: Boolean,
496496
format: String,
497497
userSpecifiedLocation: Option[String] = None): Unit = {
498498
val relation = EliminateSubqueryAliases(
@@ -501,7 +501,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
501501
sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
502502
relation match {
503503
case LogicalRelation(r: HadoopFsRelation, _, _) =>
504-
if (!isDataSourceParquet) {
504+
if (!isDataSourceTable) {
505505
fail(
506506
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
507507
s"${HadoopFsRelation.getClass.getCanonicalName}.")
@@ -514,7 +514,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
514514
assert(catalogTable.provider.get === format)
515515

516516
case r: MetastoreRelation =>
517-
if (isDataSourceParquet) {
517+
if (isDataSourceTable) {
518518
fail(
519519
s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
520520
s"${classOf[MetastoreRelation].getCanonicalName}.")
@@ -524,8 +524,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
524524
assert(r.catalogTable.storage.locationUri.get === location)
525525
case None => // OK.
526526
}
527-
// Also make sure that the format is the desired format.
527+
// Also make sure that the format and serde are as desired.
528528
assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format))
529+
assert(catalogTable.storage.outputFormat.get.toLowerCase.contains(format))
530+
val serde = catalogTable.storage.serde.get
531+
format match {
532+
case "sequence" | "text" => assert(serde.contains("LazySimpleSerDe"))
533+
case "rcfile" => assert(serde.contains("LazyBinaryColumnarSerDe"))
534+
case _ => assert(serde.toLowerCase.contains(format))
535+
}
529536
}
530537

531538
// When a user-specified location is defined, the table type needs to be EXTERNAL.
@@ -587,6 +594,30 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
587594
}
588595
}
589596

597+
test("CTAS with default fileformat") {
598+
val table = "ctas1"
599+
val ctas = s"CREATE TABLE IF NOT EXISTS $table SELECT key k, value FROM src"
600+
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
601+
withSQLConf("hive.default.fileformat" -> "textfile") {
602+
withTable(table) {
603+
sql(ctas)
604+
// We should use parquet here as that is the default datasource fileformat. The default
605+
// datasource file format is controlled by `spark.sql.sources.default` configuration.
606+
// This testcase verifies that setting `hive.default.fileformat` has no impact on
607+
// the target table's fileformat in case of CTAS.
608+
assert(sessionState.conf.defaultDataSourceName === "parquet")
609+
checkRelation(tableName = table, isDataSourceTable = true, format = "parquet")
610+
}
611+
}
612+
withSQLConf("spark.sql.sources.default" -> "orc") {
613+
withTable(table) {
614+
sql(ctas)
615+
checkRelation(tableName = table, isDataSourceTable = true, format = "orc")
616+
}
617+
}
618+
}
619+
}
620+
590621
test("CTAS without serde with location") {
591622
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
592623
withTempDir { dir =>

0 commit comments

Comments
 (0)