Skip to content

Commit d0825b9

Browse files
committed
[HUDI-4219] Merge Into when update expression "col=s.col+2" on precombine cause exception
1 parent c86edfc commit d0825b9

2 files changed

Lines changed: 213 additions & 4 deletions

File tree

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,23 +181,31 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
181181
var sourceDF = Dataset.ofRows(sparkSession, mergeInto.sourceTable)
182182
targetKey2SourceExpression.foreach {
183183
case (targetColumn, sourceExpression)
184-
if !isEqualToTarget(targetColumn, sourceExpression) =>
184+
if !isSameColumnName(targetColumn, sourceExpression) =>
185185
sourceDF = sourceDF.withColumn(targetColumn, new Column(sourceExpression))
186186
sourceDFOutput = sourceDFOutput :+ AttributeReference(targetColumn, sourceExpression.dataType)()
187187
case _=>
188188
}
189189
target2SourcePreCombineFiled.foreach {
190190
case (targetPreCombineField, sourceExpression)
191-
if !isEqualToTarget(targetPreCombineField, sourceExpression) =>
191+
if !containsSameColumnNameForPreCombine(targetPreCombineField, sourceExpression) =>
192192
sourceDF = sourceDF.withColumn(targetPreCombineField, new Column(sourceExpression))
193193
sourceDFOutput = sourceDFOutput :+ AttributeReference(targetPreCombineField, sourceExpression.dataType)()
194194
case _=>
195195
}
196196
sourceDF
197197
}
198198

199-
private def isEqualToTarget(targetColumnName: String, sourceExpression: Expression): Boolean = {
200-
val sourceColumnName = sourceDFOutput.map(_.name)
199+
/**
200+
* Check whether the source expression has the same column name with target column.
201+
*
202+
* Merge condition cases that return true:
203+
* 1) merge into .. on h0.id = s0.id ..
204+
* 2) merge into .. on h0.id = cast(s0.id as int) ..
205+
* "id" is primaryKey field of h0.
206+
*/
207+
private def isSameColumnName(targetColumnName: String, sourceExpression: Expression): Boolean = {
208+
val sourceColumnNames = sourceDFOutput.map(_.name)
201209
val resolver = sparkSession.sessionState.conf.resolver
202210

203211
sourceExpression match {
@@ -213,6 +221,26 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
213221
}
214222
}
215223

224+
/**
225+
* Check whether the source expression on preCombine field contains the same column name with target column.
226+
*
227+
* Merge expression cases that return true:
228+
* 1) merge into .. on .. update set ts = s0.ts
229+
* 2) merge into .. on .. update set ts = cast(s0.ts as int)
230+
* 3) merge into .. on .. update set ts = s0.ts+1 (expressions like this whose sub node has the same column name with target)
231+
* "ts" is preCombine field of h0.
232+
*/
233+
private def containsSameColumnNameForPreCombine(targetColumnName: String, sourceExpression: Expression): Boolean = {
234+
val sourceColumnNames = sourceDFOutput.map(_.name)
235+
val resolver = sparkSession.sessionState.conf.resolver
236+
237+
// sub node of the expression may have same column name with target column name
238+
sourceExpression.find {
239+
case attr: AttributeReference => sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName)
240+
case _ => false
241+
}.isDefined
242+
}
243+
216244
/**
217245
* Compare a [[Attribute]] to another, return true if they have the same column name(by resolver) and exprId
218246
*/

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,187 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase {
427427
}
428428
}
429429

430+
test("Test MergeInto with preCombine field expression") {
431+
withTempDir { tmp =>
432+
Seq("cow", "mor").foreach { tableType =>
433+
val tableName1 = generateTableName
434+
spark.sql(
435+
s"""
436+
| create table $tableName1 (
437+
| id int,
438+
| name string,
439+
| price double,
440+
| v long,
441+
| dt string
442+
| ) using hudi
443+
| tblproperties (
444+
| type = '$tableType',
445+
| primaryKey = 'id',
446+
| preCombineField = 'v'
447+
| )
448+
| partitioned by(dt)
449+
| location '${tmp.getCanonicalPath}/$tableName1'
450+
""".stripMargin)
451+
// Insert data
452+
spark.sql(s"""insert into $tableName1 values(1, 'a1', 10, 1000, '2021-03-21')""")
453+
454+
// Update data with a value expression on preCombine field
455+
// 1) set source column name to be same as target column
456+
spark.sql(
457+
s"""
458+
| merge into $tableName1 as t0
459+
| using (
460+
| select 1 as id, 'a1' as name, 11 as price, 999 as v, '2021-03-21' as dt
461+
| ) as s0
462+
| on t0.id = s0.id
463+
| when matched then update set id=s0.id, name=s0.name, price=s0.price*2, v=s0.v+2, dt=s0.dt
464+
""".stripMargin
465+
)
466+
// Update success as new value 1001 is bigger than original value 1000
467+
checkAnswer(s"select id,name,price,dt,v from $tableName1")(
468+
Seq(1, "a1", 22, "2021-03-21", 1001)
469+
)
470+
471+
// 2) set source column name to be different with target column
472+
spark.sql(
473+
s"""
474+
| merge into $tableName1 as t0
475+
| using (
476+
| select 1 as s_id, 'a1' as s_name, 12 as s_price, 1000 as s_v, '2021-03-21' as dt
477+
| ) as s0
478+
| on t0.id = s0.s_id
479+
| when matched then update set id=s0.s_id, name=s0.s_name, price=s0.s_price*2, v=s0.s_v+2, dt=s0.dt
480+
""".stripMargin
481+
)
482+
// Update success as new value 1002 is bigger than original value 1001
483+
checkAnswer(s"select id,name,price,dt,v from $tableName1")(
484+
Seq(1, "a1", 24, "2021-03-21", 1002)
485+
)
486+
}
487+
}
488+
}
489+
490+
test("Test MergeInto with primaryKey expression") {
491+
withTempDir { tmp =>
492+
val tableName1 = generateTableName
493+
spark.sql(
494+
s"""
495+
| create table $tableName1 (
496+
| id int,
497+
| name string,
498+
| price double,
499+
| v long,
500+
| dt string
501+
| ) using hudi
502+
| tblproperties (
503+
| type = 'cow',
504+
| primaryKey = 'id',
505+
| preCombineField = 'v'
506+
| )
507+
| partitioned by(dt)
508+
| location '${tmp.getCanonicalPath}/$tableName1'
509+
""".stripMargin)
510+
// Insert data
511+
spark.sql(s"""insert into $tableName1 values(3, 'a3', 30, 3000, '2021-03-21')""")
512+
spark.sql(s"""insert into $tableName1 values(2, 'a2', 20, 2000, '2021-03-21')""")
513+
spark.sql(s"""insert into $tableName1 values(1, 'a1', 10, 1000, '2021-03-21')""")
514+
515+
// Delete data with a condition expression on primaryKey field
516+
// 1) set source column name to be same as target column
517+
spark.sql(
518+
s"""
519+
| merge into $tableName1 t0
520+
| using (
521+
| select 1 as id, 'a1' as name, 15 as price, 1001 as v, '2021-03-21' as dt
522+
| ) s0
523+
| on t0.id = s0.id + 1
524+
| when matched then delete
525+
""".stripMargin
526+
)
527+
checkAnswer(s"select id,name,price,v,dt from $tableName1 order by id")(
528+
Seq(1, "a1", 10, 1000, "2021-03-21"),
529+
Seq(3, "a3", 30, 3000, "2021-03-21")
530+
)
531+
532+
// 2) set source column name to be different with target column
533+
spark.sql(
534+
s"""
535+
| merge into $tableName1 t0
536+
| using (
537+
| select 2 as s_id, 'a1' as s_name, 15 as s_price, 1001 as s_v, '2021-03-21' as dt
538+
| ) s0
539+
| on t0.id = s0.s_id + 1
540+
| when matched then delete
541+
""".stripMargin
542+
)
543+
checkAnswer(s"select id,name,price,v,dt from $tableName1 order by id")(
544+
Seq(1, "a1", 10, 1000, "2021-03-21")
545+
)
546+
}
547+
}
548+
549+
test("Test MergeInto with combination of delete update insert") {
550+
withTempDir { tmp =>
551+
val sourceTable = generateTableName
552+
val targetTable = generateTableName
553+
// Create source table
554+
spark.sql(
555+
s"""
556+
| create table $sourceTable (
557+
| id int,
558+
| name string,
559+
| price double,
560+
| ts long,
561+
| dt string
562+
| ) using parquet
563+
| location '${tmp.getCanonicalPath}/$sourceTable'
564+
""".stripMargin)
565+
spark.sql(s"insert into $sourceTable values(8, 's8', 80, 2008, '2021-03-21')")
566+
spark.sql(s"insert into $sourceTable values(9, 's9', 90, 2009, '2021-03-21')")
567+
spark.sql(s"insert into $sourceTable values(10, 's10', 100, 2010, '2021-03-21')")
568+
spark.sql(s"insert into $sourceTable values(11, 's11', 110, 2011, '2021-03-21')")
569+
spark.sql(s"insert into $sourceTable values(12, 's12', 120, 2012, '2021-03-21')")
570+
// Create target table
571+
spark.sql(
572+
s"""
573+
|create table $targetTable (
574+
| id int,
575+
| name string,
576+
| price double,
577+
| ts long,
578+
| dt string
579+
|) using hudi
580+
| tblproperties (
581+
| primaryKey ='id',
582+
| preCombineField = 'ts'
583+
| )
584+
| partitioned by(dt)
585+
| location '${tmp.getCanonicalPath}/$targetTable'
586+
""".stripMargin)
587+
spark.sql(s"insert into $targetTable values(7, 'a7', 70, 1007, '2021-03-21')")
588+
spark.sql(s"insert into $targetTable values(8, 'a8', 80, 1008, '2021-03-21')")
589+
spark.sql(s"insert into $targetTable values(9, 'a9', 90, 1009, '2021-03-21')")
590+
spark.sql(s"insert into $targetTable values(10, 'a10', 100, 1010, '2021-03-21')")
591+
592+
spark.sql(
593+
s"""
594+
| merge into $targetTable as t0
595+
| using $sourceTable as s0
596+
| on t0.id = s0.id
597+
| when matched and id = 10 then delete
598+
| when matched and id < 10 then update set name='sxx', price=s0.price*2, ts=s0.ts+10000, dt=s0.dt
599+
| when not matched and id > 10 then insert *
600+
""".stripMargin)
601+
checkAnswer(s"select id,name,price,ts,dt from $targetTable order by id")(
602+
Seq(7, "a7", 70, 1007, "2021-03-21"),
603+
Seq(8, "sxx", 160, 12008, "2021-03-21"),
604+
Seq(9, "sxx", 180, 12009, "2021-03-21"),
605+
Seq(11, "s11", 110, 2011, "2021-03-21"),
606+
Seq(12, "s12", 120, 2012, "2021-03-21")
607+
)
608+
}
609+
}
610+
430611
test("Merge Hudi to Hudi") {
431612
withTempDir { tmp =>
432613
Seq("cow", "mor").foreach { tableType =>

0 commit comments

Comments
 (0)