Skip to content

Commit ec47911

Browse files
committed
unify logical plans for CREATE TABLE and CTAS
1 parent ae22628 commit ec47911

15 files changed

Lines changed: 381 additions & 369 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import java.util.Date
2121

2222
import org.apache.spark.sql.AnalysisException
2323
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
25-
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
24+
import org.apache.spark.sql.catalyst.expressions.Attribute
2625
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
2726
import org.apache.spark.sql.catalyst.util.quoteIdentifier
2827
import org.apache.spark.sql.types.StructType
@@ -112,6 +111,8 @@ case class BucketSpec(
112111
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
113112
* future once we have a better understanding of how we want to handle skewed columns.
114113
*
114+
* @param provider the name of the data source provider for this table, e.g. parquet, json, etc.
115+
* Can be None if this table is a View, should be "hive" for hive serde tables.
115116
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
116117
* underlying table but not supported by Spark SQL yet.
117118
*/
@@ -120,6 +121,7 @@ case class CatalogTable(
120121
tableType: CatalogTableType,
121122
storage: CatalogStorageFormat,
122123
schema: StructType,
124+
provider: Option[String] = None,
123125
partitionColumnNames: Seq[String] = Seq.empty,
124126
bucketSpec: Option[BucketSpec] = None,
125127
owner: String = "",
@@ -131,16 +133,6 @@ case class CatalogTable(
131133
comment: Option[String] = None,
132134
unsupportedFeatures: Seq[String] = Seq.empty) {
133135

134-
// Verify that the provided columns are part of the schema
135-
private val colNames = schema.map(_.name).toSet
136-
private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
137-
require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
138-
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
139-
}
140-
requireSubsetOfSchema(partitionColumnNames, "partition")
141-
requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
142-
requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
143-
144136
/** schema of this table's partition columns */
145137
def partitionSchema: StructType = StructType(schema.filter {
146138
c => partitionColumnNames.contains(c.name)
@@ -189,6 +181,7 @@ case class CatalogTable(
189181
s"Last Access: ${new Date(lastAccessTime).toString}",
190182
s"Type: ${tableType.name}",
191183
if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
184+
if (provider.isDefined) s"Provider: ${provider.get}" else "",
192185
if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
193186
) ++ bucketStrings ++ Seq(
194187
viewOriginalText.map("Original View: " + _).getOrElse(""),

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
552552
identifier = TableIdentifier("my_table", Some("db1")),
553553
tableType = CatalogTableType.MANAGED,
554554
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
555-
schema = new StructType().add("a", "int").add("b", "string")
555+
schema = new StructType().add("a", "int").add("b", "string"),
556+
provider = Some("hive")
556557
)
557558

558559
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -571,7 +572,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
571572
storage = CatalogStorageFormat(
572573
Some(Utils.createTempDir().getAbsolutePath),
573574
None, None, None, false, Map.empty),
574-
schema = new StructType().add("a", "int").add("b", "string")
575+
schema = new StructType().add("a", "int").add("b", "string"),
576+
provider = Some("hive")
575577
)
576578
catalog.createTable("db1", externalTable, ignoreIfExists = false)
577579
assert(!exists(db.locationUri, "external_table"))
@@ -589,6 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
589591
.add("col2", "string")
590592
.add("a", "int")
591593
.add("b", "string"),
594+
provider = Some("hive"),
592595
partitionColumnNames = Seq("a", "b")
593596
)
594597
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -692,6 +695,7 @@ abstract class CatalogTestUtils {
692695
.add("col2", "string")
693696
.add("a", "int")
694697
.add("b", "string"),
698+
provider = Some("hive"),
695699
partitionColumnNames = Seq("a", "b"),
696700
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
697701
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.sql.catalyst.TableIdentifier
2525
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
26-
import org.apache.spark.sql.catalyst.catalog.BucketSpec
26+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
2727
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
28-
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
28+
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
2929
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
30+
import org.apache.spark.sql.types.StructType
3031

3132
/**
3233
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
367368
throw new AnalysisException(s"Table $tableIdent already exists.")
368369

369370
case _ =>
370-
val cmd =
371-
CreateTableUsingAsSelect(
372-
tableIdent,
373-
source,
374-
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
375-
getBucketSpec,
376-
mode,
377-
extraOptions.toMap,
378-
df.logicalPlan)
371+
val tableDesc = CatalogTable(
372+
identifier = tableIdent,
373+
tableType = CatalogTableType.EXTERNAL,
374+
storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
375+
schema = new StructType,
376+
provider = Some(source),
377+
partitionColumnNames = partitioningColumns.getOrElse(Nil),
378+
bucketSpec = getBucketSpec
379+
)
380+
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
379381
df.sparkSession.sessionState.executePlan(cmd).toRdd
380382
}
381383
}

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import scala.language.implicitConversions
2424
import scala.reflect.runtime.universe.TypeTag
2525
import scala.util.control.NonFatal
2626

27-
import com.fasterxml.jackson.core.JsonFactory
2827
import org.apache.commons.lang3.StringUtils
2928

3029
import org.apache.spark.annotation.{DeveloperApi, Experimental}
@@ -35,18 +34,16 @@ import org.apache.spark.broadcast.Broadcast
3534
import org.apache.spark.rdd.RDD
3635
import org.apache.spark.sql.catalyst._
3736
import org.apache.spark.sql.catalyst.analysis._
38-
import org.apache.spark.sql.catalyst.catalog._
3937
import org.apache.spark.sql.catalyst.encoders._
4038
import org.apache.spark.sql.catalyst.expressions._
4139
import org.apache.spark.sql.catalyst.expressions.aggregate._
42-
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
4340
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
4441
import org.apache.spark.sql.catalyst.plans._
4542
import org.apache.spark.sql.catalyst.plans.logical._
4643
import org.apache.spark.sql.catalyst.util.usePrettyExpression
4744
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
4845
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
49-
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
46+
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
5047
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
5148
import org.apache.spark.sql.execution.python.EvaluatePython
5249
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
@@ -175,7 +172,7 @@ class Dataset[T] private[sql](
175172
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
176173
case _: Command |
177174
_: InsertIntoTable |
178-
_: CreateTableUsingAsSelect => true
175+
_: CreateTable => true
179176
case _ => false
180177
}
181178

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 39 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._
3030
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
3131
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
3232
import org.apache.spark.sql.execution.command._
33-
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
33+
import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _}
3434
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
3535
import org.apache.spark.sql.types.{DataType, StructType}
3636

@@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
310310
}
311311

312312
/**
313-
* Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan.
313+
* Create a [[CreateTable]] logical plan.
314314
*/
315315
override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
316316
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
@@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
319319
}
320320
val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
321321
val provider = ctx.tableProvider.qualifiedName.getText
322+
val schema = Option(ctx.colTypeList()).map(createStructType)
322323
val partitionColumnNames =
323324
Option(ctx.partitionColumnNames)
324325
.map(visitIdentifierList(_).toArray)
325326
.getOrElse(Array.empty[String])
326327
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
327328

329+
val tableDesc = CatalogTable(
330+
identifier = table,
331+
tableType = CatalogTableType.MANAGED,
332+
storage = CatalogStorageFormat.empty.copy(properties = options),
333+
schema = schema.getOrElse(new StructType),
334+
provider = Some(provider),
335+
partitionColumnNames = partitionColumnNames,
336+
bucketSpec = bucketSpec
337+
)
338+
339+
// Determine the storage mode.
340+
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
341+
328342
if (ctx.query != null) {
329343
// Get the backing query.
330344
val query = plan(ctx.query)
@@ -333,32 +347,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
333347
operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
334348
}
335349

336-
// Determine the storage mode.
337-
val mode = if (ifNotExists) {
338-
SaveMode.Ignore
339-
} else {
340-
SaveMode.ErrorIfExists
341-
}
342-
343-
CreateTableUsingAsSelect(
344-
table, provider, partitionColumnNames, bucketSpec, mode, options, query)
350+
CreateTable(tableDesc, mode, Some(query))
345351
} else {
346-
val struct = Option(ctx.colTypeList()).map(createStructType)
347-
if (struct.isEmpty && bucketSpec.nonEmpty) {
348-
throw new ParseException(
349-
"Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx)
350-
}
352+
if (temp) {
353+
if (ifNotExists) {
354+
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
355+
}
351356

352-
CreateTableUsing(
353-
table,
354-
struct,
355-
provider,
356-
temp,
357-
options,
358-
partitionColumnNames,
359-
bucketSpec,
360-
ifNotExists,
361-
managedIfNoPath = true)
357+
logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
358+
"CREATE TEMPORARY VIEW ... USING ... instead")
359+
CreateTempViewUsing(table, schema, replace = true, provider, options)
360+
} else {
361+
CreateTable(tableDesc, mode, None)
362+
}
362363
}
363364
}
364365

@@ -891,8 +892,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
891892
}
892893

893894
/**
894-
* Create a table, returning either a [[CreateTableCommand]] or a
895-
* [[CreateHiveTableAsSelectLogicalPlan]].
895+
* Create a table, returning a [[CreateTable]] logical plan.
896896
*
897897
* This is not used to create datasource tables, which is handled through
898898
* "CREATE TABLE ... USING ...".
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
933933
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
934934
val selectQuery = Option(ctx.query).map(plan)
935935

936-
// Ensuring whether no duplicate name is used in table definition
937-
val colNames = dataCols.map(_.name)
938-
if (colNames.length != colNames.distinct.length) {
939-
val duplicateColumns = colNames.groupBy(identity).collect {
940-
case (x, ys) if ys.length > 1 => "\"" + x + "\""
941-
}
942-
operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
943-
duplicateColumns.mkString("[", ",", "]"), ctx)
944-
}
945-
946-
// For Hive tables, partition columns must not be part of the schema
947-
val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
948-
if (badPartCols.nonEmpty) {
949-
operationNotAllowed(s"Partition columns may not be specified in the schema: " +
950-
badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
951-
}
952-
953936
// Note: Hive requires partition columns to be distinct from the schema, so we need
954937
// to include the partition columns here explicitly
955938
val schema = StructType(dataCols ++ partitionCols)
@@ -1001,10 +984,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
1001984
tableType = tableType,
1002985
storage = storage,
1003986
schema = schema,
987+
provider = Some("hive"),
1004988
partitionColumnNames = partitionCols.map(_.name),
1005989
properties = properties,
1006990
comment = comment)
1007991

992+
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
993+
1008994
selectQuery match {
1009995
case Some(q) =>
1010996
// Just use whatever is projected in the select statement as our schema
@@ -1025,27 +1011,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10251011

10261012
val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
10271013
if (conf.convertCTAS && !hasStorageProperties) {
1028-
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
10291014
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
10301015
// are empty Maps.
10311016
val optionsWithPath = if (location.isDefined) {
10321017
Map("path" -> location.get)
10331018
} else {
10341019
Map.empty[String, String]
10351020
}
1036-
CreateTableUsingAsSelect(
1037-
tableIdent = tableDesc.identifier,
1038-
provider = conf.defaultDataSourceName,
1039-
partitionColumns = tableDesc.partitionColumnNames.toArray,
1040-
bucketSpec = None,
1041-
mode = mode,
1042-
options = optionsWithPath,
1043-
q
1021+
1022+
val newTableDesc = tableDesc.copy(
1023+
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
1024+
provider = Some(conf.defaultDataSourceName)
10441025
)
1026+
1027+
CreateTable(newTableDesc, mode, Some(q))
10451028
} else {
1046-
CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
1029+
CreateTable(tableDesc, mode, Some(q))
10471030
}
1048-
case None => CreateTableCommand(tableDesc, ifNotExists)
1031+
case None => CreateTable(tableDesc, mode, None)
10491032
}
10501033
}
10511034

0 commit comments

Comments
 (0)