Skip to content

Commit c491934

Browse files
MaxGekkdongjoon-hyun
authored andcommitted
[SPARK-26007][SQL] DataFrameReader.csv() respects to spark.sql.columnNameOfCorruptRecord
## What changes were proposed in this pull request? Passing current value of SQL config `spark.sql.columnNameOfCorruptRecord` to `CSVOptions` inside of `DataFrameReader`.`csv()`. ## How was this patch tested? Added a test where default value of `spark.sql.columnNameOfCorruptRecord` is changed. Closes #23006 from MaxGekk/csv-corrupt-sql-config. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Co-authored-by: Maxim Gekk <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent 88c8262 commit c491934

File tree

2 files changed

+24
-1
lines changed
  • sql
    • catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv
    • core/src/test/scala/org/apache/spark/sql/execution/datasources/csv

2 files changed

+24
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.time.FastDateFormat
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.catalyst.util._
28+
import org.apache.spark.sql.internal.SQLConf
2829

2930
class CSVOptions(
3031
@transient val parameters: CaseInsensitiveMap[String],
@@ -33,11 +34,22 @@ class CSVOptions(
3334
defaultColumnNameOfCorruptRecord: String)
3435
extends Logging with Serializable {
3536

37+
def this(
38+
parameters: Map[String, String],
39+
columnPruning: Boolean,
40+
defaultTimeZoneId: String) = {
41+
this(
42+
CaseInsensitiveMap(parameters),
43+
columnPruning,
44+
defaultTimeZoneId,
45+
SQLConf.get.columnNameOfCorruptRecord)
46+
}
47+
3648
def this(
3749
parameters: Map[String, String],
3850
columnPruning: Boolean,
3951
defaultTimeZoneId: String,
40-
defaultColumnNameOfCorruptRecord: String = "") = {
52+
defaultColumnNameOfCorruptRecord: String) = {
4153
this(
4254
CaseInsensitiveMap(parameters),
4355
columnPruning,

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,4 +1848,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
18481848
val schema = new StructType().add("a", StringType).add("b", IntegerType)
18491849
checkAnswer(spark.read.schema(schema).option("delimiter", delimiter).csv(input), Row("abc", 1))
18501850
}
1851+
1852+
test("using spark.sql.columnNameOfCorruptRecord") {
1853+
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
1854+
val csv = "\""
1855+
val df = spark.read
1856+
.schema("a int, _unparsed string")
1857+
.csv(Seq(csv).toDS())
1858+
1859+
checkAnswer(df, Row(null, csv))
1860+
}
1861+
}
18511862
}

0 commit comments

Comments
 (0)