Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,42 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\t2"))
}
}

test("SPARK-27592 set the partitioned bucketed data source table SerDe correctly") {
val provider = "parquet"
withTable("t") {
spark.sql(
s"""
|CREATE TABLE t
|USING $provider
|PARTITIONED BY (p)
|CLUSTERED BY (key)
|SORTED BY (value)
|INTO 2 BUCKETS
|AS SELECT key, value, cast(key % 3 as string) as p FROM src
""".stripMargin)

val metadata = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))

val hiveSerDe = HiveSerDe.sourceToSerDe(provider).get
assert(metadata.storage.serde === hiveSerDe.serde)
assert(metadata.storage.inputFormat === hiveSerDe.inputFormat)
assert(metadata.storage.outputFormat === hiveSerDe.outputFormat)

// It's a bucketed table at Spark side
assert(sql("DESC FORMATTED t").collect().containsSlice(
Seq(Row("Num Buckets", "2", ""), Row("Bucket Columns", "[`key`]", ""))
))
checkAnswer(table("t").select("key", "value"), table("src"))

// It's not a bucketed table at Hive side
val hiveSide = sparkSession.metadataHive.runSqlHive("DESC FORMATTED t")
assert(hiveSide.contains("Num Buckets: \t-1 \t "))
assert(hiveSide.contains("Bucket Columns: \t[] \t "))
assert(hiveSide.contains("\tspark.sql.sources.schema.numBuckets\t2 "))
assert(hiveSide.contains("\tspark.sql.sources.schema.bucketCol.0\tkey "))
assert(sparkSession.metadataHive.runSqlHive("SELECT count(*) FROM t") ===
Seq(table("src").count().toString))
}
}
}