Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
(None, message)

// our bucketing is un-compatible with hive(different hash function)
case _ if table.bucketSpec.nonEmpty =>
case Some(serde) if table.bucketSpec.nonEmpty =>
val message =
s"Persisting bucketed data source table $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
(None, message)
"Hive metastore in Spark SQL specific format, which is NOT compatible with " +
"Hive bucketed table. But Hive can read this table as a non-bucketed table."
(Some(newHiveCompatibleMetastoreTable(serde)), message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set bucketSpec = None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary:

table.bucketSpec match {
case Some(bucketSpec) if DDLUtils.isHiveTable(table) =>
hiveTable.setNumBuckets(bucketSpec.numBuckets)
hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)
if (bucketSpec.sortColumnNames.nonEmpty) {
hiveTable.setSortCols(
bucketSpec.sortColumnNames
.map(col => new Order(col, HIVE_COLUMN_ORDER_ASC))
.toList
.asJava
)
}
case _ =>
}


case Some(serde) =>
val message =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -284,4 +284,40 @@ class DataSourceWithHiveMetastoreCatalogSuite
}

}

test("SPARK-27592 set the bucketed data source table SerDe correctly") {
val provider = "parquet"
withTable("t") {
spark.sql(
s"""
|CREATE TABLE t
|USING $provider
|CLUSTERED BY (c1)
|SORTED BY (c1)
|INTO 2 BUCKETS
|AS SELECT 1 AS c1, 2 AS c2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one row is hard to prove Hive can read it correctly. Could you improve the tests?

In addition, try to create a partitioned and bucked table and see whether they are readable by Hive.

You can create a separate test suite for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

""".stripMargin)

val metadata = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))

val hiveSerDe = HiveSerDe.sourceToSerDe(provider).get
assert(metadata.storage.serde === hiveSerDe.serde)
assert(metadata.storage.inputFormat === hiveSerDe.inputFormat)
assert(metadata.storage.outputFormat === hiveSerDe.outputFormat)

// It's a bucketed table at Spark side
assert(sql("DESC FORMATTED t").collect().containsSlice(
Seq(Row("Num Buckets", "2", ""), Row("Bucket Columns", "[`c1`]", ""))
))
checkAnswer(table("t"), Row(1, 2))

// It's not a bucketed table at Hive side
val hiveSide = sparkSession.metadataHive.runSqlHive("DESC FORMATTED t")
assert(hiveSide.contains("Num Buckets: \t-1 \t "))
assert(hiveSide.contains("Bucket Columns: \t[] \t "))
assert(hiveSide.contains("\tspark.sql.sources.schema.numBuckets\t2 "))
assert(hiveSide.contains("\tspark.sql.sources.schema.bucketCol.0\tc1 "))
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\t2"))
}
}
}