Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ public InternalRow copy() {
row.setInt(i, getInt(i));
} else if (dt instanceof TimestampType) {
row.setLong(i, getLong(i));
} else if (dt instanceof CalendarIntervalType) {
row.update(i, getInterval(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,10 @@ case class DataSource(
* Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
*/
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
if (providingClass != classOf[ParquetFileFormat]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there's no cleaner way to do this than an 'instanceof'-style check? it's done a few other places here, so maybe.

if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
}

providingInstance() match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ class ParquetFileFormat

case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _: CalendarIntervalType => true

case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
Expand Down Expand Up @@ -325,6 +325,26 @@ private[parquet] class ParquetRowConverter(
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})

case CalendarIntervalType
if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY =>
new ParquetPrimitiveConverter(updater) {
override def addBinary(value: Binary): Unit = {
assert(
value.length() == 12,
"Intervals are expected to be stored in 12-byte fixed len byte array, " +
s"but got a ${value.length()}-byte array.")

val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val milliseconds = buf.getInt
var microseconds = milliseconds * DateTimeUtils.MICROS_PER_MILLIS
val days = buf.getInt
val daysInUs = Math.multiplyExact(days, DateTimeUtils.MICROS_PER_DAY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet stores # of days as a separated field because one logical day interval can be 23 or 24 or 25 hours in case of daylight saving. If we convert parquet interval to Spark interval, it's not a truncation but losing information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be fixed only if we change structure of CalendarInterval but such modifications are almost orthogonal to this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't change CalendarInterval, hm, how can we handle the different structure of a Parquet interval without getting it wrong in some cases?

As in the other PR, another option is to refuse to read/write intervals that are longer than a day, I guess?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. According to the SQL standard, hours must be in the range of 0-23
  2. We already loose the information while converting an interval string to a CalendarInterval value:
spark-sql> select interval 1 day 25 hours;
interval 2 days 1 hours

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's quite the issue. If a Parquet INTERVAL of 1 day is stored as "1 day", then adding it to a date will always produce the same time the next day. If we don't represent days separately in CalendarInterval, the 1 day is stored as "86400000000 µs" (right?) Adding that will usually, but not always, produce the same time the next day.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to defend another side :-) but the consequence of storing days separately means that hours are unbounded. In this way, interval 1 day 25 hours and interval 2 days 1 hours are represented differently in parquet - (0, 1, 90000000) and (0, 2, 3600000). As @cloud-fan wrote above, this can lead to different result while adding those intervals to 2 November 2019: 2019-11-02 + interval 1 day 25 hours = 2019-11-04 00:00:00 but 2019-11-02 + interval 2 days 1 hour = 2019-11-04 01:00:00.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's complicated. Those are actually semantically different intervals, so, I don't think it's a problem if they produce different results or are represented differently.

microseconds = Math.addExact(microseconds, daysInUs)
val months = buf.getInt
updater.set(new CalendarInterval(months, microseconds))
}
}

case t =>
throw new RuntimeException(
s"Unable to create Parquet converter for data type ${t.json} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class ParquetToSparkSchemaConverter(
case FIXED_LEN_BYTE_ARRAY =>
originalType match {
case DECIMAL => makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength))
case INTERVAL => typeNotImplemented()
case INTERVAL => CalendarIntervalType
case _ => illegalType()
}

Expand Down Expand Up @@ -553,6 +553,11 @@ class SparkToParquetSchemaConverter(
case udt: UserDefinedType[_] =>
convertField(field.copy(dataType = udt.sqlType))

case i: CalendarIntervalType =>
Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(12)
.as(INTERVAL)
.named(field.name)

case _ =>
throw new AnalysisException(s"Unsupported data type ${field.dataType.catalogString}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)

// Reusable byte array used to write intervals as Parquet FIXED_LEN_BYTE_ARRAY values
private val intervalBuffer = new Array[Byte](12)

// Reusable byte array used to write decimal values
private val decimalBuffer =
new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION))
Expand Down Expand Up @@ -207,7 +210,19 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {

case t: UserDefinedType[_] => makeWriter(t.sqlType)

// TODO Adds IntervalType support
case CalendarIntervalType =>
(row: SpecializedGetters, ordinal: Int) =>
val interval = row.getInterval(ordinal)
val microseconds = interval.microseconds % DateTimeUtils.MICROS_PER_DAY
val milliseconds: Int = (microseconds / DateTimeUtils.MICROS_PER_MILLIS).toInt
val days: Int = Math.toIntExact(interval.microseconds / DateTimeUtils.MICROS_PER_DAY)
val buf = ByteBuffer.wrap(intervalBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN)
.putInt(milliseconds)
.putInt(days)
.putInt(interval.months)
recordConsumer.addBinary(Binary.fromReusedByteArray(intervalBuffer))

case _ => sys.error(s"Unsupported data type $dataType.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ case class ParquetTable(

case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)

case _: CalendarIntervalType => true

case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") {
test("SPARK-24204 error handling for unsupported Interval data types - csv, json, orc") {
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath
// TODO: test file source V2 after write path is fixed.
Seq(true).foreach { useV1 =>
val useV1List = if (useV1) {
"csv,json,orc,parquet"
"csv,json,orc"
} else {
""
}
Expand All @@ -349,15 +349,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
// write path
Seq("csv", "json", "parquet", "orc").foreach { format =>
Seq("csv", "json", "orc").foreach { format =>
val msg = intercept[AnalysisException] {
sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
validateErrorMessage(msg)
}

// read path
Seq("parquet", "csv").foreach { format =>
Seq("csv").foreach { format =>
var msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
Expand Down Expand Up @@ -114,12 +114,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
| required int64 j(TIMESTAMP_MILLIS);
| required int64 k(TIMESTAMP_MICROS);
| required fixed_len_byte_array(12) l(INTERVAL);
|}
""".stripMargin)

val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
TimestampType, TimestampType)
TimestampType, TimestampType, CalendarIntervalType)

withTempPath { location =>
val path = new Path(location.getCanonicalPath)
Expand Down Expand Up @@ -735,7 +736,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

val dataTypes =
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType, CalendarIntervalType)

val constantValues =
Seq(
Expand All @@ -749,7 +750,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
0.75D,
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")),
CalendarInterval.fromString("interval 1 month 2 microsecond"))

dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,20 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS")
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
}

test("interval written and read as Parquet INTERVAL") {
withTempPath { file =>
val df = spark.range(10)
.selectExpr("interval 100 years 1 month 10 second 1 millisecond as i")
df.write.parquet(file.getCanonicalPath)
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
}
}
}
}

class ParquetV1QuerySuite extends ParquetQuerySuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,17 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
writeLegacyParquetFormat = true,
outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)

testSchema(
"Interval written and read as fixed_len_byte_array(12) with INTERVAL",
StructType(Seq(StructField("f1", CalendarIntervalType))),
"""message root {
| optional fixed_len_byte_array(12) f1 (INTERVAL);
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = false,
writeLegacyParquetFormat = true)

private def testSchemaClipping(
testName: String,
parquetSchema: String,
Expand Down