Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.fastParseToMicros
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -216,7 +217,7 @@ class JacksonParser(
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
Try(fastParseToMicros(options.timestampFormat, stringValue, options.timeZone))
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package org.apache.spark.sql.catalyst.util
import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.time.Instant
import java.util.{Calendar, Locale, TimeZone}
import java.util.{Calendar, GregorianCalendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

import scala.annotation.tailrec

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -1164,4 +1166,30 @@ object DateTimeUtils {
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}

class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) {
def getMicros(): SQLTimestamp = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some comments to explain the behavior? Seems to me it's

  1. for .1, it's 1000 microseconds
  2. for .1234, it's 1234 microseconds
  3. for .1234567, it's 123456 microseconds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a simple rule? The rule for interval is pretty simple: adding 0 at the end until the second fraction has 9 digits, then parse the 9 digits to nanoseconds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a simple rule?

I haven't found simpler approach for now. The difference between interval and timestamp is the former one may have time zone at the end or anything else. We cannot say to users don't use the pattern like mm:ss.SSSSSSXXX yyyy/MM/dd

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what we get here is the milliseconds FastDateFormat extracts from the string. I believe FastDateFormat can handle the part after seconds. e.g. 12:12:12.1234Z, the milliseconds part should be 1234.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some comments to explain the behavior? Seems to me it's
for .1, it's 1000 microseconds

This is 100 * 1000 microsecond but SimpleDateFormat and FastDateFormat have weird behavior. The example below on 2.4 without my changes:

scala> val df = Seq("""{"a":"2019-10-14T09:39:07.1Z"}""").toDF
df: org.apache.spark.sql.DataFrame = [value: string]

scala> val res = df.select(from_json('value, schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SXXX")))
res: org.apache.spark.sql.DataFrame = [jsontostructs(value): struct<a: timestamp>]

scala> res.show(false)
+-------------------------+
|jsontostructs(value)     |
+-------------------------+
|[2019-10-14 12:39:07.001]|
+-------------------------+
scala> val res = df.select(from_json('value, schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")))
res: org.apache.spark.sql.DataFrame = [jsontostructs(value): struct<a: timestamp>]

scala> res.show(false)
+-------------------------+
|jsontostructs(value)     |
+-------------------------+
|[2019-10-14 12:39:07.001]|
+-------------------------+

So .1 cannot be parsed correctly only 0.100:

scala> val df = Seq("""{"a":"2019-10-14T09:39:07.100Z"}""").toDF
df: org.apache.spark.sql.DataFrame = [value: string]

scala> val res = df.select(from_json('value, schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")))
res: org.apache.spark.sql.DataFrame = [jsontostructs(value): struct<a: timestamp>]

scala> res.show(false)
+-----------------------+
|jsontostructs(value)   |
+-----------------------+
|[2019-10-14 12:39:07.1]|
+-----------------------+

Copy link
Member Author

@MaxGekk MaxGekk Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I see in the source code of SimpleDateFormat, it just casts the fraction part to int. .001 and .01 are the same and equal to 1.

Copy link
Member Author

@MaxGekk MaxGekk Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this following check fails:

    check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
      "2019-10-14T09:39:07.000010Z", "2019-10-14T09:39:07.000010Z")
Expected :1571045947000010
Actual   :1571045947010000

because.000010 is parsed to 10 inside of SimpleDateFormat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit 86ac2b2 should fix that

var fraction = fields(Calendar.MILLISECOND)
if (fraction < MICROS_PER_MILLIS) {
fraction *= MICROS_PER_MILLIS.toInt
} else if (fraction >= MICROS_PER_SECOND) {
do {
fraction /= 10
} while (fraction >= MICROS_PER_SECOND)
}
fraction
}
}

def fastParseToMicros(parser: FastDateFormat, s: String, tz: TimeZone): SQLTimestamp = {
val pos = new java.text.ParsePosition(0)
val cal = new MicrosCalendar(tz)
cal.clear()
if (!parser.parse(s, pos, cal)) {
throw new IllegalArgumentException(s)
}
val micros = cal.getMicros()
cal.set(Calendar.MILLISECOND, 0)
cal.getTimeInMillis * MICROS_PER_MILLIS + micros
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -692,4 +694,23 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
}
}

test("fast parse to micros") {
val timeZone = TimeZoneUTC
def check(pattern: String, input: String, reference: String): Unit = {
val parser = FastDateFormat.getInstance(pattern, timeZone, Locale.US)
val expected = DateTimeUtils.stringToTimestamp(
UTF8String.fromString(reference), timeZone).get
val actual = fastParseToMicros(parser, input, timeZone)
assert(actual === expected)
}
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX",
"2019-10-14T09:39:07.3220000Z", "2019-10-14T09:39:07.322Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this behavior the same with master branch?

check("yyyy-MM-dd'T'HH:mm:ss.SSSXXX",
"2019-10-14T09:39:07.322Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
"2019-10-14T09:39:07.123456Z", "2019-10-14T09:39:07.123456Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSXXX",
"2019-10-14T09:39:07.123Z", "2019-10-14T09:39:07.123Z")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("from_json - timestamp in micros") {
val df = Seq("""{"time": "1970-01-01T00:00:00.123456"}""").toDS()
val schema = new StructType().add("time", TimestampType)
val options = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")

checkAnswer(
df.select(from_json($"value", schema, options)),
Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456"))))
}
}