Skip to content

Commit 5b7d712

Browse files
author
Rahil Chertara
committed
Fix AWSDmsAvroPayload#getInsertValue,combineAndGetUpdateValue to invoke correct api
1 parent 98c3d88 commit 5b7d712

3 files changed

Lines changed: 131 additions & 7 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertValue) t
6969

7070
@Override
7171
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
72-
IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
73-
return handleDeleteOperation(insertValue);
72+
return getInsertValue(schema);
7473
}
7574

7675
@Override
@@ -82,8 +81,7 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
8281
@Override
8382
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties)
8483
throws IOException {
85-
IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
86-
return handleDeleteOperation(insertValue);
84+
return combineAndGetUpdateValue(currentValue, schema);
8785
}
8886

8987
@Override

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.hudi.functional
2020
import org.apache.hadoop.fs.{FileSystem, Path}
2121
import org.apache.hudi.HoodieConversionUtils.toJavaOption
2222
import org.apache.hudi.common.config.HoodieMetadataConfig
23-
import org.apache.hudi.common.model.HoodieRecord
23+
import org.apache.hudi.common.model.{AWSDmsAvroPayload, HoodieRecord}
2424
import org.apache.hudi.common.table.timeline.HoodieInstant
2525
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
2626
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -1024,4 +1024,67 @@ class TestCOWDataSource extends HoodieClientTestBase {
10241024
.saveAsTable("hoodie_test")
10251025
assertEquals(spark.read.format("hudi").load(basePath).count(), 9)
10261026
}
1027+
1028+
@Test
1029+
def testAWSDmsAvroPayload(): Unit = {
1030+
val yesterday_date = 1654520683000L
1031+
val today_date = 1654607083000L
1032+
val tomorrow_date = 1654608083000L
1033+
1034+
val testDF_upgrade: DataFrame = spark.createDataFrame(
1035+
List(
1036+
(null, yesterday_date, 1, "Clark Kent", "Superman"),
1037+
(null, yesterday_date, 2, "Bruce Wayne", "Batman"),
1038+
(null, yesterday_date, 3, "Diana Prince", "Wonder Woman"),
1039+
(null, yesterday_date, 4, "Hal Jordan", "Green Lantern"),
1040+
("I", today_date, 5, "Barry Allen", "The Flash"),
1041+
("I", today_date, 6, "Arthur Curry", "Aquaman"),
1042+
("D", today_date, 2, "Bruce Wayne", "Detective Comics"),
1043+
("U", today_date, 4, "John Stewart", "Green Lantern"),
1044+
("U", tomorrow_date, 4, "Guy Gardner", "Green Lantern")
1045+
)
1046+
).toDF("Op", "ts", "id", "name", "alias")
1047+
1048+
val expectedDF_upgrade = spark.createDataFrame(
1049+
List(
1050+
(yesterday_date, 1, "Clark Kent", "Superman"),
1051+
(yesterday_date, 3, "Diana Prince", "Wonder Woman"),
1052+
(tomorrow_date, 4, "Guy Gardner", "Green Lantern"),
1053+
(today_date, 5, "Barry Allen", "The Flash"),
1054+
(today_date, 6, "Arthur Curry", "Aquaman")
1055+
)
1056+
)
1057+
.toDF("ts", "id", "name", "alias")
1058+
.orderBy("id")
1059+
1060+
val tableName = "dms_table"
1061+
1062+
val hudiOptions = scala.collection.mutable.Map[String, String](
1063+
HoodieWriteConfig.TABLE_NAME -> tableName,
1064+
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
1065+
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
1066+
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
1067+
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "ts",
1068+
DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[AWSDmsAvroPayload].getName,
1069+
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[CustomKeyGenerator].getName,
1070+
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> ""
1071+
)
1072+
1073+
testDF_upgrade
1074+
.dropDuplicates()
1075+
.write
1076+
.format("org.apache.hudi")
1077+
.options(hudiOptions)
1078+
.mode(SaveMode.Append)
1079+
.save(basePath)
1080+
1081+
val actualDF_upgrade = spark.read.format("org.apache.hudi").load(
1082+
basePath
1083+
)
1084+
.orderBy("id")
1085+
1086+
assertEquals(actualDF_upgrade.select("ts", "id", "name", "alias").toString(),
1087+
expectedDF_upgrade.select("ts", "id", "name", "alias").toString()
1088+
)
1089+
}
10271090
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import org.apache.hadoop.fs.Path
2121
import org.apache.hudi.DataSourceWriteOptions._
2222
import org.apache.hudi.HoodieConversionUtils.toJavaOption
2323
import org.apache.hudi.common.config.HoodieMetadataConfig
24-
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
24+
import org.apache.hudi.common.model.{AWSDmsAvroPayload, DefaultHoodieRecordPayload, HoodieTableType}
2525
import org.apache.hudi.common.table.HoodieTableMetaClient
2626
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
2727
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
2828
import org.apache.hudi.common.util
2929
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
3030
import org.apache.hudi.index.HoodieIndex.IndexType
31-
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
31+
import org.apache.hudi.keygen.{CustomKeyGenerator, NonpartitionedKeyGenerator}
3232
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
3333
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
3434
import org.apache.hudi.util.JFunction
@@ -978,4 +978,67 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
978978
assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0)
979979
assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20)
980980
}
981+
982+
@Test
983+
def testAWSDmsAvroPayload(): Unit = {
984+
val yesterday_date = 1654520683000L
985+
val today_date = 1654607083000L
986+
val tomorrow_date = 1654608083000L
987+
988+
val testDF_upgrade: DataFrame = spark.createDataFrame(
989+
List(
990+
(null, yesterday_date, 1, "Clark Kent", "Superman"),
991+
(null, yesterday_date, 2, "Bruce Wayne", "Batman"),
992+
(null, yesterday_date, 3, "Diana Prince", "Wonder Woman"),
993+
(null, yesterday_date, 4, "Hal Jordan", "Green Lantern"),
994+
("I", today_date, 5, "Barry Allen", "The Flash"),
995+
("I", today_date, 6, "Arthur Curry", "Aquaman"),
996+
("D", today_date, 2, "Bruce Wayne", "Detective Comics"),
997+
("U", today_date, 4, "John Stewart", "Green Lantern"),
998+
("U", tomorrow_date, 4, "Guy Gardner", "Green Lantern")
999+
)
1000+
).toDF("Op", "ts", "id", "name", "alias")
1001+
1002+
val expectedDF_upgrade = spark.createDataFrame(
1003+
List(
1004+
(yesterday_date, 1, "Clark Kent", "Superman"),
1005+
(yesterday_date, 3, "Diana Prince", "Wonder Woman"),
1006+
(tomorrow_date, 4, "Guy Gardner", "Green Lantern"),
1007+
(today_date, 5, "Barry Allen", "The Flash"),
1008+
(today_date, 6, "Arthur Curry", "Aquaman")
1009+
)
1010+
)
1011+
.toDF("ts", "id", "name", "alias")
1012+
.orderBy("id")
1013+
1014+
val tableName = "dms_table"
1015+
1016+
val hudiOptions = scala.collection.mutable.Map[String, String](
1017+
HoodieWriteConfig.TABLE_NAME -> tableName,
1018+
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
1019+
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
1020+
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
1021+
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "ts",
1022+
DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[AWSDmsAvroPayload].getName,
1023+
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[CustomKeyGenerator].getName,
1024+
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> ""
1025+
)
1026+
1027+
testDF_upgrade
1028+
.dropDuplicates()
1029+
.write
1030+
.format("org.apache.hudi")
1031+
.options(hudiOptions)
1032+
.mode(SaveMode.Append)
1033+
.save(basePath)
1034+
1035+
val actualDF_upgrade = spark.read.format("org.apache.hudi").load(
1036+
basePath
1037+
)
1038+
.orderBy("id")
1039+
1040+
assertEquals(actualDF_upgrade.select("ts", "id", "name", "alias").toString(),
1041+
expectedDF_upgrade.select("ts", "id", "name", "alias").toString()
1042+
)
1043+
}
9811044
}

0 commit comments

Comments
 (0)