-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables #16626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 32 commits
52ca902
f498fa6
522443e
1af2654
ec57ee9
ec74849
8fca889
4a17529
9699128
9860e5c
dfff364
9f23254
180092f
5a8aa80
d3860e6
55577aa
6fa913a
7231efe
e4e9ecf
75e7441
9847030
1a383bb
f994ce9
5bf7360
599c45e
b3edfea
7d8a515
e895278
e171ac4
4391edd
a3fef12
1eb7cd3
04ce8f4
7d8437d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} | |
| import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View} | ||
| import org.apache.spark.sql.catalyst.util.StringUtils | ||
| import org.apache.spark.sql.types.{StructField, StructType} | ||
|
|
||
| object SessionCatalog { | ||
| val DEFAULT_DATABASE = "default" | ||
|
|
@@ -161,6 +162,20 @@ class SessionCatalog( | |
| throw new TableAlreadyExistsException(db = db, table = name.table) | ||
| } | ||
| } | ||
|
|
||
| private def checkDuplication(fields: Seq[StructField]): Unit = { | ||
| val columnNames = if (conf.caseSensitiveAnalysis) { | ||
| fields.map(_.name) | ||
| } else { | ||
| fields.map(_.name.toLowerCase) | ||
| } | ||
| if (columnNames.distinct.length != columnNames.length) { | ||
| val duplicateColumns = columnNames.groupBy(identity).collect { | ||
| case (x, ys) if ys.length > 1 => x | ||
| } | ||
| throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}") | ||
| } | ||
| } | ||
| // ---------------------------------------------------------------------------- | ||
| // Databases | ||
| // ---------------------------------------------------------------------------- | ||
|
|
@@ -295,6 +310,51 @@ class SessionCatalog( | |
| externalCatalog.alterTable(newTableDefinition) | ||
| } | ||
|
|
||
| /** | ||
| * Alter the schema of a table identified by the provided table identifier. The new schema | ||
| * should still contain the existing bucket columns and partition columns used by the table. This | ||
| * method will also update any Spark SQL-related parameters stored as Hive table properties (such | ||
| * as the schema itself). | ||
| * | ||
| * @param identifier TableIdentifier | ||
| * @param newSchema Updated schema to be used for the table (must contain existing partition and | ||
| * bucket columns) | ||
| */ | ||
| def alterTableSchema( | ||
| identifier: TableIdentifier, | ||
| newSchema: StructType): Unit = { | ||
| val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(identifier.table) | ||
| val tableIdentifier = TableIdentifier(table, Some(db)) | ||
| requireDbExists(db) | ||
| requireTableExists(tableIdentifier) | ||
| checkDuplication(newSchema) | ||
|
|
||
| val catalogTable = externalCatalog.getTable(db, table) | ||
| val oldSchema = catalogTable.schema | ||
|
|
||
| // not supporting dropping columns yet | ||
| val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _)) | ||
| if (nonExistentColumnNames.nonEmpty) { | ||
| throw new AnalysisException( | ||
| s""" | ||
| |Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are | ||
| |not present in the new schema. We don't support dropping columns yet. | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| // make sure partition columns are at the end | ||
| val partitionSchema = catalogTable.partitionSchema | ||
|
||
| val reorderedSchema = newSchema | ||
| .filterNot(f => columnNameResolved(partitionSchema, f.name)) ++ partitionSchema | ||
|
|
||
| externalCatalog.alterTableSchema(db, table, StructType(reorderedSchema)) | ||
| } | ||
|
|
||
| private def columnNameResolved(schema: StructType, colName: String): Boolean = { | ||
| schema.fields.map(_.name).exists(conf.resolver(_, colName)) | ||
| } | ||
|
||
|
|
||
| /** | ||
| * Return whether a table/view with the specified name exists. If no database is specified, check | ||
| * with current database. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |
| import org.apache.spark.sql.catalyst.parser.CatalystSqlParser | ||
| import org.apache.spark.sql.catalyst.plans.PlanTest | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
| class InMemorySessionCatalogSuite extends SessionCatalogSuite { | ||
| protected val utils = new CatalogTestUtils { | ||
|
|
@@ -450,6 +451,33 @@ abstract class SessionCatalogSuite extends PlanTest { | |
| } | ||
| } | ||
|
|
||
| test("alter table add columns") { | ||
|
||
| withBasicCatalog { sessionCatalog => | ||
| sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) | ||
| val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") | ||
| sessionCatalog.alterTableSchema( | ||
| TableIdentifier("t1", Some("default")), oldTab.schema.add("c3", IntegerType)) | ||
|
|
||
| val newTab = sessionCatalog.externalCatalog.getTable("default", "t1") | ||
| // construct the expected table schema | ||
| val expectedTableSchema = StructType(oldTab.dataSchema.fields ++ | ||
| Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema) | ||
| assert(newTab.schema == expectedTableSchema) | ||
| } | ||
| } | ||
|
|
||
| test("alter table drop columns") { | ||
| withBasicCatalog { sessionCatalog => | ||
| sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) | ||
| val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") | ||
| val e = intercept[AnalysisException] { | ||
| sessionCatalog.alterTableSchema( | ||
| TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1))) | ||
| }.getMessage | ||
| assert(e.contains("We don't support dropping columns yet.")) | ||
| } | ||
| } | ||
|
|
||
| test("get table") { | ||
| withBasicCatalog { catalog => | ||
| assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ | |
| import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
| import org.apache.spark.sql.catalyst.util.quoteIdentifier | ||
| import org.apache.spark.sql.execution.datasources.PartitioningUtils | ||
| import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils} | ||
| import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat | ||
| import org.apache.spark.sql.execution.datasources.json.JsonFileFormat | ||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -174,6 +177,74 @@ case class AlterTableRenameCommand( | |
|
|
||
| } | ||
|
|
||
| /** | ||
| * A command that add columns to a table | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * ALTER TABLE table_identifier | ||
| * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...); | ||
|
||
| * }}} | ||
| */ | ||
| case class AlterTableAddColumnsCommand( | ||
| table: TableIdentifier, | ||
| columns: Seq[StructField]) extends RunnableCommand { | ||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| val catalogTable = verifyAlterTableAddColumn(catalog, table) | ||
|
|
||
| try { | ||
| sparkSession.catalog.uncacheTable(table.quotedString) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e) | ||
| } | ||
| catalog.refreshTable(table) | ||
| catalog.alterTableSchema( | ||
| table, catalogTable.schema.copy(fields = catalogTable.schema.fields ++ columns)) | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| /** | ||
| * ALTER TABLE ADD COLUMNS command does not support temporary view/table, | ||
| * view, or datasource table with text, orc formats or external provider. | ||
|
||
| * For datasource table, it currently only supports parquet, json, csv. | ||
| */ | ||
| private def verifyAlterTableAddColumn( | ||
| catalog: SessionCatalog, | ||
| table: TableIdentifier): CatalogTable = { | ||
| val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) | ||
|
|
||
| if (catalogTable.tableType == CatalogTableType.VIEW) { | ||
| throw new AnalysisException( | ||
| s""" | ||
| |ALTER ADD COLUMNS does not support views. | ||
| |You must drop and re-create the views for adding the new columns. Views: $table | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| if (DDLUtils.isDatasourceTable(catalogTable)) { | ||
| DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match { | ||
| // For datasource table, this command can only support the following File format. | ||
| // TextFileFormat only default to one column "value" | ||
| // OrcFileFormat can not handle difference between user-specified schema and | ||
| // inferred schema yet. TODO, once this issue is resolved , we can add Orc back. | ||
| // Hive type is already considered as hive serde table, so the logic will not | ||
| // come in here. | ||
| case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat => | ||
| case s => | ||
| throw new AnalysisException( | ||
| s""" | ||
| |ALTER ADD COLUMNS does not support datasource table with type $s. | ||
| |You must drop and re-create the table for adding the new columns. Tables: $table | ||
| """.stripMargin) | ||
| } | ||
| } | ||
| catalogTable | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * A command that loads data into a Hive table. | ||
| * | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us output the column names.