Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,39 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
comment = comment)

selectQuery match {
case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
case Some(q) =>
// Hive does not allow to use a CTAS statement to create a partitioned table.
if (tableDesc.partitionColumnNames.nonEmpty) {
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Hive's file formats. " +
"Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
"OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
"CTAS statement."
throw operationNotAllowed(errorMessage, ctx)
}

val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
if (conf.convertCTAS && !hasStorageProperties) {
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
val options = rowStorage.serdeProperties ++ fileStorage.serdeProperties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these will always be empty if we've reached here, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

val optionsWithPath = if (location.isDefined) {
options + ("path" -> location.get)
} else {
options
}
CreateTableUsingAsSelect(
tableIdent = tableDesc.identifier,
provider = conf.defaultDataSourceName,
temporary = false,
partitionColumns = tableDesc.partitionColumnNames.toArray,
bucketSpec = None,
mode = mode,
options = optionsWithPath,
q
)
} else {
CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one also be renamed to CreateHiveTableAsSelectLogicalPlan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at its implementation, it is not hive-specific. So, seems fine to leave it as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like HiveMetastoreCatalog is the only user though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Let me change it

}
case None => CreateTableCommand(tableDesc, ifNotExists)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ object SQLConf {
.stringConf
.createWithDefault("parquet")

val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
.internal()
.doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
"without specifying any storage property will be converted to a data source table, " +
"using the data source set by spark.sql.sources.default.")
.booleanConf
.createWithDefault(false)

// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
Expand Down Expand Up @@ -632,6 +640,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)

def convertCTAS: Boolean = getConf(CONVERT_CTAS)

def partitionDiscoveryEnabled(): Boolean =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,52 +447,20 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
case p: LogicalPlan if p.resolved => p

case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
val schema = if (table.schema.nonEmpty) {
table.schema
val desc = if (table.storage.serde.isEmpty) {
// add default serde
table.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
child.output.map { a =>
CatalogColumn(a.name, a.dataType.catalogString, a.nullable)
}
table
}

val desc = table.copy(schema = schema)

if (sessionState.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.identifier.database.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.identifier.table),
sessionState.conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
bucketSpec = None,
mode,
options = Map.empty[String, String],
child
)
} else {
val desc = if (table.storage.serde.isEmpty) {
// add default serde
table.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
table
}
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)

val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)

execution.CreateTableAsSelectCommand(
desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting)
}
execution.CreateHiveTableAsSelectCommand(
desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep this restriction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we do support having db name, right?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

/**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
* - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or
* a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml
* is either TextFile or SequenceFile.
* - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe
* is specified (no ROW FORMAT SERDE clause).
* - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
* and no SerDe is specified (no ROW FORMAT SERDE clause).
*/
def convertCTAS: Boolean = {
conf.getConf(HiveUtils.CONVERT_CTAS)
}

/**
* When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(false)

val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
.doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
"converted to a data source table, using the data source set by spark.sql.sources.default.")
.booleanConf
.createWithDefault(false)

val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
.doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
"the built in support.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import org.apache.spark.sql.hive.MetastoreRelation

/**
* Create table and insert the query result into it.
*
* @param tableDesc the Table Describe, which may contains serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param allowExisting allow continue working if it's already exists, otherwise
* @param ignoreIfExists allow continue working if it's already exists, otherwise
* raise exception
*/
private[hive]
case class CreateTableAsSelectCommand(
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
allowExisting: Boolean)
ignoreIfExists: Boolean)
extends RunnableCommand {

private val tableIdentifier = tableDesc.identifier
Expand Down Expand Up @@ -80,7 +81,7 @@ case class CreateTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
if (ignoreIfExists) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
throw new AnalysisException(s"$tableIdentifier already exists.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
Expand Down Expand Up @@ -60,7 +58,6 @@ class HiveDDLCommandSuite extends PlanTest {
|ip STRING COMMENT 'IP Address of the User',
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|STORED AS RCFILE
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
Expand All @@ -78,16 +75,12 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("page_url", "string") ::
CatalogColumn("referrer_url", "string") ::
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
CatalogColumn("country", "string", comment = Some("country of origination")) ::
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
assert(desc.comment == Some("This is the staging page view table"))
// TODO will be SQLText
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
assert(desc.partitionColumns == Seq.empty[CatalogColumn])
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
Expand All @@ -105,7 +98,6 @@ class HiveDDLCommandSuite extends PlanTest {
|ip STRING COMMENT 'IP Address of the User',
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
Expand All @@ -126,16 +118,12 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("page_url", "string") ::
CatalogColumn("referrer_url", "string") ::
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
CatalogColumn("country", "string", comment = Some("country of origination")) ::
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
// TODO will be SQLText
assert(desc.comment == Some("This is the staging page view table"))
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
assert(desc.partitionColumns == Seq.empty[CatalogColumn])
assert(desc.storage.serdeProperties == Map())
assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
Expand Down Expand Up @@ -197,6 +185,11 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
}

test("CTAS statement with a PARTITIONED BY clause is not allowed") {
assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" +
" AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp")
}

test("unsupported operations") {
intercept[ParseException] {
parser.parsePlan(
Expand Down
Loading