-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34377][SQL] Add new parquet datasource options to control datetime rebasing in read #31489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
984d9ea
2b6bf15
83eb2d8
1f8d429
bf2b77e
db16d69
1bb9980
018b171
34f88de
f898288
eb77fc6
f33b5a8
ae8288c
ebc7298
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,14 +52,16 @@ import org.apache.spark.util.SerializableConfiguration | |
| * @param readDataSchema Required schema of Parquet files. | ||
| * @param partitionSchema Schema of partitions. | ||
| * @param filters Filters to be pushed down in the batch scan. | ||
| * @param parquetOptions The options of Parquet datasource that are set for the read. | ||
| */ | ||
| case class ParquetPartitionReaderFactory( | ||
| sqlConf: SQLConf, | ||
| broadcastedConf: Broadcast[SerializableConfiguration], | ||
| dataSchema: StructType, | ||
| readDataSchema: StructType, | ||
| partitionSchema: StructType, | ||
| filters: Array[Filter]) extends FilePartitionReaderFactory with Logging { | ||
| filters: Array[Filter], | ||
| parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with Logging { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it really work? The 2 fields of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah nvm, we read the confs and put it in |
||
| private val isCaseSensitive = sqlConf.caseSensitiveAnalysis | ||
| private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields) | ||
| private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled | ||
|
|
@@ -74,6 +76,8 @@ case class ParquetPartitionReaderFactory( | |
| private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal | ||
| private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith | ||
| private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold | ||
| private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead | ||
| private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead | ||
|
|
||
| override def supportColumnarReads(partition: InputPartition): Boolean = { | ||
| sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && | ||
|
|
@@ -174,10 +178,10 @@ case class ParquetPartitionReaderFactory( | |
| } | ||
| val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( | ||
| footerFileMetaData.getKeyValueMetaData.get, | ||
| SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) | ||
| datetimeRebaseModeInRead) | ||
| val int96RebaseMode = DataSourceUtils.int96RebaseMode( | ||
| footerFileMetaData.getKeyValueMetaData.get, | ||
| SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) | ||
| int96RebaseModeInRead) | ||
| val reader = buildReaderFunc( | ||
| split, | ||
| file.partitionValues, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException} | |
| import org.apache.spark.sql.{QueryTest, Row, SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96} | ||
| import org.apache.spark.sql.catalyst.util.DateTimeTestUtils | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, ParquetOutputTimestampType} | ||
| import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, EXCEPTION, LEGACY} | ||
| import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType | ||
| import org.apache.spark.sql.test.SharedSparkSession | ||
|
|
||
| abstract class ParquetRebaseDatetimeSuite | ||
|
|
@@ -99,6 +99,27 @@ abstract class ParquetRebaseDatetimeSuite | |
|
|
||
| protected def failInRead(path: String): Unit | ||
|
|
||
| private def inReadConfToOptions( | ||
| conf: String, | ||
| mode: LegacyBehaviorPolicy.Value): Map[String, String] = conf match { | ||
| case SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key => | ||
| Map(ParquetOptions.INT96_REBASE_MODE -> mode.toString) | ||
| case _ => Map(ParquetOptions.DATETIME_REBASE_MODE -> mode.toString) | ||
| } | ||
|
|
||
| private def runInMode( | ||
| conf: String, | ||
| modes: Seq[LegacyBehaviorPolicy.Value])(f: Map[String, String] => Unit): Unit = { | ||
| modes.foreach { mode => | ||
| withSQLConf(conf -> mode.toString) { f(Map.empty) } | ||
| } | ||
| withSQLConf(conf -> EXCEPTION.toString) { | ||
| modes.foreach { mode => | ||
| f(inReadConfToOptions(conf, mode)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { | ||
| val N = 8 | ||
| // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. | ||
|
|
@@ -134,9 +155,9 @@ abstract class ParquetRebaseDatetimeSuite | |
| } | ||
| // For Parquet files written by Spark 3.0, we know the writer info and don't need the | ||
| // config to guide the rebase behavior. | ||
| withSQLConf(inReadConf -> LEGACY.toString) { | ||
| runInMode(inReadConf, Seq(LEGACY)) { options => | ||
| checkAnswer( | ||
| spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), | ||
| spark.read.format("parquet").options(options).load(path2_4, path3_0, path3_0_rebase), | ||
| (0 until N).flatMap { i => | ||
| val (dictS, plainS) = rowFunc(i) | ||
| Seq.tabulate(3) { _ => | ||
|
|
@@ -233,12 +254,10 @@ abstract class ParquetRebaseDatetimeSuite | |
| withAllParquetReaders { | ||
| // The file metadata indicates if it needs rebase or not, so we can always get the | ||
| // correct result regardless of the "rebase mode" config. | ||
| Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => | ||
| withSQLConf(inReadConf -> mode.toString) { | ||
| checkAnswer( | ||
| spark.read.parquet(path), | ||
| Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) | ||
| } | ||
| runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options => | ||
| checkAnswer( | ||
| spark.read.options(options).parquet(path), | ||
| Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) | ||
| } | ||
|
|
||
| // Force to not rebase to prove the written datetime values are rebased | ||
|
|
@@ -273,12 +292,12 @@ abstract class ParquetRebaseDatetimeSuite | |
| withAllParquetReaders { | ||
| // The file metadata indicates if it needs rebase or not, so we can always get the | ||
| // correct result regardless of the "rebase mode" config. | ||
| Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => | ||
| withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed the wrong SQL conf: LEGACY_AVRO_REBASE_MODE_IN_READ |
||
| checkAnswer( | ||
| spark.read.parquet(path), | ||
| Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) | ||
| } | ||
| runInMode( | ||
| SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key, | ||
| Seq(LEGACY, CORRECTED, EXCEPTION)) { options => | ||
| checkAnswer( | ||
| spark.read.options(options).parquet(path), | ||
| Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) | ||
| } | ||
|
|
||
| // Force to not rebase to prove the written datetime values are rebased and we will get | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.