Skip to content

Commit 323f196

Browse files
authored
[HUDI-4776] Fix merge into use unresolved assignment (#6589)
1 parent 454f7d3 commit 323f196

2 files changed

Lines changed: 46 additions & 3 deletions

File tree

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,21 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
282282
// the hoodie's meta field in sql statement, it is a system field, cannot set the value
283283
// by user.
284284
if (HoodieSparkUtils.isSpark3) {
285-
val assignmentFieldNames = assignments.map(_.key).map {
285+
val resolvedAssignments = assignments.map { assign =>
286+
val resolvedKey = assign.key match {
287+
case c if !c.resolved =>
288+
resolveExpressionFrom(target)(c)
289+
case o => o
290+
}
291+
Assignment(resolvedKey, null)
292+
}
293+
val assignmentFieldNames = resolvedAssignments.map(_.key).map {
286294
case attr: AttributeReference =>
287295
attr.name
288296
case _ => ""
289297
}.toArray
290298
val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala
291-
if (metaFields.mkString(",").startsWith(assignmentFieldNames.take(metaFields.length).mkString(","))) {
299+
if (assignmentFieldNames.take(metaFields.length).mkString(",").startsWith(metaFields.mkString(","))) {
292300
true
293301
} else {
294302
false

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase {
869869
}
870870
}
871871

872-
test("Test MereInto With All Kinds Of DataType") {
872+
test("Test MergeInto With All Kinds Of DataType") {
873873
withTempDir { tmp =>
874874
val dataAndTypes = Seq(
875875
("string", "'a1'"),
@@ -914,4 +914,39 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase {
914914
}
915915
}
916916
}
917+
918+
test("Test MergeInto with no-full fields source") {
919+
withTempDir { tmp =>
920+
val tableName = generateTableName
921+
spark.sql(
922+
s"""
923+
|create table $tableName (
924+
| id int,
925+
| name string,
926+
| value int,
927+
| ts long
928+
|) using hudi
929+
| location '${tmp.getCanonicalPath}/$tableName'
930+
| tblproperties (
931+
| primaryKey ='id',
932+
| preCombineField = 'ts'
933+
| )
934+
""".stripMargin)
935+
936+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
937+
938+
spark.sql(
939+
s"""
940+
|merge into $tableName h0
941+
|using (
942+
| select 1 as id, 1001 as ts
943+
| ) s0
944+
| on h0.id = s0.id
945+
| when matched then update set h0.ts = s0.ts
946+
|""".stripMargin)
947+
checkAnswer(s"select id, name, value, ts from $tableName")(
948+
Seq(1, "a1", 10, 1001)
949+
)
950+
}
951+
}
917952
}

0 commit comments

Comments
 (0)