From fb10b91502b67b98f2904a06b017a6e56dd6e39f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 12:01:47 +0100 Subject: [PATCH 01/30] Adding DateTimeFormatter --- .../sql/catalyst/util/DateTimeFormatter.scala | 173 ++++++++++++++++++ .../sql/catalyst/util/DateTimeUtils.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 9 + 3 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala new file mode 100644 index 000000000000..d708eec050ea --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time._ +import java.time.format.DateTimeFormatterBuilder +import java.time.temporal.{ChronoField, TemporalQueries} +import java.util.{Locale, TimeZone} + +import scala.util.Try + +import org.apache.commons.lang3.time.FastDateFormat + +import org.apache.spark.sql.internal.SQLConf + +sealed trait DateTimeFormatter { + def parse(s: String): Long // returns microseconds since epoch + def format(us: Long): String +} + +class Iso8601DateTimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateTimeFormatter { + val formatter = new DateTimeFormatterBuilder() + .appendPattern(pattern) + .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) + .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) + .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .toFormatter(locale) + + def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + if (temporalAccessor.query(TemporalQueries.offset()) == null) { + val localDateTime = LocalDateTime.from(temporalAccessor) + val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId) + Instant.from(zonedDateTime) + } else { + Instant.from(temporalAccessor) + } + } + + private def instantToMicros(instant: Instant, secMul: Long, nanoDiv: Long): Long = { + val sec = Math.multiplyExact(instant.getEpochSecond, secMul) + val result = Math.addExact(sec, instant.getNano / nanoDiv) + result + } + + def parse(s: String): Long = { + instantToMicros(toInstant(s), DateTimeUtils.MICROS_PER_SECOND, DateTimeUtils.NANOS_PER_MICROS) + } + + def format(us: Long): String = { + val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND) + val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND) + val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS) + + formatter.withZone(timeZone.toZoneId).format(instant) + } +} + +class LegacyDateTimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateTimeFormatter { + val format = FastDateFormat.getInstance(pattern, timeZone, locale) + + protected def toMillis(s: String): Long = format.parse(s).getTime + + def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS + + def format(us: Long): String = { + format.format(DateTimeUtils.toJavaTimestamp(us)) + } +} + +class LegacyFallbackDateTimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) { + override def toMillis(s: String): Long = { + Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) + } +} + +object DateTimeFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = { + if (SQLConf.get.legacyTimeParserEnabled) { + new LegacyFallbackDateTimeFormatter(format, timeZone, locale) + } else { + new Iso8601DateTimeFormatter(format, timeZone, locale) + } + } +} + +sealed trait DateFormatter { + def parse(s: String): Int // returns days since epoch + def format(days: Int): String +} + +class Iso8601DateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateFormatter { + + val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale) + + override def parse(s: String): Int = { + val seconds = dateTimeFormatter.toInstant(s).getEpochSecond + (seconds / DateTimeUtils.SECONDS_PER_DAY).toInt + } + + override def format(days: Int): String = { + val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) + dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant) + } +} + +class LegacyDateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateFormatter { + val format = FastDateFormat.getInstance(pattern, timeZone, locale) + + def parse(s: String): Int = { + val milliseconds = format.parse(s).getTime + DateTimeUtils.millisToDays(milliseconds) + } + + def format(days: Int): String = { + val date = DateTimeUtils.toJavaDate(days) + format.format(date) + } +} + +class LegacyFallbackDateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { + override def parse(s: String): Int = { + Try(super.parse(s)).getOrElse { + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime) + } + } +} + +object DateFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = { + if (SQLConf.get.legacyTimeParserEnabled) { + new LegacyFallbackDateFormatter(format, timeZone, locale) + } else { + new Iso8601DateFormatter(format, timeZone, locale) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5ae75dc93930..c6dfdbf2505b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -50,7 +50,7 @@ object DateTimeUtils { final val MILLIS_PER_SECOND = 1000L final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY - + final val NANOS_PER_MICROS = 1000L final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L // number of days in 400 years diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f1c845bc9450..b2b277364f4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1618,6 +1618,13 @@ object SQLConf { "a SparkConf entry.") .booleanConf .createWithDefault(true) + + val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled") + .doc("When set to true, java.text.SimpleDateFormat is using for formatting and parsing " + + " dates/timestamps in a locale-sensitive manner. When set to false, classes from " + + "java.time.* packages are using for the same purpose.") + .booleanConf + .createWithDefault(false) } /** @@ -2040,6 +2047,8 @@ class SQLConf extends Serializable with Logging { def setCommandRejectsSparkConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CONFS) + def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ From a9b39eccfdc3bd9da8fe52eda24ed240588b282f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 12:15:31 +0100 Subject: [PATCH 02/30] Support DateTimeFormatter by JacksonParser and JacksonGenerator --- .../spark/sql/catalyst/json/JSONOptions.scala | 10 ++---- .../sql/catalyst/json/JacksonGenerator.scala | 14 +++++--- .../sql/catalyst/json/JacksonParser.scala | 35 +++++-------------- 3 files changed, 20 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index e10b8a327c01..eaff3fa7bec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets} import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ @@ -82,13 +81,10 @@ private[sql] class JSONOptions( val timeZone: TimeZone = DateTimeUtils.getTimeZone( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) - // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. - val dateFormat: FastDateFormat = - FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale) + val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd") - val timestampFormat: FastDateFormat = - FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale) + val timestampFormat: String = + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index d02a2be8ddad..bced0180af1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters -import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ /** @@ -77,6 +77,12 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite + private val timeFormatter = DateTimeFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => (row: SpecializedGetters, ordinal: Int) => @@ -116,14 +122,12 @@ private[sql] class JacksonGenerator( case TimestampType => (row: SpecializedGetters, ordinal: Int) => - val timestampString = - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + val timestampString = timeFormatter.format(row.getLong(ordinal)) gen.writeString(timestampString) case DateType => (row: SpecializedGetters, ordinal: Int) => - val dateString = - options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + val dateString = dateFormatter.format(row.getInt(ordinal)) gen.writeString(dateString) case BinaryType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 2357595906b1..9183000c3675 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -55,6 +55,12 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) + private val timeFormatter = DateTimeFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -218,17 +224,7 @@ class JacksonParser( case TimestampType => (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Long.box { - Try(options.timestampFormat.parse(stringValue).getTime * 1000L) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(stringValue).getTime * 1000L - } - } + timeFormatter.parse(parser.getText) case VALUE_NUMBER_INT => parser.getLongValue * 1000000L @@ -237,22 +233,7 @@ class JacksonParser( case DateType => (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681.x - Int.box { - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime)) - .orElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime)) - } - .getOrElse { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - stringValue.toInt - } - } + dateFormatter.parse(parser.getText) } case BinaryType => From ff589f505f7aaa8c94ce304aaf77792505998c16 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 13:31:43 +0100 Subject: [PATCH 03/30] Make test independent from current time zone --- .../datasources/json/JsonSuite.scala | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ee5176e23e34..1e09558aca92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -57,14 +57,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } val factory = new JsonFactory() - def enforceCorrectType(value: Any, dataType: DataType): Any = { + def enforceCorrectType( + value: Any, + dataType: DataType, + options: Map[String, String] = Map.empty): Any = { val writer = new StringWriter() Utils.tryWithResource(factory.createGenerator(writer)) { generator => generator.writeObject(value) generator.flush() } - val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") + val dummyOption = new JSONOptions(options, "GMT") val dummySchema = StructType(Seq.empty) val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true) @@ -96,19 +99,26 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)), enforceCorrectType(intNumber.toLong, TimestampType)) val strTime = "2014-09-30 12:34:56" - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), - enforceCorrectType(strTime, TimestampType)) + checkTypePromotion( + expected = 1412080496000000L, + enforceCorrectType(strTime, TimestampType, Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"))) val strDate = "2014-10-15" checkTypePromotion( DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) val ISO8601Time1 = "1970-01-01T01:00:01.0Z" - val ISO8601Time2 = "1970-01-01T02:00:01-01:00" checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), - enforceCorrectType(ISO8601Time1, TimestampType)) + enforceCorrectType( + ISO8601Time1, + TimestampType, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SX"))) + val ISO8601Time2 = "1970-01-01T02:00:01-01:00" checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), - enforceCorrectType(ISO8601Time2, TimestampType)) + enforceCorrectType( + ISO8601Time2, + TimestampType, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX"))) val ISO8601Date = "1970-01-01" checkTypePromotion(DateTimeUtils.millisToDays(32400000), From 4646dededae832185a35a85244baab6507d28f0d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 19:19:31 +0100 Subject: [PATCH 04/30] Fix a test by new fallback --- .../sql/catalyst/util/DateTimeFormatter.scala | 10 +- .../datasources/json/JsonSuite.scala | 176 +++++++++--------- 2 files changed, 97 insertions(+), 89 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index d708eec050ea..30a30bbc7e43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -156,8 +156,14 @@ class LegacyFallbackDateFormatter( timeZone: TimeZone, locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { override def parse(s: String): Int = { - Try(super.parse(s)).getOrElse { - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime) + Try(super.parse(s)).orElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime)) + }.getOrElse { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + s.toInt } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 1e09558aca92..ce6815ee54f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1450,103 +1450,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("backward compatibility") { - // This test we make sure our JSON support can read JSON data generated by previous version - // of Spark generated through toJSON method and JSON data source. - // The data is generated by the following program. - // Here are a few notes: - // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) - // in the JSON object. - // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to - // JSON objects generated by those Spark versions (col17). - // - If the type is NullType, we do not write data out. - - // Create the schema. - val struct = - StructType( - StructField("f1", FloatType, true) :: - StructField("f2", ArrayType(BooleanType), true) :: Nil) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { + // This test we make sure our JSON support can read JSON data generated by previous version + // of Spark generated through toJSON method and JSON data source. + // The data is generated by the following program. + // Here are a few notes: + // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) + // in the JSON object. + // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to + // JSON objects generated by those Spark versions (col17). + // - If the type is NullType, we do not write data out. + + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) - val dataTypes = - Seq( - StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct, - new UDT.MyDenseVectorUDT()) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => - StructField(s"col$index", dataType, nullable = true) - } - val schema = StructType(fields) + val dataTypes = + Seq( + StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct, + new UDT.MyDenseVectorUDT()) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullable = true) + } + val schema = StructType(fields) - val constantValues = - Seq( - "a string in binary".getBytes(StandardCharsets.UTF_8), - null, - true, - 1.toByte, - 2.toShort, - 3, - Long.MaxValue, - 0.25.toFloat, - 0.75, - new java.math.BigDecimal(s"1234.23456"), - new java.math.BigDecimal(s"1.23456"), - java.sql.Date.valueOf("2015-01-01"), - java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), - Seq(2, 3, 4), - Map("a string" -> 2000L), - Row(4.75.toFloat, Seq(false, true)), - new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))) - val data = - Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil + val constantValues = + Seq( + "a string in binary".getBytes(StandardCharsets.UTF_8), + null, + true, + 1.toByte, + 2.toShort, + 3, + Long.MaxValue, + 0.25.toFloat, + 0.75, + new java.math.BigDecimal(s"1234.23456"), + new java.math.BigDecimal(s"1.23456"), + java.sql.Date.valueOf("2015-01-01"), + java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), + Seq(2, 3, 4), + Map("a string" -> 2000L), + Row(4.75.toFloat, Seq(false, true)), + new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))) + val data = + Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil - // Data generated by previous versions. - // scalastyle:off - val existingJSONData = + // Data generated by previous versions. + // scalastyle:off + val existingJSONData = """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil - // scalastyle:on - - // Generate data for the current version. - val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) - withTempPath { path => - df.write.format("json").mode("overwrite").save(path.getCanonicalPath) + """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil + // scalastyle:on + + // Generate data for the current version. + val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) + withTempPath { path => + df.write.format("json").mode("overwrite").save(path.getCanonicalPath) - // df.toJSON will convert internal rows to external rows first and then generate - // JSON objects. While, df.write.format("json") will write internal rows directly. - val allJSON = + // df.toJSON will convert internal rows to external rows first and then generate + // JSON objects. While, df.write.format("json") will write internal rows directly. + val allJSON = existingJSONData ++ df.toJSON.collect() ++ sparkContext.textFile(path.getCanonicalPath).collect() - Utils.deleteRecursively(path) - sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) - - // Read data back with the schema specified. - val col0Values = - Seq( - "Spark 1.2.2", - "Spark 1.3.1", - "Spark 1.3.1", - "Spark 1.4.1", - "Spark 1.4.1", - "Spark 1.5.0", - "Spark 1.5.0", - "Spark " + spark.sparkContext.version, - "Spark " + spark.sparkContext.version) - val expectedResult = col0Values.map { v => - Row.fromSeq(Seq(v) ++ constantValues) + Utils.deleteRecursively(path) + sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) + + // Read data back with the schema specified. + val col0Values = + Seq( + "Spark 1.2.2", + "Spark 1.3.1", + "Spark 1.3.1", + "Spark 1.4.1", + "Spark 1.4.1", + "Spark 1.5.0", + "Spark 1.5.0", + "Spark " + spark.sparkContext.version, + "Spark " + spark.sparkContext.version) + val expectedResult = col0Values.map { v => + Row.fromSeq(Seq(v) ++ constantValues) + } + checkAnswer( + spark.read.format("json").schema(schema).load(path.getCanonicalPath), + expectedResult + ) } - checkAnswer( - spark.read.format("json").schema(schema).load(path.getCanonicalPath), - expectedResult - ) } } From 1c838e0447c363ac4ae4a7a582eac8c4b0414621 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 22:22:54 +0100 Subject: [PATCH 05/30] Set time zone explicitly --- .../sql/sources/HadoopFsRelationTest.scala | 102 +++++++++--------- 1 file changed, 53 insertions(+), 49 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 6bd59fde550d..31391244c652 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import java.io.File +import java.util.TimeZone import scala.util.Random @@ -125,56 +126,59 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } else { Seq(false) } - for (dataType <- supportedDataTypes) { - for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { - val extraMessage = if (isParquetDataSource) { - s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled" - } else { - "" - } - logInfo(s"Testing $dataType data type$extraMessage") - - val extraOptions = Map[String, String]( - "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString - ) - - withTempPath { file => - val path = file.getCanonicalPath - - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(System.nanoTime()) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + for (dataType <- supportedDataTypes) { + for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { + val extraMessage = if (isParquetDataSource) { + s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled" + } else { + "" + } + logInfo(s"Testing $dataType data type$extraMessage") + + val extraOptions = Map[String, String]( + "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString + ) + + withTempPath { file => + val path = file.getCanonicalPath + + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(System.nanoTime()) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") + } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .save(path) + + val loadedDF = spark + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .options(extraOptions) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = - spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .save(path) - - val loadedDF = spark - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .options(extraOptions) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) } } } From 142f3017a29ecc07ec5a0135295d1a3e4a8e4d60 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 20:51:14 +0100 Subject: [PATCH 06/30] Updating the migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index e48125a0972b..d14068bc6c7d 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -31,6 +31,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`. + - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. From 606da210595a0b1f2c8e3750b695bcc9e0fd676d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 22:25:12 +0100 Subject: [PATCH 07/30] Fix the migration guide by replacing CSV by JSON --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index d14068bc6c7d..b6a9ef4d9e80 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -31,7 +31,7 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`. - - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. + - Since Spark 3.0, JSON datasource uses java.time API for parsing and generating JSON content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. ## Upgrading From Spark SQL 2.3 to 2.4 From f326042aa1aff540d06c79fd73395204d846f3ea Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 20:53:38 +0100 Subject: [PATCH 08/30] Inlining method's arguments --- .../spark/sql/catalyst/util/DateTimeFormatter.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index 30a30bbc7e43..edd5f9ec12fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -58,15 +58,13 @@ class Iso8601DateTimeFormatter( } } - private def instantToMicros(instant: Instant, secMul: Long, nanoDiv: Long): Long = { - val sec = Math.multiplyExact(instant.getEpochSecond, secMul) - val result = Math.addExact(sec, instant.getNano / nanoDiv) + private def instantToMicros(instant: Instant): Long = { + val sec = Math.multiplyExact(instant.getEpochSecond, DateTimeUtils.MICROS_PER_SECOND) + val result = Math.addExact(sec, instant.getNano / DateTimeUtils.NANOS_PER_MICROS) result } - def parse(s: String): Long = { - instantToMicros(toInstant(s), DateTimeUtils.MICROS_PER_SECOND, DateTimeUtils.NANOS_PER_MICROS) - } + def parse(s: String): Long = instantToMicros(toInstant(s)) def format(us: Long): String = { val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND) From 412022847c25d3edb72267a66e6f439a5dd2879e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 11:24:32 +0100 Subject: [PATCH 09/30] A test for roundtrip timestamp parsing --- .../sql/util/DateTimeFormatterSuite.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala new file mode 100644 index 000000000000..23a5830ff4d2 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import java.util.Locale + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} + +class DateTimeFormatterSuite extends SparkFunSuite { + + test("roundtrip parsing timestamps using timezones") { + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val timestamp = "2018-12-02T11:22:33.123456" + val formatter = DateTimeFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val micros = formatter.parse(timestamp) + val formatted = formatter.format(micros) + assert(timestamp === formatted) + } + } +} From e575162fc6ba3045f4896dcdd024e9febff13738 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 13:24:21 +0100 Subject: [PATCH 10/30] Set time zone to GMT to eliminate of situation when time zone offset in second lost because DateType contains only days since epoch --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index cebaad5b4ad9..2265082ece1c 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -49,8 +49,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { override def beforeAll() { super.beforeAll() TestHive.setCacheTables(true) - // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) - TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + // Timezone is fixed to GMT for those timezone sensitive tests (timestamp_*) + TimeZone.setDefault(TimeZone.getTimeZone("GMT")) // Add Locale setting Locale.setDefault(Locale.US) // Set a relatively small column batch size for testing purposes From a35d5bfe9c4e6daa12eba591c2a22683f3dc5458 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 13:39:41 +0100 Subject: [PATCH 11/30] UTC -> GMT --- .../org/apache/spark/sql/sources/HadoopFsRelationTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 31391244c652..59b88995c0d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -126,8 +126,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } else { Seq(false) } - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { - TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { + TimeZone.setDefault(TimeZone.getTimeZone("GMT")) for (dataType <- supportedDataTypes) { for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { val extraMessage = if (isParquetDataSource) { From 2a2085d4c07fa9b52abf87feeeb755d88339340e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 13:43:24 +0100 Subject: [PATCH 12/30] Using floorDiv to take days from seconds --- .../apache/spark/sql/catalyst/util/DateTimeFormatter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index edd5f9ec12fa..ad1f4131de2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -123,7 +123,9 @@ class Iso8601DateFormatter( override def parse(s: String): Int = { val seconds = dateTimeFormatter.toInstant(s).getEpochSecond - (seconds / DateTimeUtils.SECONDS_PER_DAY).toInt + val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY) + + days.toInt } override def format(days: Int): String = { From 55f2eacb2164c7392a8a9bc5bebbeb80f9d3aa56 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 13:45:44 +0100 Subject: [PATCH 13/30] Removing unnecessary time zone settings --- .../org/apache/spark/sql/sources/HadoopFsRelationTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 59b88995c0d9..fa7193234705 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -127,7 +127,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes Seq(false) } withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { - TimeZone.setDefault(TimeZone.getTimeZone("GMT")) for (dataType <- supportedDataTypes) { for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { val extraMessage = if (isParquetDataSource) { From 07fcf4666a96928c8096db7a131e6514013679f0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 5 Dec 2018 12:20:33 +0100 Subject: [PATCH 14/30] Using legacy parser in HiveCompatibilitySuite --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 2265082ece1c..709e69ef1cf6 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -41,6 +41,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone + private val originalUseLegacyDateTimeParser = TestHive.conf.legacyTimeParserEnabled def testCases: Seq[(String, File)] = { hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) @@ -49,8 +50,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { override def beforeAll() { super.beforeAll() TestHive.setCacheTables(true) - // Timezone is fixed to GMT for those timezone sensitive tests (timestamp_*) - TimeZone.setDefault(TimeZone.getTimeZone("GMT")) + // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) // Add Locale setting Locale.setDefault(Locale.US) // Set a relatively small column batch size for testing purposes @@ -62,6 +63,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") + TestHive.setConf(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) RuleExecutor.resetMetrics() } @@ -74,6 +76,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) + TestHive.setConf(SQLConf.LEGACY_TIME_PARSER_ENABLED, originalUseLegacyDateTimeParser) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) From 6b6ea8a89128a8f459a846dddc48aa6af09a2c51 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 00:30:14 +0100 Subject: [PATCH 15/30] Enable new parser in HiveCompatibilitySuit --- .../apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 709e69ef1cf6..4c4cee8cca95 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -63,7 +63,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") - TestHive.setConf(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) RuleExecutor.resetMetrics() } From 244654b95ae789de83e853f74feade2a66adf432 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 00:45:54 +0100 Subject: [PATCH 16/30] Remove saving legacy parser settings --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 4c4cee8cca95..cebaad5b4ad9 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -41,7 +41,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone - private val originalUseLegacyDateTimeParser = TestHive.conf.legacyTimeParserEnabled def testCases: Seq[(String, File)] = { hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) @@ -75,7 +74,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) - TestHive.setConf(SQLConf.LEGACY_TIME_PARSER_ENABLED, originalUseLegacyDateTimeParser) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) From 015fdceb637ba831a357aae150c20ceafdec4a50 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 11:01:14 +0100 Subject: [PATCH 17/30] Updating migration guide --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 77f761aa9db4..3d585864eefe 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -33,7 +33,7 @@ displayTitle: Spark SQL Upgrading Guide - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. - - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. + - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback to `Timestamp.valueOf`. To parse the same timestamp since Spark 3.0, the pattern should be `yyyy-MM-dd HH:mm:ss.SSS`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. ## Upgrading From Spark SQL 2.3 to 2.4 From 96529f5276ab375a5d440ac636054dcf0a6c8a3e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 12 Dec 2018 22:41:11 +0100 Subject: [PATCH 18/30] Making date parser independent from time zones --- .../sql/catalyst/csv/UnivocityGenerator.scala | 2 +- .../sql/catalyst/csv/UnivocityParser.scala | 2 +- .../sql/catalyst/json/JacksonGenerator.scala | 2 +- .../sql/catalyst/json/JacksonParser.scala | 2 +- .../sql/catalyst/util/DateTimeFormatter.scala | 71 +++++++++++-------- .../sql/util/DateTimeFormatterSuite.scala | 47 ++++++------ 6 files changed, 73 insertions(+), 53 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index af09cd6c8449..eea2e9328c8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -45,7 +45,7 @@ class UnivocityGenerator( options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 0f375e036029..8d871b86715e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -78,7 +78,7 @@ class UnivocityParser( options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index bced0180af1b..30a4572d4cf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -81,7 +81,7 @@ private[sql] class JacksonGenerator( options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 9183000c3675..223305a0245b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -59,7 +59,7 @@ class JacksonParser( options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index ad1f4131de2f..90a35263fb21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util import java.time._ import java.time.format.DateTimeFormatterBuilder -import java.time.temporal.{ChronoField, TemporalQueries} +import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} import java.util.{Locale, TimeZone} import scala.util.Try @@ -33,26 +33,37 @@ sealed trait DateTimeFormatter { def format(us: Long): String } +trait FormatterUtils { + def zoneId: ZoneId + def buildFormatter(pattern: String, locale: Locale): java.time.format.DateTimeFormatter = { + new DateTimeFormatterBuilder() + .appendPattern(pattern) + .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) + .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) + .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .toFormatter(locale) + } + def toInstant(temporalAccessor: TemporalAccessor): java.time.Instant = { + val localDateTime = LocalDateTime.from(temporalAccessor) + val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId) + Instant.from(zonedDateTime) + } +} + class Iso8601DateTimeFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends DateTimeFormatter { - val formatter = new DateTimeFormatterBuilder() - .appendPattern(pattern) - .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) - .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) - .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) - .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) - .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) - .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) - .toFormatter(locale) + locale: Locale) extends DateTimeFormatter with FormatterUtils { + val zoneId = timeZone.toZoneId + val formatter = buildFormatter(pattern, locale) def toInstant(s: String): Instant = { val temporalAccessor = formatter.parse(s) if (temporalAccessor.query(TemporalQueries.offset()) == null) { - val localDateTime = LocalDateTime.from(temporalAccessor) - val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId) - Instant.from(zonedDateTime) + toInstant(temporalAccessor) } else { Instant.from(temporalAccessor) } @@ -116,13 +127,19 @@ sealed trait DateFormatter { class Iso8601DateFormatter( pattern: String, - timeZone: TimeZone, - locale: Locale) extends DateFormatter { + locale: Locale) extends DateFormatter with FormatterUtils { - val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale) + val zoneId = ZoneId.of("GMT") + + val formatter = buildFormatter(pattern, locale) + + def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + toInstant(temporalAccessor) + } override def parse(s: String): Int = { - val seconds = dateTimeFormatter.toInstant(s).getEpochSecond + val seconds = toInstant(s).getEpochSecond val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY) days.toInt @@ -130,15 +147,12 @@ class Iso8601DateFormatter( override def format(days: Int): String = { val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) - dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant) + formatter.withZone(zoneId).format(instant) } } -class LegacyDateFormatter( - pattern: String, - timeZone: TimeZone, - locale: Locale) extends DateFormatter { - val format = FastDateFormat.getInstance(pattern, timeZone, locale) +class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter { + val format = FastDateFormat.getInstance(pattern, locale) def parse(s: String): Int = { val milliseconds = format.parse(s).getTime @@ -153,8 +167,7 @@ class LegacyDateFormatter( class LegacyFallbackDateFormatter( pattern: String, - timeZone: TimeZone, - locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { + locale: Locale) extends LegacyDateFormatter(pattern, locale) { override def parse(s: String): Int = { Try(super.parse(s)).orElse { // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards @@ -169,11 +182,11 @@ class LegacyFallbackDateFormatter( } object DateFormatter { - def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = { + def apply(format: String, locale: Locale): DateFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackDateFormatter(format, timeZone, locale) + new LegacyFallbackDateFormatter(format, locale) } else { - new Iso8601DateFormatter(format, timeZone, locale) + new Iso8601DateFormatter(format, locale) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala index 1af11ea043c3..ef026d35ae74 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -20,22 +20,24 @@ package org.apache.spark.sql.util import java.util.{Locale, TimeZone} import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} +import org.apache.spark.sql.internal.SQLConf -class DateTimeFormatterSuite extends SparkFunSuite { - test("parsing dates using time zones") { +class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { + test("parsing dates") { val localDate = "2018-12-02" val expectedDays = Map( "UTC" -> 17867, "PST" -> 17867, - "CET" -> 17866, + "CET" -> 17867, "Africa/Dakar" -> 17867, "America/Los_Angeles" -> 17867, - "Antarctica/Vostok" -> 17866, - "Asia/Hong_Kong" -> 17866, - "Europe/Amsterdam" -> 17866) + "Antarctica/Vostok" -> 17867, + "Asia/Hong_Kong" -> 17867, + "Europe/Amsterdam" -> 17867) DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) val daysSinceEpoch = formatter.parse(localDate) assert(daysSinceEpoch === expectedDays(timeZone)) } @@ -62,21 +64,14 @@ class DateTimeFormatterSuite extends SparkFunSuite { } } - test("format dates using time zones") { + test("format dates") { val daysSinceEpoch = 17867 - val expectedDate = Map( - "UTC" -> "2018-12-02", - "PST" -> "2018-12-01", - "CET" -> "2018-12-02", - "Africa/Dakar" -> "2018-12-02", - "America/Los_Angeles" -> "2018-12-01", - "Antarctica/Vostok" -> "2018-12-02", - "Asia/Hong_Kong" -> "2018-12-02", - "Europe/Amsterdam" -> "2018-12-02") DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) - val date = formatter.format(daysSinceEpoch) - assert(date === expectedDate(timeZone)) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val date = formatter.format(daysSinceEpoch) + assert(date === "2018-12-02") + } } } @@ -110,4 +105,16 @@ class DateTimeFormatterSuite extends SparkFunSuite { assert(timestamp === formatted) } } + + test("roundtrip date parsing") { + val date = "2018-12-12" + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val days = formatter.parse(date) + val formatted = formatter.format(days) + assert(date === formatted) + } + } + } } From 07d60314c2ec082d06b1b65d8d10dbaa6f775171 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Dec 2018 10:54:00 +0100 Subject: [PATCH 19/30] Test refactoring --- .../sql/util/DateTimeFormatterSuite.scala | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala index ef026d35ae74..db7227221042 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -24,22 +24,14 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} import org.apache.spark.sql.internal.SQLConf -class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { +class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { test("parsing dates") { - val localDate = "2018-12-02" - val expectedDays = Map( - "UTC" -> 17867, - "PST" -> 17867, - "CET" -> 17867, - "Africa/Dakar" -> 17867, - "America/Los_Angeles" -> 17867, - "Antarctica/Vostok" -> 17867, - "Asia/Hong_Kong" -> 17867, - "Europe/Amsterdam" -> 17867) DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateFormatter("yyyy-MM-dd", Locale.US) - val daysSinceEpoch = formatter.parse(localDate) - assert(daysSinceEpoch === expectedDays(timeZone)) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val daysSinceEpoch = formatter.parse("2018-12-02") + assert(daysSinceEpoch === 17867) + } } } @@ -65,11 +57,10 @@ class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { } test("format dates") { - val daysSinceEpoch = 17867 DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { val formatter = DateFormatter("yyyy-MM-dd", Locale.US) - val date = formatter.format(daysSinceEpoch) + val date = formatter.format(17867) assert(date === "2018-12-02") } } From d761dee6ec9cc1a42f07c82e8f67754f3b8f1bf7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Dec 2018 10:56:11 +0100 Subject: [PATCH 20/30] protected is added --- .../spark/sql/catalyst/util/DateTimeFormatter.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index 90a35263fb21..80fb87edbbf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -34,8 +34,10 @@ sealed trait DateTimeFormatter { } trait FormatterUtils { - def zoneId: ZoneId - def buildFormatter(pattern: String, locale: Locale): java.time.format.DateTimeFormatter = { + protected def zoneId: ZoneId + protected def buildFormatter( + pattern: String, + locale: Locale): java.time.format.DateTimeFormatter = { new DateTimeFormatterBuilder() .appendPattern(pattern) .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) @@ -46,7 +48,7 @@ trait FormatterUtils { .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) .toFormatter(locale) } - def toInstant(temporalAccessor: TemporalAccessor): java.time.Instant = { + protected def toInstant(temporalAccessor: TemporalAccessor): java.time.Instant = { val localDateTime = LocalDateTime.from(temporalAccessor) val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId) Instant.from(zonedDateTime) From 24b1e3d35be103789f038216341195e972839a5c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Dec 2018 10:57:22 +0100 Subject: [PATCH 21/30] toInstant -> toInstantWithZoneId --- .../apache/spark/sql/catalyst/util/DateTimeFormatter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index 80fb87edbbf9..ed6036edfeca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -48,7 +48,7 @@ trait FormatterUtils { .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) .toFormatter(locale) } - protected def toInstant(temporalAccessor: TemporalAccessor): java.time.Instant = { + protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = { val localDateTime = LocalDateTime.from(temporalAccessor) val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId) Instant.from(zonedDateTime) @@ -65,7 +65,7 @@ class Iso8601DateTimeFormatter( def toInstant(s: String): Instant = { val temporalAccessor = formatter.parse(s) if (temporalAccessor.query(TemporalQueries.offset()) == null) { - toInstant(temporalAccessor) + toInstantWithZoneId(temporalAccessor) } else { Instant.from(temporalAccessor) } @@ -137,7 +137,7 @@ class Iso8601DateFormatter( def toInstant(s: String): Instant = { val temporalAccessor = formatter.parse(s) - toInstant(temporalAccessor) + toInstantWithZoneId(temporalAccessor) } override def parse(s: String): Int = { From 9a11515a1a8a8775c46d6ae3aaeb04396b66ef70 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Dec 2018 13:14:13 +0100 Subject: [PATCH 22/30] Set time zone in the test --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 75d8f4493029..96f7d24b3635 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { generator.flush() } - val dummyOption = new JSONOptions(options, "GMT") + val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone) val dummySchema = StructType(Seq.empty) val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true) @@ -100,8 +100,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { enforceCorrectType(intNumber.toLong, TimestampType)) val strTime = "2014-09-30 12:34:56" checkTypePromotion( - expected = 1412080496000000L, - enforceCorrectType(strTime, TimestampType, Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"))) + expected = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), + enforceCorrectType(strTime, TimestampType, + Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"))) val strDate = "2014-10-15" checkTypePromotion( From 4b01d05e306906f20372f1b3a7c987a3f5ce1c89 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Dec 2018 13:17:07 +0100 Subject: [PATCH 23/30] GMT -> UTC --- .../org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala | 2 +- .../org/apache/spark/sql/sources/HadoopFsRelationTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index ed6036edfeca..0d30b69e692d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -131,7 +131,7 @@ class Iso8601DateFormatter( pattern: String, locale: Locale) extends DateFormatter with FormatterUtils { - val zoneId = ZoneId.of("GMT") + val zoneId = ZoneId.of("UTC") val formatter = buildFormatter(pattern, locale) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 37b7f1bc3a5a..e50511d2cdee 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -126,7 +126,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } else { Seq(false) } - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { for (dataType <- supportedDataTypes) { for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { val extraMessage = if (isParquetDataSource) { From 0c7b96b596f19c1cbd0500a8631b90bfa6b02da7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Dec 2018 13:24:28 +0100 Subject: [PATCH 24/30] DateTimeFormatter -> TimestampFormatter --- .../sql/catalyst/csv/CSVInferSchema.scala | 4 ++-- .../sql/catalyst/csv/UnivocityGenerator.scala | 4 ++-- .../sql/catalyst/csv/UnivocityParser.scala | 2 +- .../sql/catalyst/json/JacksonGenerator.scala | 2 +- .../sql/catalyst/json/JacksonParser.scala | 2 +- ...rmatter.scala => TimestampFormatter.scala} | 22 +++++++++---------- .../sql/util/DateTimeFormatterSuite.scala | 8 +++---- 7 files changed, 22 insertions(+), 22 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/{DateTimeFormatter.scala => TimestampFormatter.scala} (91%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 345dc4d41993..b4a4db5df1f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils -import org.apache.spark.sql.catalyst.util.DateTimeFormatter +import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @transient - private lazy val timeParser = DateTimeFormatter( + private lazy val timeParser = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index eea2e9328c8e..f057df8fa6fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -22,7 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter} +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -41,7 +41,7 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray - private val timeFormatter = DateTimeFormatter( + private val timeFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 8d871b86715e..4d47c39da52a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -74,7 +74,7 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val timeFormatter = DateTimeFormatter( + private val timeFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 30a4572d4cf0..216649efd74e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -77,7 +77,7 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite - private val timeFormatter = DateTimeFormatter( + private val timeFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 223305a0245b..6554111deec8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -55,7 +55,7 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) - private val timeFormatter = DateTimeFormatter( + private val timeFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala similarity index 91% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 0d30b69e692d..2b8d22dde926 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -28,7 +28,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.internal.SQLConf -sealed trait DateTimeFormatter { +sealed trait TimestampFormatter { def parse(s: String): Long // returns microseconds since epoch def format(us: Long): String } @@ -55,10 +55,10 @@ trait FormatterUtils { } } -class Iso8601DateTimeFormatter( +class Iso8601TimestampFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends DateTimeFormatter with FormatterUtils { + locale: Locale) extends TimestampFormatter with FormatterUtils { val zoneId = timeZone.toZoneId val formatter = buildFormatter(pattern, locale) @@ -88,10 +88,10 @@ class Iso8601DateTimeFormatter( } } -class LegacyDateTimeFormatter( +class LegacyTimestampFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends DateTimeFormatter { + locale: Locale) extends TimestampFormatter { val format = FastDateFormat.getInstance(pattern, timeZone, locale) protected def toMillis(s: String): Long = format.parse(s).getTime @@ -103,21 +103,21 @@ class LegacyDateTimeFormatter( } } -class LegacyFallbackDateTimeFormatter( +class LegacyFallbackTimestampFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) { + locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) { override def toMillis(s: String): Long = { Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) } } -object DateTimeFormatter { - def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = { +object TimestampFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackDateTimeFormatter(format, timeZone, locale) + new LegacyFallbackTimestampFormatter(format, timeZone, locale) } else { - new Iso8601DateTimeFormatter(format, timeZone, locale) + new Iso8601TimestampFormatter(format, timeZone, locale) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala index db7227221042..c1257882ce07 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -21,7 +21,7 @@ import java.util.{Locale, TimeZone} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { @@ -47,7 +47,7 @@ class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { "Asia/Hong_Kong" -> 1543716672001234L, "Europe/Amsterdam" -> 1543741872001234L) DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateTimeFormatter( + val formatter = TimestampFormatter( "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", TimeZone.getTimeZone(timeZone), Locale.US) @@ -78,7 +78,7 @@ class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234", "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234") DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateTimeFormatter( + val formatter = TimestampFormatter( "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", TimeZone.getTimeZone(timeZone), Locale.US) @@ -90,7 +90,7 @@ class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { test("roundtrip parsing timestamps using timezones") { DateTimeTestUtils.outstandingTimezones.foreach { timeZone => val timestamp = "2018-12-02T11:22:33.123456" - val formatter = DateTimeFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) val micros = formatter.parse(timestamp) val formatted = formatter.format(micros) assert(timestamp === formatted) From bbaff09d552967ff9ad9b5ea0497135fa6c626aa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Dec 2018 17:42:51 +0100 Subject: [PATCH 25/30] timeParser -> timestampParser --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 4 ++-- .../apache/spark/sql/catalyst/csv/UnivocityGenerator.scala | 4 ++-- .../org/apache/spark/sql/catalyst/csv/UnivocityParser.scala | 4 ++-- .../org/apache/spark/sql/catalyst/json/JacksonGenerator.scala | 4 ++-- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index b4a4db5df1f6..35ade136cc60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @transient - private lazy val timeParser = TimestampFormatter( + private lazy val timestampParser = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) @@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseTimestamp(field: String): DataType = { // This case infers a custom `dataFormat` is set. - if ((allCatch opt timeParser.parse(field)).isDefined) { + if ((allCatch opt timestampParser.parse(field)).isDefined) { TimestampType } else { tryParseBoolean(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index f057df8fa6fe..f012d96138f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -41,7 +41,7 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray - private val timeFormatter = TimestampFormatter( + private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) @@ -52,7 +52,7 @@ class UnivocityGenerator( (row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal)) case TimestampType => - (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal)) + (row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 4d47c39da52a..ed089120055e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -74,7 +74,7 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val timeFormatter = TimestampFormatter( + private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) @@ -158,7 +158,7 @@ class UnivocityParser( } case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options)(timeFormatter.parse) + nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse) case _: DateType => (d: String) => nullSafeDatum(d, name, nullable, options)(dateFormatter.parse) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 216649efd74e..951f5190cd50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -77,7 +77,7 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite - private val timeFormatter = TimestampFormatter( + private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) @@ -122,7 +122,7 @@ private[sql] class JacksonGenerator( case TimestampType => (row: SpecializedGetters, ordinal: Int) => - val timestampString = timeFormatter.format(row.getLong(ordinal)) + val timestampString = timestampFormatter.format(row.getLong(ordinal)) gen.writeString(timestampString) case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 6554111deec8..4862fa289702 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -55,7 +55,7 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) - private val timeFormatter = TimestampFormatter( + private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) @@ -224,7 +224,7 @@ class JacksonParser( case TimestampType => (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - timeFormatter.parse(parser.getText) + timestampFormatter.parse(parser.getText) case VALUE_NUMBER_INT => parser.getLongValue * 1000000L From 8af9df9a6d327e88b323eb7a92c099d07b80b4eb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 14 Dec 2018 10:27:27 +0100 Subject: [PATCH 26/30] Round trip tests --- .../sql/util/DateTimeFormatterSuite.scala | 93 ++++++++++++++++--- 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala index c1257882ce07..32beb1ee15a2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -87,24 +87,87 @@ class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { } } - test("roundtrip parsing timestamps using timezones") { - DateTimeTestUtils.outstandingTimezones.foreach { timeZone => - val timestamp = "2018-12-02T11:22:33.123456" - val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) - val micros = formatter.parse(timestamp) - val formatted = formatter.format(micros) - assert(timestamp === formatted) + test("roundtrip timestamp -> micros -> timestamp using timezones") { + Seq( + -58710115316212000L, + -18926315945345679L, + -9463427405253013L, + -244000001L, + 0L, + 99628200102030L, + 1543749753123456L, + 2177456523456789L, + 11858049903010203L).foreach { micros => + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val timestamp = formatter.format(micros) + val parsed = formatter.parse(timestamp) + assert(micros === parsed) + } } } - test("roundtrip date parsing") { - val date = "2018-12-12" - DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { - val formatter = DateFormatter("yyyy-MM-dd", Locale.US) - val days = formatter.parse(date) - val formatted = formatter.format(days) - assert(date === formatted) + test("roundtrip micros -> timestamp -> micros using timezones") { + Seq( + "0109-07-20T18:38:03.788000", + "1370-04-01T10:00:54.654321", + "1670-02-11T14:09:54.746987", + "1969-12-31T23:55:55.999999", + "1970-01-01T00:00:00.000000", + "1973-02-27T02:30:00.102030", + "2018-12-02T11:22:33.123456", + "2039-01-01T01:02:03.456789", + "2345-10-07T22:45:03.010203").foreach { timestamp => + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val micros = formatter.parse(timestamp) + val formatted = formatter.format(micros) + assert(timestamp === formatted) + } + } + } + + test("roundtrip date -> days -> date") { + Seq( + "0050-01-01", + "0953-02-02", + "1423-03-08", + "1969-12-31", + "1972-08-25", + "1975-09-26", + "2018-12-12", + "2038-01-01", + "5010-11-17").foreach { date => + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val days = formatter.parse(date) + val formatted = formatter.format(days) + assert(date === formatted) + } + } + } + } + + test("roundtrip days -> date -> days") { + Seq( + -701265, + -371419, + -199722, + -1, + 0, + 967, + 2094, + 17877, + 24837, + 1110657).foreach { days => + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val date = formatter.format(days) + val parsed = formatter.parse(date) + assert(days === parsed) + } } } } From 363482e4ab7b12b279112ab9c0437fd25817d5bf Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 14 Dec 2018 10:49:34 +0100 Subject: [PATCH 27/30] Renaming test suite --- ...meFormatterSuite.scala => DateTimestampFormatterSuite.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/util/{DateTimeFormatterSuite.scala => DateTimestampFormatterSuite.scala} (98%) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala similarity index 98% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala index 32beb1ee15a2..43e348c7eebf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf -class DateTimeFormatterSuite extends SparkFunSuite with SQLHelper { +class DateTimestampFormatterSuite extends SparkFunSuite with SQLHelper { test("parsing dates") { DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { From 07e0bf8556b1bf42558a8e09cccfd42879235981 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 14 Dec 2018 13:06:30 +0100 Subject: [PATCH 28/30] Added withClue --- .../sql/sources/HadoopFsRelationTest.scala | 69 ++++++++++--------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index e50511d2cdee..a983dc1d28c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -143,40 +143,43 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes withTempPath { file => val path = file.getCanonicalPath - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(System.nanoTime()) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") + val seed = System.nanoTime() + withClue(s"Random data generated with the seed: ${seed}") { + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(seed) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") + } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .save(path) + + val loadedDF = spark + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .options(extraOptions) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = - spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .save(path) - - val loadedDF = spark - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .options(extraOptions) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) } } } From c12da1f30ebe766f30e0992f1f884281dfb1153b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 14 Dec 2018 18:38:07 +0100 Subject: [PATCH 29/30] Put test under legacy time parser --- .../org/apache/spark/sql/sources/HadoopFsRelationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index a983dc1d28c0..85c5d030b523 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -126,7 +126,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } else { Seq(false) } - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { for (dataType <- supportedDataTypes) { for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { val extraMessage = if (isParquetDataSource) { From 60ab5b18f031c97640195a7ff0a94070e928f7f5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 15 Dec 2018 10:54:13 +0100 Subject: [PATCH 30/30] TODO --- .../org/apache/spark/sql/sources/HadoopFsRelationTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 85c5d030b523..f0f62b608785 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -126,6 +126,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } else { Seq(false) } + // TODO: Support new parser too, see SPARK-26374. withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { for (dataType <- supportedDataTypes) { for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {