diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java index 9b87e676d9b2..7eef6aea8812 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java @@ -227,6 +227,10 @@ public String property() { } } + interface ColumnChange extends TableChange { + String[] fieldNames(); + } + /** * A TableChange to add a field. *
@@ -234,7 +238,7 @@ public String property() { * If the new field is nested and its parent does not exist or is not a struct, the change must * result in an {@link IllegalArgumentException}. */ - final class AddColumn implements TableChange { + final class AddColumn implements ColumnChange { private final String[] fieldNames; private final DataType dataType; private final boolean isNullable; @@ -247,6 +251,7 @@ private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, St this.comment = comment; } + @Override public String[] fieldNames() { return fieldNames; } @@ -272,7 +277,7 @@ public String comment() { *
* If the field does not exist, the change must result in an {@link IllegalArgumentException}. */ - final class RenameColumn implements TableChange { + final class RenameColumn implements ColumnChange { private final String[] fieldNames; private final String newName; @@ -281,6 +286,7 @@ private RenameColumn(String[] fieldNames, String newName) { this.newName = newName; } + @Override public String[] fieldNames() { return fieldNames; } @@ -297,7 +303,7 @@ public String newName() { *
* If the field does not exist, the change must result in an {@link IllegalArgumentException}. */ - final class UpdateColumnType implements TableChange { + final class UpdateColumnType implements ColumnChange { private final String[] fieldNames; private final DataType newDataType; private final boolean isNullable; @@ -308,6 +314,7 @@ private UpdateColumnType(String[] fieldNames, DataType newDataType, boolean isNu this.isNullable = isNullable; } + @Override public String[] fieldNames() { return fieldNames; } @@ -328,7 +335,7 @@ public boolean isNullable() { *
* If the field does not exist, the change must result in an {@link IllegalArgumentException}. */ - final class UpdateColumnComment implements TableChange { + final class UpdateColumnComment implements ColumnChange { private final String[] fieldNames; private final String newComment; @@ -337,6 +344,7 @@ private UpdateColumnComment(String[] fieldNames, String newComment) { this.newComment = newComment; } + @Override public String[] fieldNames() { return fieldNames; } @@ -351,13 +359,14 @@ public String newComment() { *
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
*/
- final class DeleteColumn implements TableChange {
+ final class DeleteColumn implements ColumnChange {
private final String[] fieldNames;
private DeleteColumn(String[] fieldNames) {
this.fieldNames = fieldNames;
}
+ @Override
public String[] fieldNames() {
return fieldNames;
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
index 6de1ef5660e5..7cc80c41a901 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.sources.v2.Table
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
object CatalogV2Util {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
@@ -132,16 +132,45 @@ object CatalogV2Util {
val pos = struct.getFieldIndex(fieldNames.head)
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}"))
val field = struct.fields(pos)
- val replacement: Option[StructField] = if (fieldNames.tail.isEmpty) {
- update(field)
- } else {
- field.dataType match {
- case nestedStruct: StructType =>
- val updatedType: StructType = replace(nestedStruct, fieldNames.tail, update)
- Some(StructField(field.name, updatedType, field.nullable, field.metadata))
- case _ =>
- throw new IllegalArgumentException(s"Not a struct: ${fieldNames.head}")
- }
+ val replacement: Option[StructField] = (fieldNames.tail, field.dataType) match {
+ case (Seq(), _) =>
+ update(field)
+
+ case (names, struct: StructType) =>
+ val updatedType: StructType = replace(struct, names, update)
+ Some(StructField(field.name, updatedType, field.nullable, field.metadata))
+
+ case (Seq("key"), map @ MapType(keyType, _, _)) =>
+ val updated = update(StructField("key", keyType, nullable = false))
+ .getOrElse(throw new IllegalArgumentException(s"Cannot delete map key"))
+ Some(field.copy(dataType = map.copy(keyType = updated.dataType)))
+
+ case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, _)) =>
+ Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, names, update))))
+
+ case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
+ val updated = update(StructField("value", mapValueType, nullable = isNullable))
+ .getOrElse(throw new IllegalArgumentException(s"Cannot delete map value"))
+ Some(field.copy(dataType = map.copy(
+ valueType = updated.dataType,
+ valueContainsNull = updated.nullable)))
+
+ case (Seq("value", names @ _*), map @ MapType(_, valueStruct: StructType, _)) =>
+ Some(field.copy(dataType = map.copy(valueType = replace(valueStruct, names, update))))
+
+ case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
+ val updated = update(StructField("element", elementType, nullable = isNullable))
+ .getOrElse(throw new IllegalArgumentException(s"Cannot delete array element"))
+ Some(field.copy(dataType = array.copy(
+ elementType = updated.dataType,
+ containsNull = updated.nullable)))
+
+ case (Seq("element", names @ _*), array @ ArrayType(elementStruct: StructType, _)) =>
+ Some(field.copy(dataType = array.copy(elementType = replace(elementStruct, names, update))))
+
+ case (names, dataType) =>
+ throw new IllegalArgumentException(
+ s"Cannot find field: ${names.head} in ${dataType.simpleString}")
}
val newFields = struct.fields.zipWithIndex.flatMap {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1d0dba262c10..e3b41778f785 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog}
+import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog, TableChange}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
@@ -165,6 +166,7 @@ class Analyzer(
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
+ ResolveAlterTable ::
ResolveTables ::
ResolveRelations ::
ResolveReferences ::
@@ -787,6 +789,86 @@ class Analyzer(
}
}
+ /**
+ * Resolve ALTER TABLE statements that use a DSv2 catalog.
+ *
+ * This rule converts unresolved ALTER TABLE statements to v2 when a v2 catalog is responsible
+ * for the table identifier. A v2 catalog is responsible for an identifier when the identifier
+ * has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
+ * the table identifier does not include a catalog.
+ */
+ object ResolveAlterTable extends Rule[LogicalPlan] {
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case alter @ AlterTableAddColumnsStatement(
+ CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
+ val changes = cols.map { col =>
+ TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
+ }
+
+ AlterTable(
+ v2Catalog.asTableCatalog, ident,
+ UnresolvedRelation(alter.tableName),
+ changes)
+
+ case alter @ AlterTableAlterColumnStatement(
+ CatalogObjectIdentifier(Some(v2Catalog), ident), colName, dataType, comment) =>
+ val typeChange = dataType.map { newDataType =>
+ TableChange.updateColumnType(colName.toArray, newDataType, true)
+ }
+
+ val commentChange = comment.map { newComment =>
+ TableChange.updateColumnComment(colName.toArray, newComment)
+ }
+
+ AlterTable(
+ v2Catalog.asTableCatalog, ident,
+ UnresolvedRelation(alter.tableName),
+ typeChange.toSeq ++ commentChange.toSeq)
+
+ case alter @ AlterTableRenameColumnStatement(
+ CatalogObjectIdentifier(Some(v2Catalog), ident), col, newName) =>
+ AlterTable(
+ v2Catalog.asTableCatalog, ident,
+ UnresolvedRelation(alter.tableName),
+ Seq(TableChange.renameColumn(col.toArray, newName)))
+
+ case alter @ AlterTableDropColumnsStatement(
+ CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
+ val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
+ AlterTable(
+ v2Catalog.asTableCatalog, ident,
+ UnresolvedRelation(alter.tableName),
+ changes)
+
+ case alter @ AlterTableSetPropertiesStatement(
+ CatalogObjectIdentifier(Some(v2Catalog), ident), props) =>
+ val changes = props.map {
+ case (key, value) =>
+ TableChange.setProperty(key, value)
+ }
+
+ AlterTable(
+ v2Catalog.asTableCatalog, ident,
+ UnresolvedRelation(alter.tableName),
+ changes.toSeq)
+
+ case alter @ AlterTableUnsetPropertiesStatement(
+ CatalogObjectIdentifier(Some(v2Catalog), ident), keys, _) =>
+ AlterTable(
+ v2Catalog.asTableCatalog, ident,
+ UnresolvedRelation(alter.tableName),
+ keys.map(key => TableChange.removeProperty(key)))
+
+ case alter @ AlterTableSetLocationStatement(
+ CatalogObjectIdentifier(Some(v2Catalog), ident), newLoc) =>
+ AlterTable(
+ v2Catalog.asTableCatalog, ident,
+ UnresolvedRelation(alter.tableName),
+ Seq(TableChange.setProperty("location", newLoc)))
+ }
+ }
+
/**
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from
* a logical plan node's children.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 02031e758d83..ae19d02e4475 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -353,6 +354,59 @@ trait CheckAnalysis extends PredicateHelper {
case _ =>
}
+ case alter: AlterTable if alter.childrenResolved =>
+ val table = alter.table
+ def findField(operation: String, fieldName: Array[String]): StructField = {
+ // include collections because structs nested in maps and arrays may be altered
+ val field = table.schema.findNestedField(fieldName, includeCollections = true)
+ if (field.isEmpty) {
+ throw new AnalysisException(
+ s"Cannot $operation missing field in ${table.name} schema: ${fieldName.quoted}")
+ }
+ field.get
+ }
+
+ alter.changes.foreach {
+ case add: AddColumn =>
+ val parent = add.fieldNames.init
+ if (parent.nonEmpty) {
+ findField("add to", parent)
+ }
+ case update: UpdateColumnType =>
+ val field = findField("update", update.fieldNames)
+ val fieldName = update.fieldNames.quoted
+ update.newDataType match {
+ case _: StructType =>
+ throw new AnalysisException(
+ s"Cannot update ${table.name} field $fieldName type: " +
+ s"update a struct by adding, deleting, or updating its fields")
+ case _: MapType =>
+ throw new AnalysisException(
+ s"Cannot update ${table.name} field $fieldName type: " +
+ s"update a map by updating $fieldName.key or $fieldName.value")
+ case _: ArrayType =>
+ throw new AnalysisException(
+ s"Cannot update ${table.name} field $fieldName type: " +
+ s"update the element by updating $fieldName.element")
+ case _: AtomicType =>
+ // update is okay
+ }
+ if (!Cast.canUpCast(field.dataType, update.newDataType)) {
+ throw new AnalysisException(
+ s"Cannot update ${table.name} field $fieldName: " +
+ s"${field.dataType.simpleString} cannot be cast to " +
+ s"${update.newDataType.simpleString}")
+ }
+ case rename: RenameColumn =>
+ findField("rename", rename.fieldNames)
+ case update: UpdateColumnComment =>
+ findField("update", update.fieldNames)
+ case delete: DeleteColumn =>
+ findField("delete", delete.fieldNames)
+ case _ =>
+ // no validation needed for set and remove property
+ }
+
case _ => // Fallbacks to the following checks
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index b700c336e6ae..9e0e0d528a96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -40,12 +40,15 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
*
* @param multipartIdentifier table name
*/
-case class UnresolvedRelation(multipartIdentifier: Seq[String]) extends LeafNode {
+case class UnresolvedRelation(
+ multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
/** Returns a `.` separated name for this relation. */
def tableName: String = multipartIdentifier.quoted
+ override def name: String = tableName
+
override def output: Seq[Attribute] = Nil
override lazy val resolved = false
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 72f098354776..2cb04c9ec70c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
+import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
+import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
@@ -507,6 +508,40 @@ case class DropTable(
ident: Identifier,
ifExists: Boolean) extends Command
+/**
+ * Alter a table.
+ */
+case class AlterTable(
+ catalog: TableCatalog,
+ ident: Identifier,
+ table: NamedRelation,
+ changes: Seq[TableChange]) extends Command {
+
+ override def children: Seq[LogicalPlan] = Seq(table)
+
+ override lazy val resolved: Boolean = childrenResolved && {
+ changes.forall {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(_) =>
+ // a top-level field can always be added
+ true
+ case _ =>
+ // the parent field must exist
+ table.schema.findNestedField(add.fieldNames.init, includeCollections = true).isDefined
+ }
+
+ case colChange: ColumnChange =>
+ // the column that will be changed must exist
+ table.schema.findNestedField(colChange.fieldNames, includeCollections = true).isDefined
+
+ case _ =>
+ // property changes require no resolution checks
+ true
+ }
+ }
+}
+
/**
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index edf8d2c1b31a..236f73ba3832 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -310,20 +310,46 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
/**
* Returns a field in this struct and its child structs.
*
- * This does not support finding fields nested in maps or arrays.
+ * If includeCollections is true, this will return fields that are nested in maps and arrays.
*/
- private[sql] def findNestedField(fieldNames: Seq[String]): Option[StructField] = {
+ private[sql] def findNestedField(
+ fieldNames: Seq[String],
+ includeCollections: Boolean = false): Option[StructField] = {
fieldNames.headOption.flatMap(nameToField.get) match {
case Some(field) =>
- if (fieldNames.tail.isEmpty) {
- Some(field)
- } else {
- field.dataType match {
- case struct: StructType =>
- struct.findNestedField(fieldNames.tail)
- case _ =>
- None
- }
+ (fieldNames.tail, field.dataType, includeCollections) match {
+ case (Seq(), _, _) =>
+ Some(field)
+
+ case (names, struct: StructType, _) =>
+ struct.findNestedField(names, includeCollections)
+
+ case (_, _, false) =>
+ None // types nested in maps and arrays are not used
+
+ case (Seq("key"), MapType(keyType, _, _), true) =>
+ // return the key type as a struct field to include nullability
+ Some(StructField("key", keyType, nullable = false))
+
+ case (Seq("key", names @ _*), MapType(struct: StructType, _, _), true) =>
+ struct.findNestedField(names, includeCollections)
+
+ case (Seq("value"), MapType(_, valueType, isNullable), true) =>
+ // return the value type as a struct field to include nullability
+ Some(StructField("value", valueType, nullable = isNullable))
+
+ case (Seq("value", names @ _*), MapType(_, struct: StructType, _), true) =>
+ struct.findNestedField(names, includeCollections)
+
+ case (Seq("element"), ArrayType(elementType, isNullable), true) =>
+ // return the element type as a struct field to include nullability
+ Some(StructField("element", elementType, nullable = isNullable))
+
+ case (Seq("element", names @ _*), ArrayType(struct: StructType, _), true) =>
+ struct.findNestedField(names, includeCollections)
+
+ case _ =>
+ None
}
case _ =>
None
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
new file mode 100644
index 000000000000..a3fa82b12e93
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.LeafExecNode
+
+/**
+ * Physical plan node for altering a table.
+ */
+case class AlterTableExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ changes: Seq[TableChange]) extends LeafExecNode {
+
+ override def output: Seq[Attribute] = Seq.empty
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ try {
+ catalog.alterTable(ident, changes: _*)
+ } catch {
+ case e: IllegalArgumentException =>
+ throw new SparkException(s"Unsupported table change: ${e.getMessage}", e)
+ }
+
+ sqlContext.sparkContext.parallelize(Seq.empty, 1)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 27d87960edb3..4f8507da3924 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@@ -202,6 +202,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case DropTable(catalog, ident, ifExists) =>
DropTableExec(catalog, ident, ifExists) :: Nil
+ case AlterTable(catalog, ident, _, changes) =>
+ AlterTableExec(catalog, ident, changes) :: Nil
+
case _ => Nil
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
index 01752125ac26..70d06243add9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
@@ -21,13 +21,14 @@ import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalog.v2.Identifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{LongType, StringType, StructType}
+import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType}
class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter {
@@ -366,4 +367,834 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
df_joined)
}
}
+
+ test("AlterTable: table does not exist") {
+ val exc = intercept[AnalysisException] {
+ sql(s"ALTER TABLE testcat.ns1.table_name DROP COLUMN id")
+ }
+
+ assert(exc.getMessage.contains("testcat.ns1.table_name"))
+ assert(exc.getMessage.contains("Table or view not found"))
+ }
+
+ test("AlterTable: change rejected by implementation") {
+ val t = "testcat.ns1.table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id int) USING foo")
+
+ val exc = intercept[SparkException] {
+ sql(s"ALTER TABLE $t DROP COLUMN id")
+ }
+
+ assert(exc.getMessage.contains("Unsupported table change"))
+ assert(exc.getMessage.contains("Cannot drop all fields")) // from the implementation
+
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array("ns1"), "table_name"))
+
+ assert(table.name == "testcat.ns1.table_name")
+ assert(table.schema == new StructType().add("id", IntegerType))
+ }
+ }
+
+ test("AlterTable: add top-level column") {
+ val t = "testcat.ns1.table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id int) USING foo")
+ sql(s"ALTER TABLE $t ADD COLUMN data string")
+
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array("ns1"), "table_name"))
+
+ assert(table.name == "testcat.ns1.table_name")
+ assert(table.schema == new StructType().add("id", IntegerType).add("data", StringType))
+ }
+ }
+
+ test("AlterTable: add column with comment") {
+ val t = "testcat.ns1.table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id int) USING foo")
+ sql(s"ALTER TABLE $t ADD COLUMN data string COMMENT 'doc'")
+
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array("ns1"), "table_name"))
+
+ assert(table.name == "testcat.ns1.table_name")
+ assert(table.schema == StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("data", StringType).withComment("doc"))))
+ }
+ }
+
+ test("AlterTable: add multiple columns") {
+ val t = "testcat.ns1.table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id int) USING foo")
+ sql(s"ALTER TABLE $t ADD COLUMNS data string COMMENT 'doc', ts timestamp")
+
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array("ns1"), "table_name"))
+
+ assert(table.name == "testcat.ns1.table_name")
+ assert(table.schema == StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("data", StringType).withComment("doc"),
+ StructField("ts", TimestampType))))
+ }
+ }
+
+ test("AlterTable: add nested column") {
+ val t = "testcat.ns1.table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id int, point struct