-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29870][SQL] Unify the logic of multi-units interval string to CalendarInterval #26491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
49b74c3
803d454
ed3b35f
79f5892
0814fc4
3b1667e
8ad53bc
5aa09ca
f6a9424
da7f9e8
61626ff
e12bb86
89dd64f
50bfd2e
32d5194
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit | |
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} | ||
| import org.apache.spark.sql.catalyst.util.DateTimeConstants._ | ||
| import org.apache.spark.sql.types.Decimal | ||
| import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} | ||
|
|
@@ -101,34 +100,6 @@ object IntervalUtils { | |
| Decimal(result, 18, 6) | ||
| } | ||
|
|
||
| /** | ||
| * Converts a string to [[CalendarInterval]] case-insensitively. | ||
| * | ||
| * @throws IllegalArgumentException if the input string is not in valid interval format. | ||
| */ | ||
| def fromString(str: String): CalendarInterval = { | ||
| if (str == null) throw new IllegalArgumentException("Interval string cannot be null") | ||
| try { | ||
| CatalystSqlParser.parseInterval(str) | ||
| } catch { | ||
| case e: ParseException => | ||
| val ex = new IllegalArgumentException(s"Invalid interval string: $str\n" + e.message) | ||
| ex.setStackTrace(e.getStackTrace) | ||
| throw ex | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A safe version of `fromString`. It returns null for invalid input string. | ||
| */ | ||
| def safeFromString(str: String): CalendarInterval = { | ||
| try { | ||
| fromString(str) | ||
| } catch { | ||
| case _: IllegalArgumentException => null | ||
| } | ||
| } | ||
|
|
||
| private def toLongWithRange( | ||
| fieldName: IntervalUnit, | ||
| s: String, | ||
|
|
@@ -250,46 +221,6 @@ object IntervalUtils { | |
| } | ||
| } | ||
|
|
||
| def fromUnitStrings(units: Array[IntervalUnit], values: Array[String]): CalendarInterval = { | ||
| assert(units.length == values.length) | ||
| var months: Int = 0 | ||
| var days: Int = 0 | ||
| var microseconds: Long = 0 | ||
| var i = 0 | ||
| while (i < units.length) { | ||
| try { | ||
| units(i) match { | ||
| case YEAR => | ||
| months = Math.addExact(months, Math.multiplyExact(values(i).toInt, 12)) | ||
| case MONTH => | ||
| months = Math.addExact(months, values(i).toInt) | ||
| case WEEK => | ||
| days = Math.addExact(days, Math.multiplyExact(values(i).toInt, 7)) | ||
| case DAY => | ||
| days = Math.addExact(days, values(i).toInt) | ||
| case HOUR => | ||
| val hoursUs = Math.multiplyExact(values(i).toLong, MICROS_PER_HOUR) | ||
| microseconds = Math.addExact(microseconds, hoursUs) | ||
| case MINUTE => | ||
| val minutesUs = Math.multiplyExact(values(i).toLong, MICROS_PER_MINUTE) | ||
| microseconds = Math.addExact(microseconds, minutesUs) | ||
| case SECOND => | ||
| microseconds = Math.addExact(microseconds, parseSecondNano(values(i))) | ||
| case MILLISECOND => | ||
| val millisUs = Math.multiplyExact(values(i).toLong, MICROS_PER_MILLIS) | ||
| microseconds = Math.addExact(microseconds, millisUs) | ||
| case MICROSECOND => | ||
| microseconds = Math.addExact(microseconds, values(i).toLong) | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| throw new IllegalArgumentException(s"Error parsing interval string: ${e.getMessage}", e) | ||
| } | ||
| i += 1 | ||
| } | ||
| new CalendarInterval(months, days, microseconds) | ||
| } | ||
|
|
||
| // Parses a string with nanoseconds, truncates the result and returns microseconds | ||
| private def parseNanos(nanosStr: String, isNegative: Boolean): Long = { | ||
| if (nanosStr != null) { | ||
|
|
@@ -305,30 +236,6 @@ object IntervalUtils { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Parse second_nano string in ss.nnnnnnnnn format to microseconds | ||
| */ | ||
| private def parseSecondNano(secondNano: String): Long = { | ||
| def parseSeconds(secondsStr: String): Long = { | ||
| toLongWithRange( | ||
| SECOND, | ||
| secondsStr, | ||
| Long.MinValue / MICROS_PER_SECOND, | ||
| Long.MaxValue / MICROS_PER_SECOND) * MICROS_PER_SECOND | ||
| } | ||
|
|
||
| secondNano.split("\\.") match { | ||
| case Array(secondsStr) => parseSeconds(secondsStr) | ||
| case Array("", nanosStr) => parseNanos(nanosStr, false) | ||
| case Array(secondsStr, nanosStr) => | ||
| val seconds = parseSeconds(secondsStr) | ||
| Math.addExact(seconds, parseNanos(nanosStr, seconds < 0)) | ||
| case _ => | ||
| throw new IllegalArgumentException( | ||
| "Interval string does not match second-nano format of ss.nnnnnnnnn") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets interval duration | ||
| * | ||
|
|
@@ -452,20 +359,40 @@ object IntervalUtils { | |
| private final val millisStr = unitToUtf8(MILLISECOND) | ||
| private final val microsStr = unitToUtf8(MICROSECOND) | ||
|
|
||
| /** | ||
| * A safe version of `stringToInterval`. It returns null for invalid input string. | ||
| */ | ||
| def safeStringToInterval(input: UTF8String): CalendarInterval = { | ||
| try { | ||
| stringToInterval(input) | ||
| } catch { | ||
| case _: IllegalArgumentException => null | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Converts a string to [[CalendarInterval]] case-insensitively. | ||
| * | ||
| * @throws IllegalArgumentException if the input string is not in valid interval format. | ||
| */ | ||
| def stringToInterval(input: UTF8String): CalendarInterval = { | ||
| import ParseState._ | ||
| var state = PREFIX | ||
| def throwIAE(msg: String, e: Exception = null) = { | ||
| throw new IllegalArgumentException(s"Error parsing interval, $msg", e) | ||
| } | ||
|
|
||
| if (input == null) { | ||
| return null | ||
| throwIAE("interval string cannot be null") | ||
| } | ||
| // scalastyle:off caselocale .toLowerCase | ||
| val s = input.trim.toLowerCase | ||
| // scalastyle:on | ||
| val bytes = s.getBytes | ||
| if (bytes.isEmpty) { | ||
| return null | ||
| throwIAE("interval string cannot be empty") | ||
| } | ||
| var state = PREFIX | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| var i = 0 | ||
| var currentValue: Long = 0 | ||
| var isNegative: Boolean = false | ||
|
|
@@ -482,13 +409,17 @@ object IntervalUtils { | |
| } | ||
| } | ||
|
|
||
| def nextWord: UTF8String = { | ||
| s.substring(i, s.numBytes()).subStringIndex(UTF8String.blankString(1), 1) | ||
|
||
| } | ||
|
|
||
| while (i < bytes.length) { | ||
| val b = bytes(i) | ||
| state match { | ||
| case PREFIX => | ||
| if (s.startsWith(intervalStr)) { | ||
| if (s.numBytes() == intervalStr.numBytes()) { | ||
| return null | ||
| throwIAE("interval string cannot be empty") | ||
| } else { | ||
| i += intervalStr.numBytes() | ||
| } | ||
|
|
@@ -518,10 +449,10 @@ object IntervalUtils { | |
| isNegative = false | ||
| case '.' => | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| isNegative = false | ||
| fractionScale = (NANOS_PER_SECOND / 10).toInt | ||
| i += 1 | ||
| fractionScale = (NANOS_PER_SECOND / 10).toInt | ||
| state = VALUE_FRACTIONAL_PART | ||
| case _ => return null | ||
| case _ => throwIAE( s"unrecognized sign '$nextWord'") | ||
yaooqinn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| case TRIM_BEFORE_VALUE => trimToNextState(b, VALUE) | ||
| case VALUE => | ||
|
|
@@ -530,13 +461,13 @@ object IntervalUtils { | |
| try { | ||
| currentValue = Math.addExact(Math.multiplyExact(10, currentValue), (b - '0')) | ||
| } catch { | ||
| case _: ArithmeticException => return null | ||
| case e: ArithmeticException => throwIAE(e.getMessage, e) | ||
| } | ||
| case ' ' => state = TRIM_BEFORE_UNIT | ||
| case '.' => | ||
| fractionScale = (NANOS_PER_SECOND / 10).toInt | ||
| state = VALUE_FRACTIONAL_PART | ||
| case _ => return null | ||
| case _ => throwIAE(s"invalid value '$nextWord'") | ||
| } | ||
| i += 1 | ||
| case VALUE_FRACTIONAL_PART => | ||
|
|
@@ -547,14 +478,16 @@ object IntervalUtils { | |
| case ' ' => | ||
| fraction /= NANOS_PER_MICROS.toInt | ||
| state = TRIM_BEFORE_UNIT | ||
| case _ => return null | ||
| case _ if '0' <= b && b <= '9' => | ||
| throwIAE(s"invalid value fractional part '$fraction$nextWord' out of range") | ||
|
||
| case _ => throwIAE(s"invalid value '$nextWord' in fractional part") | ||
| } | ||
| i += 1 | ||
| case TRIM_BEFORE_UNIT => trimToNextState(b, UNIT_BEGIN) | ||
| case UNIT_BEGIN => | ||
| // Checks that only seconds can have the fractional part | ||
| if (b != 's' && fractionScale >= 0) { | ||
| return null | ||
| throwIAE(s"'$nextWord' with fractional part is unsupported") | ||
yaooqinn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| if (isNegative) { | ||
| currentValue = -currentValue | ||
|
|
@@ -598,26 +531,26 @@ object IntervalUtils { | |
| } else if (s.matchAt(microsStr, i)) { | ||
| microseconds = Math.addExact(microseconds, currentValue) | ||
| i += microsStr.numBytes() | ||
| } else return null | ||
| case _ => return null | ||
| } else throwIAE(s"invalid unit '$nextWord'") | ||
| case _ => throwIAE(s"invalid unit '$nextWord'") | ||
| } | ||
| } catch { | ||
| case _: ArithmeticException => return null | ||
| case e: ArithmeticException => throwIAE(e.getMessage, e) | ||
| } | ||
| state = UNIT_SUFFIX | ||
| case UNIT_SUFFIX => | ||
| b match { | ||
| case 's' => state = UNIT_END | ||
| case ' ' => state = TRIM_BEFORE_SIGN | ||
| case _ => return null | ||
| case _ => throwIAE(s"invalid unit suffix '$nextWord'") | ||
|
||
| } | ||
| i += 1 | ||
| case UNIT_END => | ||
| b match { | ||
| case ' ' => | ||
| i += 1 | ||
| state = TRIM_BEFORE_SIGN | ||
| case _ => return null | ||
| case _ => throwIAE(s"invalid unit suffix '$nextWord'") | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.