Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ private[csv] object CSVInferSchema {
// DecimalTypes have different precisions and scales, so we try to find the common type.
findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case DateType => tryParseDate(field, options)
case TimestampType =>
findTightestCommonType(typeSoFar, tryParseTimestamp(field, options)).getOrElse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind elaborating why we should find the wider type here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, your question is not really clear for me.
We have to try parse object as DateType first, because date always can be parsed as date and as timestamp (begin of day).
Current implementation of spark ignores dates and it is always parsing them as timestamps

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, it wasn't clear why we need findTightestCommonType. I thought case TimestampType => tryParseTimestamp(field, options) will work.

tryParseBoolean(field, options))
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
Expand Down Expand Up @@ -140,14 +143,23 @@ private[csv] object CSVInferSchema {
private def tryParseDouble(field: String, options: CSVOptions): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) {
DoubleType
} else {
tryParseDate(field, options)
}
}

private def tryParseDate(field: String, options: CSVOptions): DataType = {
// This case infers a custom `dateFormat` is set.
if ((allCatch opt options.dateFormatter.parse(field)).isDefined) {
DateType
} else {
tryParseTimestamp(field, options)
}
}

private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
// This case infers a custom `timestampFormat` is set.
if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) {
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we replace it to timestampFormatter in CSV parsing logic too and document it in the migration guide? (e.g., date format is now inferred correctly and also things you mentioned in #20140 (comment))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, adding a configuration to control this behaviour looks preferred in this case.

TimestampType
} else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
// We keep this for backwards compatibility.
Expand Down Expand Up @@ -216,6 +228,8 @@ private[csv] object CSVInferSchema {
} else {
Some(DecimalType(range + scale, scale))
}
// By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes).
case (t1: DateType, t2: TimestampType) => Some(TimestampType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do the opposite case too

case (t1: TimestampType, t2: DateType) => Some(TimestampType)


case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv

import java.nio.charset.StandardCharsets
import java.time.format.{DateTimeFormatter, ResolverStyle}
import java.util.{Locale, TimeZone}

import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
Expand Down Expand Up @@ -150,6 +151,16 @@ class CSVOptions(

val isCommentSet = this.comment != '\u0000'

def dateFormatter: DateTimeFormatter = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it def?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DateTimeFormatter has the disadvantage. It does not implement Serializable in contrast to FastDateFormat. That is why I couldn't make it as a val here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could do this via lazy val

DateTimeFormatter.ofPattern(dateFormat.getPattern)
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
}

def timestampFormatter: DateTimeFormatter = {
DateTimeFormatter.ofPattern(timestampFormat.getPattern)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind if I ask to elaborate DateTimeFormatter vs FastDateFormat?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DateTimeFormatter is a standard time library from java 8. FastDateFormat can't properly parse date and timestamp.

I can create some test cases to prove it, but I need many time for that.

Also, FastDateFormat does not meet the ISO8601: https://en.wikipedia.org/wiki/ISO_8601
Current implementation of CSVInferSchema contains other bugs. For example, test test("Timestamp field types are inferred correctly via custom date format") in class CSVInferSchemaSuite must not pass, because timestampFormat "yyyy-mm" is wrong format for year and month. It should be "yyyy-MM".
It is better to make refactor of date types and change deprecated types on new ones for the whole project.

.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
}

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
timestamp,date
26/08/2015 22:31:46.913,27/09/2015
27/10/2014 22:33:31.601,26/12/2016
28/01/2016 22:33:52.888,28/01/2017
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne)
}

test("Timestamp field types are inferred correctly via custom data format") {
test("Timestamp field types are inferred correctly via custom date format") {
var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}

test("Date field types are inferred correctly via custom date and timestamp format") {
val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy",
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT")
assert(CSVInferSchema.inferField(TimestampType,
"28/01/2017 22:31:46.913", options) == TimestampType)
assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType)
}

test("Timestamp field types are inferred correctly from other types") {
val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
private val simpleSparseFile = "test-data/simple_sparse.csv"
private val numbersFile = "test-data/numbers.csv"
private val datesFile = "test-data/dates.csv"
private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"

Expand Down Expand Up @@ -566,6 +567,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(results.toSeq.map(_.toSeq) === expected)
}

test("inferring timestamp types and date types via custom formats") {
val options = Map(
"header" -> "true",
"inferSchema" -> "true",
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS",
"dateFormat" -> "dd/MM/yyyy")
val results = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
assert(results.schema{0}.dataType===TimestampType)
assert(results.schema{1}.dataType===DateType)
val timestamps = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
.select("timestamp")
.collect()
val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US)
val timestampExpected =
Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)),
Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)),
Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime)))
assert(timestamps.toSeq.map(_.toSeq) === timestampExpected)
val dates = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
.select("date")
.collect()
val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US)
val dateExpected =
Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)),
Seq(new Date(dateFormat.parse("26/12/2016").getTime)),
Seq(new Date(dateFormat.parse("28/01/2017").getTime)))
assert(dates.toSeq.map(_.toSeq) === dateExpected)
}

test("load date types via custom date format") {
val customSchema = new StructType(Array(StructField("date", DateType, true)))
val options = Map(
Expand Down