Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getDateTimeParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand All @@ -53,6 +54,9 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

@transient private lazy val timestampParser =
getDateTimeParser(options.timestampFormat, options.timeZone)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -213,15 +217,12 @@ class JacksonParser(
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
Try(timestampParser.parse(stringValue)).getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
}

case VALUE_NUMBER_INT =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.sql.catalyst.util

import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.text.{DateFormat, ParsePosition, SimpleDateFormat}
import java.time.Instant
import java.util.{Calendar, Locale, TimeZone}
import java.util.{Calendar, GregorianCalendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

import scala.annotation.tailrec

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -1164,4 +1167,27 @@ object DateTimeUtils {
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}

class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) {
def getMicros(digitsInFraction: Int): SQLTimestamp = {
val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND
d / Decimal.POW_10(digitsInFraction)
}
}

class DateTimeParser(format: FastDateFormat, digitsInFraction: Int, cal: MicrosCalendar) {
def parse(s: String): SQLTimestamp = {
cal.clear()
if (!format.parse(s, new ParsePosition(0), cal)) {
throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
}
val micros = cal.getMicros(digitsInFraction)
cal.set(Calendar.MILLISECOND, 0)
cal.getTimeInMillis * MICROS_PER_MILLIS + micros
}
}

def getDateTimeParser(format: FastDateFormat, tz: TimeZone): DateTimeParser = {
new DateTimeParser(format, format.getPattern.count(_ == 'S'), new MicrosCalendar(tz))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ object Decimal {
/** Maximum number of decimal digits a Long can represent */
val MAX_LONG_DIGITS = 18

private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)
val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)

private val BIG_DEC_ZERO = BigDecimal(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -692,4 +694,31 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
}
}

test("fast parse to micros") {
val timeZone = TimeZoneUTC
def check(pattern: String, input: String, reference: String): Unit = {
val parser = getDateTimeParser(
FastDateFormat.getInstance(pattern, timeZone, Locale.US),
timeZone)
val expected = DateTimeUtils.stringToTimestamp(
UTF8String.fromString(reference), timeZone).get
val actual = parser.parse(input)
assert(actual === expected)
}
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX",
"2019-10-14T09:39:07.3220000Z", "2019-10-14T09:39:07.322Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this behavior the same with master branch?

check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
"2019-10-14T09:39:07.322000Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSX",
"2019-10-14T09:39:07.322Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.123456Z", "2019-10-14T09:39:07.123456Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.000010Z", "2019-10-14T09:39:07.000010Z")
check("yyyy-MM-dd'T'HH:mm:ss.SX",
"2019-10-14T09:39:07.1Z", "2019-10-14T09:39:07.1Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSX",
"2019-10-14T09:39:07.10Z", "2019-10-14T09:39:07.1Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some negative tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a test like xxx.123 with format .SS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just returns invalid result xxx+1.23. For example:
"2019-10-14T09:39:07.123Z" -> "2019-10-14T09:39:08.23Z". I can add such test but I don't know what it aims to validate.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getDateTimeParser
import org.apache.spark.sql.execution.datasources.FailureSafeParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -77,6 +78,9 @@ class UnivocityParser(

private val row = new GenericInternalRow(requiredSchema.length)

@transient private lazy val timestampParser =
getDateTimeParser(options.timestampFormat, options.timeZone)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
Expand Down Expand Up @@ -156,10 +160,7 @@ class UnivocityParser(

case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Try(options.timestampFormat.parse(datum).getTime * 1000L)
.getOrElse {
Try(timestampParser.parse(datum)).getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(datum).getTime * 1000L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("from_json - timestamp in micros") {
val df = Seq("""{"time": "1970-01-01T00:00:00.123456"}""").toDS()
val schema = new StructType().add("time", TimestampType)
val options = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")

checkAnswer(
df.select(from_json($"value", schema, options)),
Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456"))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

class UnivocityParserSuite extends SparkFunSuite {
private val parser = new UnivocityParser(
StructType(Seq.empty),
new CSVOptions(Map.empty[String, String], false, "GMT"))
private def getParser(options: CSVOptions) = {
new UnivocityParser(StructType(Seq.empty), options)
}

private def assertNull(v: Any) = assert(v == null)

Expand All @@ -40,8 +40,10 @@ class UnivocityParserSuite extends SparkFunSuite {
stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
val decimalValue = new BigDecimal(decimalVal.toString)
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) ===
Decimal(decimalValue, decimalType.precision, decimalType.scale))
assert(
getParser(options)
.makeConverter("_1", decimalType, options = options)
.apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale))
}
}

Expand All @@ -53,22 +55,23 @@ class UnivocityParserSuite extends SparkFunSuite {
types.foreach { t =>
// Tests that a custom nullValue.
val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), false, "GMT")
val converter =
parser.makeConverter("_1", t, nullable = true, options = nullValueOptions)
val converter = getParser(nullValueOptions)
.makeConverter("_1", t, nullable = true, options = nullValueOptions)
assertNull(converter.apply("-"))
assertNull(converter.apply(null))

// Tests that the default nullValue is empty string.
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val parser = getParser(options)
assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply(""))
}

// Not nullable field with nullValue option.
types.foreach { t =>
// Casts a null to not nullable field should throw an exception.
val options = new CSVOptions(Map("nullValue" -> "-"), false, "GMT")
val converter =
parser.makeConverter("_1", t, nullable = false, options = options)
val converter = getParser(options)
.makeConverter("_1", t, nullable = false, options = options)
var message = intercept[RuntimeException] {
converter.apply("-")
}.getMessage
Expand All @@ -83,22 +86,25 @@ class UnivocityParserSuite extends SparkFunSuite {
// null.
Seq(true, false).foreach { b =>
val options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT")
val converter =
parser.makeConverter("_1", StringType, nullable = b, options = options)
val converter = getParser(options)
.makeConverter("_1", StringType, nullable = b, options = options)
assert(converter.apply("") == UTF8String.fromString(""))
}
}

test("Throws exception for empty string with non null type") {
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val exception = intercept[RuntimeException]{
parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("")
getParser(options)
.makeConverter("_1", IntegerType, nullable = false, options = options)
.apply("")
}
assert(exception.getMessage.contains("null value found but field _1 is not nullable."))
}

test("Types are cast correctly") {
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val parser = getParser(options)
assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10)
assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10)
assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10)
Expand All @@ -111,17 +117,17 @@ class UnivocityParserSuite extends SparkFunSuite {
new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, "GMT")
val customTimestamp = "31/01/2015 00:00"
val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
val castedTimestamp =
parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
.apply(customTimestamp)
val castedTimestamp = getParser(timestampsOptions)
.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
.apply(customTimestamp)
assert(castedTimestamp == expectedTime * 1000L)

val customDate = "31/01/2015"
val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT")
val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
val castedDate =
parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
.apply(customTimestamp)
val castedDate = getParser(dateOptions)
.makeConverter("_1", DateType, nullable = true, options = dateOptions)
.apply(customTimestamp)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate))

val timestamp = "2015-01-01 00:00:00"
Expand All @@ -138,7 +144,7 @@ class UnivocityParserSuite extends SparkFunSuite {
types.foreach { dt =>
input.foreach { v =>
val message = intercept[NumberFormatException] {
parser.makeConverter("_1", dt, options = options).apply(v)
getParser(options).makeConverter("_1", dt, options = options).apply(v)
}.getMessage
assert(message.contains(v))
}
Expand All @@ -147,7 +153,7 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Float NaN values are parsed correctly") {
val options = new CSVOptions(Map("nanValue" -> "nn"), false, "GMT")
val floatVal: Float = parser.makeConverter(
val floatVal: Float = getParser(options).makeConverter(
"_1", FloatType, nullable = true, options = options
).apply("nn").asInstanceOf[Float]

Expand All @@ -158,7 +164,7 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Double NaN values are parsed correctly") {
val options = new CSVOptions(Map("nanValue" -> "-"), false, "GMT")
val doubleVal: Double = parser.makeConverter(
val doubleVal: Double = getParser(options).makeConverter(
"_1", DoubleType, nullable = true, options = options
).apply("-").asInstanceOf[Double]

Expand All @@ -167,14 +173,14 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Float infinite values can be parsed") {
val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT")
val floatVal1 = parser.makeConverter(
val floatVal1 = getParser(negativeInfOptions).makeConverter(
"_1", FloatType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Float]

assert(floatVal1 == Float.NegativeInfinity)

val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT")
val floatVal2 = parser.makeConverter(
val floatVal2 = getParser(positiveInfOptions).makeConverter(
"_1", FloatType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Float]

Expand All @@ -183,18 +189,17 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Double infinite values can be parsed") {
val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT")
val doubleVal1 = parser.makeConverter(
val doubleVal1 = getParser(negativeInfOptions).makeConverter(
"_1", DoubleType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Double]

assert(doubleVal1 == Double.NegativeInfinity)

val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT")
val doubleVal2 = parser.makeConverter(
val doubleVal2 = getParser(positiveInfOptions).makeConverter(
"_1", DoubleType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Double]

assert(doubleVal2 == Double.PositiveInfinity)
}

}