Skip to content

Commit 603f445

Browse files
Bill ChambersAndrew Or
authored andcommitted
[SPARK-15264][SPARK-15274][SQL] CSV Reader Error on Blank Column Names
## What changes were proposed in this pull request? When a CSV begins with: - `,,` OR - `"","",` meaning that the first column names are either empty or blank strings and `header` is specified to be `true`, then the column name is replaced with `C` + the index number of that given column. For example, if you were to read in the CSV: ``` "","second column" "hello", "there" ``` Then column names would become `"C0", "second column"`. This behavior aligns with what currently happens when `header` is specified to be `false` in recent versions of Spark. ### Current Behavior in Spark <=1.6 In Spark <=1.6, a CSV with a blank column name becomes a blank string, `""`, meaning that this column cannot be accessed. However the CSV reads in without issue. ### Current Behavior in Spark 2.0 Spark throws a NullPointerError and will not read in the file. #### Reproduction in 2.0 https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/346304/2828750690305044/484361/latest.html ## How was this patch tested? A new test was added to `CSVSuite` to account for this issue. We then have asserts that test for being able to select both the empty column names as well as the regular column names. Author: Bill Chambers <[email protected]> Author: Bill Chambers <[email protected]> Closes #13041 from anabranch/master.
1 parent f14c4ba commit 603f445

4 files changed

Lines changed: 22 additions & 5 deletions

File tree

python/pyspark/sql/readwriter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
358358
359359
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
360360
>>> df.dtypes
361-
[('C0', 'string'), ('C1', 'string')]
361+
[('_c0', 'string'), ('_c1', 'string')]
362362
"""
363363
if schema is not None:
364364
self.schema(schema)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,11 @@ class DefaultSource extends FileFormat with DataSourceRegister {
6161
val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine)
6262

6363
val header = if (csvOptions.headerFlag) {
64-
firstRow
64+
firstRow.zipWithIndex.map { case (value, index) =>
65+
if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value
66+
}
6567
} else {
66-
firstRow.zipWithIndex.map { case (value, index) => s"C$index" }
68+
firstRow.zipWithIndex.map { case (value, index) => s"_c$index" }
6769
}
6870

6971
val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"",,make,customer,comment
2+
2012,"Tesla","S","bill","blank"
3+
2013,"Tesla","S","c","something"

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
3838
private val carsAltFile = "cars-alternative.csv"
3939
private val carsUnbalancedQuotesFile = "cars-unbalanced-quotes.csv"
4040
private val carsNullFile = "cars-null.csv"
41+
private val carsBlankColName = "cars-blank-column-name.csv"
4142
private val emptyFile = "empty.csv"
4243
private val commentsFile = "comments.csv"
4344
private val disableCommentsFile = "disable_comments.csv"
@@ -71,14 +72,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
7172
if (withHeader) {
7273
assert(df.schema.fieldNames === Array("year", "make", "model", "comment", "blank"))
7374
} else {
74-
assert(df.schema.fieldNames === Array("C0", "C1", "C2", "C3", "C4"))
75+
assert(df.schema.fieldNames === Array("_c0", "_c1", "_c2", "_c3", "_c4"))
7576
}
7677
}
7778

7879
if (checkValues) {
7980
val yearValues = List("2012", "1997", "2015")
8081
val actualYears = if (!withHeader) "year" :: yearValues else yearValues
81-
val years = if (withHeader) df.select("year").collect() else df.select("C0").collect()
82+
val years = if (withHeader) df.select("year").collect() else df.select("_c0").collect()
8283

8384
years.zipWithIndex.foreach { case (year, index) =>
8485
if (checkTypes) {
@@ -224,6 +225,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
224225
assert(cars.select("year").collect().size === 2)
225226
}
226227

228+
test("test for blank column names on read and select columns") {
229+
val cars = spark.read
230+
.format("csv")
231+
.options(Map("header" -> "true", "inferSchema" -> "true"))
232+
.load(testFile(carsBlankColName))
233+
234+
assert(cars.select("customer").collect().size == 2)
235+
assert(cars.select("_c0").collect().size == 2)
236+
assert(cars.select("_c1").collect().size == 2)
237+
}
238+
227239
test("test for FAILFAST parsing mode") {
228240
val exception = intercept[SparkException]{
229241
spark.read

0 commit comments

Comments
 (0)