Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
53d0744
very basic test for adjusting read parquet data
squito Jan 27, 2017
69a3c8c
wip
squito Jan 27, 2017
51e24f2
working version for non-vectorized read -- lots of garbage too
squito Jan 31, 2017
7e61841
working for vectorized reads -- not sure about all code paths
squito Jan 31, 2017
9fbde13
more tests for write path
squito Feb 1, 2017
bac9eb0
expand tests; fix some metastore interaction; cleanup a lot of garbage
squito Feb 1, 2017
1b05978
more cleanup
squito Feb 1, 2017
b622d27
handle bad timezones; include unit test
squito Feb 1, 2017
0604403
write support; lots more unit tests
squito Feb 2, 2017
f45516d
add tests for alter table
squito Feb 2, 2017
d4511a6
utc or gmt; cleanup
squito Feb 2, 2017
223ce2c
more cleanup
squito Feb 2, 2017
5b49ae0
fix compatibility
squito Feb 2, 2017
9ef60a4
Merge branch 'master' into SPARK-12297
squito Mar 1, 2017
0b6883c
Merge branch 'master' into SPARK-12297
squito Mar 2, 2017
69b8142
wip
squito Mar 3, 2017
7ca2c86
fix
squito Mar 3, 2017
6f982d3
fixes; passes tests now
squito Mar 6, 2017
1ad2f83
Merge branch 'master' into SPARK-12297
squito Mar 6, 2017
2c8a228
fix merge
squito Mar 6, 2017
f0b89fd
fix
squito Mar 6, 2017
db0216f
Merge branch 'master' into SPARK-12297
squito Mar 13, 2017
46fab8d
refactor the test
squito Mar 13, 2017
c242fb8
cleanup
squito Mar 13, 2017
c87a573
reset timezone property after tests; make tests more granular
squito Mar 14, 2017
db7e514
Merge branch 'master' into SPARK-12297
squito Mar 14, 2017
f4dca27
separate tests for reading & writing
squito Mar 15, 2017
2891582
Merge branch 'master' into SPARK-12297
squito Mar 15, 2017
d951443
fix merge
squito Mar 15, 2017
38e19cd
Merge branch 'master' into SPARK-12297
squito Mar 16, 2017
1e3b768
Merge branch 'master' into SPARK-12297
squito Mar 23, 2017
39f506c
cleanup
squito Mar 23, 2017
f33bc91
Merge branch 'master' into SPARK-12297
squito Mar 24, 2017
17565e8
remove config for setting table time zone automatically
squito Mar 24, 2017
a96806f
fixup
squito Mar 24, 2017
7582b2c
predicate pushdown tests
squito Apr 4, 2017
5817064
minor cleanup
squito Apr 6, 2017
773704a
Merge branch 'master' into SPARK-12297
squito Apr 10, 2017
be134be
fix merge
squito Apr 10, 2017
d15b660
swap conversion logic
squito Apr 10, 2017
283b1c7
update tests a bunch; tests session timezones, and tests are easier t…
squito Apr 11, 2017
6ccaa92
session timezones
squito Apr 11, 2017
71c7e60
cleanup
squito Apr 11, 2017
75e8579
Merge branch 'master' into SPARK-12297
squito Apr 11, 2017
e4e88a5
partial review feedback
squito Apr 19, 2017
44a8bbb
better param names and docs
squito Apr 20, 2017
e31657a
review feedback
squito Apr 25, 2017
d4ff9fd
Merge branch 'master' into SPARK-12297
squito May 2, 2017
acc72ea
add check for partitioned tables
squito May 2, 2017
b9c03e9
fix typo
squito May 2, 2017
fc17a2e
review feedback
squito May 2, 2017
2537437
review feedback
squito May 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader
String sessionTzString =
conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key());
if (sessionTzString == null || sessionTzString.isEmpty()) {
sessionTz = TimeZone.getDefault();
sessionTz = DateTimeUtils.defaultTimeZone();
} else {
sessionTz = TimeZone.getTimeZone(sessionTzString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,11 @@ private[parquet] class ParquetRowConverter(
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
val sessionTz = TimeZone.getTimeZone(hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))
val sessionTsString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
val sessionTz = Option(sessionTsString).map(TimeZone.getTimeZone(_))
.getOrElse(DateTimeUtils.defaultTimeZone())
val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
val storageTz = if (storageTzString == null) {
sessionTz
} else {
TimeZone.getTimeZone(storageTzString)
}
val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
options = options ++ getStorageTzOptions(relation))(sparkSession = sparkSession)
val created = LogicalRelation(fsRelation, updatedTable)
tableRelationCache.put(tableIdentifier, created)
created
Expand All @@ -194,15 +194,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormatClass,
None)
val logicalRelation = cached.getOrElse {
// We add the timezone to the relation options, which automatically gets injected into
// the hadoopConf for the Parquet Converters
val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
val storageTz = relation.tableMeta.properties.getOrElse(storageTzKey, "")
val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone
val extraTzOptions = Map(
storageTzKey -> storageTz,
SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz
)
val (dataSchema, updatedTable) = inferIfNeeded(relation, options, fileFormat)
val created =
LogicalRelation(
Expand All @@ -212,7 +203,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
userSpecifiedSchema = Option(dataSchema),
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
options = options ++ extraTzOptions,
options = options ++ getStorageTzOptions(relation),
className = fileType).resolveRelation(),
table = updatedTable)

Expand All @@ -233,6 +224,17 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
result.copy(output = newOutput)
}

private def getStorageTzOptions(relation: CatalogRelation): Map[String, String] = {
// We add the table timezone to the relation options, which automatically gets injected into the
// hadoopConf for the Parquet Converters
val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
val storageTz = relation.tableMeta.properties.getOrElse(storageTzKey, "")
val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sessionTz isn't used.

Map(
storageTzKey -> storageTz
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return Map.empty if the value isn't included to the table properties?

}

private def inferIfNeeded(
relation: CatalogRelation,
options: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
}

val desiredTimestampStrings = Seq(
"2015-12-31 23:50:59.123",
"2015-12-31 22:49:59.123",
"2015-12-31 23:50:59.123",
"2016-01-01 00:39:59.123",
"2016-01-01 01:29:59.123"
)
Expand All @@ -286,23 +286,15 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
}

private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = {
val originalTsStrings = Seq(
"2015-12-31 22:49:59.123",
"2015-12-31 23:50:59.123",
"2016-01-01 00:39:59.123",
"2016-01-01 01:29:59.123"
)
val rowRdd = spark.sparkContext.parallelize(originalTsStrings, 1).map(Row(_))
val rowRdd = spark.sparkContext.parallelize(desiredTimestampStrings, 1).map(Row(_))
val schema = StructType(Seq(
StructField("display", StringType, true)
))
val df = spark.createDataFrame(rowRdd, schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use val df = desiredTimestampStrings.toDF("display") after import spark.implicits._.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, appreciate the help simplifying this. I had a feeling it was more complex than it needed to be :)

// this will get the millis corresponding to the display time given the current *session*
// timezone.
import spark.implicits._
df.withColumn("ts", expr("cast(display as timestamp)")).map { row =>
(row.getAs[String](0), row.getAs[Timestamp](1))
}
df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)]
}

private def testWriteTablesWithTimezone(
Expand Down Expand Up @@ -349,15 +341,20 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
// values in the parquet file.
val onDiskLocation = spark.sessionState.catalog
.getTableMetadata(TableIdentifier(s"insert_$baseTable")).location.getPath
val readFromDisk = spark.read.parquet(onDiskLocation).collect()
val storageTzId = explicitTz.getOrElse(sessionTzId)
readFromDisk.foreach { row =>
val displayTime = row.getAs[String](0)
val millis = row.getAs[Timestamp](1).getTime()
val expectedMillis = timestampTimezoneToMillis((displayTime, storageTzId))
assert(expectedMillis === millis, s"Display time '$displayTime' was stored incorrectly " +
s"with sessionTz = ${sessionTzOpt}; Got $millis, expected $expectedMillis " +
s"(delta = ${millis - expectedMillis})")
// we test reading the data back with and without the vectorized reader, to make sure we
// haven't broken reading parquet from non-hive tables, with both readers.
Seq(false, true).foreach { vectorized =>
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
val readFromDisk = spark.read.parquet(onDiskLocation).collect()
val storageTzId = explicitTz.getOrElse(sessionTzId)
readFromDisk.foreach { row =>
val displayTime = row.getAs[String](0)
val millis = row.getAs[Timestamp](1).getTime()
val expectedMillis = timestampTimezoneToMillis((displayTime, storageTzId))
assert(expectedMillis === millis, s"Display time '$displayTime' was stored " +
s"incorrectly with sessionTz = ${sessionTzOpt}; Got $millis, expected " +
s"$expectedMillis (delta = ${millis - expectedMillis})")
}
}
}
}
Expand Down Expand Up @@ -401,65 +398,69 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
options = options
)
Seq(false, true).foreach { vectorized =>
withSQLConf((SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString)) {
withClue(s"vectorized = $vectorized;") {
val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
val collectedFromExternal =
spark.sql(s"select display, ts from external_$baseTable").collect()
collectedFromExternal.foreach { row =>
val displayTime = row.getAs[String](0)
val millis = row.getAs[Timestamp](1).getTime()
val expectedMillis = timestampTimezoneToMillis((displayTime, sessionTz))
val delta = millis - expectedMillis
val deltaHours = delta / (1000L * 60 * 60)
assert(millis === expectedMillis, s"Display time '$displayTime' did not have " +
s"correct millis: was $millis, expected $expectedMillis; delta = $delta " +
s"($deltaHours hours)")
}

// Now test that the behavior is still correct even with a filter which could get
// pushed down into parquet. We don't need extra handling for pushed down
// predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet
// does not read statistics from int96 fields, as they are unsigned. See
// scalastyle:off line.size.limit
// https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
// https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
// scalastyle:on line.size.limit
//
// Just to be defensive in case anything ever changes in parquet, this test checks
// the assumption on column stats, and also the end-to-end behavior.

val hadoopConf = sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val parts = fs.listStatus(new Path(path.getCanonicalPath))
.filter(_.getPath().getName().endsWith(".parquet"))
// grab the meta data from the parquet file. The next section of asserts just make
// sure the test is configured correctly.
assert(parts.size == 1)
val oneFooter = ParquetFileReader.readFooter(hadoopConf, parts.head.getPath)
assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2)
assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() ===
PrimitiveTypeName.INT96)
val oneBlockMeta = oneFooter.getBlocks().get(0)
val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1)
val columnStats = oneBlockColumnMeta.getStatistics
// This is the important assert. Column stats are written, but they are ignored
// when the data is read back as mentioned above, b/c int96 is unsigned. This
// assert makes sure this holds even if we change parquet versions (if eg. there
// were ever statistics even on unsigned columns).
assert(columnStats.isEmpty)

// These queries should return the entire dataset, but if the predicates were
// applied to the raw values in parquet, they would incorrectly filter data out.
Seq(
">" -> "2015-12-31 22:00:00",
"<" -> "2016-01-01 02:00:00"
).foreach { case (comparison, value) =>
val query =
s"select ts from external_$baseTable where ts $comparison '$value'"
val countWithFilter = spark.sql(query).count()
assert(countWithFilter === 4, query)
}
withClue(s"vectorized = $vectorized;") {
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially using SQLTestUtils.withSQLConf, but I discovered that it wasn't actually taking any effect. I dunno if that is because TestHiveSingleton does something strange, or maybe I'm doing something else weird in this test by creating many new spark sessions. But I did that because it was the only way I could get the conf changes applied consistently.

Since I am creating new sessions, I don't think this has any risk of a failed test not cleaning and triggering failures in other tests outside of this suite. But it still seems like I might be doing something wrong ...

val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
val query = s"select display, cast(ts as string) as ts_as_string, ts " +
s"from external_$baseTable"
val collectedFromExternal = spark.sql(query).collect()
collectedFromExternal.foreach { row =>
val displayTime = row.getAs[String](0)
// the timestamp should still display the same, despite the changes in timezones
assert(displayTime === row.getAs[String](1).toString())
// we'll also check that the millis behind the timestamp has the appropriate
// adjustments.
val millis = row.getAs[Timestamp](2).getTime()
val expectedMillis = timestampTimezoneToMillis((displayTime, sessionTz))
val delta = millis - expectedMillis
val deltaHours = delta / (1000L * 60 * 60)
assert(millis === expectedMillis, s"Display time '$displayTime' did not have " +
s"correct millis: was $millis, expected $expectedMillis; delta = $delta " +
s"($deltaHours hours)")
}

// Now test that the behavior is still correct even with a filter which could get
// pushed down into parquet. We don't need extra handling for pushed down
// predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet
// does not read statistics from int96 fields, as they are unsigned. See
// scalastyle:off line.size.limit
// https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
// https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
// scalastyle:on line.size.limit
//
// Just to be defensive in case anything ever changes in parquet, this test checks
// the assumption on column stats, and also the end-to-end behavior.

val hadoopConf = sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val parts = fs.listStatus(new Path(path.getCanonicalPath))
.filter(_.getPath().getName().endsWith(".parquet"))
// grab the meta data from the parquet file. The next section of asserts just make
// sure the test is configured correctly.
assert(parts.size == 1)
val oneFooter = ParquetFileReader.readFooter(hadoopConf, parts.head.getPath)
assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2)
assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() ===
PrimitiveTypeName.INT96)
val oneBlockMeta = oneFooter.getBlocks().get(0)
val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1)
val columnStats = oneBlockColumnMeta.getStatistics
// This is the important assert. Column stats are written, but they are ignored
// when the data is read back as mentioned above, b/c int96 is unsigned. This
// assert makes sure this holds even if we change parquet versions (if eg. there
// were ever statistics even on unsigned columns).
assert(columnStats.isEmpty)

// These queries should return the entire dataset, but if the predicates were
// applied to the raw values in parquet, they would incorrectly filter data out.
Seq(
">" -> "2015-12-31 22:00:00",
"<" -> "2016-01-01 02:00:00"
).foreach { case (comparison, value) =>
val query =
s"select ts from external_$baseTable where ts $comparison '$value'"
val countWithFilter = spark.sql(query).count()
assert(countWithFilter === 4, query)
}
}
}
Expand Down