Skip to content

Commit 78787b1

Browse files
committed
Remove footerCache in FilteringParquetRowInputFormat.
1 parent dff6fba commit 78787b1

File tree

3 files changed

+16
-43
lines changed

3 files changed

+16
-43
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
374374
private[parquet] class FilteringParquetRowInputFormat
375375
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
376376

377-
private var footers: JList[Footer] = _
378-
379377
private var fileStatuses = Map.empty[Path, FileStatus]
380378

381379
override def createRecordReader(
@@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
396394
}
397395
}
398396

399-
override def getFooters(jobContext: JobContext): JList[Footer] = {
400-
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache
401-
402-
if (footers eq null) {
403-
val conf = ContextUtil.getConfiguration(jobContext)
404-
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
405-
val statuses = listStatus(jobContext)
406-
fileStatuses = statuses.map(file => file.getPath -> file).toMap
407-
if (statuses.isEmpty) {
408-
footers = Collections.emptyList[Footer]
409-
} else if (!cacheMetadata) {
410-
// Read the footers from HDFS
411-
footers = getFooters(conf, statuses)
412-
} else {
413-
// Read only the footers that are not in the footerCache
414-
val foundFooters = footerCache.getAllPresent(statuses)
415-
val toFetch = new ArrayList[FileStatus]
416-
for (s <- statuses) {
417-
if (!foundFooters.containsKey(s)) {
418-
toFetch.add(s)
419-
}
420-
}
421-
val newFooters = new mutable.HashMap[FileStatus, Footer]
422-
if (toFetch.size > 0) {
423-
val startFetch = System.currentTimeMillis
424-
val fetched = getFooters(conf, toFetch)
425-
logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
426-
for ((status, i) <- toFetch.zipWithIndex) {
427-
newFooters(status) = fetched.get(i)
428-
}
429-
footerCache.putAll(newFooters)
430-
}
431-
footers = new ArrayList[Footer](statuses.size)
432-
for (status <- statuses) {
433-
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
434-
}
435-
}
436-
}
397+
// This is only a temporary solution sicne we need to use fileStatuses in
398+
// both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
399+
// two methods.
400+
override def getSplits(jobContext: JobContext): JList[InputSplit] = {
401+
// First set fileStatuses.
402+
val statuses = listStatus(jobContext)
403+
fileStatuses = statuses.map(file => file.getPath -> file).toMap
437404

438-
footers
405+
super.getSplits(jobContext)
439406
}
440407

441408
// TODO Remove this method and related code once PARQUET-16 is fixed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
200200
private var commonMetadataStatuses: Array[FileStatus] = _
201201

202202
// Parquet footer cache.
203-
private var footers: Map[FileStatus, Footer] = _
203+
var footers: Map[FileStatus, Footer] = _
204204

205205
// `FileStatus` objects of all data files (Parquet part-files).
206206
var dataStatuses: Array[FileStatus] = _
@@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
400400
} else {
401401
metadataCache.dataStatuses.toSeq
402402
}
403+
val selectedFooters = selectedFiles.map(metadataCache.footers)
403404

404405
// FileInputFormat cannot handle empty lists.
405406
if (selectedFiles.nonEmpty) {
@@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
447448
@transient
448449
val cachedStatus = selectedFiles
449450

451+
@transient
452+
val cachedFooters = selectedFooters
453+
450454
// Overridden so we can inject our own cached files statuses.
451455
override def getPartitions: Array[SparkPartition] = {
452456
val inputFormat = if (cacheMetadata) {
453457
new FilteringParquetRowInputFormat {
454458
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
459+
460+
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
455461
}
456462
} else {
457463
new FilteringParquetRowInputFormat

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
411411
""")
412412
}
413413

414-
test("SPARK-6016 make sure to use the latest footers are used") {
414+
test("SPARK-6016 make sure to use the latest footers") {
415415
sql("drop table if exists spark_6016_fix")
416416

417417
// Create a DataFrame with two partitions. So, the created table will have two parquet files.

0 commit comments

Comments
 (0)