Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ package object config {
.longConf
.createWithDefault(4 * 1024 * 1024)

private [spark] val FILTER_OUT_EMPTY_SPLIT = ConfigBuilder("spark.files.filterOutEmptySplit")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no space after private
This doc is much too verbose for a flag. Just say, "If true, methods like that use HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not create a partition for input splits that are empty."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about ignoreEmptySplits to be matched with ignoreCorruptFiles?

.doc("If set to true, HadoopRDD/NewHadoopRDD will not handle the split which its length is 0." +
"Maybe you will read an empty hive table but has many empty files. If set to false, Spark " +
"generates many tasks to handle these empty files. Sometimes, users maybe want to use " +
"SparkContext#textFile to handle a file stored in hadoop, and they don't want to generate " +
"any task when this file is empty, they can set this configuration to true.")
.booleanConf
.createWithDefault(false)

private[spark] val SECRET_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.regex")
.doc("Regex to decide which Spark configuration properties and environment variables in " +
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES}
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -196,7 +196,11 @@ class HadoopRDD[K, V](
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val inputSplits = if (sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid duplicating inputFormat.getSplits(jobConf, minPartitions)

inputFormat.getSplits(jobConf, minPartitions).filter(_.getLength > 0)
} else {
inputFormat.getSplits(jobConf, minPartitions)
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.IOException
import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
Expand All @@ -34,7 +35,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES}
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
Expand Down Expand Up @@ -122,7 +123,11 @@ class NewHadoopRDD[K, V](
case _ =>
}
val jobContext = new JobContextImpl(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val rawSplits = if (sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) {
inputFormat.getSplits(jobContext).asScala.filter(_.getLength > 0)
} else {
inputFormat.getSplits(jobContext).asScala
}
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
Expand Down
65 changes: 63 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.internal.config.{FILTER_OUT_EMPTY_SPLIT, IGNORE_CORRUPT_FILES}
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -347,7 +347,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (old Hadoop API") {
test ("allow user to disable the output directory existence checking (old Hadoop API)") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
Expand Down Expand Up @@ -510,4 +510,65 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("allow user to filter out empty split (old Hadoop API)") {
val sf = new SparkConf()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sf -> conf. You can fix it above too.

sf.setAppName("test").setMaster("local").set(FILTER_OUT_EMPTY_SPLIT, true)
sc = new SparkContext(sf)

// Ensure that if all of the splits are empty, we remove the splits correctly
val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1)
emptyRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't hardcode the path separator, use new File(tempDir, output).

assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-*")
assert(hadoopRDD.partitions.length === 0)

// Ensure that if no split is empty, we don't lose any splits
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 2)
randomRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output1")
assert(new File(tempDir.getPath + "/output1/part-00001").exists() === true)
val hadoopRDD1 = sc.textFile(tempDir.getPath + "/output1/part-*")
assert(hadoopRDD1.partitions.length === 2)

// Ensure that if part of the splits are empty, we remove the splits correctly
val randomRDD2 = sc.parallelize(
Array(("key1", "a"), ("key2", "a")), 5)
randomRDD2.saveAsHadoopFile[TextOutputFormat[String, String]](
tempDir.getPath + "/output2")
assert(new File(tempDir.getPath + "/output2/part-00004").exists() === true)
val hadoopRDD2 = sc.textFile(tempDir.getPath + "/output2/part-*")
assert(hadoopRDD2.partitions.length === 2)
}

test("allow user to filter out empty split (new Hadoop API)") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set(FILTER_OUT_EMPTY_SPLIT, true)
sc = new SparkContext(sf)

// Ensure that if all of the splits are empty, we remove the splits correctly
val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1)
emptyRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-r-*")
assert(hadoopRDD.partitions.length === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should recycle the resources you required in the test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resources will be recycled by default in the afterEach function.


// Ensure that if no split is empty, we don't lose any splits
val randomRDD1 = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 2)
randomRDD1.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output1")
assert(new File(tempDir.getPath + "/output1/part-r-00001").exists() === true)
val hadoopRDD1 = sc.textFile(tempDir.getPath + "/output1/part-r-*")
assert(hadoopRDD1.partitions.length === 2)

// Ensure that if part of the splits are empty, we remove the splits correctly
val randomRDD2 = sc.parallelize(
Array(("key1", "a"), ("key2", "a")), 5)
randomRDD2.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output2")
assert(new File(tempDir.getPath + "/output2/part-r-00004").exists() === true)
val hadoopRDD2 = sc.textFile(tempDir.getPath + "/output2/part-r-*")
assert(hadoopRDD2.partitions.length === 2)
}
}
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,14 @@ Apart from these, the following properties are also available, and may be useful
then the partitions with small files will be faster than partitions with bigger files.
</td>
</tr>
<tr>
<td><code>spark.files.filterOutEmptySplit</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I'd document this. It should be just a safety valve flag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we can make this conf an internal conf.

<td>false</td>
<td>If set to true, HadoopRDD/NewHadoopRDD will not handle the split which its length is 0. Maybe you will read an empty
hive table but has many empty files. If set to false, Spark generates many tasks to handle these empty files.
Sometimes, users maybe want to use SparkContext#textFile to handle a file stored in hadoop, and they don't
want to generate any task when this file is empty, they can set this configuration to true.</td>
</tr>
<tr>
<td><code>spark.hadoop.cloneConf</code></td>
<td>false</td>
Expand Down