Skip to content

Commit 8aa1d7b

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-23355][SQL] convertMetastore should not ignore table properties
## What changes were proposed in this pull request? Previously, SPARK-22158 fixed for `USING hive` syntax. This PR aims to fix for `STORED AS` syntax. Although the test case covers ORC part, the patch considers both `convertMetastoreOrc` and `convertMetastoreParquet`. ## How was this patch tested? Pass newly added test cases. Author: Dongjoon Hyun <[email protected]> Closes #20522 from dongjoon-hyun/SPARK-22158-2.
1 parent 9ee9fcf commit 8aa1d7b

3 files changed

Lines changed: 97 additions & 8 deletions

File tree

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,15 +186,28 @@ case class RelationConversions(
186186
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
187187
}
188188

189+
// Return true for Apache ORC and Hive ORC-related configuration names.
190+
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
191+
private def isOrcProperty(key: String) =
192+
key.startsWith("orc.") || key.contains(".orc.")
193+
194+
private def isParquetProperty(key: String) =
195+
key.startsWith("parquet.") || key.contains(".parquet.")
196+
189197
private def convert(relation: HiveTableRelation): LogicalRelation = {
190198
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
199+
200+
// Consider table and storage properties. For properties existing in both sides, storage
201+
// properties will supersede table properties.
191202
if (serde.contains("parquet")) {
192-
val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
203+
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
204+
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
193205
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
194206
sessionCatalog.metastoreCatalog
195207
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
196208
} else {
197-
val options = relation.tableMeta.storage.properties
209+
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
210+
relation.tableMeta.storage.properties
198211
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
199212
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
200213
relation,

sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
268268
compressionCodecs = compressCodecs,
269269
tableCompressionCodecs = compressCodecs) {
270270
case (tableCodec, sessionCodec, realCodec, tableSize) =>
271-
// For non-partitioned table and when convertMetastore is true, Expect session-level
272-
// take effect, and in other cases expect table-level take effect
273-
// TODO: It should always be table-level taking effect when the bug(SPARK-22926)
274-
// is fixed
275-
val expectCodec =
276-
if (convertMetastore && !isPartitioned) sessionCodec else tableCodec.get
271+
val expectCodec = tableCodec.get
277272
assert(expectCodec == realCodec)
278273
assert(checkTableSize(
279274
format, expectCodec, isPartitioned, convertMetastore, usingCTAS, tableSize))

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METAS
3838
import org.apache.spark.sql.hive.orc.OrcFileOperator
3939
import org.apache.spark.sql.hive.test.TestHiveSingleton
4040
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
41+
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
4142
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
4243
import org.apache.spark.sql.test.SQLTestUtils
4344
import org.apache.spark.sql.types._
@@ -2144,6 +2145,86 @@ class HiveDDLSuite
21442145
}
21452146
}
21462147

2148+
private def getReader(path: String): org.apache.orc.Reader = {
2149+
val conf = spark.sessionState.newHadoopConf()
2150+
val files = org.apache.spark.sql.execution.datasources.orc.OrcUtils.listOrcFiles(path, conf)
2151+
assert(files.length == 1)
2152+
val file = files.head
2153+
val fs = file.getFileSystem(conf)
2154+
val readerOptions = org.apache.orc.OrcFile.readerOptions(conf).filesystem(fs)
2155+
org.apache.orc.OrcFile.createReader(file, readerOptions)
2156+
}
2157+
2158+
test("SPARK-23355 convertMetastoreOrc should not ignore table properties - STORED AS") {
2159+
Seq("native", "hive").foreach { orcImpl =>
2160+
withSQLConf(ORC_IMPLEMENTATION.key -> orcImpl, CONVERT_METASTORE_ORC.key -> "true") {
2161+
withTable("t") {
2162+
withTempPath { path =>
2163+
sql(
2164+
s"""
2165+
|CREATE TABLE t(id int) STORED AS ORC
2166+
|TBLPROPERTIES (
2167+
| orc.compress 'ZLIB',
2168+
| orc.compress.size '1001',
2169+
| orc.row.index.stride '2002',
2170+
| hive.exec.orc.default.block.size '3003',
2171+
| hive.exec.orc.compression.strategy 'COMPRESSION')
2172+
|LOCATION '${path.toURI}'
2173+
""".stripMargin)
2174+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
2175+
assert(DDLUtils.isHiveTable(table))
2176+
assert(table.storage.serde.get.contains("orc"))
2177+
val properties = table.properties
2178+
assert(properties.get("orc.compress") == Some("ZLIB"))
2179+
assert(properties.get("orc.compress.size") == Some("1001"))
2180+
assert(properties.get("orc.row.index.stride") == Some("2002"))
2181+
assert(properties.get("hive.exec.orc.default.block.size") == Some("3003"))
2182+
assert(properties.get("hive.exec.orc.compression.strategy") == Some("COMPRESSION"))
2183+
assert(spark.table("t").collect().isEmpty)
2184+
2185+
sql("INSERT INTO t SELECT 1")
2186+
checkAnswer(spark.table("t"), Row(1))
2187+
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
2188+
2189+
val reader = getReader(maybeFile.head.getCanonicalPath)
2190+
assert(reader.getCompressionKind.name === "ZLIB")
2191+
assert(reader.getCompressionSize == 1001)
2192+
assert(reader.getRowIndexStride == 2002)
2193+
}
2194+
}
2195+
}
2196+
}
2197+
}
2198+
2199+
test("SPARK-23355 convertMetastoreParquet should not ignore table properties - STORED AS") {
2200+
withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") {
2201+
withTable("t") {
2202+
withTempPath { path =>
2203+
sql(
2204+
s"""
2205+
|CREATE TABLE t(id int) STORED AS PARQUET
2206+
|TBLPROPERTIES (
2207+
| parquet.compression 'GZIP'
2208+
|)
2209+
|LOCATION '${path.toURI}'
2210+
""".stripMargin)
2211+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
2212+
assert(DDLUtils.isHiveTable(table))
2213+
assert(table.storage.serde.get.contains("parquet"))
2214+
val properties = table.properties
2215+
assert(properties.get("parquet.compression") == Some("GZIP"))
2216+
assert(spark.table("t").collect().isEmpty)
2217+
2218+
sql("INSERT INTO t SELECT 1")
2219+
checkAnswer(spark.table("t"), Row(1))
2220+
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
2221+
2222+
assertCompression(maybeFile, "parquet", "GZIP")
2223+
}
2224+
}
2225+
}
2226+
}
2227+
21472228
test("load command for non local invalid path validation") {
21482229
withTable("tbl") {
21492230
sql("CREATE TABLE tbl(i INT, j STRING)")

0 commit comments

Comments
 (0)