@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._
3030import org .apache .spark .sql .catalyst .parser .SqlBaseParser ._
3131import org .apache .spark .sql .catalyst .plans .logical .{LogicalPlan , OneRowRelation , ScriptInputOutputSchema }
3232import org .apache .spark .sql .execution .command ._
33- import org .apache .spark .sql .execution .datasources .{CreateTempViewUsing , _ }
33+ import org .apache .spark .sql .execution .datasources .{CreateTable , CreateTempViewUsing , _ }
3434import org .apache .spark .sql .internal .{HiveSerDe , SQLConf , VariableSubstitution }
3535import org .apache .spark .sql .types .{DataType , StructType }
3636
@@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
310310 }
311311
312312 /**
313- * Create a [[CreateTableUsing ]] or a [[ CreateTableUsingAsSelect ]] logical plan.
313+ * Create a [[CreateTable ]] logical plan.
314314 */
315315 override def visitCreateTableUsing (ctx : CreateTableUsingContext ): LogicalPlan = withOrigin(ctx) {
316316 val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
@@ -319,12 +319,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
319319 }
320320 val options = Option (ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map .empty)
321321 val provider = ctx.tableProvider.qualifiedName.getText
322+ val schema = Option (ctx.colTypeList()).map(createStructType)
322323 val partitionColumnNames =
323324 Option (ctx.partitionColumnNames)
324325 .map(visitIdentifierList(_).toArray)
325326 .getOrElse(Array .empty[String ])
326327 val bucketSpec = Option (ctx.bucketSpec()).map(visitBucketSpec)
327328
329+ val tableDesc = CatalogTable (
330+ identifier = table,
331+ // TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the
332+ // physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a
333+ // boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate
334+ // setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and
335+ // make it take `CatalogTable` directly.
336+ tableType = CatalogTableType .MANAGED ,
337+ storage = CatalogStorageFormat .empty.copy(properties = options),
338+ schema = schema.getOrElse(new StructType ),
339+ provider = Some (provider),
340+ partitionColumnNames = partitionColumnNames,
341+ bucketSpec = bucketSpec
342+ )
343+
344+ // Determine the storage mode.
345+ val mode = if (ifNotExists) SaveMode .Ignore else SaveMode .ErrorIfExists
346+
328347 if (ctx.query != null ) {
329348 // Get the backing query.
330349 val query = plan(ctx.query)
@@ -333,32 +352,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
333352 operationNotAllowed(" CREATE TEMPORARY TABLE ... USING ... AS query" , ctx)
334353 }
335354
336- // Determine the storage mode.
337- val mode = if (ifNotExists) {
338- SaveMode .Ignore
339- } else {
340- SaveMode .ErrorIfExists
341- }
342-
343- CreateTableUsingAsSelect (
344- table, provider, partitionColumnNames, bucketSpec, mode, options, query)
355+ CreateTable (tableDesc, mode, Some (query))
345356 } else {
346- val struct = Option (ctx.colTypeList()).map(createStructType)
347- if (struct.isEmpty && bucketSpec.nonEmpty) {
348- throw new ParseException (
349- " Expected explicit specification of table schema when using CLUSTERED BY clause." , ctx)
350- }
357+ if (temp) {
358+ if (ifNotExists) {
359+ operationNotAllowed(" CREATE TEMPORARY TABLE IF NOT EXISTS" , ctx)
360+ }
351361
352- CreateTableUsing (
353- table,
354- struct,
355- provider,
356- temp,
357- options,
358- partitionColumnNames,
359- bucketSpec,
360- ifNotExists,
361- managedIfNoPath = true )
362+ logWarning(s " CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
363+ " CREATE TEMPORARY VIEW ... USING ... instead" )
364+ CreateTempViewUsing (table, schema, replace = true , provider, options)
365+ } else {
366+ CreateTable (tableDesc, mode, None )
367+ }
362368 }
363369 }
364370
@@ -891,8 +897,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
891897 }
892898
893899 /**
894- * Create a table, returning either a [[CreateTableCommand ]] or a
895- * [[CreateHiveTableAsSelectLogicalPlan ]].
900+ * Create a table, returning a [[CreateTable ]] logical plan.
896901 *
897902 * This is not used to create datasource tables, which is handled through
898903 * "CREATE TABLE ... USING ...".
@@ -933,23 +938,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
933938 val properties = Option (ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map .empty)
934939 val selectQuery = Option (ctx.query).map(plan)
935940
936- // Ensuring whether no duplicate name is used in table definition
937- val colNames = dataCols.map(_.name)
938- if (colNames.length != colNames.distinct.length) {
939- val duplicateColumns = colNames.groupBy(identity).collect {
940- case (x, ys) if ys.length > 1 => " \" " + x + " \" "
941- }
942- operationNotAllowed(s " Duplicated column names found in table definition of $name: " +
943- duplicateColumns.mkString(" [" , " ," , " ]" ), ctx)
944- }
945-
946- // For Hive tables, partition columns must not be part of the schema
947- val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
948- if (badPartCols.nonEmpty) {
949- operationNotAllowed(s " Partition columns may not be specified in the schema: " +
950- badPartCols.map(" \" " + _ + " \" " ).mkString(" [" , " ," , " ]" ), ctx)
951- }
952-
953941 // Note: Hive requires partition columns to be distinct from the schema, so we need
954942 // to include the partition columns here explicitly
955943 val schema = StructType (dataCols ++ partitionCols)
@@ -1001,10 +989,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
1001989 tableType = tableType,
1002990 storage = storage,
1003991 schema = schema,
992+ provider = Some (" hive" ),
1004993 partitionColumnNames = partitionCols.map(_.name),
1005994 properties = properties,
1006995 comment = comment)
1007996
997+ val mode = if (ifNotExists) SaveMode .Ignore else SaveMode .ErrorIfExists
998+
1008999 selectQuery match {
10091000 case Some (q) =>
10101001 // Just use whatever is projected in the select statement as our schema
@@ -1025,27 +1016,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10251016
10261017 val hasStorageProperties = (ctx.createFileFormat != null ) || (ctx.rowFormat != null )
10271018 if (conf.convertCTAS && ! hasStorageProperties) {
1028- val mode = if (ifNotExists) SaveMode .Ignore else SaveMode .ErrorIfExists
10291019 // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
10301020 // are empty Maps.
10311021 val optionsWithPath = if (location.isDefined) {
10321022 Map (" path" -> location.get)
10331023 } else {
10341024 Map .empty[String , String ]
10351025 }
1036- CreateTableUsingAsSelect (
1037- tableIdent = tableDesc.identifier,
1038- provider = conf.defaultDataSourceName,
1039- partitionColumns = tableDesc.partitionColumnNames.toArray,
1040- bucketSpec = None ,
1041- mode = mode,
1042- options = optionsWithPath,
1043- q
1026+
1027+ val newTableDesc = tableDesc.copy(
1028+ storage = CatalogStorageFormat .empty.copy(properties = optionsWithPath),
1029+ provider = Some (conf.defaultDataSourceName)
10441030 )
1031+
1032+ CreateTable (newTableDesc, mode, Some (q))
10451033 } else {
1046- CreateHiveTableAsSelectLogicalPlan (tableDesc, q, ifNotExists )
1034+ CreateTable (tableDesc, mode, Some (q) )
10471035 }
1048- case None => CreateTableCommand (tableDesc, ifNotExists )
1036+ case None => CreateTable (tableDesc, mode, None )
10491037 }
10501038 }
10511039
0 commit comments