From 3ea40a7f8844c7c9b73917a6e9237ad323114158 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 12:22:34 +0300 Subject: [PATCH 1/8] Add a test --- .../AlterTablePartitionV2SQLSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 4cacd5ec2b49e..0f9e1c8db9616 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -243,4 +243,22 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { assert(!partTable.partitionExists(expectedPartition)) } } + + test("handle __HIVE_DEFAULT_PARTITION__") { + val t = "testpart.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (part0 string) USING foo PARTITIONED BY (part0)") + val partTable = catalog("testpart") + .asTableCatalog + .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) + .asPartitionable + val expectedPartition = InternalRow.fromSeq(Seq[Any](null)) + assert(!partTable.partitionExists(expectedPartition)) + val partSpec = "PARTITION (part0 = '__HIVE_DEFAULT_PARTITION__')" + sql(s"ALTER TABLE $t ADD $partSpec LOCATION 'loc'") + assert(partTable.partitionExists(expectedPartition)) + spark.sql(s"ALTER TABLE $t DROP $partSpec") + assert(!partTable.partitionExists(expectedPartition)) + } + } } From 052c017b29dad9bb638ed13cf5126251e1cdccee Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 12:26:50 +0300 Subject: [PATCH 2/8] Add JIRA to test's title --- .../spark/sql/connector/AlterTablePartitionV2SQLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 0f9e1c8db9616..bb18333ce1e4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -244,7 +244,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("handle __HIVE_DEFAULT_PARTITION__") { + test("SPARK-33529: handle __HIVE_DEFAULT_PARTITION__") { val t = "testpart.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (part0 string) USING foo PARTITIONED BY (part0)") From 846589f80a5e7e4bd2dce3f0b5eb78ebcf668358 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 10:48:29 +0300 Subject: [PATCH 3/8] Add castPartitionValues() --- .../spark/sql/util/PartitioningUtils.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index 586aa6c59164f..2f15a6ca4c3b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -18,7 +18,13 @@ package org.apache.spark.sql.util import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.types.StructType object PartitioningUtils { /** @@ -44,4 +50,25 @@ object PartitioningUtils { normalizedPartSpec.toMap } + + /** + * Given the partition schema, returns a row with that schema holding the partition values. + */ + def castPartitionValues( + spec: TablePartitionSpec, + partitionSchema: StructType, + properties: Map[String, String], + defaultTimeZondId: String): InternalRow = { + val caseInsensitiveProperties = CaseInsensitiveMap(properties) + val timeZoneId = caseInsensitiveProperties.getOrElse( + DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) + InternalRow.fromSeq(partitionSchema.map { field => + val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { + null + } else { + spec(field.name) + } + Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval() + }) + } } From a2fc4a3eb9dc366245c21a614d5e68b7a53edb7b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 10:53:51 +0300 Subject: [PATCH 4/8] Re-use castPartitionValues() --- .../spark/sql/catalyst/catalog/interface.scala | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index ee7216e93ebb5..62bbd70c46777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap - +import org.apache.spark.sql.util.PartitioningUtils.castPartitionValues /** * A function defined in the catalog. @@ -149,18 +149,8 @@ case class CatalogTablePartition( /** * Given the partition schema, returns a row with that schema holding the partition values. */ - def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { - val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) - val timeZoneId = caseInsensitiveProperties.getOrElse( - DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) - InternalRow.fromSeq(partitionSchema.map { field => - val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { - null - } else { - spec(field.name) - } - Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval() - }) + def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = { + castPartitionValues(spec, partitionSchema, storage.properties, defaultTimeZoneId) } } From eadb9a7b236f723affc489fd88d4a7c5822637eb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 11:23:23 +0300 Subject: [PATCH 5/8] Minor changes --- .../analysis/ResolvePartitionSpec.scala | 43 +++++++++---------- .../spark/sql/util/PartitioningUtils.scala | 4 +- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index 6d061fce06919..5cdd48834c674 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.JavaConverters._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement -import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec +import org.apache.spark.sql.util.PartitioningUtils.{castPartitionValues, normalizePartitionSpec} /** * Resolve [[UnresolvedPartitionSpec]] to [[ResolvedPartitionSpec]] in partition related commands. @@ -33,41 +33,40 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case r @ AlterTableAddPartition( - ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _) => - r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema())) + ResolvedTable(_, _, table: SupportsPartitionManagement), partitionSpec, _) => + r.copy(parts = resolvePartitionSpecs(table, partitionSpec)) case r @ AlterTableDropPartition( - ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _, _, _) => - r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema())) + ResolvedTable(_, _, table: SupportsPartitionManagement), partitionSpec, _, _, _) => + r.copy(parts = resolvePartitionSpecs(table, partitionSpec)) } private def resolvePartitionSpecs( - tableName: String, - partSpecs: Seq[PartitionSpec], - partSchema: StructType): Seq[ResolvedPartitionSpec] = - partSpecs.map { + table: SupportsPartitionManagement, + partitionSpec: Seq[PartitionSpec]): Seq[ResolvedPartitionSpec] = + partitionSpec.map { case unresolvedPartSpec: UnresolvedPartitionSpec => ResolvedPartitionSpec( - convertToPartIdent(tableName, unresolvedPartSpec.spec, partSchema), + convertToPartIdent(table, unresolvedPartSpec.spec), unresolvedPartSpec.location) case resolvedPartitionSpec: ResolvedPartitionSpec => resolvedPartitionSpec } private def convertToPartIdent( - tableName: String, - partitionSpec: TablePartitionSpec, - partSchema: StructType): InternalRow = { + table: SupportsPartitionManagement, + partitionSpec: TablePartitionSpec): InternalRow = { + val partitionSchema = table.partitionSchema() val normalizedSpec = normalizePartitionSpec( partitionSpec, - partSchema.map(_.name), - tableName, + partitionSchema.map(_.name), + table.name, conf.resolver) - val partValues = partSchema.map { part => - val raw = normalizedSpec.get(part.name).orNull - Cast(Literal.create(raw, StringType), part.dataType, Some(conf.sessionLocalTimeZone)).eval() - } - InternalRow.fromSeq(partValues) + castPartitionValues( + normalizedSpec, + partitionSchema, + table.properties().asScala.toMap, + conf.sessionLocalTimeZone) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index 2f15a6ca4c3b4..44606f6cfb52c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -58,10 +58,10 @@ object PartitioningUtils { spec: TablePartitionSpec, partitionSchema: StructType, properties: Map[String, String], - defaultTimeZondId: String): InternalRow = { + defaultTimeZoneId: String): InternalRow = { val caseInsensitiveProperties = CaseInsensitiveMap(properties) val timeZoneId = caseInsensitiveProperties.getOrElse( - DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) + DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId) InternalRow.fromSeq(partitionSchema.map { field => val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { null From 687cd6c12825088b7b0c8588a1a828571ea0bbb8 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 12:40:22 +0300 Subject: [PATCH 6/8] Remove loc --- .../spark/sql/connector/AlterTablePartitionV2SQLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index bb18333ce1e4d..6e68719056ce7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -255,7 +255,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { val expectedPartition = InternalRow.fromSeq(Seq[Any](null)) assert(!partTable.partitionExists(expectedPartition)) val partSpec = "PARTITION (part0 = '__HIVE_DEFAULT_PARTITION__')" - sql(s"ALTER TABLE $t ADD $partSpec LOCATION 'loc'") + sql(s"ALTER TABLE $t ADD $partSpec") assert(partTable.partitionExists(expectedPartition)) spark.sql(s"ALTER TABLE $t DROP $partSpec") assert(!partTable.partitionExists(expectedPartition)) From 4ad95c53ab89b990d495bd82c10a55854086225c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 12:42:11 +0300 Subject: [PATCH 7/8] Add private[sql] to PartitioningUtils --- .../scala/org/apache/spark/sql/util/PartitioningUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index 44606f6cfb52c..3f2370146a88f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.types.StructType -object PartitioningUtils { +private[sql] object PartitioningUtils { /** * Normalize the column names in partition specification, w.r.t. the real partition column names * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a From 26b83a101f39c8d0878c96b6ffd228e040169c8b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 24 Nov 2020 13:54:20 +0300 Subject: [PATCH 8/8] Remove unused imports --- .../scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 62bbd70c46777..486af001d713b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, ExprId} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util._