Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ statement
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
tableIdentifier partitionSpec? describeColName? #describeTable
multipartIdentifier partitionSpec? describeColName? #describeTable
| (DESC | DESCRIBE) QUERY? query #describeQuery
| REFRESH TABLE tableIdentifier #refreshTable
| REFRESH (STRING | .*?) #refreshResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -2449,4 +2449,36 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
visitMultipartIdentifier(ctx.multipartIdentifier),
visitLocationSpec(ctx.locationSpec))
}

/**
* Create a [[DescribeColumnStatement]] or [[DescribeTableStatement]] commands.
*/
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null
if (ctx.describeColName != null) {
if (ctx.partitionSpec != null) {
throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx)
} else {
DescribeColumnStatement(
visitMultipartIdentifier(ctx.multipartIdentifier()),
ctx.describeColName.nameParts.asScala.map(_.getText),
isExtended)
}
} else {
val partitionSpec = if (ctx.partitionSpec != null) {
// According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
visitPartitionSpec(ctx.partitionSpec).map {
case (key, Some(value)) => key -> value
case (key, _) =>
throw new ParseException(s"PARTITION specification is incomplete: `$key`", ctx)
}
} else {
Map.empty[String, String]
}
DescribeTableStatement(
visitMultipartIdentifier(ctx.multipartIdentifier()),
partitionSpec,
isExtended)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField, StructType}

private[sql] object DescribeTableSchemas {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Singular, DescribeTableSchema?

val DESCRIBE_TABLE_ATTRIBUTES = Seq(
Copy link
Contributor

@cloud-fan cloud-fan Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't define attributes in an object. AttributeReference will be assigned a unique ID when created, and in general we should create new attributes when creating a new logical plan.

For example, if you do df1 = sql("desc table t1"); df2 = sql("desc table ");, df1.join(df2) would fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you join the results of DESCRIBE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala> val df1 = sql("desc t1")
df1: org.apache.spark.sql.DataFrame = [col_name: string, data_type: string ... 1 more field]

scala> val df2 = sql("desc t2")
df2: org.apache.spark.sql.DataFrame = [col_name: string, data_type: string ... 1 more field]

scala> df1.crossJoin(df2).show
+--------+---------+-------+--------+---------+-------+
|col_name|data_type|comment|col_name|data_type|comment|
+--------+---------+-------+--------+---------+-------+
|       i|      int|   null|       j|      int|   null|
+--------+---------+-------+--------+---------+-------+

This is not a common use case but we don't have to break it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this from a value to a method, so it will generate new identifiers every time while still being shared amongst multiple contexts.

AttributeReference("col_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
AttributeReference("data_type", StringType, nullable = false,
new MetadataBuilder().putString("comment", "data type of the column").build())(),
AttributeReference("comment", StringType, nullable = true,
new MetadataBuilder().putString("comment", "comment of the column").build())()
)

val DESCRIBE_TABLE_SCHEMA = StructType(
DESCRIBE_TABLE_ATTRIBUTES.map(attr => StructField(attr.name, attr.dataType, attr.nullable)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: StructType.fromAttributes(DESCRIBE_TABLE_ATTRIBUTES)

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.sql.DescribeTableStatement
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -499,6 +501,14 @@ object OverwritePartitionsDynamic {
}
}

case class DescribeTable(
catalog: TableCatalog,
ident: Identifier,
isExtended: Boolean) extends Command {
override lazy val output = DescribeTableSchemas.DESCRIBE_TABLE_ATTRIBUTES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need lazy val here, as it's not a heavy computing

override lazy val schema = DescribeTableSchemas.DESCRIBE_TABLE_SCHEMA
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by default schema is StructType.fromAttributes(output), so we don't need to override it.

}

/**
* Drop a table.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.{MetadataBuilder, StringType}

case class DescribeColumnStatement(
tableName: Seq[String],
colNameParts: Seq[String],
isExtended: Boolean) extends ParsedStatement {
override def output: Seq[Attribute] = {
Seq(
AttributeReference("info_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column info").build())(),
AttributeReference("info_value", StringType, nullable = false,
new MetadataBuilder().putString("comment", "value of the column info").build())()
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.DescribeTableSchemas

case class DescribeTableStatement(
tableName: Seq[String],
partitionSpec: TablePartitionSpec,
isExtended: Boolean) extends ParsedStatement {
override def output: Seq[Attribute] = DescribeTableSchemas.DESCRIBE_TABLE_ATTRIBUTES
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,38 +327,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
}

/**
* Create a [[DescribeColumnCommand]] or [[DescribeTableCommand]] logical commands.
*/
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null
if (ctx.describeColName != null) {
if (ctx.partitionSpec != null) {
throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx)
} else {
DescribeColumnCommand(
visitTableIdentifier(ctx.tableIdentifier),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that these rules create DescribeColumnStatement and DescribeTableStatement, they should be moved into Catalyst. There isn't anything specific to the implementation any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that DescribeColumnCommand and DescribeTableCommand should be moved to Catalyst? The V1 commands depend on a bunch of stuff that's in core, such as SparkSession and DDLUtils.

ctx.describeColName.nameParts.asScala.map(_.getText),
isExtended)
}
} else {
val partitionSpec = if (ctx.partitionSpec != null) {
// According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
visitPartitionSpec(ctx.partitionSpec).map {
case (key, Some(value)) => key -> value
case (key, _) =>
throw new ParseException(s"PARTITION specification is incomplete: `$key`", ctx)
}
} else {
Map.empty[String, String]
}
DescribeTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
partitionSpec,
isExtended)
}
}

/**
* Create a [[DescribeQueryCommand]] logical command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.DescribeTableSchemas
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier}
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
Expand Down Expand Up @@ -499,15 +500,7 @@ case class TruncateTableCommand(
}

abstract class DescribeCommandBase extends RunnableCommand {
override val output: Seq[Attribute] = Seq(
// Column names are based on Hive.
AttributeReference("col_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
AttributeReference("data_type", StringType, nullable = false,
new MetadataBuilder().putString("comment", "data type of the column").build())(),
AttributeReference("comment", StringType, nullable = true,
new MetadataBuilder().putString("comment", "comment of the column").build())()
)
override val output = DescribeTableSchemas.DESCRIBE_TABLE_ATTRIBUTES

protected def describeSchema(
schema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -85,6 +85,25 @@ case class DataSourceResolution(
.asTableCatalog
convertCTAS(catalog, identifier, create)

case DescribeColumnStatement(
AsTableIdentifier(tableName), colName, isExtended) =>
DescribeColumnCommand(tableName, colName, isExtended)

case DescribeColumnStatement(
CatalogObjectIdentifier(Some(catalog), ident), colName, isExtended) =>
throw new AnalysisException("Describing columns is not supported for v2 tables.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be supported eventually, or is it redundant if DESCRIBE TABLE is available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we need to support it eventually if only to keep parity with V1 tables.


case DescribeTableStatement(
AsTableIdentifier(tableName), partitionSpec, isExtended) =>
DescribeTableCommand(tableName, partitionSpec, isExtended)

case DescribeTableStatement(
CatalogObjectIdentifier(Some(catalog), ident), partitionSpec, isExtended) =>
if (partitionSpec.nonEmpty) {
throw new AnalysisException("DESCRIBE TABLE... PARTITION is not supported for v2 tables.")
}
DescribeTable(catalog.asTableCatalog, ident, isExtended)

case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) =>
DropTable(catalog.asTableCatalog, ident, ifExists)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources
Expand Down Expand Up @@ -199,6 +200,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
Nil
}

case DescribeTable(catalog, ident, isExtended) =>
DescribeTableExec(catalog, ident, isExtended) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

case DescribeTable(catalog, ident, r: DataSourceV2Relation, isExtended) =>
  DescribeTableExec(r.table, ident, isExtended) :: Nil

Then we don't need to lookup the table again in DescribeTableExec


case DropTable(catalog, ident, ifExists) =>
DropTableExec(catalog, ident, ifExists) :: Nil

Expand Down
Loading