Skip to content

Commit ec1b27d

Browse files
rahil-cRahil Chertara
authored andcommitted
[HUDI-4864] Fix AWSDmsAvroPayload#combineAndGetUpdateValue when using MOR snapshot query after delete operations with test (apache#6688)
Co-authored-by: Rahil Chertara <rchertar@amazon.com>
1 parent 2662d9c commit ec1b27d

2 files changed

Lines changed: 27 additions & 3 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
4949
}
5050

5151
public AWSDmsAvroPayload(Option<GenericRecord> record) {
52-
this(record.get(), 0); // natural order
52+
this(record.isPresent() ? record.get() : null, 0); // natural order
5353
}
5454

5555
/**
@@ -87,7 +87,10 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
8787
@Override
8888
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
8989
throws IOException {
90-
IndexedRecord insertValue = super.getInsertValue(schema).get();
91-
return handleDeleteOperation(insertValue);
90+
Option<IndexedRecord> insertValue = super.getInsertValue(schema);
91+
if (!insertValue.isPresent()) {
92+
return Option.empty();
93+
}
94+
return handleDeleteOperation(insertValue.get());
9295
}
9396
}

hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,27 @@ public void testDelete() {
108108

109109
}
110110

111+
@Test
112+
public void testDeleteWithEmptyPayLoad() {
113+
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
114+
Properties properties = new Properties();
115+
116+
GenericRecord oldRecord = new GenericData.Record(avroSchema);
117+
oldRecord.put("field1", 2);
118+
oldRecord.put("Op", "U");
119+
120+
AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty());
121+
122+
try {
123+
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties);
124+
// expect nothing to be committed to table
125+
assertFalse(outputPayload.isPresent());
126+
} catch (Exception e) {
127+
e.printStackTrace();
128+
fail("Unexpected exception");
129+
}
130+
}
131+
111132
@Test
112133
public void testPreCombineWithDelete() {
113134
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);

0 commit comments

Comments
 (0)