Skip to content

Commit 5b628f8

Browse files
HeartSaVioRgengliangwang
authored andcommitted
### What changes were proposed in this pull request? This reverts commit 31c4fab (#23052) to make sure the partition calling `ManifestFileCommitProtocol.newTaskTempFile` creates actual file. This also reverts part of commit 0d3d46d (#26639) since the commit fixes the issue raised from 31c4fab and we're reverting back. The reason of partial revert is that we found the UT be worth to keep as it is, preventing regression - given the UT can detect the issue on empty partition -> no actual file. This makes one more change to UT; moved intentionally to test both DSv1 and DSv2. ### Why are the changes needed? After the changes in SPARK-26081 (commit 31c4fab / #23052), CSV/JSON/TEXT don't create actual file if the partition is empty. This optimization causes a problem in `ManifestFileCommitProtocol`: the API `newTaskTempFile` is called without actual file creation. Then `fs.getFileStatus` throws `FileNotFoundException` since the file is not created. SPARK-29999 (commit 0d3d46d / #26639) fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Jenkins build will follow. Closes #26671 from HeartSaVioR/revert-SPARK-26081-SPARK-29999. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent bdf0c60 commit 5b628f8

8 files changed

Lines changed: 73 additions & 131 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CsvOutputWriter.scala

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,17 @@ class CsvOutputWriter(
3333
context: TaskAttemptContext,
3434
params: CSVOptions) extends OutputWriter with Logging {
3535

36-
private var univocityGenerator: Option[UnivocityGenerator] = None
36+
private val charset = Charset.forName(params.charset)
37+
38+
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
39+
40+
private val gen = new UnivocityGenerator(dataSchema, writer, params)
3741

3842
if (params.headerFlag) {
39-
val gen = getGen()
4043
gen.writeHeaders()
4144
}
4245

43-
private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse {
44-
val charset = Charset.forName(params.charset)
45-
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
46-
val newGen = new UnivocityGenerator(dataSchema, os, params)
47-
univocityGenerator = Some(newGen)
48-
newGen
49-
}
50-
51-
override def write(row: InternalRow): Unit = {
52-
val gen = getGen()
53-
gen.write(row)
54-
}
46+
override def write(row: InternalRow): Unit = gen.write(row)
5547

56-
override def close(): Unit = univocityGenerator.foreach(_.close())
48+
override def close(): Unit = gen.close()
5749
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,18 @@ class JsonOutputWriter(
4444
" which can be read back by Spark only if multiLine is enabled.")
4545
}
4646

47-
private var jacksonGenerator: Option[JacksonGenerator] = None
47+
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
4848

49-
override def write(row: InternalRow): Unit = {
50-
val gen = jacksonGenerator.getOrElse {
51-
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
52-
// create the Generator without separator inserted between 2 records
53-
val newGen = new JacksonGenerator(dataSchema, os, options)
54-
jacksonGenerator = Some(newGen)
55-
newGen
56-
}
49+
// create the Generator without separator inserted between 2 records
50+
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
5751

52+
override def write(row: InternalRow): Unit = {
5853
gen.write(row)
5954
gen.writeLineEnding()
6055
}
6156

62-
override def close(): Unit = jacksonGenerator.foreach(_.close())
57+
override def close(): Unit = {
58+
gen.close()
59+
writer.close()
60+
}
6361
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.text
1818

19-
import java.io.OutputStream
20-
2119
import org.apache.hadoop.fs.Path
2220
import org.apache.hadoop.mapreduce.TaskAttemptContext
2321

@@ -32,23 +30,17 @@ class TextOutputWriter(
3230
context: TaskAttemptContext)
3331
extends OutputWriter {
3432

35-
private var outputStream: Option[OutputStream] = None
33+
private val writer = CodecStreams.createOutputStream(context, new Path(path))
3634

3735
override def write(row: InternalRow): Unit = {
38-
val os = outputStream.getOrElse {
39-
val newStream = CodecStreams.createOutputStream(context, new Path(path))
40-
outputStream = Some(newStream)
41-
newStream
42-
}
43-
4436
if (!row.isNullAt(0)) {
4537
val utf8string = row.getUTF8String(0)
46-
utf8string.writeTo(os)
38+
utf8string.writeTo(writer)
4739
}
48-
os.write(lineSeparator)
40+
writer.write(lineSeparator)
4941
}
5042

5143
override def close(): Unit = {
52-
outputStream.foreach(_.close())
44+
writer.close()
5345
}
5446
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.UUID
2222

2323
import scala.collection.mutable.ArrayBuffer
2424

25-
import org.apache.hadoop.fs.{FileSystem, Path}
25+
import org.apache.hadoop.fs.Path
2626
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
2727

2828
import org.apache.spark.internal.Logging
@@ -89,7 +89,9 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
8989
try {
9090
val fs = path.getFileSystem(jobContext.getConfiguration)
9191
// this is to make sure the file can be seen from driver as well
92-
deleteIfExists(fs, path)
92+
if (fs.exists(path)) {
93+
fs.delete(path, false)
94+
}
9395
} catch {
9496
case e: IOException =>
9597
logWarning(s"Fail to remove temporary file $path, continue removing next.", e)
@@ -137,14 +139,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
137139
if (addedFiles.nonEmpty) {
138140
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
139141
val statuses: Seq[SinkFileStatus] =
140-
addedFiles.flatMap { f =>
141-
val path = new Path(f)
142-
if (fs.exists(path)) {
143-
Some(SinkFileStatus(fs.getFileStatus(path)))
144-
} else {
145-
None
146-
}
147-
}
142+
addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f))))
148143
new TaskCommitMessage(statuses)
149144
} else {
150145
new TaskCommitMessage(Seq.empty[SinkFileStatus])
@@ -155,13 +150,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
155150
// best effort cleanup of incomplete files
156151
if (addedFiles.nonEmpty) {
157152
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
158-
addedFiles.foreach { file => deleteIfExists(fs, new Path(file)) }
159-
}
160-
}
161-
162-
private def deleteIfExists(fs: FileSystem, path: Path, recursive: Boolean = false): Unit = {
163-
if (fs.exists(path)) {
164-
fs.delete(path, recursive)
153+
addedFiles.foreach { file => fs.delete(new Path(file), false) }
165154
}
166155
}
167156
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,15 +2073,6 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
20732073
}
20742074
}
20752075

2076-
test("do not produce empty files for empty partitions") {
2077-
withTempPath { dir =>
2078-
val path = dir.getCanonicalPath
2079-
spark.emptyDataset[String].write.csv(path)
2080-
val files = new File(path).listFiles()
2081-
assert(!files.exists(_.getName.endsWith("csv")))
2082-
}
2083-
}
2084-
20852076
test("Do not reuse last good value for bad input field") {
20862077
val schema = StructType(
20872078
StructField("col1", StringType) ::

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2471,15 +2471,6 @@ class JsonSuite extends QueryTest with SharedSparkSession with TestJsonData {
24712471
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
24722472
}
24732473

2474-
test("do not produce empty files for empty partitions") {
2475-
withTempPath { dir =>
2476-
val path = dir.getCanonicalPath
2477-
spark.emptyDataset[String].write.json(path)
2478-
val files = new File(path).listFiles()
2479-
assert(!files.exists(_.getName.endsWith("json")))
2480-
}
2481-
}
2482-
24832474
test("return partial result for bad records") {
24842475
val schema = "a double, b array<int>, c string, _corrupt_record string"
24852476
val badRecords = Seq(

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,4 @@ class TextSuite extends QueryTest with SharedSparkSession {
233233
assert(data(3) == Row("\"doh\""))
234234
assert(data.length == 4)
235235
}
236-
237-
test("do not produce empty files for empty partitions") {
238-
withTempPath { dir =>
239-
val path = dir.getCanonicalPath
240-
spark.emptyDataset[String].write.text(path)
241-
val files = new File(path).listFiles()
242-
assert(!files.exists(_.getName.endsWith("txt")))
243-
}
244-
}
245236
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,54 @@ abstract class FileStreamSinkSuite extends StreamTest {
525525
}
526526
}
527527
}
528+
529+
test("Handle FileStreamSink metadata correctly for empty partition") {
530+
Seq("parquet", "orc", "text", "json").foreach { format =>
531+
val inputData = MemoryStream[String]
532+
val df = inputData.toDF()
533+
534+
withTempDir { outputDir =>
535+
withTempDir { checkpointDir =>
536+
var query: StreamingQuery = null
537+
try {
538+
// repartition to more than the input to leave empty partitions
539+
query =
540+
df.repartition(10)
541+
.writeStream
542+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
543+
.format(format)
544+
.start(outputDir.getCanonicalPath)
545+
546+
inputData.addData("1", "2", "3")
547+
inputData.addData("4", "5")
548+
549+
failAfter(streamingTimeout) {
550+
query.processAllAvailable()
551+
}
552+
} finally {
553+
if (query != null) {
554+
query.stop()
555+
}
556+
}
557+
558+
val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
559+
spark.sessionState.newHadoopConf())
560+
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
561+
outputDir.getCanonicalPath)
562+
563+
val allFiles = sinkLog.allFiles()
564+
// only files from non-empty partition should be logged
565+
assert(allFiles.length < 10)
566+
assert(allFiles.forall(file => fs.exists(new Path(file.path))))
567+
568+
// the query should be able to read all rows correctly with metadata log
569+
val outputDf = spark.read.format(format).load(outputDir.getCanonicalPath)
570+
.selectExpr("CAST(value AS INT)").as[Int]
571+
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5)
572+
}
573+
}
574+
}
575+
}
528576
}
529577

530578
object PendingCommitFilesTrackingManifestFileCommitProtocol {
@@ -600,61 +648,11 @@ class FileStreamSinkV1Suite extends FileStreamSinkSuite {
600648
}
601649

602650
class FileStreamSinkV2Suite extends FileStreamSinkSuite {
603-
import testImplicits._
604-
605651
override protected def sparkConf: SparkConf =
606652
super
607653
.sparkConf
608654
.set(SQLConf.USE_V1_SOURCE_LIST, "")
609655

610-
test("SPARK-29999 Handle FileStreamSink metadata correctly for empty partition") {
611-
Seq("parquet", "orc", "text", "json").foreach { format =>
612-
val inputData = MemoryStream[String]
613-
val df = inputData.toDF()
614-
615-
withTempDir { outputDir =>
616-
withTempDir { checkpointDir =>
617-
var query: StreamingQuery = null
618-
try {
619-
// repartition to more than the input to leave empty partitions
620-
query =
621-
df.repartition(10)
622-
.writeStream
623-
.option("checkpointLocation", checkpointDir.getCanonicalPath)
624-
.format(format)
625-
.start(outputDir.getCanonicalPath)
626-
627-
inputData.addData("1", "2", "3")
628-
inputData.addData("4", "5")
629-
630-
failAfter(streamingTimeout) {
631-
query.processAllAvailable()
632-
}
633-
} finally {
634-
if (query != null) {
635-
query.stop()
636-
}
637-
}
638-
639-
val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
640-
spark.sessionState.newHadoopConf())
641-
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
642-
outputDir.getCanonicalPath)
643-
644-
val allFiles = sinkLog.allFiles()
645-
// only files from non-empty partition should be logged
646-
assert(allFiles.length < 10)
647-
assert(allFiles.forall(file => fs.exists(new Path(file.path))))
648-
649-
// the query should be able to read all rows correctly with metadata log
650-
val outputDf = spark.read.format(format).load(outputDir.getCanonicalPath)
651-
.selectExpr("CAST(value AS INT)").as[Int]
652-
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5)
653-
}
654-
}
655-
}
656-
}
657-
658656
override def checkQueryExecution(df: DataFrame): Unit = {
659657
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
660658
// been inferred

0 commit comments

Comments
 (0)