-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29012][SQL] Support special timestamp values #25716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
f3c2f4f
dfe541b
d4751af
ad23507
59e30e3
9f7ed14
fa0037d
14ce002
a268e62
e27a450
b17d642
ec1020f
a4fae09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,14 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.util | ||
|
|
||
| import java.nio.charset.StandardCharsets | ||
| import java.sql.{Date, Timestamp} | ||
| import java.time._ | ||
| import java.time.temporal.{ChronoField, ChronoUnit, IsoFields} | ||
| import java.util.{Locale, TimeZone} | ||
| import java.util.concurrent.TimeUnit._ | ||
|
|
||
| import scala.util.Try | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.types.Decimal | ||
|
|
@@ -218,6 +220,8 @@ object DateTimeUtils { | |
| var i = 0 | ||
| var currentSegmentValue = 0 | ||
| val bytes = s.trim.getBytes | ||
| val specialTimestamp = convertSpecialTimestamp(bytes, timeZoneId) | ||
| if (specialTimestamp.isDefined) return specialTimestamp | ||
| var j = 0 | ||
| var digitsMilli = 0 | ||
| var justTime = false | ||
|
|
@@ -848,4 +852,46 @@ object DateTimeUtils { | |
| val sinceEpoch = BigDecimal(timestamp) / MICROS_PER_SECOND + offset | ||
| new Decimal().set(sinceEpoch, 20, 6) | ||
| } | ||
|
|
||
| def currentTimestamp(): SQLTimestamp = instantToMicros(Instant.now()) | ||
|
|
||
| private def today(zoneId: ZoneId): ZonedDateTime = { | ||
| Instant.now().atZone(zoneId).`with`(LocalTime.MIDNIGHT) | ||
| } | ||
|
|
||
| private val specialValue = """(EPOCH|NOW|TODAY|TOMORROW|YESTERDAY)\p{Blank}*(.*)""".r | ||
|
||
|
|
||
| /** | ||
| * Converts notational shorthands that are converted to ordinary timestamps. | ||
| * @param input - a trimmed string | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about checking if an input is trimmed by
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add the assert: assert(input.trim.length == input.length) |
||
| * @param zoneId - zone identifier used to get the current date. | ||
| * @return some of microseconds since the epoch if the conversion completed | ||
| * successfully otherwise None. | ||
| */ | ||
| def convertSpecialTimestamp(input: String, zoneId: ZoneId): Option[SQLTimestamp] = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's different from
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have extracted common code there https://github.com/apache/spark/pull/25716/files#diff-da60f07e1826788aaeb07f295fae4b8aR864-R890 |
||
| def isValidZoneId(z: String): Boolean = { | ||
| z == "" || Try { getZoneId(z) }.isSuccess | ||
| } | ||
|
|
||
| if (input.length < 3 || !input(0).isLetter) return None | ||
| input.toUpperCase(Locale.US) match { | ||
| case specialValue("EPOCH", z) if isValidZoneId(z) => Some(0) | ||
| case specialValue("NOW", "") => Some(currentTimestamp()) | ||
MaxGekk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| case specialValue("TODAY", z) if isValidZoneId(z) => | ||
| Some(instantToMicros(today(zoneId).toInstant)) | ||
| case specialValue("TOMORROW", z) if isValidZoneId(z) => | ||
| Some(instantToMicros(today(zoneId).plusDays(1).toInstant)) | ||
| case specialValue("YESTERDAY", z) if isValidZoneId(z) => | ||
| Some(instantToMicros(today(zoneId).minusDays(1).toInstant)) | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
| private def convertSpecialTimestamp(bytes: Array[Byte], zoneId: ZoneId): Option[SQLTimestamp] = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you use
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because I need
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ur, I see. |
||
| if (bytes.length > 0 && Character.isAlphabetic(bytes(0))) { | ||
| convertSpecialTimestamp(new String(bytes, StandardCharsets.UTF_8), zoneId) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,9 +22,11 @@ import java.time._ | |
| import java.time.format.DateTimeParseException | ||
| import java.time.temporal.ChronoField.MICRO_OF_SECOND | ||
| import java.time.temporal.TemporalQueries | ||
| import java.util.{Locale, TimeZone} | ||
| import java.util.Locale | ||
| import java.util.concurrent.TimeUnit.SECONDS | ||
|
|
||
| import DateTimeUtils.{convertSpecialTimestamp} | ||
|
||
|
|
||
| sealed trait TimestampFormatter extends Serializable { | ||
| /** | ||
| * Parses a timestamp in a string and converts it to microseconds. | ||
|
|
@@ -50,14 +52,17 @@ class Iso8601TimestampFormatter( | |
| protected lazy val formatter = getOrCreateFormatter(pattern, locale) | ||
|
|
||
| override def parse(s: String): Long = { | ||
| val parsed = formatter.parse(s) | ||
| val parsedZoneId = parsed.query(TemporalQueries.zone()) | ||
| val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId | ||
| val zonedDateTime = toZonedDateTime(parsed, timeZoneId) | ||
| val epochSeconds = zonedDateTime.toEpochSecond | ||
| val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) | ||
| val specialDate = convertSpecialTimestamp(s.trim, zoneId) | ||
| specialDate.getOrElse { | ||
| val parsed = formatter.parse(s) | ||
| val parsedZoneId = parsed.query(TemporalQueries.zone()) | ||
| val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId | ||
| val zonedDateTime = toZonedDateTime(parsed, timeZoneId) | ||
| val epochSeconds = zonedDateTime.toEpochSecond | ||
| val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) | ||
|
|
||
| Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) | ||
| Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) | ||
| } | ||
| } | ||
|
|
||
| override def format(us: Long): String = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,16 +19,18 @@ package org.apache.spark.sql.catalyst.util | |
|
|
||
| import java.sql.{Date, Timestamp} | ||
| import java.text.SimpleDateFormat | ||
| import java.time.ZoneId | ||
| import java.time.{LocalDateTime, LocalTime, ZoneId} | ||
| import java.util.{Locale, TimeZone} | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import org.scalatest.Matchers | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils._ | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
| class DateTimeUtilsSuite extends SparkFunSuite { | ||
| class DateTimeUtilsSuite extends SparkFunSuite with Matchers { | ||
|
|
||
| val TimeZonePST = TimeZone.getTimeZone("PST") | ||
| private def defaultZoneId = ZoneId.systemDefault() | ||
|
|
@@ -142,10 +144,14 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| assert(stringToDate(UTF8String.fromString("1999 08")).isEmpty) | ||
| } | ||
|
|
||
| private def toTimestamp(str: String, zoneId: ZoneId): Option[SQLTimestamp] = { | ||
| stringToTimestamp(UTF8String.fromString(str), zoneId) | ||
| } | ||
|
|
||
| test("string to timestamp") { | ||
| for (tz <- ALL_TIMEZONES) { | ||
| def checkStringToTimestamp(str: String, expected: Option[Long]): Unit = { | ||
| assert(stringToTimestamp(UTF8String.fromString(str), tz.toZoneId) === expected) | ||
| assert(toTimestamp(str, tz.toZoneId) === expected) | ||
| } | ||
|
|
||
| checkStringToTimestamp("1969-12-31 16:00:00", Option(date(1969, 12, 31, 16, tz = tz))) | ||
|
|
@@ -271,8 +277,8 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| UTF8String.fromString("2015-02-29 00:00:00"), defaultZoneId).isEmpty) | ||
| assert(stringToTimestamp( | ||
| UTF8String.fromString("2015-04-31 00:00:00"), defaultZoneId).isEmpty) | ||
| assert(stringToTimestamp(UTF8String.fromString("2015-02-29"), defaultZoneId).isEmpty) | ||
| assert(stringToTimestamp(UTF8String.fromString("2015-04-31"), defaultZoneId).isEmpty) | ||
| assert(toTimestamp("2015-02-29", defaultZoneId).isEmpty) | ||
| assert(toTimestamp("2015-04-31", defaultZoneId).isEmpty) | ||
| } | ||
|
|
||
| test("hours") { | ||
|
|
@@ -456,8 +462,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| timezone: TimeZone = DateTimeUtils.defaultTimeZone()): Unit = { | ||
| val truncated = | ||
| DateTimeUtils.truncTimestamp(inputTS, level, timezone) | ||
| val expectedTS = | ||
| DateTimeUtils.stringToTimestamp(UTF8String.fromString(expected), defaultZoneId) | ||
| val expectedTS = toTimestamp(expected, defaultZoneId) | ||
| assert(truncated === expectedTS.get) | ||
| } | ||
|
|
||
|
|
@@ -564,4 +569,20 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| assert(DateTimeUtils.toMillis(-9223372036844776001L) === -9223372036844777L) | ||
| assert(DateTimeUtils.toMillis(-157700927876544L) === -157700927877L) | ||
| } | ||
|
|
||
| test("special timestamp values") { | ||
| DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => | ||
| val tolerance = TimeUnit.SECONDS.toMicros(30) | ||
|
|
||
| assert(toTimestamp("Epoch", zoneId).get === 0) | ||
| val now = instantToMicros(LocalDateTime.now(zoneId).atZone(zoneId).toInstant) | ||
| toTimestamp("NOW", zoneId).get should be (now +- tolerance) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you check illegal cases, e.g.,
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have already added the test here https://github.com/apache/spark/pull/25716/files#diff-c5655e947ce2dd3748e4cf95ebc32e8aR580 |
||
| val today = instantToMicros(LocalDateTime.now(zoneId) | ||
| .`with`(LocalTime.MIDNIGHT) | ||
| .atZone(zoneId).toInstant) | ||
| toTimestamp(" Yesterday", zoneId).get should be (today - MICROS_PER_DAY +- tolerance) | ||
| toTimestamp("Today ", zoneId).get should be (today +- tolerance) | ||
| toTimestamp(" tomorrow CET ", zoneId).get should be (today + MICROS_PER_DAY +- tolerance) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid to use
returnhere?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure about bytecode for this though, no overhead to use
return?