diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index f0dab8095fc75..ff6b106d93d1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -169,8 +169,7 @@ object DateTimeUtils { */ def fromJulianDay(days: Int, nanos: Long): Long = { // use Long to avoid rounding errors - val micros = (days - JULIAN_DAY_OF_EPOCH).toLong * MICROS_PER_DAY + nanos / NANOS_PER_MICROS - rebaseJulianToGregorianMicros(micros) + (days - JULIAN_DAY_OF_EPOCH).toLong * MICROS_PER_DAY + nanos / NANOS_PER_MICROS } /** @@ -179,7 +178,7 @@ object DateTimeUtils { * Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive). */ def toJulianDay(micros: Long): (Int, Long) = { - val julianUs = rebaseGregorianToJulianMicros(micros) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY + val julianUs = micros + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY val days = julianUs / MICROS_PER_DAY val us = julianUs % MICROS_PER_DAY (days.toInt, MICROSECONDS.toNanos(us)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 319387fe854cf..73681615e0db1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2640,6 +2640,20 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE = + buildConf("spark.sql.legacy.parquet.int96RebaseModeInWrite") + .internal() + .doc("When LEGACY, which is the default, Spark will rebase INT96 timestamps from " + + "Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when " + + "writing Parquet files. When CORRECTED, Spark will not do rebase and write the timestamps" + + " as it is. When EXCEPTION, Spark will fail the writing if it sees ancient timestamps " + + "that are ambiguous between the two calendars.") + .version("3.1.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.LEGACY.toString) + val LEGACY_PARQUET_REBASE_MODE_IN_READ = buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead") .internal() @@ -2655,6 +2669,21 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ = + buildConf("spark.sql.legacy.parquet.int96RebaseModeInRead") + .internal() + .doc("When LEGACY, which is the default, Spark will rebase INT96 timestamps from " + + "the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when " + + "reading Parquet files. When CORRECTED, Spark will not do rebase and read the timestamps " + + "as it is. When EXCEPTION, Spark will fail the reading if it sees ancient timestamps " + + "that are ambiguous between the two calendars. This config is only effective if the " + + "writer info (like Spark, Hive) of the Parquet files is unknown.") + .version("3.1.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.LEGACY.toString) + val LEGACY_AVRO_REBASE_MODE_IN_WRITE = buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite") .internal() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index fe761f672c041..7bbdf44d78c3c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.catalyst.util.RebaseDateTime.rebaseJulianToGregorianMicros import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { @@ -70,17 +71,17 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { } test("us and julian day") { - val (d, ns) = toJulianDay(0) + val (d, ns) = toJulianDay(RebaseDateTime.rebaseGregorianToJulianMicros(0)) assert(d === JULIAN_DAY_OF_EPOCH) assert(ns === 0) - assert(fromJulianDay(d, ns) == 0L) + assert(rebaseJulianToGregorianMicros(fromJulianDay(d, ns)) == 0L) Seq(Timestamp.valueOf("2015-06-11 10:10:10.100"), Timestamp.valueOf("2015-06-11 20:10:10.100"), Timestamp.valueOf("1900-06-11 20:10:10.100")).foreach { t => - val (d, ns) = toJulianDay(fromJavaTimestamp(t)) + val (d, ns) = toJulianDay(RebaseDateTime.rebaseGregorianToJulianMicros(fromJavaTimestamp(t))) assert(ns > 0) - val t1 = toJavaTimestamp(fromJulianDay(d, ns)) + val t1 = toJavaTimestamp(rebaseJulianToGregorianMicros(fromJulianDay(d, ns))) assert(t.equals(t1)) } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 3e409ab9a50a1..1b8b18d4d8735 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -104,13 +104,15 @@ public class VectorizedColumnReader { private final ZoneId convertTz; private static final ZoneId UTC = ZoneOffset.UTC; private final String datetimeRebaseMode; + private final String int96RebaseMode; public VectorizedColumnReader( ColumnDescriptor descriptor, OriginalType originalType, PageReader pageReader, ZoneId convertTz, - String datetimeRebaseMode) throws IOException { + String datetimeRebaseMode, + String int96RebaseMode) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; this.convertTz = convertTz; @@ -136,6 +138,9 @@ public VectorizedColumnReader( assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) || "CORRECTED".equals(datetimeRebaseMode); this.datetimeRebaseMode = datetimeRebaseMode; + assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) || + "CORRECTED".equals(int96RebaseMode); + this.int96RebaseMode = int96RebaseMode; } /** @@ -189,10 +194,13 @@ static int rebaseDays(int julianDays, final boolean failIfRebase) { } } - static long rebaseMicros(long julianMicros, final boolean failIfRebase) { + private static long rebaseTimestamp( + long julianMicros, + final boolean failIfRebase, + final String format) { if (failIfRebase) { if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) { - throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + throw DataSourceUtils.newRebaseExceptionInRead(format); } else { return julianMicros; } @@ -201,6 +209,14 @@ static long rebaseMicros(long julianMicros, final boolean failIfRebase) { } } + static long rebaseMicros(long julianMicros, final boolean failIfRebase) { + return rebaseTimestamp(julianMicros, failIfRebase, "Parquet"); + } + + static long rebaseInt96(long julianMicros, final boolean failIfRebase) { + return rebaseTimestamp(julianMicros, failIfRebase, "Parquet INT96"); + } + /** * Reads `total` values from this columnReader into column. */ @@ -399,20 +415,44 @@ private void decodeDictionaryIds( break; case INT96: if (column.dataType() == DataTypes.TimestampType) { + final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode); if (!shouldConvertTimestamps()) { - for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + if ("CORRECTED".equals(int96RebaseMode)) { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + } + } + } else { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); + column.putLong(i, gregorianMicros); + } } } } else { - for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v); - long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC); - column.putLong(i, adjTime); + if ("CORRECTED".equals(int96RebaseMode)) { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + long gregorianMicros = ParquetRowConverter.binaryToSQLTimestamp(v); + long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC); + column.putLong(i, adjTime); + } + } + } else { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); + long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC); + column.putLong(i, adjTime); + } } } } @@ -577,25 +617,53 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th || DecimalType.isByteArrayDecimalType(column.dataType())) { defColumn.readBinarys(num, column, rowId, maxDefLevel, data); } else if (column.dataType() == DataTypes.TimestampType) { + final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode); if (!shouldConvertTimestamps()) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - // Read 12 bytes for INT96 - long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); - column.putLong(rowId + i, rawTime); - } else { - column.putNull(rowId + i); + if ("CORRECTED".equals(int96RebaseMode)) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + // Read 12 bytes for INT96 + long gregorianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); + column.putLong(rowId + i, gregorianMicros); + } else { + column.putNull(rowId + i); + } + } + } else { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + // Read 12 bytes for INT96 + long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); + column.putLong(rowId + i, gregorianMicros); + } else { + column.putNull(rowId + i); + } } } } else { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - // Read 12 bytes for INT96 - long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); - long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC); - column.putLong(rowId + i, adjTime); - } else { - column.putNull(rowId + i); + if ("CORRECTED".equals(int96RebaseMode)) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + // Read 12 bytes for INT96 + long gregorianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); + long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC); + column.putLong(rowId + i, adjTime); + } else { + column.putNull(rowId + i); + } + } + } else { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + // Read 12 bytes for INT96 + long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); + long gregorianMicros = rebaseInt96(julianMicros, failIfRebase); + long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC); + column.putLong(rowId + i, adjTime); + } else { + column.putNull(rowId + i); + } } } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index b40cc154d76fe..9d38a74a2956a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -93,6 +93,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private final String datetimeRebaseMode; + /** + * The mode of rebasing INT96 timestamp from Julian to Proleptic Gregorian calendar. + */ + private final String int96RebaseMode; + /** * columnBatch object that is used for batch decoding. This is created on first use and triggers * batched decoding. It is not valid to interleave calls to the batched interface with the row @@ -122,16 +127,21 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private final MemoryMode MEMORY_MODE; public VectorizedParquetRecordReader( - ZoneId convertTz, String datetimeRebaseMode, boolean useOffHeap, int capacity) { + ZoneId convertTz, + String datetimeRebaseMode, + String int96RebaseMode, + boolean useOffHeap, + int capacity) { this.convertTz = convertTz; this.datetimeRebaseMode = datetimeRebaseMode; + this.int96RebaseMode = int96RebaseMode; MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; this.capacity = capacity; } // For test only. public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) { - this(null, "CORRECTED", useOffHeap, capacity); + this(null, "CORRECTED", "LEGACY", useOffHeap, capacity); } /** @@ -320,8 +330,13 @@ private void checkEndOfRowGroup() throws IOException { columnReaders = new VectorizedColumnReader[columns.size()]; for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; - columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), - pages.getPageReader(columns.get(i)), convertTz, datetimeRebaseMode); + columnReaders[i] = new VectorizedColumnReader( + columns.get(i), + types.get(i).getOriginalType(), + pages.getPageReader(columns.get(i)), + convertTz, + datetimeRebaseMode, + int96RebaseMode); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index b4308a872bb39..f8068a634977b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -26,7 +26,7 @@ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.SparkUpgradeException -import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} +import org.apache.spark.sql.{SPARK_INT96_NO_REBASE, SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.util.RebaseDateTime @@ -111,13 +111,26 @@ object DataSourceUtils { }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) } - def newRebaseExceptionInRead(format: String): SparkUpgradeException = { - val config = if (format == "Parquet") { - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key - } else if (format == "Avro") { - SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key + def int96RebaseMode( + lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + LegacyBehaviorPolicy.CORRECTED + } else if (lookupFileMeta(SPARK_INT96_NO_REBASE) != null) { + LegacyBehaviorPolicy.CORRECTED + } else if (lookupFileMeta(SPARK_VERSION_METADATA_KEY) != null) { + LegacyBehaviorPolicy.LEGACY } else { - throw new IllegalStateException("unrecognized format " + format) + LegacyBehaviorPolicy.withName(modeByConfig) + } + } + + def newRebaseExceptionInRead(format: String): SparkUpgradeException = { + val config = format match { + case "Parquet INT96" => SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key + case "Parquet" => SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key + case "Avro" => SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key + case _ => throw new IllegalStateException("unrecognized format " + format) } new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or timestamps before " + s"1900-01-01T00:00:00Z from $format files can be ambiguous, as the files may be written by " + @@ -129,12 +142,11 @@ object DataSourceUtils { } def newRebaseExceptionInWrite(format: String): SparkUpgradeException = { - val config = if (format == "Parquet") { - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key - } else if (format == "Avro") { - SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key - } else { - throw new IllegalStateException("unrecognized format " + format) + val config = format match { + case "Parquet INT96" => SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key + case "Parquet" => SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key + case "Avro" => SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key + case _ => throw new IllegalStateException("unrecognized format " + format) } new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or timestamps before " + s"1900-01-01T00:00:00Z into $format files can be dangerous, as the files may be read by " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 68f49f9442579..95f19f9dcee64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -303,6 +303,9 @@ class ParquetFileFormat val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + val int96RebaseMode = DataSourceUtils.int96RebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = @@ -318,6 +321,7 @@ class ParquetFileFormat val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseMode.toString, + int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) @@ -336,7 +340,10 @@ class ParquetFileFormat logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( - convertTz, enableVectorizedReader = false, datetimeRebaseMode) + convertTz, + enableVectorizedReader = false, + datetimeRebaseMode, + int96RebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index a30d1c26b3b2d..e74872da0829d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -54,7 +54,8 @@ import org.apache.spark.sql.types._ class ParquetReadSupport( val convertTz: Option[ZoneId], enableVectorizedReader: Boolean, - datetimeRebaseMode: LegacyBehaviorPolicy.Value) + datetimeRebaseMode: LegacyBehaviorPolicy.Value, + int96RebaseMode: LegacyBehaviorPolicy.Value) extends ReadSupport[InternalRow] with Logging { private var catalystRequestedSchema: StructType = _ @@ -62,7 +63,11 @@ class ParquetReadSupport( // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz/rebaseDateTime value directly, // and the values here are ignored. - this(None, enableVectorizedReader = true, datetimeRebaseMode = LegacyBehaviorPolicy.CORRECTED) + this( + None, + enableVectorizedReader = true, + datetimeRebaseMode = LegacyBehaviorPolicy.CORRECTED, + int96RebaseMode = LegacyBehaviorPolicy.LEGACY) } /** @@ -131,7 +136,8 @@ class ParquetReadSupport( ParquetReadSupport.expandUDT(catalystRequestedSchema), new ParquetToSparkSchemaConverter(conf), convertTz, - datetimeRebaseMode) + datetimeRebaseMode, + int96RebaseMode) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index bb528d548b6ef..80763ef019b01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -35,17 +35,26 @@ import org.apache.spark.sql.types.StructType * @param convertTz the optional time zone to convert to int96 data * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian * calendar + * @param int96RebaseMode the mode of rebasing INT96 timestamp from Julian to Proleptic Gregorian + * calendar */ private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, convertTz: Option[ZoneId], - datetimeRebaseMode: LegacyBehaviorPolicy.Value) + datetimeRebaseMode: LegacyBehaviorPolicy.Value, + int96RebaseMode: LegacyBehaviorPolicy.Value) extends RecordMaterializer[InternalRow] { private val rootConverter = new ParquetRowConverter( - schemaConverter, parquetSchema, catalystSchema, convertTz, datetimeRebaseMode, NoopUpdater) + schemaConverter, + parquetSchema, + catalystSchema, + convertTz, + datetimeRebaseMode, + int96RebaseMode, + NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index e0008ed16d56d..6ef56af927129 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -123,6 +123,8 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param convertTz the optional time zone to convert to int96 data * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian * calendar + * @param int96RebaseMode the mode of rebasing INT96 timestamp from Julian to Proleptic Gregorian + * calendar * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( @@ -131,6 +133,7 @@ private[parquet] class ParquetRowConverter( catalystType: StructType, convertTz: Option[ZoneId], datetimeRebaseMode: LegacyBehaviorPolicy.Value, + int96RebaseMode: LegacyBehaviorPolicy.Value, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -187,6 +190,9 @@ private[parquet] class ParquetRowConverter( private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( datetimeRebaseMode, "Parquet") + private val int96RebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( + int96RebaseMode, "Parquet INT96") + // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false @@ -300,9 +306,10 @@ private[parquet] class ParquetRowConverter( new ParquetPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { - val rawTime = ParquetRowConverter.binaryToSQLTimestamp(value) - val adjTime = convertTz.map(DateTimeUtils.convertTz(rawTime, _, ZoneOffset.UTC)) - .getOrElse(rawTime) + val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value) + val gregorianMicros = int96RebaseFunc(julianMicros) + val adjTime = convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _, ZoneOffset.UTC)) + .getOrElse(gregorianMicros) updater.setLong(adjTime) } } @@ -363,6 +370,7 @@ private[parquet] class ParquetRowConverter( t, convertTz, datetimeRebaseMode, + int96RebaseMode, wrappedUpdater) case t => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 6c333671d59cb..b538c2f2493d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -31,7 +31,7 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.spark.SPARK_VERSION_SHORT import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} +import org.apache.spark.sql.{SPARK_INT96_NO_REBASE, SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -88,6 +88,12 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite( datetimeRebaseMode, "Parquet") + private val int96RebaseMode = LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)) + + private val int96RebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite( + int96RebaseMode, "Parquet INT96") + override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -115,6 +121,12 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } else { None } + } ++ { + if (int96RebaseMode == LegacyBehaviorPolicy.LEGACY) { + None + } else { + Some(SPARK_INT96_NO_REBASE -> "") + } } logInfo( @@ -193,7 +205,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { outputTimestampType match { case SQLConf.ParquetOutputTimestampType.INT96 => (row: SpecializedGetters, ordinal: Int) => - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) + val micros = int96RebaseFunc(row.getLong(ordinal)) + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(micros) val buf = ByteBuffer.wrap(timestampBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 3b482b0c8ab62..e4d5e9b2d9f6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -119,6 +119,7 @@ case class ParquetPartitionReaderFactory( buildReaderFunc: ( ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate], Option[ZoneId], + LegacyBehaviorPolicy.Value, LegacyBehaviorPolicy.Value) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value @@ -174,8 +175,17 @@ case class ParquetPartitionReaderFactory( val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + val int96RebaseMode = DataSourceUtils.int96RebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) val reader = buildReaderFunc( - split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, datetimeRebaseMode) + split, + file.partitionValues, + hadoopAttemptContext, + pushed, + convertTz, + datetimeRebaseMode, + int96RebaseMode) reader.initialize(split, hadoopAttemptContext) reader } @@ -190,12 +200,16 @@ case class ParquetPartitionReaderFactory( hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], convertTz: Option[ZoneId], - datetimeRebaseMode: LegacyBehaviorPolicy.Value): RecordReader[Void, InternalRow] = { + datetimeRebaseMode: LegacyBehaviorPolicy.Value, + int96RebaseMode: LegacyBehaviorPolicy.Value): RecordReader[Void, InternalRow] = { logDebug(s"Falling back to parquet-mr") val taskContext = Option(TaskContext.get()) // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( - convertTz, enableVectorizedReader = false, datetimeRebaseMode) + convertTz, + enableVectorizedReader = false, + datetimeRebaseMode, + int96RebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -221,11 +235,13 @@ case class ParquetPartitionReaderFactory( hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], convertTz: Option[ZoneId], - datetimeRebaseMode: LegacyBehaviorPolicy.Value): VectorizedParquetRecordReader = { + datetimeRebaseMode: LegacyBehaviorPolicy.Value, + int96RebaseMode: LegacyBehaviorPolicy.Value): VectorizedParquetRecordReader = { val taskContext = Option(TaskContext.get()) val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseMode.toString, + int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index c0397010acba3..011be6d69c576 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -60,4 +60,10 @@ package object sql { * values. */ private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime" + + /** + * Parquet file metadata key to indicate that the file with INT96 column type was written + * without rebasing. + */ + private[sql] val SPARK_INT96_NO_REBASE = "org.apache.spark.int96NoRebase" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index ff406f7bc62de..214f36a2df713 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -951,7 +951,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession rowFunc: Int => (String, String), toJavaType: String => T, checkDefaultLegacyRead: String => Unit, - tsOutputType: String = "TIMESTAMP_MICROS"): Unit = { + tsOutputType: String = "TIMESTAMP_MICROS", + inWriteConf: String = SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, + inReadConf: String = SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key): Unit = { withTempPaths(2) { paths => paths.foreach(_.delete()) val path2_4 = getResourceParquetFilePath("test-data/" + fileName) @@ -962,18 +964,20 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { checkDefaultLegacyRead(path2_4) // By default we should fail to write ancient datetime values. - val e = intercept[SparkException](df.write.parquet(path3_0)) - assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + if (tsOutputType != "INT96") { + val e = intercept[SparkException](df.write.parquet(path3_0)) + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + } + withSQLConf(inWriteConf -> CORRECTED.toString) { df.write.mode("overwrite").parquet(path3_0) } - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + withSQLConf(inWriteConf -> LEGACY.toString) { df.write.parquet(path3_0_rebase) } } // For Parquet files written by Spark 3.0, we know the writer info and don't need the // config to guide the rebase behavior. - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + withSQLConf(inReadConf -> LEGACY.toString) { checkAnswer( spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), (0 until N).flatMap { i => @@ -1015,15 +1019,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession java.sql.Timestamp.valueOf, checkDefaultRead, tsOutputType = "TIMESTAMP_MILLIS") - // INT96 is a legacy timestamp format and we always rebase the seconds for it. + } + } + Seq( + "2_4_5" -> successInRead _, + "2_4_6" -> successInRead _).foreach { case (version, checkDefaultRead) => + withAllParquetReaders { Seq("plain", "dict").foreach { enc => - checkAnswer(readResourceParquetFile( - s"test-data/before_1582_timestamp_int96_${enc}_v$version.snappy.parquet"), - Seq.tabulate(N) { i => - Row( - java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"), - java.sql.Timestamp.valueOf(s"1001-01-0${i + 1} 01:02:03.123456")) - }) + checkReadMixedFiles( + s"before_1582_timestamp_int96_${enc}_v$version.snappy.parquet", + "timestamp", + (i: Int) => ("1001-01-01 01:02:03.123456", s"1001-01-0${i + 1} 01:02:03.123456"), + java.sql.Timestamp.valueOf, + checkDefaultRead, + tsOutputType = "INT96", + inWriteConf = SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key, + inReadConf = SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key) } } } @@ -1033,15 +1044,31 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val N = 8 Seq(false, true).foreach { dictionaryEncoding => Seq( - ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"), - ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"), - ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456") - ).foreach { case (outType, tsStr, nonRebased) => + ( + "TIMESTAMP_MILLIS", + "1001-01-01 01:02:03.123", + "1001-01-07 01:09:05.123", + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key), + ( + "TIMESTAMP_MICROS", + "1001-01-01 01:02:03.123456", + "1001-01-07 01:09:05.123456", + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key), + ( + "INT96", + "1001-01-01 01:02:03.123456", + "1001-01-07 01:09:05.123456", + SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key, + SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key + ) + ).foreach { case (outType, tsStr, nonRebased, inWriteConf, inReadConf) => withClue(s"output type $outType") { withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) { withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + withSQLConf(inWriteConf -> LEGACY.toString) { Seq.tabulate(N)(_ => tsStr).toDF("tsS") .select($"tsS".cast("timestamp").as("ts")) .repartition(1) @@ -1054,8 +1081,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // The file metadata indicates if it needs rebase or not, so we can always get the // correct result regardless of the "rebase mode" config. Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf( - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) { + withSQLConf(inReadConf -> mode.toString) { checkAnswer( spark.read.parquet(path), Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) @@ -1136,6 +1162,30 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } } + + test("SPARK-33160: write the metadata key 'org.apache.spark.int96NoRebase'") { + def saveTs(dir: java.io.File): Unit = { + Seq(Timestamp.valueOf("1000-01-01 01:02:03")).toDF() + .repartition(1) + .write + .parquet(dir.getAbsolutePath) + } + withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + withTempPath { dir => + saveTs(dir) + assert(getMetaData(dir).get(SPARK_INT96_NO_REBASE).isEmpty) + } + } + withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + withTempPath { dir => + saveTs(dir) + assert(getMetaData(dir)(SPARK_INT96_NO_REBASE) === "") + } + } + withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { + withTempPath { dir => intercept[SparkException] { saveTs(dir) } } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)