Skip to content

Commit ab52e9f

Browse files
committed
[HUDI-3678] Fix record rewrite of create handle when 'preserveMetadata' is true
1 parent 26e5d2e commit ab52e9f

5 files changed

Lines changed: 26 additions & 40 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
5959
protected long recordsDeleted = 0;
6060
private Map<String, HoodieRecord<T>> recordMap;
6161
private boolean useWriterSchema = false;
62-
private boolean preserveHoodieMetadata = false;
62+
private final boolean preserveMetadata;
6363

6464
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
6565
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
@@ -69,9 +69,9 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
6969

7070
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
7171
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
72-
boolean preserveHoodieMetadata) {
72+
boolean preserveMetadata) {
7373
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
74-
taskContextSupplier, preserveHoodieMetadata);
74+
taskContextSupplier, preserveMetadata);
7575
}
7676

7777
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -82,10 +82,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
8282

8383
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
8484
String partitionPath, String fileId, Option<Schema> overriddenSchema,
85-
TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) {
85+
TaskContextSupplier taskContextSupplier, boolean preserveMetadata) {
8686
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
8787
taskContextSupplier);
88-
this.preserveHoodieMetadata = preserveHoodieMetadata;
88+
this.preserveMetadata = preserveMetadata;
8989
writeStatus.setFileId(fileId);
9090
writeStatus.setPartitionPath(partitionPath);
9191
writeStatus.setStat(new HoodieWriteStat());
@@ -111,7 +111,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
111111
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
112112
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
113113
TaskContextSupplier taskContextSupplier) {
114-
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier);
114+
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
115115
this.recordMap = recordMap;
116116
this.useWriterSchema = true;
117117
}
@@ -137,13 +137,11 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
137137
return;
138138
}
139139
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
140-
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
141-
if (preserveHoodieMetadata) {
142-
// do not preserve FILENAME_METADATA_FIELD
143-
recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), path.getName());
144-
fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
140+
if (preserveMetadata) {
141+
fileWriter.writeAvro(record.getRecordKey(),
142+
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
145143
} else {
146-
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
144+
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record);
147145
}
148146
// update the new location of record, so we know where to find it next
149147
record.unseal();

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@
6161
import java.util.Map;
6262
import java.util.Set;
6363

64-
import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS;
65-
6664
@SuppressWarnings("Duplicates")
6765
/**
6866
* Handle to merge incoming records to those in storage.
@@ -264,7 +262,7 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
264262
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
265263
}
266264
}
267-
return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord);
265+
return writeRecord(hoodieRecord, indexedRecord, isDelete);
268266
}
269267

270268
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
@@ -274,16 +272,16 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
274272
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
275273
return;
276274
}
277-
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) {
275+
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
278276
insertRecordsWritten++;
279277
}
280278
}
281279

282280
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
283-
return writeRecord(hoodieRecord, indexedRecord, false, null);
281+
return writeRecord(hoodieRecord, indexedRecord, false);
284282
}
285283

286-
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete, GenericRecord oldRecord) {
284+
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
287285
Option recordMetadata = hoodieRecord.getData().getMetadata();
288286
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
289287
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
@@ -294,13 +292,11 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
294292
try {
295293
if (indexedRecord.isPresent() && !isDelete) {
296294
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
297-
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get(), preserveMetadata, oldRecord);
298-
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only incase of compaction.
299-
// do not preserve FILENAME_METADATA_FIELD
300-
recordWithMetadataInSchema.put(FILENAME_METADATA_FIELD_POS, newFilePath.getName());
301-
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
295+
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only in case of compaction.
296+
fileWriter.writeAvro(hoodieRecord.getRecordKey(),
297+
rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
302298
} else {
303-
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
299+
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord);
304300
}
305301
recordsWritten++;
306302
} else {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,8 @@ protected GenericRecord rewriteRecord(GenericRecord record) {
227227
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
228228
}
229229

230-
protected GenericRecord rewriteRecord(GenericRecord record, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
231-
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields, copyOverMetaFields, fallbackRecord);
230+
protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
231+
return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
232232
}
233233

234234
public abstract List<WriteStatus> close();

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.common.testutils.RawTripTestPayload;
3030
import org.apache.hudi.common.util.BaseFileUtils;
3131
import org.apache.hudi.common.util.Option;
32+
import org.apache.hudi.config.HoodieCompactionConfig;
3233
import org.apache.hudi.config.HoodieWriteConfig;
3334
import org.apache.hudi.exception.HoodieUpsertException;
3435
import org.apache.hudi.io.HoodieCreateHandle;
@@ -77,6 +78,7 @@ public void tearDown() throws IOException {
7778
private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
7879
// Create a bunch of records with an old version of schema
7980
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc");
81+
config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
8082
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
8183
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
8284
List<HoodieRecord> insertRecords = new ArrayList<>();

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -382,23 +382,13 @@ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSch
382382
return newRecord;
383383
}
384384

385-
public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
385+
public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
386386
GenericRecord newRecord = new GenericData.Record(newSchema);
387-
boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase;
388387
for (Schema.Field f : newSchema.getFields()) {
389-
if (!(isSpecificRecord && isMetadataField(f.name()))) {
390-
copyOldValueOrSetDefault(genericRecord, newRecord, f);
391-
}
392-
if (isMetadataField(f.name()) && copyOverMetaFields) {
393-
// if meta field exists in primary generic record, copy over.
394-
if (genericRecord.getSchema().getField(f.name()) != null) {
395-
copyOldValueOrSetDefault(genericRecord, newRecord, f);
396-
} else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) {
397-
// if not, try to copy from the fallback record.
398-
copyOldValueOrSetDefault(fallbackRecord, newRecord, f);
399-
}
400-
}
388+
copyOldValueOrSetDefault(genericRecord, newRecord, f);
401389
}
390+
// do not preserve FILENAME_METADATA_FIELD
391+
newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
402392
if (!GenericData.get().validate(newSchema, newRecord)) {
403393
throw new SchemaCompatibilityException(
404394
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);

0 commit comments

Comments
 (0)