-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON #23196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
fb10b91
a9b39ec
ff589f5
4646ded
1c838e0
142f301
606da21
f326042
4120228
6689747
e575162
a35d5bf
2a2085d
55f2eac
57600e2
07fcf46
6b6ea8a
244654b
015fdce
96529f5
07d6031
d761dee
24b1e3d
9a11515
4b01d05
0c7b96b
bbaff09
8af9df9
363482e
07e0bf8
c12da1f
60ab5b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.util | ||
|
|
||
| import java.time._ | ||
| import java.time.format.DateTimeFormatterBuilder | ||
| import java.time.temporal.{ChronoField, TemporalQueries} | ||
| import java.util.{Locale, TimeZone} | ||
|
|
||
| import scala.util.Try | ||
|
|
||
| import org.apache.commons.lang3.time.FastDateFormat | ||
|
|
||
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| sealed trait DateTimeFormatter { | ||
| def parse(s: String): Long // returns microseconds since epoch | ||
| def format(us: Long): String | ||
| } | ||
|
|
||
| class Iso8601DateTimeFormatter( | ||
| pattern: String, | ||
| timeZone: TimeZone, | ||
| locale: Locale) extends DateTimeFormatter { | ||
| val formatter = new DateTimeFormatterBuilder() | ||
| .appendPattern(pattern) | ||
| .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) | ||
| .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) | ||
| .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) | ||
| .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) | ||
| .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) | ||
| .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) | ||
| .toFormatter(locale) | ||
|
|
||
| def toInstant(s: String): Instant = { | ||
| val temporalAccessor = formatter.parse(s) | ||
| if (temporalAccessor.query(TemporalQueries.offset()) == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry I'm not very familiar with this API. what does this condition mean?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. zone offset is unknown after parsing. For example, if you parse |
||
| val localDateTime = LocalDateTime.from(temporalAccessor) | ||
| val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId) | ||
| Instant.from(zonedDateTime) | ||
| } else { | ||
| Instant.from(temporalAccessor) | ||
| } | ||
| } | ||
|
|
||
| private def instantToMicros(instant: Instant): Long = { | ||
| val sec = Math.multiplyExact(instant.getEpochSecond, DateTimeUtils.MICROS_PER_SECOND) | ||
| val result = Math.addExact(sec, instant.getNano / DateTimeUtils.NANOS_PER_MICROS) | ||
| result | ||
| } | ||
|
|
||
| def parse(s: String): Long = instantToMicros(toInstant(s)) | ||
|
|
||
| def format(us: Long): String = { | ||
| val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND) | ||
| val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND) | ||
| val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS) | ||
|
|
||
| formatter.withZone(timeZone.toZoneId).format(instant) | ||
| } | ||
| } | ||
|
|
||
| class LegacyDateTimeFormatter( | ||
| pattern: String, | ||
| timeZone: TimeZone, | ||
| locale: Locale) extends DateTimeFormatter { | ||
| val format = FastDateFormat.getInstance(pattern, timeZone, locale) | ||
|
|
||
| protected def toMillis(s: String): Long = format.parse(s).getTime | ||
|
|
||
| def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS | ||
|
|
||
| def format(us: Long): String = { | ||
| format.format(DateTimeUtils.toJavaTimestamp(us)) | ||
| } | ||
| } | ||
|
|
||
| class LegacyFallbackDateTimeFormatter( | ||
| pattern: String, | ||
| timeZone: TimeZone, | ||
| locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) { | ||
| override def toMillis(s: String): Long = { | ||
| Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) | ||
| } | ||
| } | ||
|
|
||
| object DateTimeFormatter { | ||
| def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = { | ||
| if (SQLConf.get.legacyTimeParserEnabled) { | ||
| new LegacyFallbackDateTimeFormatter(format, timeZone, locale) | ||
| } else { | ||
| new Iso8601DateTimeFormatter(format, timeZone, locale) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| sealed trait DateFormatter { | ||
| def parse(s: String): Int // returns days since epoch | ||
| def format(days: Int): String | ||
| } | ||
|
|
||
| class Iso8601DateFormatter( | ||
| pattern: String, | ||
| timeZone: TimeZone, | ||
| locale: Locale) extends DateFormatter { | ||
|
|
||
| val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale) | ||
|
|
||
| override def parse(s: String): Int = { | ||
| val seconds = dateTimeFormatter.toInstant(s).getEpochSecond | ||
| val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY) | ||
|
|
||
| days.toInt | ||
| } | ||
|
|
||
| override def format(days: Int): String = { | ||
| val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) | ||
| dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant) | ||
| } | ||
| } | ||
|
|
||
| class LegacyDateFormatter( | ||
| pattern: String, | ||
| timeZone: TimeZone, | ||
| locale: Locale) extends DateFormatter { | ||
| val format = FastDateFormat.getInstance(pattern, timeZone, locale) | ||
|
|
||
| def parse(s: String): Int = { | ||
| val milliseconds = format.parse(s).getTime | ||
| DateTimeUtils.millisToDays(milliseconds) | ||
| } | ||
|
|
||
| def format(days: Int): String = { | ||
| val date = DateTimeUtils.toJavaDate(days) | ||
| format.format(date) | ||
| } | ||
| } | ||
|
|
||
| class LegacyFallbackDateFormatter( | ||
| pattern: String, | ||
| timeZone: TimeZone, | ||
| locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { | ||
| override def parse(s: String): Int = { | ||
| Try(super.parse(s)).orElse { | ||
| // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards | ||
| // compatibility. | ||
| Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime)) | ||
| }.getOrElse { | ||
| // In Spark 1.5.0, we store the data as number of days since epoch in string. | ||
| // So, we just convert it to Int. | ||
| s.toInt | ||
| } | ||
| } | ||
| } | ||
|
|
||
| object DateFormatter { | ||
| def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = { | ||
| if (SQLConf.get.legacyTimeParserEnabled) { | ||
| new LegacyFallbackDateFormatter(format, timeZone, locale) | ||
| } else { | ||
| new Iso8601DateFormatter(format, timeZone, locale) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.util | ||
|
|
||
| import java.util.Locale | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} | ||
|
|
||
| class DateTimeFormatterSuite extends SparkFunSuite { | ||
|
|
||
| test("roundtrip parsing timestamps using timezones") { | ||
| DateTimeTestUtils.outstandingTimezones.foreach { timeZone => | ||
| val timestamp = "2018-12-02T11:22:33.123456" | ||
| val formatter = DateTimeFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) | ||
| val micros = formatter.parse(timestamp) | ||
| val formatted = formatter.format(micros) | ||
| assert(timestamp === formatted) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The impact is not clearly documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behavior changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New implementation and old one have slightly different pattern formats. See https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html and https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html . And two Java API can have different behaviours. Besides of that, new one can parse timestamps with microseconds precision as a consequence of using Java 8 java.time API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile What would I you recommend to improve the text? I can add the links above, so, an user can figure out what is difference in their particular case. Our tests don't show any difference on our default timestamp/date patterns but the user can use something more specific and face to behaviour change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add an example that shows the diff. IIRC it has a difference about exact match or non-exact match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an example when there is a difference, and updated the migration guide.