diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index c4b9aec753e7a..b22c60792b749 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -196,9 +196,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab } private def isEqualToTarget(targetColumnName: String, sourceExpression: Expression): Boolean = { + val sourceColNameMap = sourceDFOutput.map(attr => (attr.name.toLowerCase, attr.name)).toMap + sourceExpression match { - case attr: AttributeReference if attr.name.equalsIgnoreCase(targetColumnName) => true - case Cast(attr: AttributeReference, _, _) if attr.name.equalsIgnoreCase(targetColumnName) => true + case attr: AttributeReference if sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true + case Cast(attr: AttributeReference, _, _) if sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true case _=> false } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 92a2c63ee617f..30a2448f0a5e4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -375,4 +375,73 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { } } + test("Test MergeInto When PrimaryKey And PreCombineField Of Source Table And Target Table Differ In Case Only") { + withTempDir { tmp => + val tableName = generateTableName + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | options ( + | primaryKey ='id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as ID, 'a1' as NAME, 10 as PRICE, 1000 as TS, '1' as FLAG + | ) s0 + | on s0.ID = $tableName.id + | when matched and FLAG = '1' then update set + | id = s0.ID, name = s0.NAME, price = s0.PRICE, ts = s0.TS + | when not matched and FLAG = '1' then insert * + |""".stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // Test the case of the column names of condition and action is different from that of source table + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as ID, 'a1' as NAME, 11 as PRICE, 1001 as TS, '1' as FLAG + | ) s0 + | on s0.id = $tableName.id + | when matched and FLAG = '1' then update set + | id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts + | when not matched and FLAG = '1' then insert * + |""".stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 11.0, 1001) + ) + + // Test the case of the column names of cast condition is different from that of source table + spark.sql( + s""" + | merge into $tableName + | using ( + | select 2 as ID, 'a2' as NAME, 12 as PRICE, 1002 as TS, '1' as FLAG + | ) s0 + | on cast(s0.id as int) = $tableName.id + | when matched and FLAG = '1' then update set + | id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts + | when not matched and FLAG = '1' then insert * + |""".stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 11.0, 1001), + Seq(2, "a2", 12.0, 1002) + ) + } + } + }