Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

/**
* Create a table, returning either a [[CreateTableCommand]] or a
* [[CreateTableAsSelectLogicalPlan]].
* [[CreateHiveTableAsSelectLogicalPlan]].
*
* This is not used to create datasource tables, which is handled through
* "CREATE TABLE ... USING ...".
Expand Down Expand Up @@ -936,7 +936,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
comment = comment)

selectQuery match {
case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
case Some(q) =>
// Hive does not allow to use a CTAS statement to create a partitioned table.
if (tableDesc.partitionColumnNames.nonEmpty) {
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Hive's file formats. " +
"Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
"OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
"CTAS statement."
throw operationNotAllowed(errorMessage, ctx)
}

val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
if (conf.convertCTAS && !hasStorageProperties) {
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val optionsWithPath = if (location.isDefined) {
Map("path" -> location.get)
} else {
Map.empty[String, String]
}
CreateTableUsingAsSelect(
tableIdent = tableDesc.identifier,
provider = conf.defaultDataSourceName,
temporary = false,
partitionColumns = tableDesc.partitionColumnNames.toArray,
bucketSpec = None,
mode = mode,
options = optionsWithPath,
q
)
} else {
CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
}
case None => CreateTableCommand(tableDesc, ifNotExists)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

case class CreateTableAsSelectLogicalPlan(
case class CreateHiveTableAsSelectLogicalPlan(
tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean) extends UnaryNode with Command {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ object SQLConf {
.stringConf
.createWithDefault("parquet")

val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
.internal()
.doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
"without specifying any storage property will be converted to a data source table, " +
"using the data source set by spark.sql.sources.default.")
.booleanConf
.createWithDefault(false)

// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
Expand Down Expand Up @@ -632,6 +640,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)

def convertCTAS: Boolean = getConf(CONVERT_CTAS)

def partitionDiscoveryEnabled(): Boolean =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.orc.OrcFileFormat
Expand Down Expand Up @@ -446,53 +446,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p

case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
val schema = if (table.schema.nonEmpty) {
table.schema
case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) =>
val desc = if (table.storage.serde.isEmpty) {
// add default serde
table.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
child.output.map { a =>
CatalogColumn(a.name, a.dataType.catalogString, a.nullable)
}
table
}

val desc = table.copy(schema = schema)

if (sessionState.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.identifier.database.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.identifier.table),
sessionState.conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
bucketSpec = None,
mode,
options = Map.empty[String, String],
child
)
} else {
val desc = if (table.storage.serde.isEmpty) {
// add default serde
table.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
table
}

val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)

execution.CreateTableAsSelectCommand(
desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting)
}
execution.CreateHiveTableAsSelectCommand(
desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep this restriction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we do support having db name, right?

Expand Down Expand Up @@ -543,6 +511,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
/**
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

/**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
* - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or
* a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml
* is either TextFile or SequenceFile.
* - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe
* is specified (no ROW FORMAT SERDE clause).
* - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
* and no SerDe is specified (no ROW FORMAT SERDE clause).
*/
def convertCTAS: Boolean = {
conf.getConf(HiveUtils.CONVERT_CTAS)
}

/**
* When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(false)

val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
.doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
"converted to a data source table, using the data source set by spark.sql.sources.default.")
.booleanConf
.createWithDefault(false)

val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
.doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
"the built in support.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import org.apache.spark.sql.hive.MetastoreRelation

/**
* Create table and insert the query result into it.
*
* @param tableDesc the Table Describe, which may contains serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param allowExisting allow continue working if it's already exists, otherwise
* @param ignoreIfExists allow continue working if it's already exists, otherwise
* raise exception
*/
private[hive]
case class CreateTableAsSelectCommand(
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
allowExisting: Boolean)
ignoreIfExists: Boolean)
extends RunnableCommand {

private val tableIdentifier = tableDesc.identifier
Expand Down Expand Up @@ -80,7 +81,7 @@ case class CreateTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
if (ignoreIfExists) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
throw new AnalysisException(s"$tableIdentifier already exists.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class HiveDDLCommandSuite extends PlanTest {
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
case c: CreateTableCommand => (c.table, c.ifNotExists)
case c: CreateTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
case c: CreateViewCommand => (c.tableDesc, c.allowExisting)
}.head
}
Expand All @@ -58,7 +58,6 @@ class HiveDDLCommandSuite extends PlanTest {
|ip STRING COMMENT 'IP Address of the User',
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|STORED AS RCFILE
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
Expand All @@ -76,16 +75,12 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("page_url", "string") ::
CatalogColumn("referrer_url", "string") ::
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
CatalogColumn("country", "string", comment = Some("country of origination")) ::
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
assert(desc.comment == Some("This is the staging page view table"))
// TODO will be SQLText
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
assert(desc.partitionColumns == Seq.empty[CatalogColumn])
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
Expand All @@ -103,7 +98,6 @@ class HiveDDLCommandSuite extends PlanTest {
|ip STRING COMMENT 'IP Address of the User',
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
Expand All @@ -124,16 +118,12 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("page_url", "string") ::
CatalogColumn("referrer_url", "string") ::
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
CatalogColumn("country", "string", comment = Some("country of origination")) ::
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
// TODO will be SQLText
assert(desc.comment == Some("This is the staging page view table"))
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
assert(desc.partitionColumns == Seq.empty[CatalogColumn])
assert(desc.storage.serdeProperties == Map())
assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
Expand Down Expand Up @@ -195,6 +185,11 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
}

test("CTAS statement with a PARTITIONED BY clause is not allowed") {
assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" +
" AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp")
}

test("unsupported operations") {
intercept[ParseException] {
parser.parsePlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"== Analyzed Logical Plan ==",
"== Optimized Logical Plan ==",
"== Physical Plan ==",
"CreateTableAsSelect",
"CreateHiveTableAsSelect",
"InsertIntoHiveTable",
"Limit",
"src")
Expand All @@ -71,7 +71,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"== Analyzed Logical Plan ==",
"== Optimized Logical Plan ==",
"== Physical Plan ==",
"CreateTableAsSelect",
"CreateHiveTableAsSelect",
"InsertIntoHiveTable",
"Limit",
"src")
Expand All @@ -92,7 +92,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
val shouldContain =
"== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" ::
"== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
"CreateTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
"CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
for (key <- shouldContain) {
assert(outputs.contains(key), s"$key doesn't exist in result")
}
Expand Down
Loading