diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 68bd47046174..aa44e677e577 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -198,7 +198,7 @@ statement | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)? - tableIdentifier partitionSpec? describeColName? #describeTable + multipartIdentifier partitionSpec? describeColName? #describeTable | (DESC | DESCRIBE) QUERY? query #describeQuery | REFRESH TABLE tableIdentifier #refreshTable | REFRESH (STRING | .*?) #refreshResource diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4970727b9646..f8eef0cf3236 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, InsertIntoStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, DescribeTableStatement, InsertIntoStatement} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL @@ -171,6 +171,7 @@ class Analyzer( Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveAlterTable :: + ResolveDescribeTable :: ResolveInsertInto :: ResolveTables :: ResolveRelations :: @@ -972,6 +973,21 @@ class Analyzer( Seq(TableChange.setProperty("location", newLoc))) } } + /** + * Resolve DESCRIBE TABLE statements that use a DSv2 catalog. + * + * This rule converts unresolved DESCRIBE TABLE statements to v2 when a v2 catalog is responsible + * for the table identifier. A v2 catalog is responsible for an identifier when the identifier + * has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and + * the table identifier does not include a catalog. + */ + object ResolveDescribeTable extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case describe @ DescribeTableStatement( + CatalogObjectIdentifier(Some(v2Catalog), ident), _, isExtended) => + DescribeTable(UnresolvedRelation(describe.tableName), isExtended) + } + } /** * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index e727abd503de..cdcba301cb75 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -2561,4 +2561,36 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging visitMultipartIdentifier(ctx.multipartIdentifier), visitLocationSpec(ctx.locationSpec)) } + + /** + * Create a [[DescribeColumnStatement]] or [[DescribeTableStatement]] commands. + */ + override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { + val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null + if (ctx.describeColName != null) { + if (ctx.partitionSpec != null) { + throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx) + } else { + DescribeColumnStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), + ctx.describeColName.nameParts.asScala.map(_.getText), + isExtended) + } + } else { + val partitionSpec = if (ctx.partitionSpec != null) { + // According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`. + visitPartitionSpec(ctx.partitionSpec).map { + case (key, Some(value)) => key -> value + case (key, _) => + throw new ParseException(s"PARTITION specification is incomplete: `$key`", ctx) + } + } else { + Map.empty[String, String] + } + DescribeTableStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), + partitionSpec, + isExtended) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/DescribeTableSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/DescribeTableSchema.scala new file mode 100644 index 000000000000..ff35972b901f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/DescribeTableSchema.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField, StructType} + +private[sql] object DescribeTableSchema { + def describeTableAttributes(): Seq[AttributeReference] = Seq( + AttributeReference("col_name", StringType, nullable = false, + new MetadataBuilder().putString("comment", "name of the column").build())(), + AttributeReference("data_type", StringType, nullable = false, + new MetadataBuilder().putString("comment", "data type of the column").build())(), + AttributeReference("comment", StringType, nullable = true, + new MetadataBuilder().putString("comment", "comment of the column").build())()) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2698ba282f96..6f33944fc1cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.AliasIdentifier import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction} import org.apache.spark.sql.catalyst.plans._ @@ -541,6 +542,13 @@ object OverwritePartitionsDynamic { } } +case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + override val output = DescribeTableSchema.describeTableAttributes() +} + /** * Drop a table. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DescribeColumnStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DescribeColumnStatement.scala new file mode 100644 index 000000000000..80ee262af672 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DescribeColumnStatement.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.{MetadataBuilder, StringType} + +case class DescribeColumnStatement( + tableName: Seq[String], + colNameParts: Seq[String], + isExtended: Boolean) extends ParsedStatement { + override def output: Seq[Attribute] = { + Seq( + AttributeReference("info_name", StringType, nullable = false, + new MetadataBuilder().putString("comment", "name of the column info").build())(), + AttributeReference("info_value", StringType, nullable = false, + new MetadataBuilder().putString("comment", "value of the column info").build())() + ) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DescribeTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DescribeTableStatement.scala new file mode 100644 index 000000000000..bf837441bb50 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DescribeTableStatement.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.DescribeTableSchema + +case class DescribeTableStatement( + tableName: Seq[String], + partitionSpec: TablePartitionSpec, + isExtended: Boolean) extends ParsedStatement { + override def output: Seq[Attribute] = DescribeTableSchema.describeTableAttributes() +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0635f8e5e87e..d8c3ebc5b966 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransf import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -617,6 +617,47 @@ class DDLParserSuite extends AnalysisTest { } } + test("describe table column") { + comparePlans(parsePlan("DESCRIBE t col"), + DescribeColumnStatement( + Seq("t"), Seq("col"), isExtended = false)) + comparePlans(parsePlan("DESCRIBE t `abc.xyz`"), + DescribeColumnStatement( + Seq("t"), Seq("abc.xyz"), isExtended = false)) + comparePlans(parsePlan("DESCRIBE t abc.xyz"), + DescribeColumnStatement( + Seq("t"), Seq("abc", "xyz"), isExtended = false)) + comparePlans(parsePlan("DESCRIBE t `a.b`.`x.y`"), + DescribeColumnStatement( + Seq("t"), Seq("a.b", "x.y"), isExtended = false)) + + comparePlans(parsePlan("DESCRIBE TABLE t col"), + DescribeColumnStatement( + Seq("t"), Seq("col"), isExtended = false)) + comparePlans(parsePlan("DESCRIBE TABLE EXTENDED t col"), + DescribeColumnStatement( + Seq("t"), Seq("col"), isExtended = true)) + comparePlans(parsePlan("DESCRIBE TABLE FORMATTED t col"), + DescribeColumnStatement( + Seq("t"), Seq("col"), isExtended = true)) + + val caught = intercept[AnalysisException]( + parsePlan("DESCRIBE TABLE t PARTITION (ds='1970-01-01') col")) + assert(caught.getMessage.contains( + "DESC TABLE COLUMN for a specific partition is not supported")) + } + + test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") { + comparePlans(parsePlan("describe t"), + DescribeTableStatement(Seq("t"), Map.empty, isExtended = false)) + comparePlans(parsePlan("describe table t"), + DescribeTableStatement(Seq("t"), Map.empty, isExtended = false)) + comparePlans(parsePlan("describe table extended t"), + DescribeTableStatement(Seq("t"), Map.empty, isExtended = true)) + comparePlans(parsePlan("describe table formatted t"), + DescribeTableStatement(Seq("t"), Map.empty, isExtended = true)) + } + test("insert table: basic append") { Seq( "INSERT INTO TABLE testcat.ns1.ns2.tbl SELECT * FROM source", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 12cd8abcad89..c4edadba278f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -327,38 +327,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } } - /** - * Create a [[DescribeColumnCommand]] or [[DescribeTableCommand]] logical commands. - */ - override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { - val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null - if (ctx.describeColName != null) { - if (ctx.partitionSpec != null) { - throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx) - } else { - DescribeColumnCommand( - visitTableIdentifier(ctx.tableIdentifier), - ctx.describeColName.nameParts.asScala.map(_.getText), - isExtended) - } - } else { - val partitionSpec = if (ctx.partitionSpec != null) { - // According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`. - visitPartitionSpec(ctx.partitionSpec).map { - case (key, Some(value)) => key -> value - case (key, _) => - throw new ParseException(s"PARTITION specification is incomplete: `$key`", ctx) - } - } else { - Map.empty[String, String] - } - DescribeTableCommand( - visitTableIdentifier(ctx.tableIdentifier), - partitionSpec, - isExtended) - } - } - /** * Create a [[DescribeQueryCommand]] logical command. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 03aca89bc642..ca42de39db86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier} import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} @@ -499,15 +500,7 @@ case class TruncateTableCommand( } abstract class DescribeCommandBase extends RunnableCommand { - override val output: Seq[Attribute] = Seq( - // Column names are based on Hive. - AttributeReference("col_name", StringType, nullable = false, - new MetadataBuilder().putString("comment", "name of the column").build())(), - AttributeReference("data_type", StringType, nullable = false, - new MetadataBuilder().putString("comment", "data type of the column").build())(), - AttributeReference("comment", StringType, nullable = true, - new MetadataBuilder().putString("comment", "comment of the column").build())() - ) + override val output = DescribeTableSchema.describeTableAttributes() protected def describeSchema( schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index a51678da2d8e..f17b31da5731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider @@ -94,6 +94,18 @@ case class DataSourceResolution( convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create) } + case DescribeColumnStatement( + AsTableIdentifier(tableName), colName, isExtended) => + DescribeColumnCommand(tableName, colName, isExtended) + + case DescribeColumnStatement( + CatalogObjectIdentifier(Some(catalog), ident), colName, isExtended) => + throw new AnalysisException("Describing columns is not supported for v2 tables.") + + case DescribeTableStatement( + AsTableIdentifier(tableName), partitionSpec, isExtended) => + DescribeTableCommand(tableName, partitionSpec, isExtended) + case ReplaceTableStatement( AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, location, comment, orCreate) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 52e289653635..91fc2e068af7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalog.v2.StagingTableCatalog import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -237,6 +237,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } + case DescribeTable(r: DataSourceV2Relation, isExtended) => + DescribeTableExec(r.table, isExtended) :: Nil + case DropTable(catalog, ident, ifExists) => DropTableExec(catalog, ident, ifExists) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala new file mode 100644 index 000000000000..5db79c84a395 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRowWithSchema} +import org.apache.spark.sql.catalyst.plans.DescribeTableSchema +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.sources.v2.Table +import org.apache.spark.sql.types.StructType + +case class DescribeTableExec(table: Table, isExtended: Boolean) extends LeafExecNode { + + override val output: Seq[AttributeReference] = + DescribeTableSchema.describeTableAttributes() + + private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind() + + override protected def doExecute(): RDD[InternalRow] = { + val rows = new ArrayBuffer[InternalRow]() + addSchema(rows) + + if (isExtended) { + addPartitioning(rows) + addProperties(rows) + } + sparkContext.parallelize(rows) + } + + private def addSchema(rows: ArrayBuffer[InternalRow]): Unit = { + rows ++= table.schema.map{ column => + toCatalystRow( + column.name, column.dataType.simpleString, column.getComment().getOrElse("")) + } + } + + private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { + rows += emptyRow() + rows += toCatalystRow(" Partitioning", "", "") + rows += toCatalystRow("--------------", "", "") + if (table.partitioning.isEmpty) { + rows += toCatalystRow("Not partitioned", "", "") + } else { + rows ++= table.partitioning.zipWithIndex.map { + case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "") + } + } + } + + private def addProperties(rows: ArrayBuffer[InternalRow]): Unit = { + rows += emptyRow() + rows += toCatalystRow(" Table Property", " Value", "") + rows += toCatalystRow("----------------", "-------", "") + rows ++= table.properties.asScala.toList.sortBy(_._1).map { + case (key, value) => toCatalystRow(key, value, "") + } + } + + private def emptyRow(): InternalRow = toCatalystRow("", "", "") + + private def toCatalystRow(strs: String*): InternalRow = { + encoder.toRow(new GenericRowWithSchema(strs.toArray, schema)).copy() + } +} diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out b/sql/core/src/test/resources/sql-tests/results/describe.sql.out index 46d9ec30a8a7..f58bdb5446b6 100644 --- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out @@ -539,7 +539,7 @@ EXPLAIN EXTENDED DESC t struct -- !query 34 output == Parsed Logical Plan == -DescribeTableCommand `t`, false +'DescribeTableStatement [t], false == Analyzed Logical Plan == col_name: string, data_type: string, comment: string diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 92a1bf63ec94..c6fc84894a3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql.{DescribeColumnStatement, DescribeTableStatement} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile} import org.apache.spark.sql.execution.HiveResult.hiveResultString @@ -374,7 +375,10 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { // Returns true if the plan is supposed to be sorted. def isSorted(plan: LogicalPlan): Boolean = plan match { case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false - case _: DescribeCommandBase | _: DescribeColumnCommand => true + case _: DescribeCommandBase + | _: DescribeColumnCommand + | _: DescribeTableStatement + | _: DescribeColumnStatement => true case PhysicalOperation(_, _, Sort(_, true, _)) => true case _ => plan.children.iterator.exists(isSorted) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index df80311c0ce2..b751fb7c5043 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, Un import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, RepartitionByExpression, Sort} +import org.apache.spark.sql.catalyst.plans.logical.sql.{DescribeColumnStatement, DescribeTableStatement} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, RefreshResource} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -210,51 +211,12 @@ class SparkSqlParserSuite extends AnalysisTest { "no viable alternative at input") } - test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") { - assertEqual("describe t", - DescribeTableCommand(TableIdentifier("t"), Map.empty, isExtended = false)) - assertEqual("describe table t", - DescribeTableCommand(TableIdentifier("t"), Map.empty, isExtended = false)) - assertEqual("describe table extended t", - DescribeTableCommand(TableIdentifier("t"), Map.empty, isExtended = true)) - assertEqual("describe table formatted t", - DescribeTableCommand(TableIdentifier("t"), Map.empty, isExtended = true)) - } - test("describe query") { val query = "SELECT * FROM t" assertEqual("DESCRIBE QUERY " + query, DescribeQueryCommand(query, parser.parsePlan(query))) assertEqual("DESCRIBE " + query, DescribeQueryCommand(query, parser.parsePlan(query))) } - test("describe table column") { - assertEqual("DESCRIBE t col", - DescribeColumnCommand( - TableIdentifier("t"), Seq("col"), isExtended = false)) - assertEqual("DESCRIBE t `abc.xyz`", - DescribeColumnCommand( - TableIdentifier("t"), Seq("abc.xyz"), isExtended = false)) - assertEqual("DESCRIBE t abc.xyz", - DescribeColumnCommand( - TableIdentifier("t"), Seq("abc", "xyz"), isExtended = false)) - assertEqual("DESCRIBE t `a.b`.`x.y`", - DescribeColumnCommand( - TableIdentifier("t"), Seq("a.b", "x.y"), isExtended = false)) - - assertEqual("DESCRIBE TABLE t col", - DescribeColumnCommand( - TableIdentifier("t"), Seq("col"), isExtended = false)) - assertEqual("DESCRIBE TABLE EXTENDED t col", - DescribeColumnCommand( - TableIdentifier("t"), Seq("col"), isExtended = true)) - assertEqual("DESCRIBE TABLE FORMATTED t col", - DescribeColumnCommand( - TableIdentifier("t"), Seq("col"), isExtended = true)) - - intercept("DESCRIBE TABLE t PARTITION (ds='1970-01-01') col", - "DESC TABLE COLUMN for a specific partition is not supported") - } - test("analyze table statistics") { assertEqual("analyze table t compute statistics", AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index a3e029f53cec..d95021077f9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, LongType, MapType, Metadata, StringType, StructField, StructType, TimestampType} class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { @@ -71,6 +71,56 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) } + test("DescribeTable using v2 catalog") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string)" + + " USING foo" + + " PARTITIONED BY (id)") + val descriptionDf = spark.sql("DESCRIBE TABLE testcat.table_name") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + val description = descriptionDf.collect() + assert(description === Seq( + Row("id", "bigint", ""), + Row("data", "string", ""))) + } + + test("DescribeTable with v2 catalog when table does not exist.") { + intercept[AnalysisException] { + spark.sql("DESCRIBE TABLE testcat.table_name") + } + } + + test("DescribeTable extended using v2 catalog") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string)" + + " USING foo" + + " PARTITIONED BY (id)" + + " TBLPROPERTIES ('bar'='baz')") + val descriptionDf = spark.sql("DESCRIBE TABLE EXTENDED testcat.table_name") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) + === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + assert(descriptionDf.collect() + .map(_.toSeq) + .map(_.toArray.map(_.toString.trim)) === Array( + Array("id", "bigint", ""), + Array("data", "string", ""), + Array("", "", ""), + Array("Partitioning", "", ""), + Array("--------------", "", ""), + Array("Part 0", "id", ""), + Array("", "", ""), + Array("Table Property", "Value", ""), + Array("----------------", "-------", ""), + Array("bar", "baz", ""), + Array("provider", "foo", ""))) + + } + test("CreateTable: use v2 plan and session catalog when provider is v2") { spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING $orc2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 19a41bee19d9..7c51a29bde90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -71,7 +71,6 @@ class TestInMemoryTableCatalog extends TableCatalog { throw new TableAlreadyExistsException(ident) } TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) - val table = new InMemoryTable(s"$name.${ident.quoted}", schema, partitions, properties) tables.put(ident, table) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index a4587abbf389..c0158f1947d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -30,6 +30,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql.{DescribeColumnStatement, DescribeTableStatement} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.HiveResult.hiveResultString import org.apache.spark.sql.execution.SQLExecution @@ -376,6 +377,8 @@ abstract class HiveComparisonTest (!hiveQuery.logical.isInstanceOf[ShowFunctionsCommand]) && (!hiveQuery.logical.isInstanceOf[DescribeFunctionCommand]) && (!hiveQuery.logical.isInstanceOf[DescribeCommandBase]) && + (!hiveQuery.logical.isInstanceOf[DescribeTableStatement]) && + (!hiveQuery.logical.isInstanceOf[DescribeColumnStatement]) && preparedHive != catalyst) { val hivePrintOut = s"== HIVE - ${preparedHive.size} row(s) ==" +: preparedHive