Skip to content

Commit cab4b6a

Browse files
committed
[HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch
1 parent a1dedf3 commit cab4b6a

2 files changed

Lines changed: 57 additions & 4 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
7070
if (!overwriteField(value, defaultValue)) {
7171
builder.set(field, value);
7272
} else {
73-
builder.set(field, currentRecord.get(field.pos()));
73+
builder.set(field, currentRecord.get(field.name()));
7474
}
7575
});
7676
return Option.of(builder.build());

hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.avro.generic.GenericData;
2424
import org.apache.avro.generic.GenericRecord;
2525
import org.apache.avro.generic.IndexedRecord;
26+
import org.apache.hudi.avro.HoodieAvroUtils;
2627
import org.junit.jupiter.api.BeforeEach;
2728
import org.junit.jupiter.api.Test;
2829

@@ -54,13 +55,15 @@ public void setUp() throws Exception {
5455

5556
@Test
5657
public void testActiveRecords() throws IOException {
58+
Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema);
59+
5760
GenericRecord record1 = new GenericData.Record(schema);
5861
record1.put("id", "1");
5962
record1.put("partition", "partition1");
6063
record1.put("ts", 0L);
6164
record1.put("_hoodie_is_deleted", false);
6265
record1.put("city", "NY0");
63-
record1.put("child", Arrays.asList("A"));
66+
record1.put("child", Collections.singletonList("A"));
6467

6568
GenericRecord record2 = new GenericData.Record(schema);
6669
record2.put("id", "2");
@@ -76,11 +79,38 @@ public void testActiveRecords() throws IOException {
7679
record3.put("ts", 1L);
7780
record3.put("_hoodie_is_deleted", false);
7881
record3.put("city", "NY0");
79-
record3.put("child", Arrays.asList("A"));
80-
82+
record3.put("child", Collections.singletonList("A"));
83+
84+
// same content with record1 plus metadata fields
85+
GenericRecord record4 = createRecordWithMetadataFields(writerSchema, "1", "partition1");
86+
record4.put("id", "1");
87+
record4.put("partition", "partition1");
88+
record4.put("ts", 0L);
89+
record4.put("_hoodie_is_deleted", false);
90+
record4.put("city", "NY0");
91+
record4.put("child", Collections.singletonList("A"));
92+
93+
// same content with record2 plus metadata fields
94+
GenericRecord record5 = createRecordWithMetadataFields(writerSchema, "2", "");
95+
record5.put("id", "2");
96+
record5.put("partition", "");
97+
record5.put("ts", 1L);
98+
record5.put("_hoodie_is_deleted", false);
99+
record5.put("city", "NY");
100+
record5.put("child", Collections.emptyList());
101+
102+
// same content with record3 plus metadata fields
103+
GenericRecord record6 = createRecordWithMetadataFields(writerSchema, "2", "");
104+
record6.put("id", "2");
105+
record6.put("partition", "partition1");
106+
record6.put("ts", 1L);
107+
record6.put("_hoodie_is_deleted", false);
108+
record6.put("city", "NY0");
109+
record6.put("child", Collections.singletonList("A"));
81110

82111
OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1);
83112
OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2);
113+
OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2);
84114
assertEquals(payload1.preCombine(payload2), payload2);
85115
assertEquals(payload2.preCombine(payload1), payload2);
86116

@@ -94,6 +124,19 @@ public void testActiveRecords() throws IOException {
94124
IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get();
95125
assertEquals(combinedVal2, record3);
96126
assertNotSame(combinedVal2, record3);
127+
128+
// the real case in production is: the current record to be combined includes the metadata fields,
129+
// the payload record could include the metadata fields (for compaction) or not (for normal writer path).
130+
131+
// case1: validate normal writer path
132+
IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema).get();
133+
assertEquals(combinedVal3, record3);
134+
assertNotSame(combinedVal3, record3);
135+
136+
// case2: validate compaction path
137+
IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema).get();
138+
assertEquals(combinedVal4, record6);
139+
assertNotSame(combinedVal4, record6);
97140
}
98141

99142
@Test
@@ -164,4 +207,14 @@ public void testNullColumn() throws IOException {
164207
OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1);
165208
assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3);
166209
}
210+
211+
private static GenericRecord createRecordWithMetadataFields(Schema schema, String recordKey, String partitionPath) {
212+
GenericRecord record = new GenericData.Record(schema);
213+
record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001");
214+
record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123");
215+
record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
216+
record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
217+
record.put(HoodieRecord.FILENAME_METADATA_FIELD, "file1");
218+
return record;
219+
}
167220
}

0 commit comments

Comments
 (0)