Skip to content

Commit e3914eb

Browse files
author
jian.feng
committed
[HUDI-3304] apply patch from Danny
1 parent 94c9b7b commit e3914eb

File tree

10 files changed

+110
-95
lines changed

10 files changed

+110
-95
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
4343
try {
4444
// De-dupe/merge if needed
4545
I dedupedRecords =
46-
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, table.getConfig().getSchema());
46+
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
4747

4848
Instant lookupBegin = Instant.now();
4949
I taggedRecords = dedupedRecords;
@@ -69,8 +69,8 @@ protected abstract I tag(
6969
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table);
7070

7171
public I combineOnCondition(
72-
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table, String schema) {
73-
return condition ? deduplicateRecords(records, table, parallelism, schema) : records;
72+
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
73+
return condition ? deduplicateRecords(records, table, parallelism) : records;
7474
}
7575

7676
/**
@@ -81,8 +81,8 @@ public I combineOnCondition(
8181
* @return Collection of HoodieRecord already be deduplicated
8282
*/
8383
public I deduplicateRecords(
84-
I records, HoodieTable<T, I, K, O> table, int parallelism, String schema) {
85-
return deduplicateRecords(records, table.getIndex(), parallelism, schema);
84+
I records, HoodieTable<T, I, K, O> table, int parallelism) {
85+
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
8686
}
8787

8888
public abstract I deduplicateRecords(

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.table.action.commit;
2020

2121
import org.apache.hudi.client.WriteStatus;
22+
import org.apache.hudi.common.config.SerializableSchema;
2223
import org.apache.hudi.common.data.HoodieData;
2324
import org.apache.hudi.common.engine.HoodieEngineContext;
2425
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -29,8 +30,6 @@
2930
import org.apache.hudi.index.HoodieIndex;
3031
import org.apache.hudi.table.HoodieTable;
3132

32-
import org.apache.avro.Schema;
33-
3433
import java.util.Properties;
3534

3635
public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
@@ -55,20 +54,17 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
5554

5655
@Override
5756
public HoodieData<HoodieRecord<T>> deduplicateRecords(
58-
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String avroJsonSchema) {
57+
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
5958
boolean isIndexingGlobal = index.isGlobal();
60-
final Schema[] schema = {null};
59+
final SerializableSchema schema = new SerializableSchema(schemaStr);
6160
return records.mapToPair(record -> {
6261
HoodieKey hoodieKey = record.getKey();
6362
// If index used is global, then records are expected to differ in their partitionPath
6463
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
6564
return Pair.of(key, record);
6665
}).reduceByKey((rec1, rec2) -> {
67-
if (schema[0] == null) {
68-
schema[0] = new Schema.Parser().parse(avroJsonSchema);
69-
}
7066
@SuppressWarnings("unchecked")
71-
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema[0], new Properties());
67+
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties());
7268
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
7369

7470
return new HoodieAvroRecord<>(reducedKey, reducedData);

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,13 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie
9191

9292
@Override
9393
public List<HoodieRecord<T>> deduplicateRecords(
94-
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String avroJsonSchema) {
94+
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
9595
// If index used is global, then records are expected to differ in their partitionPath
9696
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
9797
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
9898

99-
final Schema schema = new Schema.Parser().parse(avroJsonSchema);
99+
// caution that the avro schema is not serializable
100+
final Schema schema = new Schema.Parser().parse(schemaStr);
100101
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
101102
final T data1 = rec1.getData();
102103
final T data2 = rec2.getData();

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
103103

104104
if (performDedupe) {
105105
dedupedRecords = (List<HoodieRecord<T>>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
106-
parallelism, table, config.getSchema());
106+
parallelism, table);
107107
}
108108

109109
final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie
5858

5959
@Override
6060
public List<HoodieRecord<T>> deduplicateRecords(
61-
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String avroJsonSchema) {
61+
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
6262
boolean isIndexingGlobal = index.isGlobal();
6363
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
6464
HoodieKey hoodieKey = record.getKey();
@@ -67,7 +67,7 @@ public List<HoodieRecord<T>> deduplicateRecords(
6767
return Pair.of(key, record);
6868
}).collect(Collectors.groupingBy(Pair::getLeft));
6969

70-
final Schema schema = new Schema.Parser().parse(avroJsonSchema);
70+
final Schema schema = new Schema.Parser().parse(schemaStr);
7171
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
7272
@SuppressWarnings("unchecked")
7373
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema, new Properties());

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecor
114114

115115
if (performDedupe) {
116116
dedupedRecords = (HoodieData<HoodieRecord<T>>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
117-
parallelism, table, config.getSchema());
117+
parallelism, table);
118118
}
119119

120120
// only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,26 +461,27 @@ private void testDeduplication(
461461
HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
462462
.combineInput(true, true);
463463
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
464+
HoodieWriteConfig writeConfig = configBuilder.build();
464465

465466
// Global dedup should be done based on recordKey only
466467
HoodieIndex index = mock(HoodieIndex.class);
467468
when(index.isGlobal()).thenReturn(true);
468-
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, configBuilder.build().getSchema()).collectAsList();
469+
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
469470
assertEquals(1, dedupedRecs.size());
470471
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
471472
assertNodupesWithinPartition(dedupedRecs);
472473

473474
// non-Global dedup should be done based on both recordKey and partitionPath
474475
index = mock(HoodieIndex.class);
475476
when(index.isGlobal()).thenReturn(false);
476-
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, configBuilder.build().getSchema()).collectAsList();
477+
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
477478
assertEquals(2, dedupedRecs.size());
478479
assertNodupesWithinPartition(dedupedRecs);
479480

480481
// Perform write-action and check
481482
JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
482483

483-
try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) {
484+
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
484485
client.startCommitWithTime(newCommitTime);
485486
List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
486487
assertNoWriteErrors(statuses);

hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,14 @@ public class SerializableSchema implements Serializable {
3535
public SerializableSchema() {
3636
}
3737

38+
public SerializableSchema(String schema) {
39+
this.schema = new Schema.Parser().parse(schema);
40+
}
41+
3842
public SerializableSchema(Schema schema) {
3943
this.schema = newCopy(schema);
4044
}
41-
45+
4246
public SerializableSchema(SerializableSchema serializableSchema) {
4347
this(serializableSchema.schema);
4448
}

hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,29 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
6161
return mergeRecords(schema, insertRecord, currentRecord);
6262
}
6363

64-
protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord insertRecord, GenericRecord currentRecord) {
65-
if (isDeleteRecord(insertRecord)) {
64+
/**
65+
* Merges the given records into one.
66+
*
67+
* @param schema The record schema
68+
* @param baseRecord The base record to merge with
69+
* @param mergedRecord The record to be merged
70+
*
71+
* @return the merged record option
72+
*/
73+
protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) {
74+
if (isDeleteRecord(baseRecord)) {
6675
return Option.empty();
6776
} else {
6877
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
6978
List<Schema.Field> fields = schema.getFields();
7079
fields.forEach(field -> {
71-
Object value = insertRecord.get(field.name());
80+
Object value = baseRecord.get(field.name());
7281
value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
7382
Object defaultValue = field.defaultVal();
7483
if (!overwriteField(value, defaultValue)) {
7584
builder.set(field, value);
7685
} else {
77-
builder.set(field, currentRecord.get(field.pos()));
86+
builder.set(field, mergedRecord.get(field.pos()));
7887
}
7988
});
8089
return Option.of(builder.build());

hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java

Lines changed: 72 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,22 @@
3232
import java.util.Properties;
3333

3434
/**
35-
* subclass of OverwriteNonDefaultsWithLatestAvroPayload used for Partial update Hudi Table.
35+
* Payload clazz that is used for partial update Hudi Table.
3636
*
37-
* Simplified partial update Logic:
38-
* 1 preCombine
39-
* For every record with duplicate record (same record key) in the same batch or in the delta logs that belongs to same File Group
40-
* Check if one record's ordering value is larger than the other record. If yes,overwrite the exists one for specified fields
41-
* that doesn't equal to null.
37+
* <p>Simplified partial update Logic:
38+
* <pre>
39+
* 1. #preCombine
40+
* For records with the same record key in one batch
41+
* or in the delta logs that belongs to same File Group,
42+
* Checks whether one record's ordering value is larger than the other record.
43+
* If yes, overwrites the existing one for specified fields that doesn't equal to null.
4244
*
43-
* 2 combineAndGetUpdateValue
44-
* For every incoming record with exists record in storage (same record key)
45-
* Check if incoming record's ordering value is larger than exists record. If yes,overwrite the exists one for specified fields
46-
* that doesn't equal to null.
47-
* else overwrite the incoming one with exists record for specified fields that doesn't equal to null
48-
* get a merged record, write to file.
45+
* 2. #combineAndGetUpdateValue
46+
* For every incoming record with existing record in storage (same record key)
47+
* Checks whether incoming record's ordering value is larger than the existing record.
48+
* If yes, overwrites the existing one for specified fields that doesn't equal to null.
49+
* else overwrites the incoming one with the existing record for specified fields that doesn't equal to null
50+
* and returns a merged record.
4951
*
5052
* Illustration with simple data.
5153
* let's say the order field is 'ts' and schema is :
@@ -66,7 +68,7 @@
6668
* id ts name price
6769
* 1 , 2 , null , price_2
6870
*
69-
* Result data after preCombine or combineAndGetUpdateValue:
71+
* Result data after #preCombine or #combineAndGetUpdateValue:
7072
* id ts name price
7173
* 1 , 2 , name_1 , price_2
7274
*
@@ -81,14 +83,7 @@
8183
* Result data after preCombine or combineAndGetUpdateValue:
8284
* id ts name price
8385
* 1 , 2 , name_1 , price_1
84-
*
85-
*
86-
* <ol>
87-
* <li>preCombine - Picks the latest delta record for a key, based on an ordering field, then overwrite the older one for specified fields
88-
* that doesn't equal null.
89-
* <li>combineAndGetUpdateValue/getInsertValue - overwrite the older record for specified fields
90-
* that doesn't equal null.
91-
* </ol>
86+
*</pre>
9287
*/
9388
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {
9489

@@ -106,16 +101,13 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal
106101
// use natural order for delete record
107102
return this;
108103
}
109-
boolean isOldRecordNewer = false;
110-
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
111-
// pick the payload with greatest ordering value as insert record
112-
isOldRecordNewer = true;
113-
}
104+
// pick the payload with greater ordering value as insert record
105+
final boolean isOldRecordNewer = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
114106
try {
115-
GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schema).get();
116-
Option<IndexedRecord> optValue = combineAndGetUpdateValue(indexedOldValue, schema, isOldRecordNewer);
117-
if (optValue.isPresent()) {
118-
return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
107+
GenericRecord oldRecord = (GenericRecord) oldValue.getInsertValue(schema).get();
108+
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, isOldRecordNewer);
109+
if (mergedRecord.isPresent()) {
110+
return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
119111
isOldRecordNewer ? oldValue.orderingVal : this.orderingVal);
120112
}
121113
} catch (Exception ex) {
@@ -124,59 +116,71 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal
124116
return this;
125117
}
126118

127-
private Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, boolean shouldInsertCurrentValue) throws IOException {
119+
@Override
120+
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
121+
return this.mergeOldRecord(currentValue, schema, false);
122+
}
123+
124+
@Override
125+
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
126+
return mergeOldRecord(currentValue, schema, isRecordNewer(currentValue, prop));
127+
}
128+
129+
/**
130+
* Return true if value equals defaultValue otherwise false.
131+
*/
132+
public Boolean overwriteField(Object value, Object defaultValue) {
133+
return value == null;
134+
}
135+
136+
// -------------------------------------------------------------------------
137+
// Utilities
138+
// -------------------------------------------------------------------------
139+
140+
private Option<IndexedRecord> mergeOldRecord(
141+
IndexedRecord oldRecord,
142+
Schema schema,
143+
boolean isOldRecordNewer) throws IOException {
128144
Option<IndexedRecord> recordOption = getInsertValue(schema);
129145

130146
if (!recordOption.isPresent()) {
147+
// use natural order for delete record
131148
return Option.empty();
132149
}
133150

134-
GenericRecord insertRecord;
135-
GenericRecord currentRecord;
136-
if (shouldInsertCurrentValue) {
137-
insertRecord = (GenericRecord) currentValue;
138-
currentRecord = (GenericRecord) recordOption.get();
139-
} else {
140-
insertRecord = (GenericRecord) recordOption.get();
141-
currentRecord = (GenericRecord) currentValue;
142-
}
143-
144-
return mergeRecords(schema, insertRecord, currentRecord);
145-
}
151+
GenericRecord baseRecord = isOldRecordNewer ? (GenericRecord) oldRecord : (GenericRecord) recordOption.get();
152+
GenericRecord mergedRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord;
146153

147-
@Override
148-
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
149-
return this.combineAndGetUpdateValue(currentValue, schema, false);
154+
return mergeRecords(schema, baseRecord, mergedRecord);
150155
}
151156

152-
@Override
153-
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
154-
157+
/**
158+
* Returns whether the given record is newer than the record of this payload.
159+
*
160+
* @param record The record
161+
* @param prop The payload properties
162+
*
163+
* @return true if the given record is newer
164+
*/
165+
private boolean isRecordNewer(IndexedRecord record, Properties prop) {
155166
String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
156-
boolean isOldRecordNewer = false;
157-
158167
if (!StringUtils.isNullOrEmpty(orderingField)) {
159-
160168
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty(
161169
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
162170
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
163171

164-
Comparable oldOrderingVal = (Comparable)HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
165-
orderingField,
166-
true, consistentLogicalTimestampEnabled);
167-
if (oldOrderingVal != null && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal)
168-
&& oldOrderingVal.compareTo(orderingVal) > 0) {
169-
// pick the payload with greatest ordering value as insert record
170-
isOldRecordNewer = true;
171-
}
172+
Comparable oldOrderingVal =
173+
(Comparable) HoodieAvroUtils.getNestedFieldVal(
174+
(GenericRecord) record,
175+
orderingField,
176+
true,
177+
consistentLogicalTimestampEnabled);
178+
179+
// pick the payload with greater ordering value as insert record
180+
return oldOrderingVal != null
181+
&& ReflectionUtils.isSameClass(oldOrderingVal, orderingVal)
182+
&& oldOrderingVal.compareTo(orderingVal) > 0;
172183
}
173-
return combineAndGetUpdateValue(currentValue, schema, isOldRecordNewer);
174-
}
175-
176-
/**
177-
* Return true if value equals defaultValue otherwise false.
178-
*/
179-
public Boolean overwriteField(Object value, Object defaultValue) {
180-
return value == null;
184+
return false;
181185
}
182186
}

0 commit comments

Comments
 (0)