diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 48fb95b519d88..e2ff18379aa79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -936,7 +936,47 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { comment = comment) selectQuery match { - case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists) + case Some(q) => + // Hive does not allow to use a CTAS statement to create a partitioned table. + if (tableDesc.partitionColumnNames.nonEmpty) { + val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + + "create a partitioned table using Hive's file formats. " + + "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + + "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + + "CTAS statement." + throw operationNotAllowed(errorMessage, ctx) + } + + // CTAS statement does not allow the EXTERNAL keyword. + if (external) { + val errorMessage = "CREATE EXTERNAL TABLE ... AS SELECT. " + + "Please remove the EXTERNAL keyword. As long as a user-specified location is " + + "provided, the data of the table will not be deleted when dropping the table." + throw operationNotAllowed(errorMessage, ctx) + } + + val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null) + if (conf.convertCTAS && !hasStorageProperties) { + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + val options = rowStorage.serdeProperties ++ fileStorage.serdeProperties + val optionsWithPath = if (location.isDefined) { + options + ("path" -> location.get) + } else { + options + } + CreateTableUsingAsSelect( + tableIdent = tableDesc.identifier, + provider = conf.defaultDataSourceName, + temporary = false, + partitionColumns = tableDesc.partitionColumnNames.toArray, + bucketSpec = None, + mode = mode, + options = optionsWithPath, + q + ) + } else { + CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists) + } case None => CreateTableCommand(tableDesc, ifNotExists) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d1db0dd800a33..437e093825f6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -310,6 +310,14 @@ object SQLConf { .stringConf .createWithDefault("parquet") + val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS") + .internal() + .doc("When true, a table created by a Hive CTAS statement (no USING clause) " + + "without specifying any storage property will be converted to a data source table, " + + "using the data source set by spark.sql.sources.default.") + .booleanConf + .createWithDefault(false) + // This is used to control the when we will split a schema's JSON string to multiple pieces // in order to fit the JSON string in metastore's table property (by default, the value has // a length restriction of 4000 characters). We will split the JSON string of a schema @@ -632,6 +640,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) + def convertCTAS: Boolean = getConf(CONVERT_CTAS) + def partitionDiscoveryEnabled(): Boolean = getConf(SQLConf.PARTITION_DISCOVERY_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ff395f39b7052..4cfdac05053c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -447,52 +447,20 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case p: LogicalPlan if p.resolved => p case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) => - val schema = if (table.schema.nonEmpty) { - table.schema + val desc = if (table.storage.serde.isEmpty) { + // add default serde + table.withNewStorage( + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) } else { - child.output.map { a => - CatalogColumn(a.name, a.dataType.catalogString, a.nullable) - } + table } - val desc = table.copy(schema = schema) - - if (sessionState.convertCTAS && table.storage.serde.isEmpty) { - // Do the conversion when spark.sql.hive.convertCTAS is true and the query - // does not specify any storage format (file format and storage handler). - if (table.identifier.database.isDefined) { - throw new AnalysisException( - "Cannot specify database name in a CTAS statement " + - "when spark.sql.hive.convertCTAS is set to true.") - } - - val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableUsingAsSelect( - TableIdentifier(desc.identifier.table), - sessionState.conf.defaultDataSourceName, - temporary = false, - Array.empty[String], - bucketSpec = None, - mode, - options = Map.empty[String, String], - child - ) - } else { - val desc = if (table.storage.serde.isEmpty) { - // add default serde - table.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } else { - table - } + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) - - execution.CreateTableAsSelectCommand( - desc.copy(identifier = TableIdentifier(tblName, Some(dbName))), - child, - allowExisting) - } + execution.CreateHiveTableAsSelectCommand( + desc.copy(identifier = TableIdentifier(tblName, Some(dbName))), + child, + allowExisting) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 081d85acb9084..ca8e5f8223968 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -138,22 +138,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } - /** - * When true, a table created by a Hive CTAS statement (no USING clause) will be - * converted to a data source table, using the data source set by spark.sql.sources.default. - * The table in CTAS statement will be converted when it meets any of the following conditions: - * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or - * a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml - * is either TextFile or SequenceFile. - * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe - * is specified (no ROW FORMAT SERDE clause). - * - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format - * and no SerDe is specified (no ROW FORMAT SERDE clause). - */ - def convertCTAS: Boolean = { - conf.getConf(HiveUtils.CONVERT_CTAS) - } - /** * When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool." */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 88f4a2d2b20ba..9ed357c587c35 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -96,12 +96,6 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(false) - val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS") - .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " + - "converted to a data source table, using the data source set by spark.sql.sources.default.") - .booleanConf - .createWithDefault(false) - val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc") .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " + "the built in support.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala similarity index 95% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index cfe614909532b..b8099385a466b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -26,16 +26,17 @@ import org.apache.spark.sql.hive.MetastoreRelation /** * Create table and insert the query result into it. + * * @param tableDesc the Table Describe, which may contains serde, storage handler etc. * @param query the query whose result will be insert into the new relation - * @param allowExisting allow continue working if it's already exists, otherwise + * @param ignoreIfExists allow continue working if it's already exists, otherwise * raise exception */ private[hive] -case class CreateTableAsSelectCommand( +case class CreateHiveTableAsSelectCommand( tableDesc: CatalogTable, query: LogicalPlan, - allowExisting: Boolean) + ignoreIfExists: Boolean) extends RunnableCommand { private val tableIdentifier = tableDesc.identifier @@ -80,7 +81,7 @@ case class CreateTableAsSelectCommand( // add the relation into catalog, just in case of failure occurs while data // processing. if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { - if (allowExisting) { + if (ignoreIfExists) { // table already exists, will do nothing, to keep consistent with Hive } else { throw new AnalysisException(s"$tableIdentifier already exists.") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 96c8fa6b70501..227e1fc891d76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.serde.serdeConstants - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} @@ -52,7 +50,7 @@ class HiveDDLCommandSuite extends PlanTest { test("Test CTAS #1") { val s1 = - """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view + """CREATE TABLE IF NOT EXISTS mydb.page_view |(viewTime INT, |userid BIGINT, |page_url STRING, @@ -60,7 +58,6 @@ class HiveDDLCommandSuite extends PlanTest { |ip STRING COMMENT 'IP Address of the User', |country STRING COMMENT 'country of origination') |COMMENT 'This is the staging page view table' - |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day') |STORED AS RCFILE |LOCATION '/user/external/page_view' |TBLPROPERTIES ('p1'='v1', 'p2'='v2') @@ -78,16 +75,12 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("page_url", "string") :: CatalogColumn("referrer_url", "string") :: CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: - CatalogColumn("country", "string", comment = Some("country of origination")) :: - CatalogColumn("dt", "string", comment = Some("date type")) :: - CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) assert(desc.comment == Some("This is the staging page view table")) // TODO will be SQLText assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) - assert(desc.partitionColumns == - CatalogColumn("dt", "string", comment = Some("date type")) :: - CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + assert(desc.partitionColumns == Seq.empty[CatalogColumn]) assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) assert(desc.storage.serde == @@ -97,7 +90,7 @@ class HiveDDLCommandSuite extends PlanTest { test("Test CTAS #2") { val s2 = - """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view + """CREATE TABLE IF NOT EXISTS mydb.page_view |(viewTime INT, |userid BIGINT, |page_url STRING, @@ -105,7 +98,6 @@ class HiveDDLCommandSuite extends PlanTest { |ip STRING COMMENT 'IP Address of the User', |country STRING COMMENT 'country of origination') |COMMENT 'This is the staging page view table' - |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day') |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' | STORED AS | INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' @@ -126,16 +118,12 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("page_url", "string") :: CatalogColumn("referrer_url", "string") :: CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: - CatalogColumn("country", "string", comment = Some("country of origination")) :: - CatalogColumn("dt", "string", comment = Some("date type")) :: - CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) // TODO will be SQLText assert(desc.comment == Some("This is the staging page view table")) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) - assert(desc.partitionColumns == - CatalogColumn("dt", "string", comment = Some("date type")) :: - CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + assert(desc.partitionColumns == Seq.empty[CatalogColumn]) assert(desc.storage.serdeProperties == Map()) assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) @@ -197,6 +185,17 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22"))) } + test("CTAS statement with a PARTITIONED BY clause is not allowed") { + assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" + + " AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp") + } + + test("CTAS statement with an EXTERNAL keyword is not allowed") { + assertUnsupported( + s"CREATE EXTERNAL TABLE ctas1 stored as textfile LOCATION 'test'" + + " AS SELECT key FROM (SELECT 1 as key) tmp") + } + test("unsupported operations") { intercept[ParseException] { parser.parsePlan( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 4b51f021bfa0c..a624f07f4c87a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -24,11 +24,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -376,78 +379,138 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ) } - test("CTAS without serde") { - def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { - val relation = EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(TableIdentifier(tableName))) - relation match { - case LogicalRelation(r: HadoopFsRelation, _, _) => - if (!isDataSourceParquet) { - fail( - s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + + def checkRelation( + tableName: String, + isDataSourceParquet: Boolean, + format: String, + userSpecifiedLocation: Option[String] = None): Unit = { + val relation = EliminateSubqueryAliases( + sessionState.catalog.lookupRelation(TableIdentifier(tableName))) + val catalogTable = + sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + relation match { + case LogicalRelation(r: HadoopFsRelation, _, _) => + if (!isDataSourceParquet) { + fail( + s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + s"${HadoopFsRelation.getClass.getCanonicalName}.") - } + } + userSpecifiedLocation match { + case Some(location) => + assert(r.options("path") === location) + case None => // OK. + } + assert( + catalogTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) === format) - case r: MetastoreRelation => - if (isDataSourceParquet) { - fail( - s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " + + case r: MetastoreRelation => + if (isDataSourceParquet) { + fail( + s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " + s"${classOf[MetastoreRelation].getCanonicalName}.") - } - } + } + userSpecifiedLocation match { + case Some(location) => + assert(r.catalogTable.storage.locationUri.get === location) + case None => // OK. + } + // Also make sure that the format is the desired format. + assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format)) } - val originalConf = sessionState.convertCTAS + // When a user-specified location is defined, the table type needs to be EXTERNAL. + val actualTableType = catalogTable.tableType + userSpecifiedLocation match { + case Some(location) => + assert(actualTableType === CatalogTableType.EXTERNAL) + case None => + assert(actualTableType === CatalogTableType.MANAGED) + } + } - setConf(HiveUtils.CONVERT_CTAS, true) + test("CTAS without serde without location") { + val originalConf = sessionState.conf.convertCTAS + setConf(SQLConf.CONVERT_CTAS, true) + + val defaultDataSource = sessionState.conf.defaultDataSourceName try { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value") - var message = intercept[AnalysisException] { + val message = intercept[AnalysisException] { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") }.getMessage assert(message.contains("already exists")) - checkRelation("ctas1", true) + checkRelation("ctas1", true, defaultDataSource) sql("DROP TABLE ctas1") // Specifying database name for query can be converted to data source write path // is not allowed right now. - message = intercept[AnalysisException] { - sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value") - }.getMessage - assert( - message.contains("Cannot specify database name in a CTAS statement"), - "When spark.sql.hive.convertCTAS is true, we should not allow " + - "database name specified.") + sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true, defaultDataSource) + sql("DROP TABLE ctas1") sql("CREATE TABLE ctas1 stored as textfile" + " AS SELECT key k, value FROM src ORDER BY k, value") - checkRelation("ctas1", true) + checkRelation("ctas1", false, "text") sql("DROP TABLE ctas1") sql("CREATE TABLE ctas1 stored as sequencefile" + " AS SELECT key k, value FROM src ORDER BY k, value") - checkRelation("ctas1", true) + checkRelation("ctas1", false, "sequence") sql("DROP TABLE ctas1") sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k, value") - checkRelation("ctas1", false) + checkRelation("ctas1", false, "rcfile") sql("DROP TABLE ctas1") sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value") - checkRelation("ctas1", false) + checkRelation("ctas1", false, "orc") sql("DROP TABLE ctas1") sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value") - checkRelation("ctas1", false) + checkRelation("ctas1", false, "parquet") sql("DROP TABLE ctas1") } finally { - setConf(HiveUtils.CONVERT_CTAS, originalConf) + setConf(SQLConf.CONVERT_CTAS, originalConf) sql("DROP TABLE IF EXISTS ctas1") } } + test("CTAS without serde with location") { + withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") { + withTempDir { dir => + val defaultDataSource = sessionState.conf.defaultDataSourceName + + val tempLocation = dir.getCanonicalPath + sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" + + " AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c1")) + sql("DROP TABLE ctas1") + + sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c2'" + + " AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c2")) + sql("DROP TABLE ctas1") + + sql(s"CREATE TABLE ctas1 stored as textfile LOCATION 'file:$tempLocation/c3'" + + " AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false, "text", Some(s"file:$tempLocation/c3")) + sql("DROP TABLE ctas1") + + sql(s"CREATE TABLE ctas1 stored as sequenceFile LOCATION 'file:$tempLocation/c4'" + + " AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false, "sequence", Some(s"file:$tempLocation/c4")) + sql("DROP TABLE ctas1") + + sql(s"CREATE TABLE ctas1 stored as rcfile LOCATION 'file:$tempLocation/c5'" + + " AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false, "rcfile", Some(s"file:$tempLocation/c5")) + sql("DROP TABLE ctas1") + } + } + } + test("CTAS with serde") { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql( @@ -785,8 +848,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // generates an invalid query plan. val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) read.json(rdd).createOrReplaceTempView("data") - val originalConf = sessionState.convertCTAS - setConf(HiveUtils.CONVERT_CTAS, false) + val originalConf = sessionState.conf.convertCTAS + setConf(SQLConf.CONVERT_CTAS, false) try { sql("CREATE TABLE explodeTest (key bigInt)") @@ -805,7 +868,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("DROP TABLE explodeTest") dropTempTable("data") } finally { - setConf(HiveUtils.CONVERT_CTAS, originalConf) + setConf(SQLConf.CONVERT_CTAS, originalConf) } }