Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,20 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
val properties = new Properties()
properties.putAll(tableConfigs.asJava)

HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableName(table.identifier.table)
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(hadoopConf, tableLocation)
if (hoodieTableExists) {
// just persist hoodie.table.create.schema
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @YannByron

Recently, we found an Avro schema issue which is caused by the wrong record name (detail here: #7284).

May I ask if this line could cause the same problem? If so, we can discuss how to fix it in PR: #7297

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried, it will rewrite the record name in hoodie.table.create.schema as topLevelRecord in hoodie.properties.

.initTable(hadoopConf, tableLocation)
} else {
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableName(table.identifier.table)
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(hadoopConf, tableLocation)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)

// init hoodie table
hoodieCatalogTable.initHoodieTable()

Expand Down Expand Up @@ -129,12 +130,14 @@ object CreateHoodieTableCommand {
val newTableIdentifier = table.identifier
.copy(table = tablName, database = Some(newDatabaseName))

val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name)
// append pk, preCombineKey, type to the properties of table
val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties)
val newTable = table.copy(
identifier = newTableIdentifier,
schema = hoodieCatalogTable.tableSchema,
storage = newStorage,
schema = hoodieCatalogTable.tableSchema,
partitionColumnNames = partitionColumnNames,
createVersion = SPARK_VERSION,
properties = newTblProperties
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,30 +322,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
} else {
l
}
// Fill schema for Create Table without specify schema info
case c @ CreateTable(tableDesc, _, _)
if isHoodieTable(tableDesc) =>
val tablePath = getTableLocation(c.tableDesc, sparkSession)
val tableExistInCatalog = sparkSession.sessionState.catalog.tableExists(tableDesc.identifier)
// Only when the table has not exist in catalog, we need to fill the schema info for creating table.
if (!tableExistInCatalog && tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(sparkSession.sessionState.newHadoopConf())
.build()
val tableSchema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient)
if (tableSchema.isDefined && tableDesc.schema.isEmpty) {
// Fill the schema with the schema from the table
c.copy(tableDesc.copy(schema = tableSchema.get))
} else if (tableSchema.isDefined && tableDesc.schema != tableSchema.get) {
throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." +
s"You should not specify the schema for an exist table: ${tableDesc.identifier} ")
} else {
c
}
} else {
c
}

case p => p
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (dt)
|location '$tablePath'
|""".stripMargin)

Expand Down Expand Up @@ -149,11 +144,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (dt)
|location '$tablePath'
|""".stripMargin)

Expand Down Expand Up @@ -210,7 +200,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {

import spark.implicits._
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02"))
.toDF("id", "name", "ts", "year", "month", "day")
.toDF("id", "name", "ts", "year", "month", "day")

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
Expand All @@ -229,11 +219,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (year, month, day)
|location '$tablePath'
|""".stripMargin)

Expand Down Expand Up @@ -278,11 +263,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (year, month, day)
|location '$tablePath'
|""".stripMargin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class TestCreateTable extends TestHoodieSqlBase {
assertResult(Seq("dt"))(table2.partitionColumnNames)
assertResult(classOf[HoodieParquetRealtimeInputFormat].getCanonicalName)(table2.storage.inputFormat.get)

// Test create a external table with an exist table in the path
// Test create a external table with an existing table in the path
val tableName3 = generateTableName
spark.sql(
s"""
Expand Down Expand Up @@ -285,17 +285,18 @@ class TestCreateTable extends TestHoodieSqlBase {
val tableName3 = generateTableName
// CTAS failed with null primaryKey
assertThrows[Exception] {
spark.sql(
s"""
| create table $tableName3 using hudi
| partitioned by (dt)
| tblproperties(primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tableName3'
| AS
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
""".stripMargin
)}
spark.sql(
s"""
| create table $tableName3 using hudi
| partitioned by (dt)
| tblproperties(primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tableName3'
| AS
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
""".stripMargin
)
}
// Create table with timestamp type partition
spark.sql(
s"""
Expand Down Expand Up @@ -357,7 +358,7 @@ class TestCreateTable extends TestHoodieSqlBase {
}
}

test("Test Create Table From Exist Hoodie Table") {
test("Test Create Table From Existing Hoodie Table") {
withTempDir { tmp =>
Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
val tableName = generateTableName
Expand All @@ -377,16 +378,20 @@ class TestCreateTable extends TestHoodieSqlBase {
.mode(SaveMode.Overwrite)
.save(tablePath)

// Create a table over the exist old table.
spark.sql(
// Create a table over the existing table.
// Fail to create table if only specify partition columns, no table schema.
checkExceptionContain(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (dt)
|location '$tablePath'
|""".stripMargin
) ("It is not allowed to specify partition columns when the table schema is not defined.")

spark.sql(
s"""
|create table $tableName using hudi
|location '$tablePath'
|""".stripMargin)
checkAnswer(s"select id, name, value, ts, dt from $tableName")(
Seq(1, "a1", 10, 1000, partitionValue)
Expand Down Expand Up @@ -434,7 +439,7 @@ class TestCreateTable extends TestHoodieSqlBase {
}
}

test("Test Create Table From Exist Hoodie Table For Multi-Level Partitioned Table") {
test("Test Create Table From Existing Hoodie Table For Multi-Level Partitioned Table") {
withTempDir { tmp =>
Seq("2021-08-02", "2021/08/02").foreach { day =>
val tableName = generateTableName
Expand All @@ -454,15 +459,10 @@ class TestCreateTable extends TestHoodieSqlBase {
.mode(SaveMode.Overwrite)
.save(tablePath)

// Create a table over the exist old table.
// Create a table over the existing table.
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (day, hh)
|location '$tablePath'
|""".stripMargin)
checkAnswer(s"select id, name, value, ts, day, hh from $tableName")(
Expand Down Expand Up @@ -511,7 +511,7 @@ class TestCreateTable extends TestHoodieSqlBase {
}
}

test("Test Create Table From Exist Hoodie Table For None Partitioned Table") {
test("Test Create Table From Existing Hoodie Table For None Partitioned Table") {
withTempDir{tmp =>
// Write a table by spark dataframe.
val tableName = generateTableName
Expand All @@ -529,14 +529,10 @@ class TestCreateTable extends TestHoodieSqlBase {
.mode(SaveMode.Overwrite)
.save(tmp.getCanonicalPath)

// Create a table over the exist old table.
// Create a table over the existing table.
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|location '${tmp.getCanonicalPath}'
|""".stripMargin)
checkAnswer(s"select id, name, value, ts from $tableName")(
Expand Down Expand Up @@ -583,7 +579,7 @@ class TestCreateTable extends TestHoodieSqlBase {
}
}

test("Test Create Table Exists In Catalog") {
test("Test Create Table Existing In Catalog") {
val tableName = generateTableName
spark.sql(
s"""
Expand All @@ -598,7 +594,7 @@ class TestCreateTable extends TestHoodieSqlBase {

spark.sql(s"alter table $tableName add columns(ts bigint)")

// Check "create table if not exist" works after schema evolution.
// Check "create table if not exists" works after schema evolution.
spark.sql(
s"""
|create table if not exists $tableName (
Expand Down