Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,16 @@ case class CatalogTable(
unsupportedFeatures: Seq[String] = Seq.empty) {

// Verify that the provided columns are part of the schema
private val colNames = schema.map(_.name).toSet
private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
requireSubsetOfSchema(sortColumnNames, "sort")
requireSubsetOfSchema(bucketColumnNames, "bucket")
// TODO: this restriction should be checked at the end of Analyzer. When building CatalogTable,
// the initial version might violate it.
// private val colNames = schema.map(_.name).toSet
// private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
// require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
// s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
// }
// requireSubsetOfSchema(partitionColumnNames, "partition")
// requireSubsetOfSchema(sortColumnNames, "sort")
// requireSubsetOfSchema(bucketColumnNames, "bucket")

/** Columns this table is partitioned by. */
def partitionColumns: Seq[CatalogColumn] =
Expand Down
28 changes: 21 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils

/**
Expand Down Expand Up @@ -366,14 +367,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")

case _ =>
val bucketSpec = getBucketSpec
val sortColumnNames = bucketSpec.map(_.sortColumnNames).getOrElse(Seq.empty)
val bucketColumnNames = bucketSpec.map(_.bucketColumnNames).getOrElse(Seq.empty)
val numBuckets = bucketSpec.map(_.numBuckets).getOrElse(-1)

val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = Seq.empty[CatalogColumn],
partitionColumnNames = partitioningColumns.getOrElse(Seq.empty[String]),
sortColumnNames = sortColumnNames,
bucketColumnNames = bucketColumnNames,
numBuckets = numBuckets,
properties = extraOptions.toMap)

val cmd =
CreateTableUsingAsSelect(
tableIdent,
CreateTableAsSelect(
tableDesc,
source,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
getBucketSpec,
mode,
extraOptions.toMap,
df.logicalPlan)
df.sparkSession.sessionState.executePlan(cmd).toRdd
}
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{CreateTableAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
Expand Down Expand Up @@ -175,7 +175,7 @@ class Dataset[T] private[sql](
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
case _: Command |
_: InsertIntoTable |
_: CreateTableUsingAsSelect => true
_: CreateTableAsSelect => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan.
* Create a [[CreateTableUsing]] or a [[CreateTableAsSelect]] logical plan.
*/
override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
Expand Down Expand Up @@ -340,8 +340,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
SaveMode.ErrorIfExists
}

CreateTableUsingAsSelect(
table, provider, partitionColumnNames, bucketSpec, mode, options, query)
val sortColumnNames = bucketSpec.map(_.sortColumnNames).getOrElse(Seq.empty)
val bucketColumnNames = bucketSpec.map(_.bucketColumnNames).getOrElse(Seq.empty)
val numBuckets = bucketSpec.map(_.numBuckets).getOrElse(-1)

val tableDesc = CatalogTable(
identifier = table,
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = Seq.empty[CatalogColumn],
partitionColumnNames = partitionColumnNames,
sortColumnNames = sortColumnNames,
bucketColumnNames = bucketColumnNames,
numBuckets = numBuckets,
properties = options)

CreateTableAsSelect(
tableDesc = tableDesc, provider = provider, mode = mode, child = query)
} else {
val struct = Option(ctx.colTypeList()).map(createStructType)
CreateTableUsing(
Expand Down Expand Up @@ -890,8 +905,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a table, returning either a [[CreateTableCommand]] or a
* [[CreateHiveTableAsSelectLogicalPlan]].
* Create a table, returning either a [[CreateTableCommand]] or a [[CreateTableAsSelect]].
*
* This is not used to create datasource tables, which is handled through
* "CREATE TABLE ... USING ...".
Expand Down Expand Up @@ -1004,6 +1018,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
properties = properties,
comment = comment)

val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

selectQuery match {
case Some(q) =>
// Just use whatever is projected in the select statement as our schema
Expand All @@ -1023,27 +1039,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
if (conf.convertCTAS && !hasStorageProperties) {
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val optionsWithPath = if (location.isDefined) {
Map("path" -> location.get)
} else {
Map.empty[String, String]
}
CreateTableUsingAsSelect(
tableIdent = tableDesc.identifier,
provider = conf.defaultDataSourceName,
partitionColumns = tableDesc.partitionColumnNames.toArray,
bucketSpec = None,
mode = mode,
options = optionsWithPath,
q
)
} else {
CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
}

val provider =
if (conf.convertCTAS && !hasStorageProperties) conf.defaultDataSourceName else "hive"

CreateTableAsSelect(
tableDesc = tableDesc,
provider = provider,
mode = mode,
child = q)

case None => CreateTableCommand(tableDesc, ifNotExists)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
throw new AnalysisException(
"allowExisting should be set to false when creating a temporary table.")

case c: CreateTableUsingAsSelect =>
case c: CreateTableAsSelect if c.provider != "hive" =>
val cmd =
CreateDataSourceTableAsSelectCommand(
c.tableIdent,
c.tableDesc,
c.provider,
c.partitionColumns,
c.bucketSpec,
c.mode,
c.options,
c.child)
ExecutedCommandExec(cmd) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,9 @@ case class CreateDataSourceTableCommand(
* }}}
*/
case class CreateDataSourceTableAsSelectCommand(
tableIdent: TableIdentifier,
tableDesc: CatalogTable,
provider: String,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
mode: SaveMode,
options: Map[String, String],
query: LogicalPlan)
extends RunnableCommand {

Expand All @@ -146,31 +143,41 @@ case class CreateDataSourceTableAsSelectCommand(
// the table name and database name we have for this query. MetaStoreUtils.validateName
// is the method used by Hive to check if a table name or a database name is valid for
// the metastore.
if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
s"metastore. Metastore only accepts table name containing characters, numbers and _.")
if (!CreateDataSourceTableUtils.validateName(tableDesc.identifier.table)) {
throw new AnalysisException(s"Table name ${tableDesc.identifier.table} is not a valid name " +
s"for metastore. Metastore only accepts table name containing characters, numbers and _.")
}
if (tableIdent.database.isDefined &&
!CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
s"for metastore. Metastore only accepts database name containing " +
if (tableDesc.identifier.database.isDefined &&
!CreateDataSourceTableUtils.validateName(tableDesc.identifier.database.get)) {
throw new AnalysisException(s"Database name ${tableDesc.identifier.database.get} is not " +
s"a valid name for metastore. Metastore only accepts database name containing " +
s"characters, numbers and _.")
}

val tableName = tableIdent.unquotedString
val tableName = tableDesc.identifier.unquotedString
val sessionState = sparkSession.sessionState
var createMetastoreTable = false
var isExternal = true
val optionsWithPath =
if (!new CaseInsensitiveMap(options).contains("path")) {
if (tableDesc.storage.locationUri.nonEmpty) {
tableDesc.properties + ("path" -> tableDesc.storage.locationUri.get)
} else if (!new CaseInsensitiveMap(tableDesc.properties).contains("path")) {
isExternal = false
options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
tableDesc.properties +
("path" -> sessionState.catalog.defaultTablePath(tableDesc.identifier))
} else {
options
tableDesc.properties
}

val bucketSpec: Option[BucketSpec] = if (tableDesc.numBuckets > 0) {
Option(BucketSpec(
tableDesc.numBuckets, tableDesc.bucketColumnNames, tableDesc.sortColumnNames))
} else {
None
}

var existingSchema = Option.empty[StructType]
if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
if (sparkSession.sessionState.catalog.tableExists(tableDesc.identifier)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
Expand All @@ -187,21 +194,21 @@ case class CreateDataSourceTableAsSelectCommand(
val dataSource = DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = Some(query.schema.asNullable),
partitionColumns = partitionColumns,
partitionColumns = tableDesc.partitionColumnNames,
bucketSpec = bucketSpec,
className = provider,
options = optionsWithPath)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(tableIdent)) match {
sessionState.catalog.lookupRelation(tableDesc.identifier)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
throw new AnalysisException(
s"The file format of the existing table $tableIdent is " +
s"The file format of the existing table ${tableDesc.identifier} is " +
s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
s"format `$provider`")
case _ =>
Expand Down Expand Up @@ -238,15 +245,15 @@ case class CreateDataSourceTableAsSelectCommand(
val dataSource = DataSource(
sparkSession,
className = provider,
partitionColumns = partitionColumns,
partitionColumns = tableDesc.partitionColumnNames,
bucketSpec = bucketSpec,
options = optionsWithPath)

val result = try {
dataSource.write(mode, df)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
logError(s"Failed to write to table ${tableDesc.identifier} in $mode mode", ex)
throw ex
}
if (createMetastoreTable) {
Expand All @@ -255,17 +262,17 @@ case class CreateDataSourceTableAsSelectCommand(
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
tableIdent = tableDesc.identifier,
userSpecifiedSchema = Some(result.schema),
partitionColumns = partitionColumns,
partitionColumns = tableDesc.partitionColumnNames.toArray,
bucketSpec = bucketSpec,
provider = provider,
options = optionsWithPath,
isExternal = isExternal)
}

// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdent)
sessionState.catalog.refreshTable(tableDesc.identifier)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,11 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, Catal
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

case class CreateHiveTableAsSelectLogicalPlan(
tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean) extends UnaryNode with Command {

override def output: Seq[Attribute] = Seq.empty[Attribute]

override lazy val resolved: Boolean =
tableDesc.identifier.database.isDefined &&
tableDesc.schema.nonEmpty &&
tableDesc.storage.serde.isDefined &&
tableDesc.storage.inputFormat.isDefined &&
tableDesc.storage.outputFormat.isDefined &&
childrenResolved
}

/**
* A command to create a table with the same definition of the given existing table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -53,13 +54,10 @@ case class CreateTableUsing(
* analyzer can analyze the logical plan that will be used to populate the table.
* So, [[PreWriteCheck]] can detect cases that are not allowed.
*/
case class CreateTableUsingAsSelect(
tableIdent: TableIdentifier,
case class CreateTableAsSelect(
tableDesc: CatalogTable,
provider: String,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
mode: SaveMode,
options: Map[String, String],
child: LogicalPlan) extends logical.UnaryNode {
override def output: Seq[Attribute] = Seq.empty[Attribute]
}
Expand Down
Loading