diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f61c673baaa5..6f7438192dfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -138,7 +138,7 @@ object PartitioningUtils { "root directory of the table. If there are multiple root directories, " + "please load them separately and then union them.") - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone) // Creates the StructType which represents the partition columns. val fields = { @@ -322,7 +322,8 @@ object PartitioningUtils { * }}} */ def resolvePartitions( - pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { + pathsWithPartitionValues: Seq[(Path, PartitionValues)], + timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { @@ -337,7 +338,7 @@ object PartitioningUtils { val values = pathsWithPartitionValues.map(_._2) val columnCount = values.head.columnNames.size val resolvedValues = (0 until columnCount).map { i => - resolveTypeConflicts(values.map(_.literals(i))) + resolveTypeConflicts(values.map(_.literals(i)), timeZone) } // Fills resolved literals back to each partition @@ -474,7 +475,7 @@ object PartitioningUtils { * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. */ - private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { + private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = { val desiredType = { val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) // Falls back to string if all values of this column are null or empty string @@ -482,7 +483,7 @@ object PartitioningUtils { } literals.map { case l @ Literal(_, dataType) => - Literal.create(Cast(l, desiredType).eval(), desiredType) + Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index b4f3de996120..7225693e5027 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1022,4 +1022,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } + + test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") { + val df = Seq( + (1, "2015-01-01 00:00:00"), + (2, "2014-01-01 00:00:00"), + (3, "blah")).toDF("i", "str") + + withTempPath { path => + df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath) + checkAnswer(spark.read.load(path.getAbsolutePath), df) + } + } }