Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.types._

Expand All @@ -32,7 +33,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private val timestampParser = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

private val decimalParser = if (options.locale == Locale.US) {
// Special handling the default locale for backward compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ class CSVOptions(
// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd")
val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)

val timestampFormat: String =
parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX")
parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.types._

class UnivocityGenerator(
Expand All @@ -44,11 +45,13 @@ class UnivocityGenerator(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -86,11 +87,13 @@ class UnivocityParser(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

private val csvFilters = new CSVFilters(filters, requiredSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val tf = if (formatter.isEmpty) {
TimestampFormatter(format.toString, zoneId)
TimestampFormatter.withStrongLegacy(format.toString, zoneId)
} else {
formatter.get
}
Expand All @@ -645,7 +645,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
defineCodeGen(ctx, ev, (timestamp, format) => {
s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $zid)
s"""UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid)
.format($timestamp))"""
})
}
Expand Down Expand Up @@ -688,7 +688,7 @@ case class ToUnixTimestamp(
copy(timeZoneId = Option(timeZoneId))

def this(time: Expression) = {
this(time, Literal("uuuu-MM-dd HH:mm:ss"))
this(time, Literal(TimestampFormatter.defaultPattern))
}

override def prettyName: String = "to_unix_timestamp"
Expand Down Expand Up @@ -732,7 +732,7 @@ case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Op
copy(timeZoneId = Option(timeZoneId))

def this(time: Expression) = {
this(time, Literal("uuuu-MM-dd HH:mm:ss"))
this(time, Literal(TimestampFormatter.defaultPattern))
}

def this() = {
Expand All @@ -758,7 +758,7 @@ abstract class ToTimestamp
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, zoneId)
TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId)
} catch {
case NonFatal(_) => null
}
Expand Down Expand Up @@ -791,7 +791,7 @@ abstract class ToTimestamp
} else {
val formatString = f.asInstanceOf[UTF8String].toString
try {
TimestampFormatter(formatString, zoneId).parse(
TimestampFormatter.withStrongLegacy(formatString, zoneId).parse(
t.asInstanceOf[UTF8String].toString) / downScaleFactor
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -831,12 +831,11 @@ abstract class ToTimestamp
}
case StringType =>
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val locale = ctx.addReferenceObj("locale", Locale.US)
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (string, format) => {
s"""
try {
${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, $locale)
${ev.value} = $tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid)
.parse($string.toString()) / $downScaleFactor;
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
Expand Down Expand Up @@ -908,7 +907,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
override def prettyName: String = "from_unixtime"

def this(unix: Expression) = {
this(unix, Literal("uuuu-MM-dd HH:mm:ss"))
this(unix, Literal(TimestampFormatter.defaultPattern))
}

override def dataType: DataType = StringType
Expand All @@ -922,7 +921,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, zoneId)
TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId)
} catch {
case NonFatal(_) => null
}
Expand All @@ -948,7 +947,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
UTF8String.fromString(TimestampFormatter(f.toString, zoneId)
UTF8String.fromString(TimestampFormatter.withStrongLegacy(f.toString, zoneId)
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -980,12 +979,11 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
}
} else {
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val locale = ctx.addReferenceObj("locale", Locale.US)
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (seconds, f) => {
s"""
try {
${ev.value} = UTF8String.fromString($tf$$.MODULE$$.apply($f.toString(), $zid, $locale).
${ev.value} = UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($f.toString(), $zid).
format($seconds * 1000000L));
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ private[sql] class JSONOptions(
val zoneId: ZoneId = DateTimeUtils.getZoneId(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd")
val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)

val timestampFormat: String =
parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX")
parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -80,11 +81,13 @@ private[sql] class JacksonGenerator(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -58,11 +59,13 @@ class JacksonParser(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -40,7 +41,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

/**
* Infer the type of a collection of json records in three stages:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.sql.catalyst.util

import java.text.SimpleDateFormat
import java.time.{LocalDate, ZoneId}
import java.util.Locale
import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

Expand Down Expand Up @@ -51,41 +52,76 @@ class Iso8601DateFormatter(
}
}

class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
@transient
private lazy val format = FastDateFormat.getInstance(pattern, locale)
trait LegacyDateFormatter extends DateFormatter {
def parseToDate(s: String): Date
def formatDate(d: Date): String

override def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
val milliseconds = parseToDate(s).getTime
DateTimeUtils.millisToDays(milliseconds)
}

override def format(days: Int): String = {
val date = DateTimeUtils.toJavaDate(days)
format.format(date)
formatDate(date)
}
}

class LegacyFastDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter {
@transient
private lazy val fdf = FastDateFormat.getInstance(pattern, locale)
override def parseToDate(s: String): Date = fdf.parse(s)
def formatDate(d: Date): String = fdf.format(d)
}

class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter {
@transient
private lazy val sdf = new SimpleDateFormat(pattern, locale)
override def parseToDate(s: String): Date = sdf.parse(s)
def formatDate(d: Date): String = sdf.format(d)
}

object DateFormatter {
import LegacyDateFormats._

val defaultLocale: Locale = Locale.US

def apply(format: String, zoneId: ZoneId, locale: Locale): DateFormatter = {
def defaultPattern(): String = {
if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd"
}

private def getFormatter(
format: Option[String],
zoneId: ZoneId,
locale: Locale = defaultLocale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = {

val pattern = format.getOrElse(defaultPattern)
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyDateFormatter(format, locale)
legacyFormat match {
case FAST_DATE_FORMAT =>
new LegacyFastDateFormatter(pattern, locale)
case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
new LegacySimpleDateFormatter(pattern, locale)
}
} else {
new Iso8601DateFormatter(format, zoneId, locale)
new Iso8601DateFormatter(pattern, zoneId, locale)
}
}

def apply(
format: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): DateFormatter = {
getFormatter(Some(format), zoneId, locale, legacyFormat)
}

def apply(format: String, zoneId: ZoneId): DateFormatter = {
apply(format, zoneId, defaultLocale)
getFormatter(Some(format), zoneId)
}

def apply(zoneId: ZoneId): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyDateFormatter("yyyy-MM-dd", defaultLocale)
} else {
new Iso8601DateFormatter("uuuu-MM-dd", zoneId, defaultLocale)
}
getFormatter(None, zoneId)
}
}
Loading