Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,11 @@ def text(self, paths, wholetext=False, lineSep=None):
@since(2.0)
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None):
ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be put at the last; otherwise, it's going to break existing Python app when the arguments are given positionally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add new parameter at the end. +1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None,
maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None,
mode=None, columnNameOfCorruptRecord=None, multiLine=None,
charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -395,6 +395,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string. Since 2.0.1, this ``nullValue`` param
applies to all supported types including the string type.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, empty string.
:param nanValue: sets the string representation of a non-number value. If None is set, it
uses the default value, ``NaN``.
:param positiveInf: sets the string representation of a positive infinity value. If None
Expand Down Expand Up @@ -457,9 +459,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this at the end as well for readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat,
maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
Expand Down Expand Up @@ -857,9 +859,9 @@ def text(self, path, compression=None, lineSep=None):

@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None, encoding=None):
header=None, nullValue=None, emptyValue=None, escapeQuotes=None, quoteAll=None,
dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand Down Expand Up @@ -891,6 +893,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, ``""``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
Expand All @@ -916,8 +920,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
dateFormat=dateFormat, timestampFormat=timestampFormat,
nullValue=nullValue, emptyValue=emptyValue, escapeQuotes=escapeQuotes,
quoteAll=quoteAll, dateFormat=dateFormat, timestampFormat=timestampFormat,
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
Expand Down
14 changes: 8 additions & 6 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,9 @@ def text(self, path, wholetext=False, lineSep=None):
@since(2.0)
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None,
positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None,
maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
Expand Down Expand Up @@ -611,6 +611,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string. Since 2.0.1, this ``nullValue`` param
applies to all supported types including the string type.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, empty string.
:param nanValue: sets the string representation of a non-number value. If None is set, it
uses the default value, ``NaN``.
:param positiveInf: sets the string representation of a positive infinity value. If None
Expand Down Expand Up @@ -669,9 +671,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf,
negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat,
maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
* 2.0.1, this applies to all supported types including the string type.</li>
* <li>`emptyValue` (default empty string): sets the string representation of an empty value.</li>
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
* value.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* enclosed in quotes. Default is to only escape values containing a quote character.</li>
* <li>`header` (default `false`): writes the names of columns as the first line.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
* <li>`emptyValue` (default `""`): sets the string representation of an empty value.</li>
* <li>`encoding` (by default it is not set): specifies encoding (charset) of saved csv
* files. If it is not set, the UTF-8 charset will be used.</li>
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class CSVOptions(

val nullValue = parameters.getOrElse("nullValue", "")

val emptyValueInRead = parameters.getOrElse("emptyValue", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just call it emptyValue for consistency with other options here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though that as well. Just for the shake of providing backwards compatibility as we already have in ignoreLeadingWhiteSpaceInRead and ignoreLeadingWhiteSpaceFlagInWrite I implemented that in that way.
What do you say?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to name them differently names because the default values are different. Ah, yea then it makes sense here. I rushed to read.

val emptyValueInWrite = parameters.getOrElse("emptyValue", "\"\"")

val nanValue = parameters.getOrElse("nanValue", "NaN")

val positiveInf = parameters.getOrElse("positiveInf", "Inf")
Expand Down Expand Up @@ -173,7 +176,7 @@ class CSVOptions(
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
writerSettings.setNullValue(nullValue)
writerSettings.setEmptyValue("\"\"")
writerSettings.setEmptyValue(emptyValueInWrite)
writerSettings.setSkipEmptyLines(true)
writerSettings.setQuoteAllFields(quoteAll)
writerSettings.setQuoteEscapingEnabled(escapeQuotes)
Expand All @@ -194,7 +197,7 @@ class CSVOptions(
settings.setInputBufferSize(inputBufferSize)
settings.setMaxColumns(maxColumns)
settings.setNullValue(nullValue)
settings.setEmptyValue("")
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
* 2.0.1, this applies to all supported types including the string type.</li>
* <li>`emptyValue` (default empty string): sets the string representation of an empty value.</li>
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
* value.</li>
Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/test/resources/test-data/cars-empty-value.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
year,make,model,comment,blank
"2012","Tesla","S","",""
1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt,,""
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
private val carsAltFile = "test-data/cars-alternative.csv"
private val carsUnbalancedQuotesFile = "test-data/cars-unbalanced-quotes.csv"
private val carsNullFile = "test-data/cars-null.csv"
private val carsEmptyValueFile = "test-data/cars-empty-value.csv"
private val carsBlankColName = "test-data/cars-blank-column-name.csv"
private val emptyFile = "test-data/empty.csv"
private val commentsFile = "test-data/comments.csv"
Expand Down Expand Up @@ -668,6 +669,70 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
}

test("empty fields with user defined empty values") {

// year,make,model,comment,blank
val dataSchema = StructType(List(
StructField("year", IntegerType, nullable = true),
StructField("make", StringType, nullable = false),
StructField("model", StringType, nullable = false),
StructField("comment", StringType, nullable = true),
StructField("blank", StringType, nullable = true)))
val cars = spark.read
.format("csv")
.schema(dataSchema)
.option("header", "true")
.option("emptyValue", "empty")
.load(testFile(carsEmptyValueFile))

verifyCars(cars, withHeader = true, checkValues = false)
val results = cars.collect()
assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
assert(results(1).toSeq ===
Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
}

test("save csv with empty fields with user defined empty values") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath

// year,make,model,comment,blank
val dataSchema = StructType(List(
StructField("year", IntegerType, nullable = true),
StructField("make", StringType, nullable = false),
StructField("model", StringType, nullable = false),
StructField("comment", StringType, nullable = true),
StructField("blank", StringType, nullable = true)))
val cars = spark.read
.format("csv")
.schema(dataSchema)
.option("header", "true")
.option("nullValue", "NULL")
.load(testFile(carsEmptyValueFile))

cars.coalesce(1).write
.format("csv")
.option("header", "true")
.option("emptyValue", "empty")
.option("nullValue", null)
.save(csvDir)

val carsCopy = spark.read
.format("csv")
.schema(dataSchema)
.option("header", "true")
.load(csvDir)

verifyCars(carsCopy, withHeader = true, checkValues = false)
val results = carsCopy.collect()
assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
assert(results(1).toSeq ===
Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
}
}

test("save csv with compression codec option") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath
Expand Down