Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,12 @@ package object config {
.longConf
.createWithDefault(4 * 1024 * 1024)

private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
.doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
"SparkContext.textFiles will not create a partition for input splits that are empty.")
.booleanConf
.createWithDefault(false)
private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS =
ConfigBuilder("spark.hadoopRDD.ignoreEmptySplits")
.internal()
.doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.")
.booleanConf
.createWithDefault(false)

private[spark] val SECRET_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.regex")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
import org.apache.spark.internal.config._
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -134,7 +134,7 @@ class HadoopRDD[K, V](

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
import org.apache.spark.internal.config._
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
Expand Down Expand Up @@ -90,7 +90,7 @@ class NewHadoopRDD[K, V](

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
Expand Down
14 changes: 9 additions & 5 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -510,9 +510,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
test("spark.hadoopRDD.ignoreEmptySplits work correctly (old Hadoop API)") {
val conf = new SparkConf()
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
.setAppName("test")
.setMaster("local")
.set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)

def testIgnoreEmptySplits(
Expand Down Expand Up @@ -549,9 +551,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
expectedPartitionNum = 2)
}

test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
test("spark.hadoopRDD.ignoreEmptySplits work correctly (new Hadoop API)") {
val conf = new SparkConf()
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
.setAppName("test")
.setMaster("local")
.set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)

def testIgnoreEmptySplits(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testIgnoreEmptySplits(
       data = Array.empty[Tuple2[String, String]],
       actualPartitionNum = 1,
       expectedPartitionNum = 0)

=>

testIgnoreEmptySplits(
       data = Array.empty[(String, String)],
       actualPartitionNum = 1,
       expectedPartitionNum = 0)

Expand Down