Skip to content

Commit 48e143d

Browse files
mmolimarMaxGekk
authored andcommitted
Adding tests
1 parent 465ed7a commit 48e143d

4 files changed

Lines changed: 66 additions & 4 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,10 @@ abstract class CSVDataSource extends Serializable {
9191
}
9292

9393
row.zipWithIndex.map { case (value, index) =>
94-
if (value == null || value.isEmpty || value == options.nullValue) {
95-
// When there are empty strings or the values set in `nullValue`, put the
96-
// index as the suffix.
94+
if (value == null || value.isEmpty || value == options.nullValue ||
95+
value == options.emptyValueInRead) {
96+
// When there are empty strings or the values set in `nullValue` or in `emptyValue`,
97+
// put the index as the suffix.
9798
s"_c$index"
9899
} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) {
99100
// When there are case-insensitive duplicates, put the index as the suffix.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ private[csv] object CSVInferSchema {
7979
* point checking if it is an Int, as the final type must be Double or higher.
8080
*/
8181
def inferField(typeSoFar: DataType, field: String, options: CSVOptions): DataType = {
82-
if (field == null || field.isEmpty || field == options.nullValue) {
82+
if (field == null || field.isEmpty || field == options.nullValue ||
83+
field == options.emptyValueInRead) {
8384
typeSoFar
8485
} else {
8586
typeSoFar match {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,20 @@ class CSVInferSchemaSuite extends SparkFunSuite {
105105
assert(CSVInferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1))
106106
}
107107

108+
test("Empty fields are handled properly when an emptyValue is specified") {
109+
var options = new CSVOptions(Map("emptyValue" -> "empty"), false, "GMT")
110+
assert(CSVInferSchema.inferField(NullType, "empty", options) == NullType)
111+
assert(CSVInferSchema.inferField(StringType, "empty", options) == StringType)
112+
assert(CSVInferSchema.inferField(LongType, "empty", options) == LongType)
113+
114+
options = new CSVOptions(Map("emptyValue" -> "\\N"), false, "GMT")
115+
assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType)
116+
assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType)
117+
assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType)
118+
assert(CSVInferSchema.inferField(BooleanType, "\\N", options) == BooleanType)
119+
assert(CSVInferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1))
120+
}
121+
108122
test("Merging Nulltypes should yield Nulltype.") {
109123
val mergedNullTypes = CSVInferSchema.mergeRowTypes(Array(NullType), Array(NullType))
110124
assert(mergedNullTypes.deep == Array(NullType).deep)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,52 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
14401440
}
14411441
}
14421442

1443+
test("SPARK-25241: An empty string should not be coerced to null when emptyValue is passed.") {
1444+
val litNull: String = null
1445+
val df = Seq(
1446+
(1, "John Doe"),
1447+
(2, ""),
1448+
(3, "-"),
1449+
(4, litNull)
1450+
).toDF("id", "name")
1451+
1452+
// Checks for new behavior where a null is not coerced to an empty string when `emptyValue` is
1453+
// set to anything but an empty string literal.
1454+
withTempPath { path =>
1455+
df.write
1456+
.option("emptyValue", "-")
1457+
.csv(path.getAbsolutePath)
1458+
val computed = spark.read
1459+
.option("emptyValue", "-")
1460+
.schema(df.schema)
1461+
.csv(path.getAbsolutePath)
1462+
val expected = Seq(
1463+
(1, "John Doe"),
1464+
(2, "-"),
1465+
(3, "-"),
1466+
(4, "-")
1467+
).toDF("id", "name")
1468+
1469+
checkAnswer(computed, expected)
1470+
}
1471+
// Keeps the old behavior where empty string us coerced to emptyValue is not passed.
1472+
withTempPath { path =>
1473+
df.write
1474+
.csv(path.getAbsolutePath)
1475+
val computed = spark.read
1476+
.schema(df.schema)
1477+
.csv(path.getAbsolutePath)
1478+
val expected = Seq(
1479+
(1, "John Doe"),
1480+
(2, litNull),
1481+
(3, "-"),
1482+
(4, litNull)
1483+
).toDF("id", "name")
1484+
1485+
checkAnswer(computed, expected)
1486+
}
1487+
}
1488+
14431489
test("SPARK-24329: skip lines with comments, and one or multiple whitespaces") {
14441490
val schema = new StructType().add("colA", StringType)
14451491
val ds = spark

0 commit comments

Comments
 (0)