Skip to content

Commit 4875755

Browse files
committed
add minSplits for WholeTextFiles
1 parent ca11919 commit 4875755

File tree

3 files changed

+30
-5
lines changed

3 files changed

+30
-5
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,12 +456,13 @@ class SparkContext(config: SparkConf) extends Logging {
456456
*
457457
* @note Small files are preferred, as each file will be loaded fully in memory.
458458
*/
459-
def wholeTextFiles(path: String): RDD[(String, String)] = {
459+
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
460460
newAPIHadoopFile(
461461
path,
462462
classOf[WholeTextFileInputFormat],
463463
classOf[String],
464-
classOf[String])
464+
classOf[String],
465+
minSplits = minSplits)
465466
}
466467

467468
/**
@@ -584,11 +585,12 @@ class SparkContext(config: SparkConf) extends Logging {
584585
fClass: Class[F],
585586
kClass: Class[K],
586587
vClass: Class[V],
587-
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
588+
conf: Configuration = hadoopConfiguration,
589+
minSplits: Int = 1): RDD[(K, V)] = {
588590
val job = new NewHadoopJob(conf)
589591
NewFileInputFormat.addInputPath(job, new Path(path))
590592
val updatedConf = job.getConfiguration
591-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
593+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, minSplits)
592594
}
593595

594596
/**

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.spark.input
1919

20+
import scala.collection.JavaConversions._
21+
2022
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.fs.FileStatus
2124
import org.apache.hadoop.mapreduce.InputSplit
2225
import org.apache.hadoop.mapreduce.JobContext
2326
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
@@ -44,4 +47,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
4447
context,
4548
classOf[WholeTextFileRecordReader])
4649
}
50+
51+
/**
52+
* Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
53+
*/
54+
def setMaxSplitSize(context: JobContext, minSplits: Int) {
55+
val files = listStatus(context)
56+
val totalLen = files.map { file =>
57+
if (file.isDir) 0L else file.getLen
58+
}.sum
59+
super.setMaxSplitSize(totalLen / (if (minSplits == 0) 1 else minSplits))
60+
}
4761
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce._
2626

2727
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
2828
import org.apache.spark.annotation.DeveloperApi
29+
import org.apache.spark.input.WholeTextFileInputFormat
2930

3031
private[spark]
3132
class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
@@ -56,7 +57,8 @@ class NewHadoopRDD[K, V](
5657
inputFormatClass: Class[_ <: InputFormat[K, V]],
5758
keyClass: Class[K],
5859
valueClass: Class[V],
59-
@transient conf: Configuration)
60+
@transient conf: Configuration,
61+
minSplits: Int = 1)
6062
extends RDD[(K, V)](sc, Nil)
6163
with SparkHadoopMapReduceUtil
6264
with Logging {
@@ -74,10 +76,17 @@ class NewHadoopRDD[K, V](
7476

7577
override def getPartitions: Array[Partition] = {
7678
val inputFormat = inputFormatClass.newInstance
79+
7780
if (inputFormat.isInstanceOf[Configurable]) {
7881
inputFormat.asInstanceOf[Configurable].setConf(conf)
7982
}
83+
8084
val jobContext = newJobContext(conf, jobId)
85+
86+
if (inputFormat.isInstanceOf[WholeTextFileInputFormat]) {
87+
inputFormat.asInstanceOf[WholeTextFileInputFormat].setMaxSplitSize(jobContext, minSplits)
88+
}
89+
8190
val rawSplits = inputFormat.getSplits(jobContext).toArray
8291
val result = new Array[Partition](rawSplits.size)
8392
for (i <- 0 until rawSplits.size) {

0 commit comments

Comments
 (0)