Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,17 @@ class CsvOutputWriter(
context: TaskAttemptContext,
params: CSVOptions) extends OutputWriter with Logging {

private var univocityGenerator: Option[UnivocityGenerator] = None
private val charset = Charset.forName(params.charset)

private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)

private val gen = new UnivocityGenerator(dataSchema, writer, params)

if (params.headerFlag) {
val gen = getGen()
gen.writeHeaders()
}

private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse {
val charset = Charset.forName(params.charset)
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
val newGen = new UnivocityGenerator(dataSchema, os, params)
univocityGenerator = Some(newGen)
newGen
}

override def write(row: InternalRow): Unit = {
val gen = getGen()
gen.write(row)
}
override def write(row: InternalRow): Unit = gen.write(row)

override def close(): Unit = univocityGenerator.foreach(_.close())
override def close(): Unit = gen.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,18 @@ class JsonOutputWriter(
" which can be read back by Spark only if multiLine is enabled.")
}

private var jacksonGenerator: Option[JacksonGenerator] = None
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)

override def write(row: InternalRow): Unit = {
val gen = jacksonGenerator.getOrElse {
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
// create the Generator without separator inserted between 2 records
val newGen = new JacksonGenerator(dataSchema, os, options)
jacksonGenerator = Some(newGen)
newGen
}
// create the Generator without separator inserted between 2 records
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)

override def write(row: InternalRow): Unit = {
gen.write(row)
gen.writeLineEnding()
}

override def close(): Unit = jacksonGenerator.foreach(_.close())
override def close(): Unit = {
gen.close()
writer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.text

import java.io.OutputStream

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand All @@ -32,23 +30,17 @@ class TextOutputWriter(
context: TaskAttemptContext)
extends OutputWriter {

private var outputStream: Option[OutputStream] = None
private val writer = CodecStreams.createOutputStream(context, new Path(path))

override def write(row: InternalRow): Unit = {
val os = outputStream.getOrElse {
val newStream = CodecStreams.createOutputStream(context, new Path(path))
outputStream = Some(newStream)
newStream
}

if (!row.isNullAt(0)) {
val utf8string = row.getUTF8String(0)
utf8string.writeTo(os)
utf8string.writeTo(writer)
}
os.write(lineSeparator)
writer.write(lineSeparator)
}

override def close(): Unit = {
outputStream.foreach(_.close())
writer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.UUID

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -89,7 +89,9 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
try {
val fs = path.getFileSystem(jobContext.getConfiguration)
// this is to make sure the file can be seen from driver as well
deleteIfExists(fs, path)
if (fs.exists(path)) {
fs.delete(path, false)
}
} catch {
case e: IOException =>
logWarning(s"Fail to remove temporary file $path, continue removing next.", e)
Expand Down Expand Up @@ -137,14 +139,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
if (addedFiles.nonEmpty) {
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
val statuses: Seq[SinkFileStatus] =
addedFiles.flatMap { f =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this if-else looked an overhead and it was added to avoid files not being written?

FWIW, there's still an old case when files are not written:

private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
new OrcOutputFormat().getRecordWriter(
new Path(path).getFileSystem(context.getConfiguration),
context.getConfiguration.asInstanceOf[JobConf],
path,
Reporter.NULL
).asInstanceOf[RecordWriter[NullWritable, Writable]]
}
override def write(row: InternalRow): Unit = {
recordWriter.write(NullWritable.get(), serializer.serialize(row))
}

spark.conf.set("spark.sql.orc.impl", "hive")
spark.range(10).filter(_ => false).write.orc("test.orc")

But I suspect it's a-okay since this behaviour will be superseded by "native" implementation completely in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. That would also break streaming sink if there's empty partition then. Ideally it should be fixed for streaming query, but I feel its scope is beyond the PR.

val path = new Path(f)
if (fs.exists(path)) {
Some(SinkFileStatus(fs.getFileStatus(path)))
} else {
None
}
}
addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f))))
new TaskCommitMessage(statuses)
} else {
new TaskCommitMessage(Seq.empty[SinkFileStatus])
Expand All @@ -155,13 +150,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
// best effort cleanup of incomplete files
if (addedFiles.nonEmpty) {
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
addedFiles.foreach { file => deleteIfExists(fs, new Path(file)) }
}
}

private def deleteIfExists(fs: FileSystem, path: Path, recursive: Boolean = false): Unit = {
if (fs.exists(path)) {
fs.delete(path, recursive)
addedFiles.foreach { file => fs.delete(new Path(file), false) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2073,15 +2073,6 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
}
}

test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.csv(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("csv")))
}
}

test("Do not reuse last good value for bad input field") {
val schema = StructType(
StructField("col1", StringType) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2471,15 +2471,6 @@ class JsonSuite extends QueryTest with SharedSparkSession with TestJsonData {
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
}

test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.json(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("json")))
}
}

test("return partial result for bad records") {
val schema = "a double, b array<int>, c string, _corrupt_record string"
val badRecords = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,4 @@ class TextSuite extends QueryTest with SharedSparkSession {
assert(data(3) == Row("\"doh\""))
assert(data.length == 4)
}

test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.text(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("txt")))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,54 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}
}

test("Handle FileStreamSink metadata correctly for empty partition") {
Seq("parquet", "orc", "text", "json").foreach { format =>
val inputData = MemoryStream[String]
val df = inputData.toDF()

withTempDir { outputDir =>
withTempDir { checkpointDir =>
var query: StreamingQuery = null
try {
// repartition to more than the input to leave empty partitions
query =
df.repartition(10)
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format(format)
.start(outputDir.getCanonicalPath)

inputData.addData("1", "2", "3")
inputData.addData("4", "5")

failAfter(streamingTimeout) {
query.processAllAvailable()
}
} finally {
if (query != null) {
query.stop()
}
}

val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
spark.sessionState.newHadoopConf())
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
outputDir.getCanonicalPath)

val allFiles = sinkLog.allFiles()
// only files from non-empty partition should be logged
assert(allFiles.length < 10)
assert(allFiles.forall(file => fs.exists(new Path(file.path))))

// the query should be able to read all rows correctly with metadata log
val outputDf = spark.read.format(format).load(outputDir.getCanonicalPath)
.selectExpr("CAST(value AS INT)").as[Int]
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5)
}
}
}
}
}

object PendingCommitFilesTrackingManifestFileCommitProtocol {
Expand Down Expand Up @@ -600,61 +648,11 @@ class FileStreamSinkV1Suite extends FileStreamSinkSuite {
}

class FileStreamSinkV2Suite extends FileStreamSinkSuite {
import testImplicits._

override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")

test("SPARK-29999 Handle FileStreamSink metadata correctly for empty partition") {
Seq("parquet", "orc", "text", "json").foreach { format =>
val inputData = MemoryStream[String]
val df = inputData.toDF()

withTempDir { outputDir =>
withTempDir { checkpointDir =>
var query: StreamingQuery = null
try {
// repartition to more than the input to leave empty partitions
query =
df.repartition(10)
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format(format)
.start(outputDir.getCanonicalPath)

inputData.addData("1", "2", "3")
inputData.addData("4", "5")

failAfter(streamingTimeout) {
query.processAllAvailable()
}
} finally {
if (query != null) {
query.stop()
}
}

val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
spark.sessionState.newHadoopConf())
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
outputDir.getCanonicalPath)

val allFiles = sinkLog.allFiles()
// only files from non-empty partition should be logged
assert(allFiles.length < 10)
assert(allFiles.forall(file => fs.exists(new Path(file.path))))

// the query should be able to read all rows correctly with metadata log
val outputDf = spark.read.format(format).load(outputDir.getCanonicalPath)
.selectExpr("CAST(value AS INT)").as[Int]
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5)
}
}
}
}

override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
Expand Down