From 42fa83caab5914a089d282e0c5009f7d0db42c46 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 24 Sep 2017 00:05:17 +0900 Subject: [PATCH] [SPARK-22109][SQL] Resolves type conflicts between strings and timestamps in partition column This PR proposes to resolve the type conflicts in strings and timestamps in partition column values. It looks we need to set the timezone as it needs a cast between strings and timestamps. ```scala val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str") val path = "/tmp/test.parquet" df.write.format("parquet").partitionBy("str").save(path) spark.read.parquet(path).show() ``` **Before** ``` java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46) at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172) at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208) at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207) at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331) at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481) at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ``` **After** ``` +---+-------------------+ | i| str| +---+-------------------+ | 2|2014-01-01 00:00:00| | 1|2015-01-01 00:00:00| | 3| blah| +---+-------------------+ ``` Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests. Author: hyukjinkwon Closes #19331 from HyukjinKwon/SPARK-22109. --- .../execution/datasources/PartitioningUtils.scala | 11 ++++++----- .../parquet/ParquetPartitionDiscoverySuite.scala | 12 ++++++++++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f61c673baaa5..6f7438192dfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -138,7 +138,7 @@ object PartitioningUtils { "root directory of the table. If there are multiple root directories, " + "please load them separately and then union them.") - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone) // Creates the StructType which represents the partition columns. val fields = { @@ -322,7 +322,8 @@ object PartitioningUtils { * }}} */ def resolvePartitions( - pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { + pathsWithPartitionValues: Seq[(Path, PartitionValues)], + timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { @@ -337,7 +338,7 @@ object PartitioningUtils { val values = pathsWithPartitionValues.map(_._2) val columnCount = values.head.columnNames.size val resolvedValues = (0 until columnCount).map { i => - resolveTypeConflicts(values.map(_.literals(i))) + resolveTypeConflicts(values.map(_.literals(i)), timeZone) } // Fills resolved literals back to each partition @@ -474,7 +475,7 @@ object PartitioningUtils { * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. */ - private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { + private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = { val desiredType = { val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) // Falls back to string if all values of this column are null or empty string @@ -482,7 +483,7 @@ object PartitioningUtils { } literals.map { case l @ Literal(_, dataType) => - Literal.create(Cast(l, desiredType).eval(), desiredType) + Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index b4f3de996120..7225693e5027 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1022,4 +1022,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } + + test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") { + val df = Seq( + (1, "2015-01-01 00:00:00"), + (2, "2014-01-01 00:00:00"), + (3, "blah")).toDF("i", "str") + + withTempPath { path => + df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath) + checkAnswer(spark.read.load(path.getAbsolutePath), df) + } + } }