From 7e73e59dc33bfa508c306d4ad464f78854e9144b Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 15 Nov 2021 16:38:10 +1300 Subject: [PATCH 01/14] add timestamp ntz read in csv --- .../sql/catalyst/csv/CSVInferSchema.scala | 22 +++++ .../catalyst/util/TimestampFormatter.scala | 28 ++++++ .../execution/datasources/csv/CSVSuite.scala | 91 +++++++++++++++++++ 3 files changed, 141 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 696d25f8ed484..97d55ff5fa8e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @@ -38,6 +39,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { legacyFormat = FAST_DATE_FORMAT, isParsing = true) + private val timestampNTZFormatter = TimestampFormatter( + options.timestampFormatInRead, + options.zoneId, + legacyFormat = FAST_DATE_FORMAT, + isParsing = true, + forTimestampNTZ = true) + private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility s: String => new java.math.BigDecimal(s) @@ -109,6 +117,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case LongType => tryParseLong(field) case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) + case TimestampNTZType => tryParseTimestampNTZ(field) case TimestampType => tryParseTimestamp(field) case BooleanType => tryParseBoolean(field) case StringType => StringType @@ -160,6 +169,15 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseDouble(field: String): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType + } else { + tryParseTimestampNTZ(field) + } + } + + private def tryParseTimestampNTZ(field: String): DataType = { + if ((allCatch opt !timestampNTZFormatter.isTimeZoneSet(field)).getOrElse(false) && + (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field)).isDefined) { + SQLConf.get.timestampType } else { tryParseTimestamp(field) } @@ -225,6 +243,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } else { Some(DecimalType(range + scale, scale)) } + + case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) => + Some(TimestampType) + case _ => None } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 8a9104ae9eef9..f0bf45ddd06a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -71,6 +71,19 @@ sealed trait TimestampFormatter extends Serializable { s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " + "of timestamp without time zone") + /** + * Returns true if the parsed timestamp contains the time zone component, false otherwise. + * Used to determine if the timestamp can be inferred as timestamp without time zone. + * + * @param s - string with timestamp to inspect + * @return whether the timestamp string has the time zone component defined. + */ + @throws(classOf[IllegalStateException]) + def isTimeZoneSet(s: String): Boolean = + throw new IllegalStateException( + s"The method `isTimeZoneSet(s: String)` should be implemented in the formatter " + + "of timestamp without time zone") + def format(us: Long): String def format(ts: Timestamp): String def format(instant: Instant): String @@ -127,6 +140,14 @@ class Iso8601TimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } + override def isTimeZoneSet(s: String): Boolean = { + try { + val parsed = formatter.parse(s) + val parsedZoneId = parsed.query(TemporalQueries.zone()) + parsedZoneId != null + } catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet) + } + override def format(instant: Instant): String = { try { formatter.withZone(zoneId).format(instant) @@ -191,6 +212,13 @@ class DefaultTimestampFormatter( DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s)) } catch checkParsedDiff(s, legacyFormatter.parse) } + + override def isTimeZoneSet(s: String): Boolean = { + try { + val (_, zoneIdOpt, _) = parseTimestampString(UTF8String.fromString(s)) + zoneIdOpt.isDefined + } catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 466c1bea9d37b..ce4e1f5661785 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1012,6 +1012,97 @@ abstract class CSVSuite } } + test("Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + val exp = spark.sql(""" + select + timestamp_ntz'2020-12-12 12:12:12' as col1, + timestamp_ltz'2020-12-12 12:12:12' as col2 + """) + + exp.write.format("csv").option("header", "true").save(path) + + val res = spark.read + .format("csv") + .schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ") + .option("header", "true") + .load(path) + + checkAnswer(res, exp) + } + } + + test("Timestamp type inference for a column with TIMESTAMP_NTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + val exp = spark.sql(""" + select + timestamp_ntz'2020-12-12 12:12:12' as col1, + timestamp_ltz'2020-12-12 12:12:12' as col2 + """) + + exp.write.format("csv").option("header", "true").save(path) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("header", "true") + .load(path) + + checkAnswer(res, exp) + } + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("header", "true") + .load(path) + + val exp = spark.sql(""" + select + timestamp_ltz'2020-12-12 12:12:12' as col1, + timestamp_ltz'2020-12-12 12:12:12' as col2 + """) + + checkAnswer(res, exp) + } + } + } + + test("Timestamp type inference for a column with both TIMESTAMP_NTZ and TIMESTAMP_LTZ") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + Seq( + "2020-12-12T12:12:12.000", + "2020-12-12T17:12:12.000Z", + "2020-12-12T17:12:12.000+05:00", + "2020-12-12T12:12:12.000" + ).toDF("col0") + .coalesce(1) + .write.text(path) + + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .load(path) + + val exp = spark.sql(""" + select timestamp'2020-12-12T12:12:12.000' as col0 union all + select timestamp'2020-12-12T17:12:12.000Z' as col0 union all + select timestamp'2020-12-12T17:12:12.000+05:00' as col0 union all + select timestamp'2020-12-12T12:12:12.000' as col0 + """) + + checkAnswer(res, exp) + } + } + test("Write dates correctly with dateFormat option") { val customSchema = new StructType(Array(StructField("date", DateType, true))) withTempDir { dir => From 6bfc1b1d1c542e2e4251e4ef1b8a5e6e07841651 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 15 Nov 2021 16:47:03 +1300 Subject: [PATCH 02/14] update tests --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index ce4e1f5661785..816c545e20b62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1012,7 +1012,7 @@ abstract class CSVSuite } } - test("Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { + test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { withTempDir { dir => val path = s"${dir.getCanonicalPath}/csv" @@ -1034,7 +1034,7 @@ abstract class CSVSuite } } - test("Timestamp type inference for a column with TIMESTAMP_NTZ values") { + test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") { withTempDir { dir => val path = s"${dir.getCanonicalPath}/csv" @@ -1074,7 +1074,7 @@ abstract class CSVSuite } } - test("Timestamp type inference for a column with both TIMESTAMP_NTZ and TIMESTAMP_LTZ") { + test("SPARK-37326: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") { withTempDir { dir => val path = s"${dir.getCanonicalPath}/csv" From ea47b9439028585e1f1a383b170150609924d4c7 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Tue, 16 Nov 2021 15:22:06 +1300 Subject: [PATCH 03/14] update code to handle legacy time parsing --- .../catalyst/util/TimestampFormatter.scala | 8 +- .../execution/datasources/csv/CSVSuite.scala | 103 ++++++++++++------ 2 files changed, 75 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index f0bf45ddd06a9..e9a4225433bd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -165,7 +165,13 @@ class Iso8601TimestampFormatter( } override def format(localDateTime: LocalDateTime): String = { - localDateTime.format(formatter) + // If the legacy time parser policy is selected, we can only write timestamp with timezone, + // we will use the default time zone for it. + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { + format(toJavaTimestamp(instantToMicros(localDateTime.atZone(zoneId).toInstant))) + } else { + localDateTime.format(formatter) + } } override def validatePatternString(checkLegacy: Boolean): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 816c545e20b62..210c730249c23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1012,6 +1012,30 @@ abstract class CSVSuite } } + test("SPARK-37326: Write TIMESTAMP_NTZ in legacy time parser policy") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col1").coalesce(1) + + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + exp.write.format("csv") + .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss") + .option("header", "true") + .save(path) + } + + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss") + .option("header", "true") + .load(path) + + checkAnswer(res, exp) + } + } + test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { withTempDir { dir => val path = s"${dir.getCanonicalPath}/csv" @@ -1039,37 +1063,38 @@ abstract class CSVSuite val path = s"${dir.getCanonicalPath}/csv" val exp = spark.sql(""" - select - timestamp_ntz'2020-12-12 12:12:12' as col1, - timestamp_ltz'2020-12-12 12:12:12' as col2 + select timestamp_ntz'2020-12-12 12:12:12' as col0 union all + select timestamp_ntz'2020-12-12 12:12:12' as col0 """) exp.write.format("csv").option("header", "true").save(path) - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { - val res = spark.read - .format("csv") - .option("inferSchema", "true") - .option("header", "true") - .load(path) + val timestampTypes = Seq( + SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString, + SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) - checkAnswer(res, exp) - } - - withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { - val res = spark.read - .format("csv") - .option("inferSchema", "true") - .option("header", "true") - .load(path) - - val exp = spark.sql(""" - select - timestamp_ltz'2020-12-12 12:12:12' as col1, - timestamp_ltz'2020-12-12 12:12:12' as col2 - """) - - checkAnswer(res, exp) + for (timestampType <- timestampTypes) { + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("header", "true") + .load(path) + + if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString && + spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) != "legacy") { + checkAnswer(res, exp) + } else { + // Timestamps are written as timestamp with timezone in the legacy mode. + checkAnswer( + res, + spark.sql(""" + select timestamp_ltz'2020-12-12 12:12:12' as col0 union all + select timestamp_ltz'2020-12-12 12:12:12' as col0 + """) + ) + } + } } } } @@ -1079,27 +1104,35 @@ abstract class CSVSuite val path = s"${dir.getCanonicalPath}/csv" Seq( + "col0", "2020-12-12T12:12:12.000", "2020-12-12T17:12:12.000Z", "2020-12-12T17:12:12.000+05:00", "2020-12-12T12:12:12.000" - ).toDF("col0") + ).toDF("data") .coalesce(1) .write.text(path) val res = spark.read .format("csv") .option("inferSchema", "true") + .option("header", "true") .load(path) - val exp = spark.sql(""" - select timestamp'2020-12-12T12:12:12.000' as col0 union all - select timestamp'2020-12-12T17:12:12.000Z' as col0 union all - select timestamp'2020-12-12T17:12:12.000+05:00' as col0 union all - select timestamp'2020-12-12T12:12:12.000' as col0 - """) - - checkAnswer(res, exp) + if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") { + // Timestamps without timezone are parsed as strings, so the col0 type would be StringType + // which is similar to reading without schema inference. + val exp = spark.read.format("csv").option("header", "true").load(path) + checkAnswer(res, exp) + } else { + val exp = spark.sql(""" + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 + """) + checkAnswer(res, exp) + } } } From e439ae23c70bac32309748901051e8f2544b4144 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 18 Nov 2021 12:27:10 +1300 Subject: [PATCH 04/14] address comments, update code --- docs/sql-data-sources-csv.md | 12 +++++++++--- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 2 +- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 14 ++++++++++++++ .../sql/catalyst/csv/UnivocityGenerator.scala | 2 +- .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../sql/catalyst/util/TimestampFormatter.scala | 8 +------- .../sql/execution/datasources/csv/CSVSuite.scala | 4 +--- 7 files changed, 28 insertions(+), 16 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 82cfa352a5751..dd4f99e95c59e 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -9,9 +9,9 @@ license: | The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,7 +19,7 @@ license: | limitations under the License. --- -Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on. +Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on.
@@ -162,6 +162,12 @@ Data source options of CSV can be set via: Sets the string that indicates a timestamp format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp type. read/write + + timestampNTZFormat + yyyy-MM-dd'T'HH:mm:ss[.SSS] + Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and zone-id components are not supported when writing or reading this data type. + read/write + maxColumns 20480 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 97d55ff5fa8e3..4075b1d30398e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -40,7 +40,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { isParsing = true) private val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInRead, + options.timestampNTZFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 79624b9a608a4..8777c595dc528 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -164,6 +164,20 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat").orElse { + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + Some(s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSS") + } else { + None + } + } + val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSS" + } else { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]" + }) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) val maxColumns = getInt("maxColumns", 20480) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 8a04e4ca56c5d..10cccd57117e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -49,7 +49,7 @@ class UnivocityGenerator( legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInWrite, + options.timestampNTZFormatInWrite, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index cd5621bbb7856..2604eedf6bb7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -94,7 +94,7 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInRead, + options.timestampNTZFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index e9a4225433bd2..f0bf45ddd06a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -165,13 +165,7 @@ class Iso8601TimestampFormatter( } override def format(localDateTime: LocalDateTime): String = { - // If the legacy time parser policy is selected, we can only write timestamp with timezone, - // we will use the default time zone for it. - if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { - format(toJavaTimestamp(instantToMicros(localDateTime.atZone(zoneId).toInstant))) - } else { - localDateTime.format(formatter) - } + localDateTime.format(formatter) } override def validatePatternString(checkLegacy: Boolean): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 210c730249c23..4fa09a7dc3e81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1081,11 +1081,9 @@ abstract class CSVSuite .option("header", "true") .load(path) - if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString && - spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) != "legacy") { + if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { checkAnswer(res, exp) } else { - // Timestamps are written as timestamp with timezone in the legacy mode. checkAnswer( res, spark.sql(""" From 7eca451603eab54da9865f9451c0ca34a41b76ff Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 18 Nov 2021 15:01:50 +1300 Subject: [PATCH 05/14] update tests --- docs/sql-data-sources-csv.md | 2 +- .../execution/datasources/csv/CSVSuite.scala | 62 ++++++++++++++----- 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index dd4f99e95c59e..1dfe8568f9afb 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -165,7 +165,7 @@ Data source options of CSV can be set via: timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] - Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and zone-id components are not supported when writing or reading this data type. + Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type. read/write diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 4fa09a7dc3e81..c37216af1ea59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1012,27 +1012,41 @@ abstract class CSVSuite } } - test("SPARK-37326: Write TIMESTAMP_NTZ in legacy time parser policy") { + test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_NTZ values") { withTempDir { dir => val path = s"${dir.getCanonicalPath}/csv" - val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col1").coalesce(1) + val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") + exp.write.format("csv").option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss").save(path) - withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { - exp.write.format("csv") - .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss") - .option("header", "true") - .save(path) + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss") + .load(path) + + checkAnswer(res, exp) } + } + } - val res = spark.read - .format("csv") - .option("inferSchema", "true") - .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss") - .option("header", "true") - .load(path) + test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_LTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" - checkAnswer(res, exp) + val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col0") + exp.write.format("csv").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").save(path) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .load(path) + + checkAnswer(res, exp) + } } } @@ -1134,6 +1148,26 @@ abstract class CSVSuite } } + test("SPARK-37326: Fail to write TIMESTAMP_NTZ if timestampNTZFormat contains zone offset") { + val patterns = Seq( + "yyyy-MM-dd HH:mm:ss XXX", + "yyyy-MM-dd HH:mm:ss Z", + "yyyy-MM-dd HH:mm:ss z") + + val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") + for (pattern <- patterns) { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + val err = intercept[SparkException] { + exp.write.format("csv").option("timestampNTZFormat", pattern).save(path) + } + assert( + err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") || + err.getCause.getMessage.contains("Unable to extract value")) + } + } + } + test("Write dates correctly with dateFormat option") { val customSchema = new StructType(Array(StructField("date", DateType, true))) withTempDir { dir => From 1d25ac74738b5bcad5950a421da131b89ed8f273 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 18 Nov 2021 19:06:12 +1300 Subject: [PATCH 06/14] address comments --- .../sql/catalyst/csv/CSVInferSchema.scala | 3 ++ .../spark/sql/catalyst/csv/CSVOptions.scala | 16 ++------ .../sql/catalyst/csv/UnivocityParser.scala | 3 ++ .../catalyst/util/TimestampFormatter.scala | 14 +++---- .../sql/errors/QueryExecutionErrors.scala | 8 +++- .../execution/datasources/csv/CSVSuite.scala | 39 +++++++++++++++---- 6 files changed, 52 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 4075b1d30398e..1fee1222c6307 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -175,6 +175,9 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } private def tryParseTimestampNTZ(field: String): DataType = { + // We can only parse the value as TimestampNTZType if it does not have zone-offset or + // time-zone component and can be parsed with the timestamp formatter. + // Otherwise, it is likely to be a timestamp with timezone. if ((allCatch opt !timestampNTZFormatter.isTimeZoneSet(field)).getOrElse(false) && (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field)).isDefined) { SQLConf.get.timestampType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 8777c595dc528..4becf7ccfe1af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -164,19 +164,9 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat").orElse { - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - Some(s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSS") - } else { - None - } - } - val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSS" - } else { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]" - }) + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat") + val timestampNTZFormatInWrite: String = + parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2604eedf6bb7d..c5013292e61ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -204,6 +204,9 @@ class UnivocityParser( case _: TimestampNTZType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => + if (timestampNTZFormatter.isTimeZoneSet(datum)) { + throw QueryExecutionErrors.cannotParseStringAsDataTypeError(name, datum, TimestampNTZType) + } timestampNTZFormatter.parseWithoutTimeZone(datum) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index f0bf45ddd06a9..f359a23acd35e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -141,11 +141,9 @@ class Iso8601TimestampFormatter( } override def isTimeZoneSet(s: String): Boolean = { - try { - val parsed = formatter.parse(s) - val parsedZoneId = parsed.query(TemporalQueries.zone()) - parsedZoneId != null - } catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet) + val parsed = formatter.parse(s) + val parsedZoneId = parsed.query(TemporalQueries.zone()) + parsedZoneId != null } override def format(instant: Instant): String = { @@ -214,10 +212,8 @@ class DefaultTimestampFormatter( } override def isTimeZoneSet(s: String): Boolean = { - try { - val (_, zoneIdOpt, _) = parseTimestampString(UTF8String.fromString(s)) - zoneIdOpt.isDefined - } catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet) + val (_, zoneIdOpt, _) = parseTimestampString(UTF8String.fromString(s)) + zoneIdOpt.isDefined } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index d7cd8e13f0c59..d1a3e3011ea9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1034,6 +1034,13 @@ object QueryExecutionErrors { s"[$token] as target spark data type [$dataType].") } + def cannotParseStringAsDataTypeError(name: String, value: String, dataType: DataType) + : Throwable = { + new RuntimeException( + s"Cannot parse field name ${name}, field value ${value}, " + + s"as target spark data type [$dataType].") + } + def failToParseEmptyStringForDataTypeError(dataType: DataType): Throwable = { new RuntimeException( s"Failed to parse an empty string for data type ${dataType.catalogString}") @@ -1890,4 +1897,3 @@ object QueryExecutionErrors { new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported") } } - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c37216af1ea59..2a1cfeb6113e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1148,6 +1148,33 @@ abstract class CSVSuite } } + test("SPARK-37326: Malformed records when reading TIMESTAMP_LTZ as TIMESTAMP_NTZ") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + Seq( + "2020-12-12T12:12:12.000", + "2020-12-12T17:12:12.000Z", + "2020-12-12T17:12:12.000+05:00", + "2020-12-12T12:12:12.000" + ).toDF("data") + .coalesce(1) + .write.text(path) + + val res = spark.read.format("csv").schema("col0 TIMESTAMP_NTZ").load(path) + + checkAnswer( + res, + Seq( + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)), + Row(null), + Row(null), + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)) + ) + ) + } + } + test("SPARK-37326: Fail to write TIMESTAMP_NTZ if timestampNTZFormat contains zone offset") { val patterns = Seq( "yyyy-MM-dd HH:mm:ss XXX", @@ -2645,10 +2672,6 @@ abstract class CSVSuite } test("SPARK-36536: use casting when datetime pattern is not set") { - def isLegacy: Boolean = { - spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) == - SQLConf.LegacyBehaviorPolicy.LEGACY.toString - } withSQLConf( SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true", SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) { @@ -2667,13 +2690,13 @@ abstract class CSVSuite readback, Seq( Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)), + LocalDateTime.of(2021, 10, 1, 0, 0, 0)), Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"), - if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) + LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) } } } From b718db15d1331ce0da86e06856ebc02e5c007040 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Fri, 19 Nov 2021 12:35:49 +1300 Subject: [PATCH 07/14] update according to comments, remove isTimeZoneSet --- .../sql/catalyst/csv/CSVInferSchema.scala | 3 +- .../sql/catalyst/csv/UnivocityParser.scala | 3 -- .../catalyst/util/TimestampFormatter.scala | 39 +++++++------------ .../sql/errors/QueryExecutionErrors.scala | 4 +- 4 files changed, 16 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 1fee1222c6307..ac5246c4731a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -178,8 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. - if ((allCatch opt !timestampNTZFormatter.isTimeZoneSet(field)).getOrElse(false) && - (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field)).isDefined) { + if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field)).isDefined) { SQLConf.get.timestampType } else { tryParseTimestamp(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index c5013292e61ac..2604eedf6bb7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -204,9 +204,6 @@ class UnivocityParser( case _: TimestampNTZType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - if (timestampNTZFormatter.isTimeZoneSet(datum)) { - throw QueryExecutionErrors.cannotParseStringAsDataTypeError(name, datum, TimestampNTZType) - } timestampNTZFormatter.parseWithoutTimeZone(datum) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index f359a23acd35e..4c291e5790b6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -31,9 +31,10 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT} import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ -import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.types.{Decimal, TimestampNTZType} import org.apache.spark.unsafe.types.UTF8String sealed trait TimestampFormatter extends Serializable { @@ -71,19 +72,6 @@ sealed trait TimestampFormatter extends Serializable { s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " + "of timestamp without time zone") - /** - * Returns true if the parsed timestamp contains the time zone component, false otherwise. - * Used to determine if the timestamp can be inferred as timestamp without time zone. - * - * @param s - string with timestamp to inspect - * @return whether the timestamp string has the time zone component defined. - */ - @throws(classOf[IllegalStateException]) - def isTimeZoneSet(s: String): Boolean = - throw new IllegalStateException( - s"The method `isTimeZoneSet(s: String)` should be implemented in the formatter " + - "of timestamp without time zone") - def format(us: Long): String def format(ts: Timestamp): String def format(instant: Instant): String @@ -134,18 +122,16 @@ class Iso8601TimestampFormatter( override def parseWithoutTimeZone(s: String): Long = { try { val parsed = formatter.parse(s) + val parsedZoneId = parsed.query(TemporalQueries.zone()) + if (parsedZoneId != null) { + throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType) + } val localDate = toLocalDate(parsed) val localTime = toLocalTime(parsed) DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(localDate, localTime)) } catch checkParsedDiff(s, legacyFormatter.parse) } - override def isTimeZoneSet(s: String): Boolean = { - val parsed = formatter.parse(s) - val parsedZoneId = parsed.query(TemporalQueries.zone()) - parsedZoneId != null - } - override def format(instant: Instant): String = { try { formatter.withZone(zoneId).format(instant) @@ -207,14 +193,15 @@ class DefaultTimestampFormatter( override def parseWithoutTimeZone(s: String): Long = { try { - DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s)) + val utf8Value = UTF8String.fromString(s) + val (_, zoneIdOpt, _) = DateTimeUtils.parseTimestampString(utf8Value) + if (zoneIdOpt.isDefined) { + throw QueryExecutionErrors.cannotParseStringAsDataTypeError( + TimestampFormatter.defaultPattern(), s, TimestampNTZType) + } + DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(utf8Value) } catch checkParsedDiff(s, legacyFormatter.parse) } - - override def isTimeZoneSet(s: String): Boolean = { - val (_, zoneIdOpt, _) = parseTimestampString(UTF8String.fromString(s)) - zoneIdOpt.isDefined - } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index d1a3e3011ea9f..45c971d1b606b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1034,10 +1034,10 @@ object QueryExecutionErrors { s"[$token] as target spark data type [$dataType].") } - def cannotParseStringAsDataTypeError(name: String, value: String, dataType: DataType) + def cannotParseStringAsDataTypeError(pattern: String, value: String, dataType: DataType) : Throwable = { new RuntimeException( - s"Cannot parse field name ${name}, field value ${value}, " + + s"Cannot parse field value ${value} for pattern ${pattern} " + s"as target spark data type [$dataType].") } From 014cd209753681d054c1f1f20e157a0f48e4a944 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 22 Nov 2021 15:27:52 +1300 Subject: [PATCH 08/14] update code --- .../sql/catalyst/csv/CSVInferSchema.scala | 2 +- .../sql/catalyst/csv/UnivocityParser.scala | 2 +- .../sql/catalyst/util/DateTimeUtils.scala | 32 ++++++++++++++----- .../catalyst/util/TimestampFormatter.scala | 27 ++++++++++------ .../catalyst/util/DateTimeUtilsSuite.scala | 12 +++++++ .../execution/datasources/csv/CSVSuite.scala | 31 ++++++++++-------- 6 files changed, 73 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index ac5246c4731a7..f30fa8a0b5f95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. - if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field)).isDefined) { + if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) { SQLConf.get.timestampType } else { tryParseTimestamp(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2604eedf6bb7d..56166950e6783 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -204,7 +204,7 @@ class UnivocityParser( case _: TimestampNTZType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - timestampNTZFormatter.parseWithoutTimeZone(datum) + timestampNTZFormatter.parseWithoutTimeZone(datum, false) } case _: DateType => (d: String) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 3d9598cd0c23a..80f5a08b4f72c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -442,17 +442,22 @@ object DateTimeUtils { /** * Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the - * number of microseconds since the epoch. The result is independent of time zones, - * which means that zone ID in the input string will be ignored. + * number of microseconds since the epoch. The result will be independent of time zones. + * + * If the input string contains a component associated with time zone, the method will return + * `None` if `ignoreTimeZone` is set to `false`. If `ignoreTimeZone` is set to `true`, the method + * will simply discard the time zone component. Enable the check to detect situations like parsing + * a timestamp with time zone as TimestampNTZType. + * * The return type is [[Option]] in order to distinguish between 0L and null. Please * refer to `parseTimestampString` for the allowed formats. */ - def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = { + def stringToTimestampWithoutTimeZone(s: UTF8String, ignoreTimeZone: Boolean): Option[Long] = { try { - val (segments, _, justTime) = parseTimestampString(s) - // If the input string can't be parsed as a timestamp, or it contains only the time part of a - // timestamp and we can't determine its date, return None. - if (segments.isEmpty || justTime) { + val (segments, zoneIdOpt, justTime) = parseTimestampString(s) + // If the input string can't be parsed as a timestamp without time zone, or it contains only + // the time part of a timestamp and we can't determine its date, return None. + if (segments.isEmpty || justTime || !ignoreTimeZone && zoneIdOpt.isDefined) { return None } val nanoseconds = MICROSECONDS.toNanos(segments(6)) @@ -465,8 +470,19 @@ object DateTimeUtils { } } + /** + * Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the + * number of microseconds since the epoch. The result is independent of time zones. Zone id + * component will be discarded and ignored. + * The return type is [[Option]] in order to distinguish between 0L and null. Please + * refer to `parseTimestampString` for the allowed formats. + */ + def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = { + stringToTimestampWithoutTimeZone(s, true) + } + def stringToTimestampWithoutTimeZoneAnsi(s: UTF8String): Long = { - stringToTimestampWithoutTimeZone(s).getOrElse { + stringToTimestampWithoutTimeZone(s, true).getOrElse { throw QueryExecutionErrors.cannotCastToDateTimeError(s, TimestampNTZType) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 4c291e5790b6f..3d28686a1817c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -56,6 +56,7 @@ sealed trait TimestampFormatter extends Serializable { * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. * * @param s - string with timestamp to parse + * @param ignoreTimeZone - indicates non-strict parsing, allows to ignore time zone components * @return microseconds since epoch. * @throws ParseException can be thrown by legacy parser * @throws DateTimeParseException can be thrown by new parser @@ -67,10 +68,19 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[DateTimeParseException]) @throws(classOf[DateTimeException]) @throws(classOf[IllegalStateException]) - def parseWithoutTimeZone(s: String): Long = + def parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean): Long = throw new IllegalStateException( - s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " + - "of timestamp without time zone") + s"The method `parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean)` should be " + + "implemented in the formatter of timestamp without time zone") + + /** + * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. + * Zone-id and zone-offset components are ignored. + */ + final def parseWithoutTimeZone(s: String): Long = + // This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we + // did not fail if timestamp contained zone-id or zone-offset component and instead ignored it. + parseWithoutTimeZone(s, true) def format(us: Long): String def format(ts: Timestamp): String @@ -119,11 +129,10 @@ class Iso8601TimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } - override def parseWithoutTimeZone(s: String): Long = { + override def parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean): Long = { try { val parsed = formatter.parse(s) - val parsedZoneId = parsed.query(TemporalQueries.zone()) - if (parsedZoneId != null) { + if (!ignoreTimeZone && parsed.query(TemporalQueries.zone()) != null) { throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType) } val localDate = toLocalDate(parsed) @@ -191,15 +200,13 @@ class DefaultTimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } - override def parseWithoutTimeZone(s: String): Long = { + override def parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean): Long = { try { val utf8Value = UTF8String.fromString(s) - val (_, zoneIdOpt, _) = DateTimeUtils.parseTimestampString(utf8Value) - if (zoneIdOpt.isDefined) { + DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, ignoreTimeZone).getOrElse { throw QueryExecutionErrors.cannotParseStringAsDataTypeError( TimestampFormatter.defaultPattern(), s, TimestampNTZType) } - DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(utf8Value) } catch checkParsedDiff(s, legacyFormatter.parse) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 69bb6c141ae8f..831b0fb523843 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -357,6 +357,18 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { checkStringToTimestamp("2021-01-01T12:30:4294967297+4294967297:30", None) } + test("SPARK-37326: stringToTimestampWithoutTimeZone with ignoreTimeZone") { + assert( + stringToTimestampWithoutTimeZone( + UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) == + Some(DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(2021, 11, 22, 10, 54, 27)))) + + assert( + stringToTimestampWithoutTimeZone( + UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) == + None) + } + test("SPARK-15379: special invalid date string") { // Test stringToDate assert(toDate("2015-02-29 00:00:00").isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2a1cfeb6113e6..226b9304e5296 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1125,15 +1125,14 @@ abstract class CSVSuite .coalesce(1) .write.text(path) - val res = spark.read - .format("csv") + val res = spark.read.format("csv") .option("inferSchema", "true") .option("header", "true") .load(path) if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") { - // Timestamps without timezone are parsed as strings, so the col0 type would be StringType - // which is similar to reading without schema inference. + // Timestamps without timezone are parsed as strings, so the col0 type would be + // StringType which is similar to reading without schema inference. val exp = spark.read.format("csv").option("header", "true").load(path) checkAnswer(res, exp) } else { @@ -1161,17 +1160,23 @@ abstract class CSVSuite .coalesce(1) .write.text(path) - val res = spark.read.format("csv").schema("col0 TIMESTAMP_NTZ").load(path) + for (timestampNTZFormat <- Seq(None, Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) { + val reader = spark.read.format("csv").schema("col0 TIMESTAMP_NTZ") + val res = timestampNTZFormat match { + case Some(format) => reader.option("timestampNTZFormat", format).load(path) + case None => reader.load(path) + } - checkAnswer( - res, - Seq( - Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)), - Row(null), - Row(null), - Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)) + checkAnswer( + res, + Seq( + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)), + Row(null), + Row(null), + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)) + ) ) - ) + } } } From 309643bea16446d58c9e86e7e0e60988c33934fb Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 22 Nov 2021 18:04:18 +1300 Subject: [PATCH 09/14] add throws annotations --- .../apache/spark/sql/catalyst/util/TimestampFormatter.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 3d28686a1817c..d443456c64f4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -77,6 +77,10 @@ sealed trait TimestampFormatter extends Serializable { * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. * Zone-id and zone-offset components are ignored. */ + @throws(classOf[ParseException]) + @throws(classOf[DateTimeParseException]) + @throws(classOf[DateTimeException]) + @throws(classOf[IllegalStateException]) final def parseWithoutTimeZone(s: String): Long = // This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we // did not fail if timestamp contained zone-id or zone-offset component and instead ignored it. From f59ba6c4b78308470eae52f0b9da870333cd8782 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 25 Nov 2021 10:43:02 +1300 Subject: [PATCH 10/14] address comments, switch to failOnError --- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 2 +- .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../spark/sql/catalyst/util/DateTimeUtils.scala | 10 +++++----- .../sql/catalyst/util/TimestampFormatter.scala | 16 ++++++++-------- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 6 +++--- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index f30fa8a0b5f95..b4ec1645ed35b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. - if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) { + if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, true)).isDefined) { SQLConf.get.timestampType } else { tryParseTimestamp(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 56166950e6783..eb827aea73f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -204,7 +204,7 @@ class UnivocityParser( case _: TimestampNTZType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - timestampNTZFormatter.parseWithoutTimeZone(datum, false) + timestampNTZFormatter.parseWithoutTimeZone(datum, true) } case _: DateType => (d: String) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 80f5a08b4f72c..ebe5153099f43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -445,19 +445,19 @@ object DateTimeUtils { * number of microseconds since the epoch. The result will be independent of time zones. * * If the input string contains a component associated with time zone, the method will return - * `None` if `ignoreTimeZone` is set to `false`. If `ignoreTimeZone` is set to `true`, the method + * `None` if `failOnError` is set to `true`. If `failOnError` is set to `false`, the method * will simply discard the time zone component. Enable the check to detect situations like parsing * a timestamp with time zone as TimestampNTZType. * * The return type is [[Option]] in order to distinguish between 0L and null. Please * refer to `parseTimestampString` for the allowed formats. */ - def stringToTimestampWithoutTimeZone(s: UTF8String, ignoreTimeZone: Boolean): Option[Long] = { + def stringToTimestampWithoutTimeZone(s: UTF8String, failOnError: Boolean): Option[Long] = { try { val (segments, zoneIdOpt, justTime) = parseTimestampString(s) // If the input string can't be parsed as a timestamp without time zone, or it contains only // the time part of a timestamp and we can't determine its date, return None. - if (segments.isEmpty || justTime || !ignoreTimeZone && zoneIdOpt.isDefined) { + if (segments.isEmpty || justTime || failOnError && zoneIdOpt.isDefined) { return None } val nanoseconds = MICROSECONDS.toNanos(segments(6)) @@ -478,11 +478,11 @@ object DateTimeUtils { * refer to `parseTimestampString` for the allowed formats. */ def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = { - stringToTimestampWithoutTimeZone(s, true) + stringToTimestampWithoutTimeZone(s, false) } def stringToTimestampWithoutTimeZoneAnsi(s: UTF8String): Long = { - stringToTimestampWithoutTimeZone(s, true).getOrElse { + stringToTimestampWithoutTimeZone(s, false).getOrElse { throw QueryExecutionErrors.cannotCastToDateTimeError(s, TimestampNTZType) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index d443456c64f4d..21fd0860ef449 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -56,7 +56,7 @@ sealed trait TimestampFormatter extends Serializable { * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. * * @param s - string with timestamp to parse - * @param ignoreTimeZone - indicates non-strict parsing, allows to ignore time zone components + * @param failOnError - indicates strict parsing of timezone * @return microseconds since epoch. * @throws ParseException can be thrown by legacy parser * @throws DateTimeParseException can be thrown by new parser @@ -68,9 +68,9 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[DateTimeParseException]) @throws(classOf[DateTimeException]) @throws(classOf[IllegalStateException]) - def parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean): Long = + def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = throw new IllegalStateException( - s"The method `parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean)` should be " + + s"The method `parseWithoutTimeZone(s: String, failOnError: Boolean)` should be " + "implemented in the formatter of timestamp without time zone") /** @@ -84,7 +84,7 @@ sealed trait TimestampFormatter extends Serializable { final def parseWithoutTimeZone(s: String): Long = // This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we // did not fail if timestamp contained zone-id or zone-offset component and instead ignored it. - parseWithoutTimeZone(s, true) + parseWithoutTimeZone(s, false) def format(us: Long): String def format(ts: Timestamp): String @@ -133,10 +133,10 @@ class Iso8601TimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } - override def parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean): Long = { + override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = { try { val parsed = formatter.parse(s) - if (!ignoreTimeZone && parsed.query(TemporalQueries.zone()) != null) { + if (failOnError && parsed.query(TemporalQueries.zone()) != null) { throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType) } val localDate = toLocalDate(parsed) @@ -204,10 +204,10 @@ class DefaultTimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } - override def parseWithoutTimeZone(s: String, ignoreTimeZone: Boolean): Long = { + override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = { try { val utf8Value = UTF8String.fromString(s) - DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, ignoreTimeZone).getOrElse { + DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, failOnError).getOrElse { throw QueryExecutionErrors.cannotParseStringAsDataTypeError( TimestampFormatter.defaultPattern(), s, TimestampNTZType) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 831b0fb523843..422a6cdeda2eb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -357,15 +357,15 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { checkStringToTimestamp("2021-01-01T12:30:4294967297+4294967297:30", None) } - test("SPARK-37326: stringToTimestampWithoutTimeZone with ignoreTimeZone") { + test("SPARK-37326: stringToTimestampWithoutTimeZone with failOnError") { assert( stringToTimestampWithoutTimeZone( - UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) == + UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) == Some(DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(2021, 11, 22, 10, 54, 27)))) assert( stringToTimestampWithoutTimeZone( - UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) == + UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) == None) } From 662460f8c8a365d51fc634c1d8cc37e61e7aef08 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 25 Nov 2021 19:12:12 +1300 Subject: [PATCH 11/14] update to use only one option --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 4 +--- .../apache/spark/sql/catalyst/csv/UnivocityGenerator.scala | 2 +- .../org/apache/spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index b4ec1645ed35b..31acf263ed9a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -40,7 +40,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { isParsing = true) private val timestampNTZFormatter = TimestampFormatter( - options.timestampNTZFormatInRead, + options.timestampNTZFormat, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 4becf7ccfe1af..422ad79fc980d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -164,9 +164,7 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat") - val timestampNTZFormatInWrite: String = - parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + val timestampNTZFormat: Option[String] = parameters.get("timestampNTZFormat") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 10cccd57117e0..a3c58067a0b4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -49,7 +49,7 @@ class UnivocityGenerator( legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampNTZFormatInWrite, + options.timestampNTZFormat, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index eb827aea73f0b..fb4c5e557d83a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -94,7 +94,7 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampNTZFormatInRead, + options.timestampNTZFormat, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, From 043edb627e79a3f836caff608722e90ddc3997d9 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 29 Nov 2021 11:10:23 +1300 Subject: [PATCH 12/14] minor updates, add test --- .../sql/catalyst/csv/CSVInferSchema.scala | 2 +- .../spark/sql/catalyst/csv/CSVOptions.scala | 4 ++- .../sql/catalyst/csv/UnivocityGenerator.scala | 2 +- .../sql/catalyst/csv/UnivocityParser.scala | 2 +- .../apache/spark/sql/CsvFunctionsSuite.scala | 11 ++++++++ .../datasources/csv/CSVBenchmark.scala | 8 +++--- .../execution/datasources/csv/CSVSuite.scala | 28 ++++++++++--------- 7 files changed, 36 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 31acf263ed9a2..b4ec1645ed35b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -40,7 +40,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { isParsing = true) private val timestampNTZFormatter = TimestampFormatter( - options.timestampNTZFormat, + options.timestampNTZFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 422ad79fc980d..2a404b14bfd89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -164,7 +164,9 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - val timestampNTZFormat: Option[String] = parameters.get("timestampNTZFormat") + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat") + val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index a3c58067a0b4a..10cccd57117e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -49,7 +49,7 @@ class UnivocityGenerator( legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampNTZFormat, + options.timestampNTZFormatInWrite, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index fb4c5e557d83a..eb827aea73f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -94,7 +94,7 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampNTZFormat, + options.timestampNTZFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index c87314386f38b..79c6862fe2faa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -368,4 +368,15 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { .selectExpr("value.a") checkAnswer(fromCsvDF, Row(localDT)) } + + test("SPARK-36490: Handle incorrectly formatted timestamp_ntz values in from_csv") { + val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv") + .select( + from_csv( + $"csv", + StructType(StructField("a", TimestampNTZType) :: Nil), + Map.empty[String, String]) as "value") + .selectExpr("value.a") + checkAnswer(fromCsvDF, Row(null)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala index 53d287b32f8db..12c5ce0060f32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala @@ -343,11 +343,11 @@ object CSVBenchmark extends SqlBasedBenchmark { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("Benchmark to measure CSV read/write performance") { val numIters = 3 - quotedValuesBenchmark(rowsNum = 50 * 1000, numIters) - multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters) - countBenchmark(rowsNum = 10 * 1000 * 1000, numIters) + // quotedValuesBenchmark(rowsNum = 50 * 1000, numIters) + // multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters) + // countBenchmark(rowsNum = 10 * 1000 * 1000, numIters) datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters) - filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) + // filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 226b9304e5296..936293e00b8ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1130,19 +1130,21 @@ abstract class CSVSuite .option("header", "true") .load(path) - if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") { - // Timestamps without timezone are parsed as strings, so the col0 type would be - // StringType which is similar to reading without schema inference. - val exp = spark.read.format("csv").option("header", "true").load(path) - checkAnswer(res, exp) - } else { - val exp = spark.sql(""" - select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all - select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all - select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all - select timestamp_ltz'2020-12-12T12:12:12.000' as col0 - """) - checkAnswer(res, exp) + for (policy <- Seq("exception", "corrected", "legacy")) { + if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") { + // Timestamps without timezone are parsed as strings, so the col0 type would be + // StringType which is similar to reading without schema inference. + val exp = spark.read.format("csv").option("header", "true").load(path) + checkAnswer(res, exp) + } else { + val exp = spark.sql(""" + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 + """) + checkAnswer(res, exp) + } } } } From 1edef2d15babe611119c9d3e9790b0251ee2f750 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Tue, 30 Nov 2021 09:54:50 +1300 Subject: [PATCH 13/14] minor updates --- .../scala/org/apache/spark/sql/CsvFunctionsSuite.scala | 2 +- .../sql/execution/datasources/csv/CSVBenchmark.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 79c6862fe2faa..2808652f2998d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -369,7 +369,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(fromCsvDF, Row(localDT)) } - test("SPARK-36490: Handle incorrectly formatted timestamp_ntz values in from_csv") { + test("SPARK-37326: Handle incorrectly formatted timestamp_ntz values in from_csv") { val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv") .select( from_csv( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala index 12c5ce0060f32..53d287b32f8db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala @@ -343,11 +343,11 @@ object CSVBenchmark extends SqlBasedBenchmark { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("Benchmark to measure CSV read/write performance") { val numIters = 3 - // quotedValuesBenchmark(rowsNum = 50 * 1000, numIters) - // multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters) - // countBenchmark(rowsNum = 10 * 1000 * 1000, numIters) + quotedValuesBenchmark(rowsNum = 50 * 1000, numIters) + multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters) + countBenchmark(rowsNum = 10 * 1000 * 1000, numIters) datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters) - // filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) + filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) } } } From feb3715285bb8b31d21ef0dd5903310d739210a7 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Wed, 1 Dec 2021 11:43:19 +1300 Subject: [PATCH 14/14] update tests, address comments --- .../execution/datasources/csv/CSVSuite.scala | 58 ++++++++++++------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 936293e00b8ed..6b496b714c4c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1017,15 +1017,21 @@ abstract class CSVSuite val path = s"${dir.getCanonicalPath}/csv" val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") - exp.write.format("csv").option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss").save(path) + exp.write + .format("csv") + .option("header", "true") + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .save(path) withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { val res = spark.read .format("csv") .option("inferSchema", "true") - .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss") + .option("header", "true") + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .load(path) + assert(res.dtypes === exp.dtypes) checkAnswer(res, exp) } } @@ -1036,15 +1042,21 @@ abstract class CSVSuite val path = s"${dir.getCanonicalPath}/csv" val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col0") - exp.write.format("csv").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").save(path) + exp.write + .format("csv") + .option("header", "true") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .save(path) withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { val res = spark.read .format("csv") .option("inferSchema", "true") - .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .option("header", "true") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") .load(path) + assert(res.dtypes === exp.dtypes) checkAnswer(res, exp) } } @@ -1125,25 +1137,27 @@ abstract class CSVSuite .coalesce(1) .write.text(path) - val res = spark.read.format("csv") - .option("inferSchema", "true") - .option("header", "true") - .load(path) - for (policy <- Seq("exception", "corrected", "legacy")) { - if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") { - // Timestamps without timezone are parsed as strings, so the col0 type would be - // StringType which is similar to reading without schema inference. - val exp = spark.read.format("csv").option("header", "true").load(path) - checkAnswer(res, exp) - } else { - val exp = spark.sql(""" - select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all - select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all - select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all - select timestamp_ltz'2020-12-12T12:12:12.000' as col0 - """) - checkAnswer(res, exp) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) { + val res = spark.read.format("csv") + .option("inferSchema", "true") + .option("header", "true") + .load(path) + + if (policy == "legacy") { + // Timestamps without timezone are parsed as strings, so the col0 type would be + // StringType which is similar to reading without schema inference. + val exp = spark.read.format("csv").option("header", "true").load(path) + checkAnswer(res, exp) + } else { + val exp = spark.sql(""" + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 + """) + checkAnswer(res, exp) + } } } }