Skip to content

Commit fe3242f

Browse files
HyukjinKwonuzadude
authored andcommitted
[SPARK-16975][SQL][FOLLOWUP] Do not duplicately check file paths in data sources implementing FileFormat
## What changes were proposed in this pull request? This PR cleans up duplicated checking for file paths in implemented data sources and prevent to attempt to list twice in ORC data source. apache#14585 handles a problem for the partition column name having `_` and the issue itself is resolved correctly. However, it seems the data sources implementing `FileFormat` are validating the paths duplicately. Assuming from the comment in `CSVFileFormat`, `// TODO: Move filtering.`, I guess we don't have to check this duplicately. Currently, this seems being filtered in `PartitioningAwareFileIndex.shouldFilterOut` and`PartitioningAwareFileIndex.isDataPath`. So, `FileFormat.inferSchema` will always receive leaf files. For example, running to codes below: ``` scala spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet") spark.read.parquet("/tmp/parquet") ``` gives the paths below without directories but just valid data files: ``` bash /tmp/parquet/_col=0/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet /tmp/parquet/_col=1/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet /tmp/parquet/_col=2/part-r-00000-25de2b50-225a-4bcf-a2bc-9eb9ed407ef6.snappy.parquet ... ``` to `FileFormat.inferSchema`. ## How was this patch tested? Unit test added in `HadoopFsRelationTest` and related existing tests. Author: hyukjinkwon <[email protected]> Closes apache#14627 from HyukjinKwon/SPARK-16975.
1 parent 6e45753 commit fe3242f

4 files changed

Lines changed: 21 additions & 15 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,9 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
5555
options: Map[String, String],
5656
files: Seq[FileStatus]): Option[StructType] = {
5757
require(files.nonEmpty, "Cannot infer schema from an empty set of files")
58-
val csvOptions = new CSVOptions(options)
5958

60-
// TODO: Move filtering.
61-
val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
59+
val csvOptions = new CSVOptions(options)
60+
val paths = files.map(_.getPath.toString)
6261
val lines: Dataset[String] = readText(sparkSession, csvOptions, paths)
6362
val firstLine: String = findFirstLine(csvOptions, lines)
6463
val firstRow = new CsvReader(csvOptions).parseLine(firstLine)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
5151
val columnNameOfCorruptRecord =
5252
parsedOptions.columnNameOfCorruptRecord
5353
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
54-
val jsonFiles = files.filterNot { status =>
55-
val name = status.getPath.getName
56-
(name.startsWith("_") && !name.contains("=")) || name.startsWith(".")
57-
}.toArray
58-
5954
val jsonSchema = InferSchema.infer(
60-
createBaseRdd(sparkSession, jsonFiles),
55+
createBaseRdd(sparkSession, files),
6156
columnNameOfCorruptRecord,
6257
parsedOptions)
6358
checkConstraints(jsonSchema)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,7 @@ class ParquetFileFormat
241241
commonMetadata: Seq[FileStatus])
242242

243243
private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = {
244-
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
245-
val leaves = allFiles.filter { f =>
246-
isSummaryFile(f.getPath) ||
247-
!((f.getPath.getName.startsWith("_") && !f.getPath.getName.contains("=")) ||
248-
f.getPath.getName.startsWith("."))
249-
}.toArray.sortBy(_.getPath.toString)
244+
val leaves = allFiles.toArray.sortBy(_.getPath.toString)
250245

251246
FileTypes(
252247
data = leaves.filterNot(f => isSummaryFile(f.getPath)),

sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,23 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
877877
}
878878
}
879879
}
880+
881+
test("SPARK-16975: Partitioned table with the column having '_' should be read correctly") {
882+
withTempDir { dir =>
883+
val childDir = new File(dir, dataSourceName).getCanonicalPath
884+
val dataDf = spark.range(10).toDF()
885+
val df = dataDf.withColumn("_col", $"id")
886+
df.write.format(dataSourceName).partitionBy("_col").save(childDir)
887+
val reader = spark.read.format(dataSourceName)
888+
889+
// This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema.
890+
if (dataSourceName == classOf[SimpleTextSource].getCanonicalName) {
891+
reader.option("dataSchema", dataDf.schema.json)
892+
}
893+
val readBack = reader.load(childDir)
894+
checkAnswer(df, readBack)
895+
}
896+
}
880897
}
881898

882899
// This class is used to test SPARK-8578. We should not use any custom output committer when

0 commit comments

Comments
 (0)