Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ case class DataSource(
* is considered as a non-streaming file based data source. Since we know
* that files already exist, we don't need to check them again.
*/
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
def resolveRelation(
checkFilesExist: Boolean = true,
needPartitionInferring: Boolean = true): BaseRelation = {
val relation = (providingInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
Expand Down Expand Up @@ -391,7 +393,7 @@ case class DataSource(
} else {
val globbedPaths = checkAndGlobPathIfNecessary(
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val index = createInMemoryFileIndex(globbedPaths)
val index = createInMemoryFileIndex(globbedPaths, needPartitionInferring)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
Expand Down Expand Up @@ -427,7 +429,6 @@ case class DataSource(
"in the data schema",
equality)
}

relation
}

Expand Down Expand Up @@ -551,10 +552,12 @@ case class DataSource(
}

/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
private def createInMemoryFileIndex(
globbedPaths: Seq[Path],
needPartitionInferring: Boolean = true): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
new InMemoryFileIndex(sparkSession, globbedPaths, options, userSpecifiedSchema,
fileStatusCache, needPartitionInferring)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class InMemoryFileIndex(
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
fileStatusCache: FileStatusCache = NoopCache,
needPartitionInferring: Boolean = true)
extends PartitioningAwareFileIndex(
sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {

Expand All @@ -69,7 +70,11 @@ class InMemoryFileIndex(

override def partitionSpec(): PartitionSpec = {
if (cachedPartitionSpec == null) {
cachedPartitionSpec = inferPartitioning()
cachedPartitionSpec = if (needPartitionInferring) {
inferPartitioning()
} else {
PartitionSpec.emptySpec
}
}
logTrace(s"Partition spec: $cachedPartitionSpec")
cachedPartitionSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
userSpecifiedSchema = Option(updatedTable.dataSchema),
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
className = fileType).resolveRelation(needPartitionInferring = false),
table = updatedTable)

catalogProxy.cacheTable(tableIdentifier, created)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,29 +362,20 @@ class DataSourceWithHiveMetastoreCatalogSuite
}
}

test("SPARK-29869: HiveMetastoreCatalog#convertToLogicalRelation throws AssertionError") {
test("SPARK-29869: fix HiveMetastoreCatalog#convertToLogicalRelation throws AssertionError") {
withTempPath(dir => {
val baseDir = s"${dir.getCanonicalFile.toURI.toString}/test"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test -> non_partition_table ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the location can be set to anywhere. test here is fine. non_partition_table will over 100 chars :)

val partitionDir = s"${dir.getCanonicalFile.toURI.toString}/test/dt=20191113"
val file = new Path(partitionDir, "file.parquet")
val fs = file.getFileSystem(new Configuration())
fs.createNewFile(file)
withTable("test") {
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
val partitionLikeDir = s"${dir.getCanonicalFile.toURI.toString}/test/dt=20191113"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val partitionLikeDir = s"${dir.getCanonicalFile.toURI.toString}/test/dt=20191113" -> val partitionLikeDir = s"$baseDir/dt=20191113"?

spark.range(3).selectExpr("id").write.parquet(partitionLikeDir)
withTable("non_partition_table") {
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") {
spark.sql(
s"""
|CREATE EXTERNAL TABLE `test`(key string)
|ROW FORMAT SERDE
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS INPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|OUTPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|LOCATION '$baseDir'
|CREATE TABLE non_partition_table (id bigint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a malformed table? Does hive ignore the directories for non-partitioned tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems Hive return none when query this table(1.2.1):

hive> select * from xxxxx.xxxx;
OK
Time taken: 25.301 seconds

But no assertion error

Copy link
Contributor Author

@LantaoJin LantaoJin Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you are right. Actually the table LOCATION is /path/tablename/dt=yyyymmdd, But its data file paths are /path/tablename/dt=yyyymmdd/dt=yyyymmdd/xxx.parquet. I guess Hive does not recursively lookup load the data. So it return empty but not error.
And I found if when enable recursively lookup by .option("recursiveFileLookup", true), the inferPartitioning will be disable. So dt=yyyymmdd won't be treated as partitionSpec.

So should I revert the code changes and only keep the assert detail information? Or throws exception instead of assertion, and catch it then rollback to do not use built-in Parquet reader to read?

|STORED AS PARQUET LOCATION '$baseDir'
|""".stripMargin)
val e = intercept[AssertionError](spark.sql("select * from test")).getMessage
assert(e.contains("assertion failed"))
assert(spark.sql("select * from non_partition_table").collect() ===
Array(Row(0), Row(1), Row(2)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(spark.sql("select * from non_partition_table").collect() === Array(Row(0), Row(1), Row(2))) -> checkAnswer(spark.table("non_partition_table"), Row(0, 1, 2))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I wanna wait more comments to update in one.

}
}
})
Expand Down