diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 6087e3979d67d..c01145f479d99 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -164,12 +164,20 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten val properties = new Properties() properties.putAll(tableConfigs.asJava) - HoodieTableMetaClient.withPropertyBuilder() - .fromProperties(properties) - .setTableName(table.identifier.table) - .setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString()) - .setPartitionFields(table.partitionColumnNames.mkString(",")) - .initTable(hadoopConf, tableLocation) + if (hoodieTableExists) { + // just persist hoodie.table.create.schema + HoodieTableMetaClient.withPropertyBuilder() + .fromProperties(properties) + .setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString()) + .initTable(hadoopConf, tableLocation) + } else { + HoodieTableMetaClient.withPropertyBuilder() + .fromProperties(properties) + .setTableName(table.identifier.table) + .setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString()) + .setPartitionFields(table.partitionColumnNames.mkString(",")) + .initTable(hadoopConf, tableLocation) + } } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index d84973f90d42f..8c9d902d9dc8d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -61,6 +61,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table) // check if there are conflict between table configs defined in hoodie table and properties defined in catalog. CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable) + // init hoodie table hoodieCatalogTable.initHoodieTable() @@ -129,12 +130,14 @@ object CreateHoodieTableCommand { val newTableIdentifier = table.identifier .copy(table = tablName, database = Some(newDatabaseName)) + val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name) // append pk, preCombineKey, type to the properties of table val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties) val newTable = table.copy( identifier = newTableIdentifier, - schema = hoodieCatalogTable.tableSchema, storage = newStorage, + schema = hoodieCatalogTable.tableSchema, + partitionColumnNames = partitionColumnNames, createVersion = SPARK_VERSION, properties = newTblProperties ) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 762f23b555030..c8fa32891e0f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -322,30 +322,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi } else { l } - // Fill schema for Create Table without specify schema info - case c @ CreateTable(tableDesc, _, _) - if isHoodieTable(tableDesc) => - val tablePath = getTableLocation(c.tableDesc, sparkSession) - val tableExistInCatalog = sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) - // Only when the table has not exist in catalog, we need to fill the schema info for creating table. - if (!tableExistInCatalog && tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) { - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(tablePath) - .setConf(sparkSession.sessionState.newHadoopConf()) - .build() - val tableSchema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient) - if (tableSchema.isDefined && tableDesc.schema.isEmpty) { - // Fill the schema with the schema from the table - c.copy(tableDesc.copy(schema = tableSchema.get)) - } else if (tableSchema.isDefined && tableDesc.schema != tableSchema.get) { - throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." + - s"You should not specify the schema for an exist table: ${tableDesc.identifier} ") - } else { - c - } - } else { - c - } + case p => p } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 22d20cdd09cc5..3e7adec7d59bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -100,11 +100,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - |tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - |) - |partitioned by (dt) |location '$tablePath' |""".stripMargin) @@ -149,11 +144,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - |tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - |) - |partitioned by (dt) |location '$tablePath' |""".stripMargin) @@ -210,7 +200,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { import spark.implicits._ val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) - .toDF("id", "name", "ts", "year", "month", "day") + .toDF("id", "name", "ts", "year", "month", "day") df.write.format("hudi") .option(HoodieWriteConfig.TBL_NAME.key, tableName) @@ -229,11 +219,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - |tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - |) - |partitioned by (year, month, day) |location '$tablePath' |""".stripMargin) @@ -278,11 +263,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - |tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - |) - |partitioned by (year, month, day) |location '$tablePath' |""".stripMargin) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 7f51c59b0f109..f8a658ae0b0e5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -167,7 +167,7 @@ class TestCreateTable extends TestHoodieSqlBase { assertResult(Seq("dt"))(table2.partitionColumnNames) assertResult(classOf[HoodieParquetRealtimeInputFormat].getCanonicalName)(table2.storage.inputFormat.get) - // Test create a external table with an exist table in the path + // Test create a external table with an existing table in the path val tableName3 = generateTableName spark.sql( s""" @@ -285,17 +285,18 @@ class TestCreateTable extends TestHoodieSqlBase { val tableName3 = generateTableName // CTAS failed with null primaryKey assertThrows[Exception] { - spark.sql( - s""" - | create table $tableName3 using hudi - | partitioned by (dt) - | tblproperties(primaryKey = 'id') - | location '${tmp.getCanonicalPath}/$tableName3' - | AS - | select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt - | - """.stripMargin - )} + spark.sql( + s""" + | create table $tableName3 using hudi + | partitioned by (dt) + | tblproperties(primaryKey = 'id') + | location '${tmp.getCanonicalPath}/$tableName3' + | AS + | select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt + | + """.stripMargin + ) + } // Create table with timestamp type partition spark.sql( s""" @@ -357,7 +358,7 @@ class TestCreateTable extends TestHoodieSqlBase { } } - test("Test Create Table From Exist Hoodie Table") { + test("Test Create Table From Existing Hoodie Table") { withTempDir { tmp => Seq("2021-08-02", "2021/08/02").foreach { partitionValue => val tableName = generateTableName @@ -377,16 +378,20 @@ class TestCreateTable extends TestHoodieSqlBase { .mode(SaveMode.Overwrite) .save(tablePath) - // Create a table over the exist old table. - spark.sql( + // Create a table over the existing table. + // Fail to create table if only specify partition columns, no table schema. + checkExceptionContain( s""" |create table $tableName using hudi - |tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - |) |partitioned by (dt) |location '$tablePath' + |""".stripMargin + ) ("It is not allowed to specify partition columns when the table schema is not defined.") + + spark.sql( + s""" + |create table $tableName using hudi + |location '$tablePath' |""".stripMargin) checkAnswer(s"select id, name, value, ts, dt from $tableName")( Seq(1, "a1", 10, 1000, partitionValue) @@ -434,7 +439,7 @@ class TestCreateTable extends TestHoodieSqlBase { } } - test("Test Create Table From Exist Hoodie Table For Multi-Level Partitioned Table") { + test("Test Create Table From Existing Hoodie Table For Multi-Level Partitioned Table") { withTempDir { tmp => Seq("2021-08-02", "2021/08/02").foreach { day => val tableName = generateTableName @@ -454,15 +459,10 @@ class TestCreateTable extends TestHoodieSqlBase { .mode(SaveMode.Overwrite) .save(tablePath) - // Create a table over the exist old table. + // Create a table over the existing table. spark.sql( s""" |create table $tableName using hudi - |tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - |) - |partitioned by (day, hh) |location '$tablePath' |""".stripMargin) checkAnswer(s"select id, name, value, ts, day, hh from $tableName")( @@ -511,7 +511,7 @@ class TestCreateTable extends TestHoodieSqlBase { } } - test("Test Create Table From Exist Hoodie Table For None Partitioned Table") { + test("Test Create Table From Existing Hoodie Table For None Partitioned Table") { withTempDir{tmp => // Write a table by spark dataframe. val tableName = generateTableName @@ -529,14 +529,10 @@ class TestCreateTable extends TestHoodieSqlBase { .mode(SaveMode.Overwrite) .save(tmp.getCanonicalPath) - // Create a table over the exist old table. + // Create a table over the existing table. spark.sql( s""" |create table $tableName using hudi - |tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - |) |location '${tmp.getCanonicalPath}' |""".stripMargin) checkAnswer(s"select id, name, value, ts from $tableName")( @@ -583,7 +579,7 @@ class TestCreateTable extends TestHoodieSqlBase { } } - test("Test Create Table Exists In Catalog") { + test("Test Create Table Existing In Catalog") { val tableName = generateTableName spark.sql( s""" @@ -598,7 +594,7 @@ class TestCreateTable extends TestHoodieSqlBase { spark.sql(s"alter table $tableName add columns(ts bigint)") - // Check "create table if not exist" works after schema evolution. + // Check "create table if not exists" works after schema evolution. spark.sql( s""" |create table if not exists $tableName (