diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 50d977a92da51..a14bad47dfe10 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} @@ -35,8 +36,12 @@ private[spark] class BinaryFileRDD[T]( extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { - val inputFormat = inputFormatClass.newInstance val conf = getConf + // setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when + // traversing a large number of directories and files. Parallelize it. + conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS, + Runtime.getRuntime.availableProcessors().toString) + val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => configurable.setConf(conf) diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala index 8e1baae796fc5..9f3d0745c33c9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.{Text, Writable} import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} @@ -38,8 +39,12 @@ private[spark] class WholeTextFileRDD( extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { - val inputFormat = inputFormatClass.newInstance val conf = getConf + // setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when + // traversing a large number of directories and files. Parallelize it. + conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS, + Runtime.getRuntime.availableProcessors().toString) + val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => configurable.setConf(conf)