Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ statement
| SHOW PARTITIONS multipartIdentifier partitionSpec? #showPartitions
| SHOW identifier? FUNCTIONS
(LIKE? (multipartIdentifier | pattern=STRING))? #showFunctions
| SHOW CREATE TABLE multipartIdentifier #showCreateTable
| SHOW CREATE TABLE multipartIdentifier (AS SPARK)? #showCreateTable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rethinking it, let us make it more aggressive here. Instead of creating Spark native tables for the existing Hive serde tables, we can try to always show how to create Spark native tables if possible. This will further simplify the migration from Hive to Spark.

To the existing Spark users who prefer to keeping Hive serde formats, we can introduce a new option AS SERDE which will keep the behaviors in Spark 2.4 or prior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The new proposal makes more sense!

Copy link
Member Author

@viirya viirya Jan 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confusing and let me confirm. So you mean let SHOW CREATE TABLE work with AS SPARK (so not to add new AS SPARK option) by default, and only fallback to current behavior (show how to create Hive serde table) when given AS SERDE?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

| SHOW CURRENT NAMESPACE #showCurrentNamespace
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase
Expand Down Expand Up @@ -1120,6 +1120,7 @@ ansiNonReserved
| SKEWED
| SORT
| SORTED
| SPARK
| START
| STATISTICS
| STORED
Expand Down Expand Up @@ -1388,6 +1389,7 @@ nonReserved
| SOME
| SORT
| SORTED
| SPARK
| START
| STATISTICS
| STORED
Expand Down Expand Up @@ -1659,6 +1661,7 @@ SKEWED: 'SKEWED';
SOME: 'SOME';
SORT: 'SORT';
SORTED: 'SORTED';
SPARK: 'SPARK';
START: 'START';
STATISTICS: 'STATISTICS';
STORED: 'STORED';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2982,7 +2982,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* Creates a [[ShowCreateTableStatement]]
*/
override def visitShowCreateTable(ctx: ShowCreateTableContext): LogicalPlan = withOrigin(ctx) {
ShowCreateTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
ShowCreateTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()), ctx.SPARK != null)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,9 @@ case class LoadDataStatement(
/**
* A SHOW CREATE TABLE statement, as parsed from SQL.
*/
case class ShowCreateTableStatement(tableName: Seq[String]) extends ParsedStatement
case class ShowCreateTableStatement(
tableName: Seq[String],
asSpark: Boolean = false) extends ParsedStatement

/**
* A CACHE TABLE statement, as parsed from SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,14 @@ class ResolveSessionCatalog(
isOverwrite,
partition)

case ShowCreateTableStatement(tableName) =>
case ShowCreateTableStatement(tableName, asSpark) if !asSpark =>
val v1TableName = parseV1Table(tableName, "SHOW CREATE TABLE")
ShowCreateTableCommand(v1TableName.asTableIdentifier)

case ShowCreateTableStatement(tableName, asSpark) if asSpark =>
val v1TableName = parseV1Table(tableName, "SHOW CREATE TABLE AS SPARK")
ShowCreateTableAsSparkCommand(v1TableName.asTableIdentifier)

case CacheTableStatement(tableName, plan, isLazy, options) =>
val v1TableName = parseV1Table(tableName, "CACHE TABLE")
CacheTableCommand(v1TableName.asTableIdentifier, plan, isLazy, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils

Expand Down Expand Up @@ -940,7 +940,95 @@ case class ShowPartitionsCommand(
}
}

case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableCommand {
/**
* Provides common utilities between `ShowCreateTableCommand` and `ShowCreateTableAsSparkCommand`.
*/
trait ShowCreateTableCommandBase {

protected val table: TableIdentifier

protected def showTableLocation(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.tableType == EXTERNAL) {
metadata.storage.locationUri.foreach { location =>
builder ++= s"LOCATION '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'\n"
}
}
}

protected def showTableComment(metadata: CatalogTable, builder: StringBuilder): Unit = {
metadata
.comment
.map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}

protected def showTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.properties.nonEmpty) {
val props = metadata.properties.map { case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}

builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
}
}

protected def showDataSourceTableDataColumns(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method showDataSourceTableDataColumns / showDataSourceTableOptions/ showDataSourceTableNonDataColumns / showCreateDataSourceTable are only used in ShowCreateTableCommand. Shall we move them into ShowCreateTableCommand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yea, actually they put there because for previous AS SPARK option, they are used in both commands. Forgot to move them. Thanks.

metadata: CatalogTable, builder: StringBuilder): Unit = {
val columns = metadata.schema.fields.map(_.toDDL)
builder ++= columns.mkString("(", ", ", ")\n")
}

protected def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
builder ++= s"USING ${metadata.provider.get}\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is metadata.provider always defined here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for datasource table. For such table, I think provider is available there.

Copy link
Member

@gengliangwang gengliangwang Jan 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be better to add comments or an assertion here to explain that the provider is always defined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Added comments for that.


val dataSourceOptions = SQLConf.get.redactOptions(metadata.storage.properties).map {
case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
}

if (dataSourceOptions.nonEmpty) {
builder ++= "OPTIONS (\n"
builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n")
builder ++= ")\n"
}
}

protected def showDataSourceTableNonDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
val partCols = metadata.partitionColumnNames
if (partCols.nonEmpty) {
builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
}

metadata.bucketSpec.foreach { spec =>
if (spec.bucketColumnNames.nonEmpty) {
builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"

if (spec.sortColumnNames.nonEmpty) {
builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n"
}

builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
}
}
}

protected def showCreateDataSourceTable(metadata: CatalogTable): String = {
val builder = StringBuilder.newBuilder

builder ++= s"CREATE TABLE ${table.quotedString} "
showDataSourceTableDataColumns(metadata, builder)
showDataSourceTableOptions(metadata, builder)
showDataSourceTableNonDataColumns(metadata, builder)
showTableComment(metadata, builder)
showTableLocation(metadata, builder)
showTableProperties(metadata, builder)

builder.toString()
}
}

case class ShowCreateTableCommand(table: TableIdentifier)
extends RunnableCommand with ShowCreateTableCommandBase {
override val output: Seq[Attribute] = Seq(
AttributeReference("createtab_stmt", StringType, nullable = false)()
)
Expand Down Expand Up @@ -1041,7 +1129,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
storage.serde.foreach { serde =>
builder ++= s"ROW FORMAT SERDE '$serde'\n"

val serdeProps = metadata.storage.properties.map {
val serdeProps = SQLConf.get.redactOptions(metadata.storage.properties).map {
case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}
Expand All @@ -1061,83 +1149,86 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
}
}
}
}

private def showTableLocation(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.tableType == EXTERNAL) {
metadata.storage.locationUri.foreach { location =>
builder ++= s"LOCATION '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'\n"
}
}
}
/**
* This commands generates Spark DDL for Hive table.
*
* The syntax of using this command in SQL is:
* {{{
* SHOW CREATE TABLE table_identifier AS SPARK;
* }}}
*/
case class ShowCreateTableAsSparkCommand(table: TableIdentifier)
extends RunnableCommand with ShowCreateTableCommandBase {
override val output: Seq[Attribute] = Seq(
AttributeReference("sparktab_stmt", StringType, nullable = false)()
)

private def showTableComment(metadata: CatalogTable, builder: StringBuilder): Unit = {
metadata
.comment
.map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val tableMetadata = catalog.getTableMetadata(table)

private def showTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.properties.nonEmpty) {
val props = metadata.properties.map { case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
throw new AnalysisException(
s"$table is already a Spark data source table. Use `SHOW CREATE TABLE` instead.")
} else {
if (tableMetadata.unsupportedFeatures.nonEmpty) {
throw new AnalysisException(
"Failed to execute SHOW CREATE TABLE AS SPARK against table " +
s"${tableMetadata.identifier}, which is created by Hive and uses the " +
"following unsupported feature(s)\n" +
tableMetadata.unsupportedFeatures.map(" - " + _).mkString("\n")
)
}

builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
}
}

private def showCreateDataSourceTable(metadata: CatalogTable): String = {
val builder = StringBuilder.newBuilder

builder ++= s"CREATE TABLE ${table.quotedString} "
showDataSourceTableDataColumns(metadata, builder)
showDataSourceTableOptions(metadata, builder)
showDataSourceTableNonDataColumns(metadata, builder)
showTableComment(metadata, builder)
showTableLocation(metadata, builder)
showTableProperties(metadata, builder)

builder.toString()
}

private def showDataSourceTableDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
val columns = metadata.schema.fields.map(_.toDDL)
builder ++= columns.mkString("(", ", ", ")\n")
}
if (tableMetadata.tableType == VIEW) {
throw new AnalysisException("Hive view isn't supported by SHOW CREATE TABLE AS SPARK")
}

private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
builder ++= s"USING ${metadata.provider.get}\n"
// scalastyle:off caselocale
if (tableMetadata.properties.getOrElse("transactional", "false").toLowerCase.equals("true")) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just move the property to unsupportedFeatures of CatalogTable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO .. its a good idea. I am not sure what happens today when we try to select from a hive transactional table ? If we add it to unsupported features, then we will get an error during select ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't try it, but seems you read it but no results return, like SPARK-15348, SPARK-16996 track.

Currently I don't see we set limits on unsupportedFeatures when reading a table. Anyway, it is still fine to leave it as is. Just rise this as a question.

throw new AnalysisException(
"SHOW CRETE TABLE AS SPARK doesn't support transactional Hive table")
}
// scalastyle:on caselocale

val dataSourceOptions = SQLConf.get.redactOptions(metadata.storage.properties).map {
case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
showCreateDataSourceTable(convertTableMetadata(tableMetadata))
}

if (dataSourceOptions.nonEmpty) {
builder ++= "OPTIONS (\n"
builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n")
builder ++= ")\n"
}
Seq(Row(stmt))
}

private def showDataSourceTableNonDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
val partCols = metadata.partitionColumnNames
if (partCols.nonEmpty) {
builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
}

metadata.bucketSpec.foreach { spec =>
if (spec.bucketColumnNames.nonEmpty) {
builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"

if (spec.sortColumnNames.nonEmpty) {
builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n"
}

builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
private def convertTableMetadata(tableMetadata: CatalogTable): CatalogTable = {
val hiveSerde = HiveSerDe(
serde = tableMetadata.storage.serde,
inputFormat = tableMetadata.storage.inputFormat,
outputFormat = tableMetadata.storage.outputFormat)

// Looking for Spark data source that maps to to the Hive serde.
// TODO: some Hive fileformat + row serde might be mapped to Spark data source, e.g. CSV.
val source = HiveSerDe.serdeToSource(hiveSerde)
if (source.isEmpty) {
val builder = StringBuilder.newBuilder
hiveSerde.serde.foreach { serde =>
builder ++= s" SERDE: $serde"
}
hiveSerde.inputFormat.foreach { format =>
builder ++= s" INPUTFORMAT: $format"
}
hiveSerde.outputFormat.foreach { format =>
builder ++= s" OUTPUTFORMAT: $format"
}
throw new AnalysisException(
"Failed to execute SHOW CREATE TABLE AS SPARK against table " +
s"${tableMetadata.identifier}, which is created by Hive and uses the " +
"following unsupported serde configuration\n" +
builder.toString()
)
} else {
// TODO: should we keep Hive serde properties?
val newStorage = tableMetadata.storage.copy(properties = Map.empty)
tableMetadata.copy(provider = source, storage = newStorage)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ object HiveSerDe {
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))

// `HiveSerDe` in `serdeMap` should be dintinct.
val serdeInverseMap: Map[HiveSerDe, String] = serdeMap.flatMap {
case ("sequencefile", _) => None
case ("rcfile", _) => None
case ("textfile", serde) => Some((serde, "text"))
case pair => Some(pair.swap)
}

/**
* Get the Hive SerDe information from the data source abbreviation string or classname.
*
Expand All @@ -88,6 +96,14 @@ object HiveSerDe {
serdeMap.get(key)
}

/**
* Get the Spark data source name from the Hive SerDe information.
*
* @param serde Hive SerDe information.
* @return Spark data source name associated with the specified Hive Serde.
*/
def serdeToSource(serde: HiveSerDe): Option[String] = serdeInverseMap.get(serde)

def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
// To respect hive-site.xml, it peeks Hadoop configuration from existing Spark session,
// as an easy workaround. See SPARK-27555.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils {
}
}

private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
protected def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
def normalize(table: CatalogTable): CatalogTable = {
val nondeterministicProps = Set(
"CreateTime",
Expand Down
Loading