Skip to content

Commit 0fa1066

Browse files
jiangxb1987cloud-fan
authored andcommitted
[SPARK-22233][CORE][FOLLOW-UP] Allow user to filter out empty split in HadoopRDD
## What changes were proposed in this pull request? Update the config `spark.files.ignoreEmptySplits`, rename it and make it internal. This is followup of #19464 ## How was this patch tested? Exsiting tests. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes #19504 from jiangxb1987/partitionsplit.
1 parent 0ae9649 commit 0fa1066

4 files changed

Lines changed: 19 additions & 14 deletions

File tree

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,12 @@ package object config {
270270
.longConf
271271
.createWithDefault(4 * 1024 * 1024)
272272

273-
private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
274-
.doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
275-
"SparkContext.textFiles will not create a partition for input splits that are empty.")
276-
.booleanConf
277-
.createWithDefault(false)
273+
private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS =
274+
ConfigBuilder("spark.hadoopRDD.ignoreEmptySplits")
275+
.internal()
276+
.doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.")
277+
.booleanConf
278+
.createWithDefault(false)
278279

279280
private[spark] val SECRET_REDACTION_PATTERN =
280281
ConfigBuilder("spark.redaction.regex")

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
3535
import org.apache.spark.broadcast.Broadcast
3636
import org.apache.spark.deploy.SparkHadoopUtil
3737
import org.apache.spark.internal.Logging
38-
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
38+
import org.apache.spark.internal.config._
3939
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
4040
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
4141
import org.apache.spark.storage.StorageLevel
@@ -134,7 +134,7 @@ class HadoopRDD[K, V](
134134

135135
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
136136

137-
private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
137+
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
138138

139139
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
140140
protected def getJobConf(): JobConf = {

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark._
3535
import org.apache.spark.annotation.DeveloperApi
3636
import org.apache.spark.deploy.SparkHadoopUtil
3737
import org.apache.spark.internal.Logging
38-
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
38+
import org.apache.spark.internal.config._
3939
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
4040
import org.apache.spark.storage.StorageLevel
4141
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -90,7 +90,7 @@ class NewHadoopRDD[K, V](
9090

9191
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
9292

93-
private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
93+
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
9494

9595
def getConf: Configuration = {
9696
val conf: Configuration = confBroadcast.value.value

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
3131
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
3232
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
3333

34-
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
34+
import org.apache.spark.internal.config._
3535
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
3636
import org.apache.spark.storage.StorageLevel
3737
import org.apache.spark.util.Utils
@@ -510,9 +510,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
510510
}
511511
}
512512

513-
test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
513+
test("spark.hadoopRDD.ignoreEmptySplits work correctly (old Hadoop API)") {
514514
val conf = new SparkConf()
515-
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
515+
.setAppName("test")
516+
.setMaster("local")
517+
.set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
516518
sc = new SparkContext(conf)
517519

518520
def testIgnoreEmptySplits(
@@ -549,9 +551,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
549551
expectedPartitionNum = 2)
550552
}
551553

552-
test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
554+
test("spark.hadoopRDD.ignoreEmptySplits work correctly (new Hadoop API)") {
553555
val conf = new SparkConf()
554-
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
556+
.setAppName("test")
557+
.setMaster("local")
558+
.set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
555559
sc = new SparkContext(conf)
556560

557561
def testIgnoreEmptySplits(

0 commit comments

Comments
 (0)