Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public class VectorizedDeltaBinaryPackedReader extends VectorizedReaderBase {
private ByteBufferInputStream in;

// temporary buffers used by readByte, readShort, readInteger, and readLong
byte byteVal;
short shortVal;
int intVal;
long longVal;
private byte byteVal;
private short shortVal;
private int intVal;
private long longVal;

@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ abstract class ParquetRebaseDatetimeSuite
val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain")
.select($"dict".cast(catalystType), $"plain".cast(catalystType))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) {
checkDefaultLegacyRead(oldPath)
checkDefaultLegacyRead(oldPath)
withSQLConf(inWriteConf -> CORRECTED.toString) {
df.write.mode("overwrite").parquet(path3_x)
df.write.mode("overwrite").parquet(path3_x)
}
withSQLConf(inWriteConf -> LEGACY.toString) {
df.write.parquet(path3_x_rebase)
df.write.parquet(path3_x_rebase)
}
}
// For Parquet files written by Spark 3.0, we know the writer info and don't need the
Expand Down Expand Up @@ -243,40 +243,41 @@ abstract class ParquetRebaseDatetimeSuite
SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key
)
).foreach { case (outType, tsStr, nonRebased, inWriteConf, inReadConf) =>
// Ignore the default JVM time zone and use the session time zone instead of it in rebasing.
DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.JST) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId) {
withClue(s"output type $outType") {
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(inWriteConf -> LEGACY.toString) {
Seq.tabulate(N)(_ => tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.repartition(1)
.write
.option("parquet.enable.dictionary", dictionaryEncoding)
.parquet(path)
}
// Ignore the default JVM time zone and use the session time zone instead of
// it in rebasing.
DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.JST) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId) {
withClue(s"output type $outType") {
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(inWriteConf -> LEGACY.toString) {
Seq.tabulate(N)(_ => tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.repartition(1)
.write
.option("parquet.enable.dictionary", dictionaryEncoding)
.parquet(path)
}

withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get
// the correct result regardless of the "rebase mode" config.
runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).parquet(path).select($"ts".cast("string")),
Seq.tabulate(N)(_ => Row(tsStr)))
}
withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get
// the correct result regardless of the "rebase mode" config.
runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).parquet(path).select($"ts".cast("string")),
Seq.tabulate(N)(_ => Row(tsStr)))
}

// Force to not rebase to prove the written datetime values are rebased
// and we will get wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(
spark.read.parquet(path).select($"ts".cast("string")),
Seq.tabulate(N)(_ => Row(nonRebased)))
// Force to not rebase to prove the written datetime values are rebased
// and we will get wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(
spark.read.parquet(path).select($"ts".cast("string")),
Seq.tabulate(N)(_ => Row(nonRebased)))
}
}
}
}
}
}
}
Expand Down