Skip to content

Commit 4c0ff5f

Browse files
xwu0226gatorsmile
authored andcommitted
[SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables
## What changes were proposed in this pull request? Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables. In this PR, we consider a few aspects: 1. View is not supported for `ALTER ADD COLUMNS` 2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use. 3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs. 4. For datasource table, this feature does not support the following: 4.1 TEXT format, since there is only one default column `value` is inferred for text format data. 4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files. 4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema. 4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported. 5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration. 6. This feature also supports In-Memory catalog, while Hive support is turned off. ## How was this patch tested? Add new test cases Author: Xin Wu <[email protected]> Closes #16626 from xwu0226/alter_add_columns.
1 parent 63f077f commit 4c0ff5f

8 files changed

Lines changed: 400 additions & 10 deletions

File tree

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ statement
8585
LIKE source=tableIdentifier locationSpec? #createTableLike
8686
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
8787
(identifier | FOR COLUMNS identifierSeq)? #analyze
88+
| ALTER TABLE tableIdentifier
89+
ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
8890
| ALTER (TABLE | VIEW) from=tableIdentifier
8991
RENAME TO to=tableIdentifier #renameTable
9092
| ALTER (TABLE | VIEW) tableIdentifier
@@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
198200
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
199201
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
200202
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
201-
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
202203
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
203204
| kw1=START kw2=TRANSACTION
204205
| kw1=COMMIT

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
3535
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
3636
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
3737
import org.apache.spark.sql.catalyst.util.StringUtils
38+
import org.apache.spark.sql.types.{StructField, StructType}
3839

3940
object SessionCatalog {
4041
val DEFAULT_DATABASE = "default"
@@ -161,6 +162,20 @@ class SessionCatalog(
161162
throw new TableAlreadyExistsException(db = db, table = name.table)
162163
}
163164
}
165+
166+
private def checkDuplication(fields: Seq[StructField]): Unit = {
167+
val columnNames = if (conf.caseSensitiveAnalysis) {
168+
fields.map(_.name)
169+
} else {
170+
fields.map(_.name.toLowerCase)
171+
}
172+
if (columnNames.distinct.length != columnNames.length) {
173+
val duplicateColumns = columnNames.groupBy(identity).collect {
174+
case (x, ys) if ys.length > 1 => x
175+
}
176+
throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
177+
}
178+
}
164179
// ----------------------------------------------------------------------------
165180
// Databases
166181
// ----------------------------------------------------------------------------
@@ -295,6 +310,47 @@ class SessionCatalog(
295310
externalCatalog.alterTable(newTableDefinition)
296311
}
297312

313+
/**
314+
* Alter the schema of a table identified by the provided table identifier. The new schema
315+
* should still contain the existing bucket columns and partition columns used by the table. This
316+
* method will also update any Spark SQL-related parameters stored as Hive table properties (such
317+
* as the schema itself).
318+
*
319+
* @param identifier TableIdentifier
320+
* @param newSchema Updated schema to be used for the table (must contain existing partition and
321+
* bucket columns, and partition columns need to be at the end)
322+
*/
323+
def alterTableSchema(
324+
identifier: TableIdentifier,
325+
newSchema: StructType): Unit = {
326+
val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
327+
val table = formatTableName(identifier.table)
328+
val tableIdentifier = TableIdentifier(table, Some(db))
329+
requireDbExists(db)
330+
requireTableExists(tableIdentifier)
331+
checkDuplication(newSchema)
332+
333+
val catalogTable = externalCatalog.getTable(db, table)
334+
val oldSchema = catalogTable.schema
335+
336+
// not supporting dropping columns yet
337+
val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
338+
if (nonExistentColumnNames.nonEmpty) {
339+
throw new AnalysisException(
340+
s"""
341+
|Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
342+
|not present in the new schema. We don't support dropping columns yet.
343+
""".stripMargin)
344+
}
345+
346+
// assuming the newSchema has all partition columns at the end as required
347+
externalCatalog.alterTableSchema(db, table, newSchema)
348+
}
349+
350+
private def columnNameResolved(schema: StructType, colName: String): Boolean = {
351+
schema.fields.map(_.name).exists(conf.resolver(_, colName))
352+
}
353+
298354
/**
299355
* Return whether a table/view with the specified name exists. If no database is specified, check
300356
* with current database.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2727
import org.apache.spark.sql.catalyst.plans.PlanTest
2828
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
29+
import org.apache.spark.sql.types._
2930

3031
class InMemorySessionCatalogSuite extends SessionCatalogSuite {
3132
protected val utils = new CatalogTestUtils {
@@ -448,6 +449,34 @@ abstract class SessionCatalogSuite extends PlanTest {
448449
}
449450
}
450451

452+
test("alter table add columns") {
453+
withBasicCatalog { sessionCatalog =>
454+
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
455+
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
456+
sessionCatalog.alterTableSchema(
457+
TableIdentifier("t1", Some("default")),
458+
StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
459+
460+
val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
461+
// construct the expected table schema
462+
val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
463+
Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
464+
assert(newTab.schema == expectedTableSchema)
465+
}
466+
}
467+
468+
test("alter table drop columns") {
469+
withBasicCatalog { sessionCatalog =>
470+
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
471+
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
472+
val e = intercept[AnalysisException] {
473+
sessionCatalog.alterTableSchema(
474+
TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
475+
}.getMessage
476+
assert(e.contains("We don't support dropping columns yet."))
477+
}
478+
}
479+
451480
test("get table") {
452481
withBasicCatalog { catalog =>
453482
assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
741741
ctx.VIEW != null)
742742
}
743743

744+
/**
745+
* Create a [[AlterTableAddColumnsCommand]] command.
746+
*
747+
* For example:
748+
* {{{
749+
* ALTER TABLE table1
750+
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
751+
* }}}
752+
*/
753+
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
754+
AlterTableAddColumnsCommand(
755+
visitTableIdentifier(ctx.tableIdentifier),
756+
visitColTypeList(ctx.columns)
757+
)
758+
}
759+
744760
/**
745761
* Create an [[AlterTableSetPropertiesCommand]] command.
746762
*

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
3737
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3838
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3939
import org.apache.spark.sql.catalyst.util.quoteIdentifier
40-
import org.apache.spark.sql.execution.datasources.PartitioningUtils
40+
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
41+
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
42+
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
43+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
4144
import org.apache.spark.sql.types._
4245
import org.apache.spark.util.Utils
4346

@@ -174,6 +177,77 @@ case class AlterTableRenameCommand(
174177

175178
}
176179

180+
/**
181+
* A command that add columns to a table
182+
* The syntax of using this command in SQL is:
183+
* {{{
184+
* ALTER TABLE table_identifier
185+
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
186+
* }}}
187+
*/
188+
case class AlterTableAddColumnsCommand(
189+
table: TableIdentifier,
190+
columns: Seq[StructField]) extends RunnableCommand {
191+
override def run(sparkSession: SparkSession): Seq[Row] = {
192+
val catalog = sparkSession.sessionState.catalog
193+
val catalogTable = verifyAlterTableAddColumn(catalog, table)
194+
195+
try {
196+
sparkSession.catalog.uncacheTable(table.quotedString)
197+
} catch {
198+
case NonFatal(e) =>
199+
log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
200+
}
201+
catalog.refreshTable(table)
202+
203+
// make sure any partition columns are at the end of the fields
204+
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
205+
catalog.alterTableSchema(
206+
table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
207+
208+
Seq.empty[Row]
209+
}
210+
211+
/**
212+
* ALTER TABLE ADD COLUMNS command does not support temporary view/table,
213+
* view, or datasource table with text, orc formats or external provider.
214+
* For datasource table, it currently only supports parquet, json, csv.
215+
*/
216+
private def verifyAlterTableAddColumn(
217+
catalog: SessionCatalog,
218+
table: TableIdentifier): CatalogTable = {
219+
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
220+
221+
if (catalogTable.tableType == CatalogTableType.VIEW) {
222+
throw new AnalysisException(
223+
s"""
224+
|ALTER ADD COLUMNS does not support views.
225+
|You must drop and re-create the views for adding the new columns. Views: $table
226+
""".stripMargin)
227+
}
228+
229+
if (DDLUtils.isDatasourceTable(catalogTable)) {
230+
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
231+
// For datasource table, this command can only support the following File format.
232+
// TextFileFormat only default to one column "value"
233+
// OrcFileFormat can not handle difference between user-specified schema and
234+
// inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
235+
// Hive type is already considered as hive serde table, so the logic will not
236+
// come in here.
237+
case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
238+
case s =>
239+
throw new AnalysisException(
240+
s"""
241+
|ALTER ADD COLUMNS does not support datasource table with type $s.
242+
|You must drop and re-create the table for adding the new columns. Tables: $table
243+
""".stripMargin)
244+
}
245+
}
246+
catalogTable
247+
}
248+
}
249+
250+
177251
/**
178252
* A command that loads data into a Hive table.
179253
*

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
780780
assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
781781
}
782782

783-
test("alter table: add/replace columns (not allowed)") {
784-
assertUnsupported(
785-
"""
786-
|ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
787-
|ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
788-
|COMMENT 'test_comment2') CASCADE
789-
""".stripMargin)
783+
test("alter table: replace columns (not allowed)") {
790784
assertUnsupported(
791785
"""
792786
|ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
21852185
}
21862186
}
21872187
}
2188+
2189+
val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
2190+
2191+
supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
2192+
test(s"alter datasource table add columns - $provider") {
2193+
withTable("t1") {
2194+
sql(s"CREATE TABLE t1 (c1 int) USING $provider")
2195+
sql("INSERT INTO t1 VALUES (1)")
2196+
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
2197+
checkAnswer(
2198+
spark.table("t1"),
2199+
Seq(Row(1, null))
2200+
)
2201+
checkAnswer(
2202+
sql("SELECT * FROM t1 WHERE c2 is null"),
2203+
Seq(Row(1, null))
2204+
)
2205+
2206+
sql("INSERT INTO t1 VALUES (3, 2)")
2207+
checkAnswer(
2208+
sql("SELECT * FROM t1 WHERE c2 = 2"),
2209+
Seq(Row(3, 2))
2210+
)
2211+
}
2212+
}
2213+
}
2214+
2215+
supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
2216+
test(s"alter datasource table add columns - partitioned - $provider") {
2217+
withTable("t1") {
2218+
sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
2219+
sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
2220+
sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
2221+
checkAnswer(
2222+
spark.table("t1"),
2223+
Seq(Row(1, null, 2))
2224+
)
2225+
checkAnswer(
2226+
sql("SELECT * FROM t1 WHERE c3 is null"),
2227+
Seq(Row(1, null, 2))
2228+
)
2229+
sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
2230+
checkAnswer(
2231+
sql("SELECT * FROM t1 WHERE c3 = 3"),
2232+
Seq(Row(2, 3, 1))
2233+
)
2234+
checkAnswer(
2235+
sql("SELECT * FROM t1 WHERE c2 = 1"),
2236+
Seq(Row(2, 3, 1))
2237+
)
2238+
}
2239+
}
2240+
}
2241+
2242+
test("alter datasource table add columns - text format not supported") {
2243+
withTable("t1") {
2244+
sql("CREATE TABLE t1 (c1 int) USING text")
2245+
val e = intercept[AnalysisException] {
2246+
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
2247+
}.getMessage
2248+
assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
2249+
}
2250+
}
2251+
2252+
test("alter table add columns -- not support temp view") {
2253+
withTempView("tmp_v") {
2254+
sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
2255+
val e = intercept[AnalysisException] {
2256+
sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
2257+
}
2258+
assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
2259+
}
2260+
}
2261+
2262+
test("alter table add columns -- not support view") {
2263+
withView("v1") {
2264+
sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
2265+
val e = intercept[AnalysisException] {
2266+
sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
2267+
}
2268+
assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
2269+
}
2270+
}
2271+
2272+
test("alter table add columns with existing column name") {
2273+
withTable("t1") {
2274+
sql("CREATE TABLE t1 (c1 int) USING PARQUET")
2275+
val e = intercept[AnalysisException] {
2276+
sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
2277+
}.getMessage
2278+
assert(e.contains("Found duplicate column(s)"))
2279+
}
2280+
}
2281+
2282+
Seq(true, false).foreach { caseSensitive =>
2283+
test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
2284+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
2285+
withTable("t1") {
2286+
sql("CREATE TABLE t1 (c1 int) USING PARQUET")
2287+
if (!caseSensitive) {
2288+
val e = intercept[AnalysisException] {
2289+
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
2290+
}.getMessage
2291+
assert(e.contains("Found duplicate column(s)"))
2292+
} else {
2293+
if (isUsingHiveMetastore) {
2294+
// hive catalog will still complains that c1 is duplicate column name because hive
2295+
// identifiers are case insensitive.
2296+
val e = intercept[AnalysisException] {
2297+
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
2298+
}.getMessage
2299+
assert(e.contains("HiveException"))
2300+
} else {
2301+
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
2302+
assert(spark.table("t1").schema
2303+
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
2304+
}
2305+
}
2306+
}
2307+
}
2308+
}
2309+
}
21882310
}

0 commit comments

Comments
 (0)