Skip to content

Commit f7dabd8

Browse files
Ivan Sadikovcloud-fan
authored andcommitted
[SPARK-37326][SQL][FOLLOW-UP] Update code and tests for TimestampNTZ support in CSV data source
### What changes were proposed in this pull request? This is a follow-up PR to #34596. There were a few comments and suggestions raised after the PR was merged, so I addressed them in this follow-up: - Instead of using `failOnError`, which was confusing as no error was thrown in the method, we use `allowTimeZone` which has an opposite meaning of `failOnError` and far more descriptive. - I updated a few test names to resolve ambiguity. - I changed the tests to use `withTempPath` as was suggested in the original PR. ### Why are the changes needed? Code cleanup and clarifications. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit and integration tests. Closes #34777 from sadikovi/timestamp-ntz-csv-follow-up. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 0b42cd4 commit f7dabd8

6 files changed

Lines changed: 45 additions & 56 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
178178
// We can only parse the value as TimestampNTZType if it does not have zone-offset or
179179
// time-zone component and can be parsed with the timestamp formatter.
180180
// Otherwise, it is likely to be a timestamp with timezone.
181-
if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, true)).isDefined) {
181+
if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) {
182182
SQLConf.get.timestampType
183183
} else {
184184
tryParseTimestamp(field)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ class UnivocityParser(
204204

205205
case _: TimestampNTZType => (d: String) =>
206206
nullSafeDatum(d, name, nullable, options) { datum =>
207-
timestampNTZFormatter.parseWithoutTimeZone(datum, true)
207+
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
208208
}
209209

210210
case _: DateType => (d: String) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -445,19 +445,19 @@ object DateTimeUtils {
445445
* number of microseconds since the epoch. The result will be independent of time zones.
446446
*
447447
* If the input string contains a component associated with time zone, the method will return
448-
* `None` if `failOnError` is set to `true`. If `failOnError` is set to `false`, the method
448+
* `None` if `allowTimeZone` is set to `false`. If `allowTimeZone` is set to `true`, the method
449449
* will simply discard the time zone component. Enable the check to detect situations like parsing
450450
* a timestamp with time zone as TimestampNTZType.
451451
*
452452
* The return type is [[Option]] in order to distinguish between 0L and null. Please
453453
* refer to `parseTimestampString` for the allowed formats.
454454
*/
455-
def stringToTimestampWithoutTimeZone(s: UTF8String, failOnError: Boolean): Option[Long] = {
455+
def stringToTimestampWithoutTimeZone(s: UTF8String, allowTimeZone: Boolean): Option[Long] = {
456456
try {
457457
val (segments, zoneIdOpt, justTime) = parseTimestampString(s)
458458
// If the input string can't be parsed as a timestamp without time zone, or it contains only
459459
// the time part of a timestamp and we can't determine its date, return None.
460-
if (segments.isEmpty || justTime || failOnError && zoneIdOpt.isDefined) {
460+
if (segments.isEmpty || justTime || !allowTimeZone && zoneIdOpt.isDefined) {
461461
return None
462462
}
463463
val nanoseconds = MICROSECONDS.toNanos(segments(6))
@@ -473,16 +473,16 @@ object DateTimeUtils {
473473
/**
474474
* Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the
475475
* number of microseconds since the epoch. The result is independent of time zones. Zone id
476-
* component will be discarded and ignored.
476+
* component will be ignored.
477477
* The return type is [[Option]] in order to distinguish between 0L and null. Please
478478
* refer to `parseTimestampString` for the allowed formats.
479479
*/
480480
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
481-
stringToTimestampWithoutTimeZone(s, false)
481+
stringToTimestampWithoutTimeZone(s, true)
482482
}
483483

484484
def stringToTimestampWithoutTimeZoneAnsi(s: UTF8String): Long = {
485-
stringToTimestampWithoutTimeZone(s, false).getOrElse {
485+
stringToTimestampWithoutTimeZone(s, true).getOrElse {
486486
throw QueryExecutionErrors.cannotCastToDateTimeError(s, TimestampNTZType)
487487
}
488488
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ sealed trait TimestampFormatter extends Serializable {
5656
* Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time.
5757
*
5858
* @param s - string with timestamp to parse
59-
* @param failOnError - indicates strict parsing of timezone
59+
* @param allowTimeZone - indicates strict parsing of timezone
6060
* @return microseconds since epoch.
6161
* @throws ParseException can be thrown by legacy parser
6262
* @throws DateTimeParseException can be thrown by new parser
@@ -68,9 +68,9 @@ sealed trait TimestampFormatter extends Serializable {
6868
@throws(classOf[DateTimeParseException])
6969
@throws(classOf[DateTimeException])
7070
@throws(classOf[IllegalStateException])
71-
def parseWithoutTimeZone(s: String, failOnError: Boolean): Long =
71+
def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long =
7272
throw new IllegalStateException(
73-
s"The method `parseWithoutTimeZone(s: String, failOnError: Boolean)` should be " +
73+
s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)` should be " +
7474
"implemented in the formatter of timestamp without time zone")
7575

7676
/**
@@ -84,7 +84,7 @@ sealed trait TimestampFormatter extends Serializable {
8484
final def parseWithoutTimeZone(s: String): Long =
8585
// This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we
8686
// did not fail if timestamp contained zone-id or zone-offset component and instead ignored it.
87-
parseWithoutTimeZone(s, false)
87+
parseWithoutTimeZone(s, true)
8888

8989
def format(us: Long): String
9090
def format(ts: Timestamp): String
@@ -133,10 +133,10 @@ class Iso8601TimestampFormatter(
133133
} catch checkParsedDiff(s, legacyFormatter.parse)
134134
}
135135

136-
override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
136+
override def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long = {
137137
try {
138138
val parsed = formatter.parse(s)
139-
if (failOnError && parsed.query(TemporalQueries.zone()) != null) {
139+
if (!allowTimeZone && parsed.query(TemporalQueries.zone()) != null) {
140140
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType)
141141
}
142142
val localDate = toLocalDate(parsed)
@@ -204,10 +204,10 @@ class DefaultTimestampFormatter(
204204
} catch checkParsedDiff(s, legacyFormatter.parse)
205205
}
206206

207-
override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
207+
override def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long = {
208208
try {
209209
val utf8Value = UTF8String.fromString(s)
210-
DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, failOnError).getOrElse {
210+
DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, allowTimeZone).getOrElse {
211211
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(
212212
TimestampFormatter.defaultPattern(), s, TimestampNTZType)
213213
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,15 +357,15 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
357357
checkStringToTimestamp("2021-01-01T12:30:4294967297+4294967297:30", None)
358358
}
359359

360-
test("SPARK-37326: stringToTimestampWithoutTimeZone with failOnError") {
360+
test("SPARK-37326: stringToTimestampWithoutTimeZone with allowTimeZone") {
361361
assert(
362362
stringToTimestampWithoutTimeZone(
363-
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) ==
363+
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) ==
364364
Some(DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(2021, 11, 22, 10, 54, 27))))
365365

366366
assert(
367367
stringToTimestampWithoutTimeZone(
368-
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) ==
368+
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) ==
369369
None)
370370
}
371371

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,49 +1012,45 @@ abstract class CSVSuite
10121012
}
10131013
}
10141014

1015-
test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_NTZ values") {
1016-
withTempDir { dir =>
1017-
val path = s"${dir.getCanonicalPath}/csv"
1018-
1015+
test("SPARK-37326: Write and infer TIMESTAMP_NTZ values with a non-default pattern") {
1016+
withTempPath { path =>
10191017
val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
10201018
exp.write
10211019
.format("csv")
10221020
.option("header", "true")
10231021
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
1024-
.save(path)
1022+
.save(path.getAbsolutePath)
10251023

10261024
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
10271025
val res = spark.read
10281026
.format("csv")
10291027
.option("inferSchema", "true")
10301028
.option("header", "true")
10311029
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
1032-
.load(path)
1030+
.load(path.getAbsolutePath)
10331031

10341032
assert(res.dtypes === exp.dtypes)
10351033
checkAnswer(res, exp)
10361034
}
10371035
}
10381036
}
10391037

1040-
test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_LTZ values") {
1041-
withTempDir { dir =>
1042-
val path = s"${dir.getCanonicalPath}/csv"
1043-
1038+
test("SPARK-37326: Write and infer TIMESTAMP_LTZ values with a non-default pattern") {
1039+
withTempPath { path =>
10441040
val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col0")
10451041
exp.write
10461042
.format("csv")
10471043
.option("header", "true")
10481044
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
1049-
.save(path)
1045+
.save(path.getAbsolutePath)
10501046

10511047
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
10521048
val res = spark.read
10531049
.format("csv")
10541050
.option("inferSchema", "true")
10551051
.option("header", "true")
10561052
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
1057-
.load(path)
1053+
.load(path.getAbsolutePath)
10581054

10591055
assert(res.dtypes === exp.dtypes)
10601056
checkAnswer(res, exp)
@@ -1063,37 +1059,33 @@ abstract class CSVSuite
10631059
}
10641060

10651061
test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") {
1066-
withTempDir { dir =>
1067-
val path = s"${dir.getCanonicalPath}/csv"
1068-
1062+
withTempPath { path =>
10691063
val exp = spark.sql("""
10701064
select
10711065
timestamp_ntz'2020-12-12 12:12:12' as col1,
10721066
timestamp_ltz'2020-12-12 12:12:12' as col2
10731067
""")
10741068

1075-
exp.write.format("csv").option("header", "true").save(path)
1069+
exp.write.format("csv").option("header", "true").save(path.getAbsolutePath)
10761070

10771071
val res = spark.read
10781072
.format("csv")
10791073
.schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ")
10801074
.option("header", "true")
1081-
.load(path)
1075+
.load(path.getAbsolutePath)
10821076

10831077
checkAnswer(res, exp)
10841078
}
10851079
}
10861080

10871081
test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") {
1088-
withTempDir { dir =>
1089-
val path = s"${dir.getCanonicalPath}/csv"
1090-
1082+
withTempPath { path =>
10911083
val exp = spark.sql("""
10921084
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
10931085
select timestamp_ntz'2020-12-12 12:12:12' as col0
10941086
""")
10951087

1096-
exp.write.format("csv").option("header", "true").save(path)
1088+
exp.write.format("csv").option("header", "true").save(path.getAbsolutePath)
10971089

10981090
val timestampTypes = Seq(
10991091
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
@@ -1105,7 +1097,7 @@ abstract class CSVSuite
11051097
.format("csv")
11061098
.option("inferSchema", "true")
11071099
.option("header", "true")
1108-
.load(path)
1100+
.load(path.getAbsolutePath)
11091101

11101102
if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
11111103
checkAnswer(res, exp)
@@ -1124,9 +1116,7 @@ abstract class CSVSuite
11241116
}
11251117

11261118
test("SPARK-37326: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") {
1127-
withTempDir { dir =>
1128-
val path = s"${dir.getCanonicalPath}/csv"
1129-
1119+
withTempPath { path =>
11301120
Seq(
11311121
"col0",
11321122
"2020-12-12T12:12:12.000",
@@ -1135,19 +1125,19 @@ abstract class CSVSuite
11351125
"2020-12-12T12:12:12.000"
11361126
).toDF("data")
11371127
.coalesce(1)
1138-
.write.text(path)
1128+
.write.text(path.getAbsolutePath)
11391129

11401130
for (policy <- Seq("exception", "corrected", "legacy")) {
11411131
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) {
11421132
val res = spark.read.format("csv")
11431133
.option("inferSchema", "true")
11441134
.option("header", "true")
1145-
.load(path)
1135+
.load(path.getAbsolutePath)
11461136

11471137
if (policy == "legacy") {
11481138
// Timestamps without timezone are parsed as strings, so the col0 type would be
11491139
// StringType which is similar to reading without schema inference.
1150-
val exp = spark.read.format("csv").option("header", "true").load(path)
1140+
val exp = spark.read.format("csv").option("header", "true").load(path.getAbsolutePath)
11511141
checkAnswer(res, exp)
11521142
} else {
11531143
val exp = spark.sql("""
@@ -1164,23 +1154,23 @@ abstract class CSVSuite
11641154
}
11651155

11661156
test("SPARK-37326: Malformed records when reading TIMESTAMP_LTZ as TIMESTAMP_NTZ") {
1167-
withTempDir { dir =>
1168-
val path = s"${dir.getCanonicalPath}/csv"
1169-
1157+
withTempPath { path =>
11701158
Seq(
11711159
"2020-12-12T12:12:12.000",
11721160
"2020-12-12T17:12:12.000Z",
11731161
"2020-12-12T17:12:12.000+05:00",
11741162
"2020-12-12T12:12:12.000"
11751163
).toDF("data")
11761164
.coalesce(1)
1177-
.write.text(path)
1165+
.write.text(path.getAbsolutePath)
11781166

11791167
for (timestampNTZFormat <- Seq(None, Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) {
11801168
val reader = spark.read.format("csv").schema("col0 TIMESTAMP_NTZ")
11811169
val res = timestampNTZFormat match {
1182-
case Some(format) => reader.option("timestampNTZFormat", format).load(path)
1183-
case None => reader.load(path)
1170+
case Some(format) =>
1171+
reader.option("timestampNTZFormat", format).load(path.getAbsolutePath)
1172+
case None =>
1173+
reader.load(path.getAbsolutePath)
11841174
}
11851175

11861176
checkAnswer(
@@ -1204,10 +1194,9 @@ abstract class CSVSuite
12041194

12051195
val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
12061196
for (pattern <- patterns) {
1207-
withTempDir { dir =>
1208-
val path = s"${dir.getCanonicalPath}/csv"
1197+
withTempPath { path =>
12091198
val err = intercept[SparkException] {
1210-
exp.write.format("csv").option("timestampNTZFormat", pattern).save(path)
1199+
exp.write.format("csv").option("timestampNTZFormat", pattern).save(path.getAbsolutePath)
12111200
}
12121201
assert(
12131202
err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") ||

0 commit comments

Comments
 (0)