-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22233] [core] Allow user to filter out empty split in HadoopRDD #19464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
cf0c350
31a5d30
4dcfd83
527b367
25f98d0
534d8fb
a6818b6
9f42f9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job | |
| import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} | ||
| import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} | ||
|
|
||
| import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES | ||
| import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS} | ||
| import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} | ||
| import org.apache.spark.storage.StorageLevel | ||
| import org.apache.spark.util.Utils | ||
|
|
@@ -347,10 +347,10 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { | |
| } | ||
| } | ||
|
|
||
| test ("allow user to disable the output directory existence checking (old Hadoop API") { | ||
| val sf = new SparkConf() | ||
| sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") | ||
| sc = new SparkContext(sf) | ||
| test ("allow user to disable the output directory existence checking (old Hadoop API)") { | ||
| val conf = new SparkConf() | ||
| conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") | ||
| sc = new SparkContext(conf) | ||
| val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) | ||
| randomRDD.saveAsTextFile(tempDir.getPath + "/output") | ||
| assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) | ||
|
|
@@ -380,9 +380,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { | |
| } | ||
|
|
||
| test ("allow user to disable the output directory existence checking (new Hadoop API") { | ||
| val sf = new SparkConf() | ||
| sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") | ||
| sc = new SparkContext(sf) | ||
| val conf = new SparkConf() | ||
| conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") | ||
| sc = new SparkContext(conf) | ||
| val randomRDD = sc.parallelize( | ||
| Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) | ||
| randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( | ||
|
|
@@ -510,4 +510,87 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { | |
| } | ||
| } | ||
|
|
||
| test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") { | ||
| val conf = new SparkConf() | ||
| conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) | ||
| sc = new SparkContext(conf) | ||
|
|
||
| def testIgnoreEmptySplits( | ||
| data: Array[Tuple2[String, String]], | ||
| actualPartitionNum: Int, | ||
| expectedPart: String, | ||
| expectedPartitionNum: Int): Unit = { | ||
|
||
| val output = new File(tempDir, "output") | ||
| sc.parallelize(data, actualPartitionNum) | ||
| .saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) | ||
| assert(new File(output, expectedPart).exists() === true) | ||
|
||
| val hadoopRDD = sc.textFile(new File(output, "part-*").getPath) | ||
| assert(hadoopRDD.partitions.length === expectedPartitionNum) | ||
| Utils.deleteRecursively(output) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe: try {
...
} finally {
Utils.deleteRecursively(output)
}
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need |
||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we maybe do this something like ... as below? (not tested) def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
actualPartitionNum: Int,
expectedName: String,
expectedPartitionNum: Int): Unit = {
val output = new File(tempDir, "output")
sc.parallelize(data, actualPartitionNum)
.saveAsHadoopFile[TextOutputFormat[String, String]](output.getAbsolutePath)
assert(new File(output, expectedPart).exists())
val hadoopRDD = sc.textFile(new File(output, "part-*").getAbsolutePath)
assert(hadoopRDD.partitions.length === expectedPartitionNum)
}
...
testIgnoreEmptySplits(
data = Array.empty[Tuple2[String, String]],
actualPartitionNum = 1,
expectedName = "part-00000",
expectedPartitionNum = 0)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, to me the previous tests were also okay to me as well .. |
||
|
|
||
| // Ensure that if all of the splits are empty, we remove the splits correctly | ||
| testIgnoreEmptySplits( | ||
| data = Array.empty[Tuple2[String, String]], | ||
| actualPartitionNum = 1, | ||
| expectedPart = "part-00000", | ||
| expectedPartitionNum = 0) | ||
|
|
||
| // Ensure that if no split is empty, we don't lose any splits | ||
| testIgnoreEmptySplits( | ||
| data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), | ||
| actualPartitionNum = 2, | ||
| expectedPart = "part-00001", | ||
| expectedPartitionNum = 2) | ||
|
|
||
| // Ensure that if part of the splits are empty, we remove the splits correctly | ||
| testIgnoreEmptySplits( | ||
| data = Array(("key1", "a"), ("key2", "a")), | ||
| actualPartitionNum = 5, | ||
| expectedPart = "part-00004", | ||
| expectedPartitionNum = 2) | ||
| } | ||
|
|
||
| test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") { | ||
| val conf = new SparkConf() | ||
| conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) | ||
| sc = new SparkContext(conf) | ||
|
|
||
| def testIgnoreEmptySplits( | ||
| data: Array[Tuple2[String, String]], | ||
| actualPartitionNum: Int, | ||
| expectedPart: String, | ||
| expectedPartitionNum: Int): Unit = { | ||
| val output = new File(tempDir, "output") | ||
| sc.parallelize(data, actualPartitionNum) | ||
| .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath) | ||
| assert(new File(output, expectedPart).exists() === true) | ||
| val hadoopRDD = sc.newAPIHadoopFile(new File(output, "part-r-*").getPath, | ||
| classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]) | ||
| .asInstanceOf[NewHadoopRDD[_, _]] | ||
|
||
| assert(hadoopRDD.partitions.length === expectedPartitionNum) | ||
| Utils.deleteRecursively(output) | ||
| } | ||
|
|
||
| // Ensure that if all of the splits are empty, we remove the splits correctly | ||
| testIgnoreEmptySplits( | ||
| data = Array.empty[Tuple2[String, String]], | ||
| actualPartitionNum = 1, | ||
| expectedPart = "part-r-00000", | ||
| expectedPartitionNum = 0) | ||
|
|
||
| // Ensure that if no split is empty, we don't lose any splits | ||
| testIgnoreEmptySplits( | ||
| data = Array(("1", "a"), ("2", "a"), ("3", "b")), | ||
| actualPartitionNum = 2, | ||
| expectedPart = "part-r-00001", | ||
| expectedPartitionNum = 2) | ||
|
|
||
| // Ensure that if part of the splits are empty, we remove the splits correctly | ||
| testIgnoreEmptySplits( | ||
| data = Array(("1", "a"), ("2", "b")), | ||
| actualPartitionNum = 5, | ||
| expectedPart = "part-r-00004", | ||
| expectedPartitionNum = 2) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config should be made internal, and the name should be improved because it's not about spark files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll send a follow-up PR to fix this.