Skip to content

Conversation

@liutang123
Copy link
Contributor

@liutang123 liutang123 commented Oct 10, 2017

What changes were proposed in this pull request?

Add a flag spark.files.ignoreEmptySplits. When true, methods like that use HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not create a partition for input splits that are empty.

@kiszk
Copy link
Member

kiszk commented Oct 10, 2017

Could you please update the title of this PR appropriately? e.g. [SPARK-22233][core] ...

@liutang123 liutang123 changed the title Spark 22233 [SPARK-22233] [core] Oct 10, 2017
@liutang123 liutang123 changed the title [SPARK-22233] [core] [SPARK-22233] [core] Allow user to filter out empty split in HadoopRDD Oct 10, 2017
@liutang123
Copy link
Contributor Author

liutang123 commented Oct 11, 2017

@kiszk Any other suggestions an can this PR be merged?

@srowen
Copy link
Member

srowen commented Oct 11, 2017

Interesting. On the one hand I don't like adding yet another flag that changes behavior, when the user often can't meaningfully decide to set it. There is probably no value in processing an empty partition, sure. Then again it does change behavior slightly, and I wonder if that impacts assumptions that apps rely on somehow.

If there's no reason to expect downside, we could do this in Spark 3.x, or make the change now but yes introduce a flag as a safety valve to go back to old behavior, leaving the default to true.

But first are there any known impacts to skipping the empty partitions?

val rawSplits = inputFormat.getSplits(jobContext).toArray
var rawSplits = inputFormat.getSplits(jobContext).toArray(Array.empty[InputSplit])
if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
rawSplits = rawSplits.filter(_.getLength>0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space around operator.
You should filter before making an array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any one use empty file to do something ?
for example:
sc.textFile("/somepath/*").mapPartitions(....)
setting this flag to true by default may change the behavior of user's application.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable, also cc @cloud-fan

This setting is ignored for jobs generated through Spark Streaming's StreamingContext, since
data may need to be rewritten to pre-existing output directories during checkpoint recovery.</td>
</tr>
<tr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the config to internal/config.

SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

val inputSplits = if (......) {
    inputFormat.getSplits(jobConf, minPartitions).filter(_.getLength > 0)
} else {
   inputFormat.getSplits(jobConf, minPartitions)
}

We should alway try to not use var.

val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
inputSplits = inputSplits.filter(_.getLength>0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space around operator.

assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)

val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-00000")
assert(hadoopRDD.partitions.length === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should recycle the resources you required in the test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resources will be recycled by default in the afterEach function.

emptyRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)

val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-00000")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add the following test cases:

  1. Ensure that if no split is empty, we don't lose any splits;
  2. Ensure that if part of the splits are empty, we remove the splits correctly.

val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest not to use the name started by "spark.hadoop", this kind of configurations will be treated as Hadoop configuration and set into Hadoop Configuration, it might be better to choose another name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use spark.files prefix, taken after spark.files.ignoreCorruptFiles, spark.files.maxPartitionBytes and spark.files.openCostInBytes.

@jerryshao
Copy link
Contributor

IIUC this issue also existed in NewHadoopRDD and FileScanRDD (possibly), we'd better also fix them.

@HyukjinKwon
Copy link
Member

I think the optimisation by spark.sql.files.maxPartitionBytes sql specific conf includes this concept in FileScanRDD and it looks already partially doing it in combining input splits. I'd suggest to avoid putting this conf in FileScanRDD, for now, if I didn't miss something.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82658 has finished for PR 19464 at commit cf0c350.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

…Split and add it to internal/config; add test case
@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82672 has finished for PR 19464 at commit 31a5d30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Ensure that if all of the splits are empty, we remove the splits correctly
val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1)
emptyRDD.saveAsHadoopFile[TextOutputFormat[String, String]](tempDir.getPath + "/output")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't hardcode the path separator, use new File(tempDir, output).

@cloud-fan
Copy link
Contributor

I can't think of any downside, but it's always safe to avoid behavior changes. LGTM

.longConf
.createWithDefault(4 * 1024 * 1024)

private [spark] val FILTER_OUT_EMPTY_SPLIT = ConfigBuilder("spark.files.filterOutEmptySplit")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no space after private
This doc is much too verbose for a flag. Just say, "If true, methods like that use HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not create a partition for input splits that are empty."

SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val inputSplits = if (sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid duplicating inputFormat.getSplits(jobConf, minPartitions)

}

test("allow user to filter out empty split (old Hadoop API)") {
val sf = new SparkConf()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sf -> conf. You can fix it above too.

then the partitions with small files will be faster than partitions with bigger files.
</td>
</tr>
<tr>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I'd document this. It should be just a safety valve flag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we can make this conf an internal conf.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too except for those comments.

.longConf
.createWithDefault(4 * 1024 * 1024)

private [spark] val FILTER_OUT_EMPTY_SPLIT = ConfigBuilder("spark.files.filterOutEmptySplit")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about ignoreEmptySplits to be matched with ignoreCorruptFiles?

}

// Ensure that if all of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(Array.empty[Tuple2[String, String]], 1, 0, "part-00000", 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call it with named arguments, for example,

testIgnoreEmptySplits(
  Array.empty[Tuple2[String, String]],
  numSlices = 1,
  outputSuffix = 0,
  checkPart = "part-00000",
  expectedPartitionNum = 0)

assert(new File(output, checkPart).exists() === true)
val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
assert(hadoopRDD.partitions.length === expectedPartitionNum)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maybe do this something like ... as below? (not tested)

def testIgnoreEmptySplits(
    data: Array[Tuple2[String, String]],
    actualPartitionNum: Int,
    expectedName: String,
    expectedPartitionNum: Int): Unit = {
  val output = new File(tempDir, "output")
  sc.parallelize(data, actualPartitionNum)
    .saveAsHadoopFile[TextOutputFormat[String, String]](output.getAbsolutePath)
  assert(new File(output, expectedPart).exists())
  val hadoopRDD = sc.textFile(new File(output, "part-*").getAbsolutePath)
  assert(hadoopRDD.partitions.length === expectedPartitionNum)
}

...

testIgnoreEmptySplits(
  data = Array.empty[Tuple2[String, String]],
  actualPartitionNum = 1,
  expectedName = "part-00000",
  expectedPartitionNum = 0)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, to me the previous tests were also okay to me as well ..

val output = new File(tempDir, "output" + outputSuffix)
dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath)
assert(new File(output, checkPart).exists() === true)
val hadoopRDD = sc.textFile(new File(output, "part-r-*").getPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should read it with new hadoop API to test NewHadoopRDD I guess?

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82694 has finished for PR 19464 at commit 4dcfd83.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82696 has finished for PR 19464 at commit 527b367.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.createWithDefault(4 * 1024 * 1024)

private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
.doc("If true, methods like that use HadoopRDD and NewHadoopRDD such as " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like that -> that

conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)

def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one argument per line.

conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)

def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], numSlices: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@SparkQA
Copy link

SparkQA commented Oct 13, 2017

Test build #82716 has finished for PR 19464 at commit 25f98d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2017

Test build #82726 has finished for PR 19464 at commit 534d8fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val output = new File(tempDir, "output")
sc.parallelize(data, actualPartitionNum)
.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
assert(new File(output, expectedPart).exists() === true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the expectedPart parameter, just

for (i <- 0 until actualPartitionNum) {
  assert(new File(output, s"part-0000$i").exists() === true)
}

assert(new File(output, expectedPart).exists() === true)
val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
assert(hadoopRDD.partitions.length === expectedPartitionNum)
Utils.deleteRecursively(output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

try {
  ...
} finally {
  Utils.deleteRecursively(output)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need try... finally here. Because Utils.deleteRecursively(output) just to ensure
the success of next invocation of the testIgnoreEmptySplits. When test finished, wether be passed or not, the tempDir will be deleted in FileSuite.afterEach().

data: Array[Tuple2[String, String]],
actualPartitionNum: Int,
expectedPart: String,
expectedPartitionNum: Int): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation..

def testIgnoreEmptySplits(
    data: Array[Tuple2[String, String]],
    ...
    expectedPartitionNum: Int): Unit = {
  val output = new File(tempDir, "output")
  ...

@HyukjinKwon
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Oct 14, 2017

Test build #82752 has finished for PR 19464 at commit a6818b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix this nit when we change some codes around here next time.

}
val hadoopRDD = sc.newAPIHadoopFile(new File(output, "part-r-*").getPath,
classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text])
.asInstanceOf[NewHadoopRDD[_, _]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val hadoopRDD = sc.newAPIHadoopFile(
  new File(output, "part-r-*").getPath,
  classOf[NewTextInputFormat],
  classOf[LongWritable],
  classOf[Text]).asInstanceOf[NewHadoopRDD[_, _]]

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 014dc84 Oct 14, 2017
.longConf
.createWithDefault(4 * 1024 * 1024)

private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config should be made internal, and the name should be improved because it's not about spark files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll send a follow-up PR to fix this.

ghost pushed a commit to dbtsai/spark that referenced this pull request Oct 16, 2017
…n HadoopRDD

## What changes were proposed in this pull request?

Update the config `spark.files.ignoreEmptySplits`, rename it and make it internal.

This is followup of apache#19464

## How was this patch tested?

Exsiting tests.

Author: Xingbo Jiang <[email protected]>

Closes apache#19504 from jiangxb1987/partitionsplit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants