Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Nov 26, 2019

What changes were proposed in this pull request?

This reverts commit 31c4fab (#23052) to make sure the partition calling ManifestFileCommitProtocol.newTaskTempFile creates actual file.

This also reverts part of commit 0d3d46d (#26639) since the commit fixes the issue raised from 31c4fab and we're reverting back. The reason of partial revert is that we found the UT be worth to keep as it is, preventing regression - given the UT can detect the issue on empty partition -> no actual file. This makes one more change to UT; moved intentionally to test both DSv1 and DSv2.

Why are the changes needed?

After the changes in SPARK-26081 (commit 31c4fab / #23052), CSV/JSON/TEXT don't create actual file if the partition is empty. This optimization causes a problem in ManifestFileCommitProtocol: the API newTaskTempFile is called without actual file creation. Then fs.getFileStatus throws FileNotFoundException since the file is not created.

SPARK-29999 (commit 0d3d46d / #26639) fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Jenkins build will follow.

@HeartSaVioR
Copy link
Contributor Author

cc. @dongjoon-hyun @gatorsmile @gengliangwang @zsxwing - this is the PR to revert both commits. Once this is merged I'll raise a separate PR for adding UT. Please take a look. Thanks!

@HeartSaVioR
Copy link
Contributor Author

I skipped copying full title on each PR as we're reverting two issues which would make very long title.

@gengliangwang
Copy link
Member

Well I was expecting this PR comes with UTs...
@dongjoon-hyun @gatorsmile @zsxwing WDYT?

@dongjoon-hyun
Copy link
Member

Yes. I also expect only a revert of SPARK-26081 (including the SPARK-29999 body part).

@HeartSaVioR
Copy link
Contributor Author

Sorry I'm not clear on understanding the suggestion.

Looks like you don't suggest to "do clean revert on both issues", but suggest to "do dirty revert on SPARK-26081 - revert SPARK-26081 and also the things depending on SPARK-26081 like body part of SPARK-29999", right? The way I understand the suggestion was former, as community has been preferred clean revert for most of cases. So I might want to confirm this again.

And if we want to do the latter (clean revert on SPARK-26081 & partial revert on SPARK-29999), would we like to just mention the PR that the PR reverts SPARK-26081, or keep this as it is?

@HeartSaVioR
Copy link
Contributor Author

For now I restored the UT and move the UT to the place where it will be checked with both DSv1 and DSv2, and modified the description of PR. I left the title of the PR as it is, as it can be modified via committers so easier to handle directly if desired.

@gengliangwang
Copy link
Member

gengliangwang commented Nov 26, 2019

@HeartSaVioR the file CsvOutputWriter.scala was moved in another PR, and this PR is not directly reverting from the original commit 31c4fab.
I prefer resolving it all in one PR to make the commit history cleaner. We can also add a new test case to check the behavior that an empty Dataframe will output exactly one empty file.
The title can explain the detail changes.

But I am open with this and not strongly suggesting doing so. You can do revert and add tests as follow-up.

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114435 has finished for PR 26671 at commit 5bddefe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 26, 2019

Ah yes you're right that it cannot be reverted cleanly - so there's physically no clean revert.

Maybe I overthought here; I thought about how we deal with JIRA issue for SPARK-26081/SPARK-29999. If we reopen them (at least SPARK-26081) and open a chance to try to do the right fix, it'd be ideal if we have a "minimized" commit to revert the SPARK-26081 - so we can track how SPARK-26081 was introduced and reverted later, and re-introduced. If we would want to abandon the original idea of SPARK-26081 and close the issue as won't fix, any approach would be OK for me.

Btw, would you mind if I ask for elaboration on the new suggestion on the new UT?

We can also add a new test case to check the behavior that an empty Dataframe will output exactly one empty file.

I'm not familiar enough to understand the expectations/requirements on file sink; I feel the UT in SPARK-29999 can reside with reverting commit as the UT tests the regression what we've broken - we're reverting and adding the guard to prevent we don't break again. Is the new UT same case - did SPARK-26081 break the expectation? If not, that sounds to be on different purpose.

@gengliangwang
Copy link
Member

OK, I think the current PR is good :)

@HeartSaVioR
Copy link
Contributor Author

@gengliangwang Thanks for understanding and bearing with me. :)

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114439 has finished for PR 26671 at commit 4e09411.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114449 has finished for PR 26671 at commit 4e09411.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (addedFiles.nonEmpty) {
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
val statuses: Seq[SinkFileStatus] =
addedFiles.flatMap { f =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this if-else looked an overhead and it was added to avoid files not being written?

FWIW, there's still an old case when files are not written:

private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
new OrcOutputFormat().getRecordWriter(
new Path(path).getFileSystem(context.getConfiguration),
context.getConfiguration.asInstanceOf[JobConf],
path,
Reporter.NULL
).asInstanceOf[RecordWriter[NullWritable, Writable]]
}
override def write(row: InternalRow): Unit = {
recordWriter.write(NullWritable.get(), serializer.serialize(row))
}

spark.conf.set("spark.sql.orc.impl", "hive")
spark.range(10).filter(_ => false).write.orc("test.orc")

But I suspect it's a-okay since this behaviour will be superseded by "native" implementation completely in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. That would also break streaming sink if there's empty partition then. Ideally it should be fixed for streaming query, but I feel its scope is beyond the PR.

}

}

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this line in a revert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm taking this as OK to remove the line; please let me know if you meant removing the line in "changeset".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @HyukjinKwon means removing this line in the "changeset".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal. Will do.

@HyukjinKwon
Copy link
Member

cc @cloud-fan as well since I talked about empty files here and there (e.g. #18654) with him.

@cloud-fan
Copy link
Contributor

now we at least write one file for each partition?

@HyukjinKwon
Copy link
Member

I guess you mean if we write at least one file for each task? Yes, I think so except old Hive case (#26671 (comment))

@HeartSaVioR
Copy link
Contributor Author

now we at least write one file for each partition?

Yes, that has been the assumption of ManifestFileCommitProtocol, as it provides the path of temp file and there's no interface notifying whether the file is created or not when committing. So either assuming the file should be created, or need to check existence (that was SPARK-29999).

@cloud-fan
Copy link
Contributor

I'm fine to revert. one file per task is not that bad, and we won't have regressions.

@gengliangwang
Copy link
Member

now we at least write one file for each partition?

No, in DSV1, only partition 0 will write empty file: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L260

@gengliangwang
Copy link
Member

@HeartSaVioR Please update the PR description with more details

@HeartSaVioR
Copy link
Contributor Author

@gengliangwang I added the reason to move the UT. If you meant others, would you mind if I ask to be more specific in review comments?

@gengliangwang
Copy link
Member

@HeartSaVioR I have updated the PR description from

We found a bug on SPARK-26081 and SPARK-29999 was proposed to fix it, but we decided to revert both as it's too costly to apply SPARK-29999 for SPARK-26081; SPARK-26081 may be resubmitted if there's viable approach for dealing with bug.

to

For Spark file sources, in case of an empty job, we leave the first partition to save meta for file format like parquet.
After the changes in SPARK-26081, CSV/JSON/TEXT won't be able to output an empty file for an empty job. This optimization causes a problem in `ManifestFileCommitProtocol`: the API `newTaskTempFile` is called without actual file creation. Then `fs.getFileStatus` throws FileNotFoundException since the file is not created.

SPARK-29999 fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081.

So that the context is more straightforward to developers.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 26, 2019

Looks like we are mixing up two different problems.

Let me define the problem properly; the root issue of SPARK-29999 was due to "empty partition", not "empty job". SPARK-26081 also optimizes about "empty partition". (If empty job is also affected then that's an unintended side-effect.) That's why I have been trying to decouple "empty job" with this PR. (So that's why I said "different purpose" on request to add UT for verifying empty job.)

Btw, I'm seeing what you meant about "details"; thanks for providing the detailed explanation! Let me update the description so that we say about "empty partition", not "empty job".

@HeartSaVioR
Copy link
Contributor Author

Just updated the description.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me too

@gengliangwang
Copy link
Member

I will merge it once jenkins test passes.

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114481 has finished for PR 26671 at commit 3fa7236.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

Thanks, merging to master

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the revert-SPARK-26081-SPARK-29999 branch November 27, 2019 03:33
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Dec 6, 2019
### What changes were proposed in this pull request?

This reverts commit 31c4fab (apache#23052) to make sure the partition calling `ManifestFileCommitProtocol.newTaskTempFile` creates actual file.

This also reverts part of commit 0d3d46d (apache#26639) since the commit fixes the issue raised from 31c4fab and we're reverting back. The reason of partial revert is that we found the UT be worth to keep as it is, preventing regression - given the UT can detect the issue on empty partition -> no actual file. This makes one more change to UT; moved intentionally to test both DSv1 and DSv2.

### Why are the changes needed?

After the changes in SPARK-26081 (commit 31c4fab / apache#23052), CSV/JSON/TEXT don't create actual file if the partition is empty. This optimization causes a problem in `ManifestFileCommitProtocol`: the API `newTaskTempFile` is called without actual file creation. Then `fs.getFileStatus` throws `FileNotFoundException` since the file is not created.

SPARK-29999 (commit 0d3d46d / apache#26639) fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Jenkins build will follow.

Closes apache#26671 from HeartSaVioR/revert-SPARK-26081-SPARK-29999.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants