Skip to content

Commit bb208e9

Browse files
wulei0302wangzixuan.wzxuan
andcommitted
[HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific HoodieRecord (#5627)
Co-authored-by: wangzixuan.wzxuan <[email protected]>
1 parent 17d179c commit bb208e9

File tree

54 files changed

+1322
-186
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1322
-186
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hudi.common.config.ConfigGroups;
2323
import org.apache.hudi.common.config.ConfigProperty;
2424
import org.apache.hudi.common.config.HoodieConfig;
25+
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
2526
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
2627
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
2728
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -113,6 +114,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
113114
+ "compaction during each compaction run. By default. Hudi picks the log file "
114115
+ "with most accumulated unmerged data");
115116

117+
public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
118+
.key("hoodie.compaction.merge.class")
119+
.defaultValue(HoodieAvroRecordMerge.class.getName())
120+
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
121+
+ "types, such as Spark records or Flink records.");
122+
116123
public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
117124
.key("hoodie.compaction.lazy.block.read")
118125
.defaultValue("true")
@@ -346,6 +353,11 @@ public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
346353
return this;
347354
}
348355

356+
public Builder withMergeClass(String mergeClass) {
357+
compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass);
358+
return this;
359+
}
360+
349361
public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
350362
compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
351363
return this;

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.common.engine.EngineType;
3434
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
3535
import org.apache.hudi.common.fs.FileSystemRetryConfig;
36+
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
3637
import org.apache.hudi.common.model.HoodieCleaningPolicy;
3738
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
3839
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -125,6 +126,12 @@ public class HoodieWriteConfig extends HoodieConfig {
125126
.withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
126127
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
127128

129+
public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
130+
.key("hoodie.datasource.write.merge.class")
131+
.defaultValue(HoodieAvroRecordMerge.class.getName())
132+
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
133+
+ "types, such as Spark records or Flink records.");
134+
128135
public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
129136
.key("hoodie.datasource.write.keygenerator.class")
130137
.noDefaultValue()
@@ -1328,6 +1335,10 @@ public String getPayloadClass() {
13281335
return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
13291336
}
13301337

1338+
public String getMergeClass() {
1339+
return getString(HoodieCompactionConfig.MERGE_CLASS_NAME);
1340+
}
1341+
13311342
public int getTargetPartitionsPerDayBasedCompaction() {
13321343
return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
13331344
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,10 +338,7 @@ public void write(HoodieRecord<T> oldRecord) {
338338
// writing the first record. So make a copy of the record to be merged
339339
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
340340
try {
341-
Option<HoodieRecord> combinedRecord =
342-
hoodieRecord.combineAndGetUpdateValue(oldRecord,
343-
schema,
344-
props);
341+
Option<HoodieRecord> combinedRecord = merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props);
345342

346343
if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
347344
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import org.apache.hudi.common.engine.TaskContextSupplier;
2828
import org.apache.hudi.common.fs.FSUtils;
2929
import org.apache.hudi.common.model.HoodieRecord;
30+
import org.apache.hudi.common.model.HoodieMerge;
3031
import org.apache.hudi.common.model.IOType;
32+
import org.apache.hudi.common.util.HoodieRecordUtils;
3133
import org.apache.hudi.common.util.HoodieTimer;
3234
import org.apache.hudi.common.util.Option;
3335
import org.apache.hudi.common.util.ReflectionUtils;
@@ -59,6 +61,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
5961
*/
6062
protected final Schema tableSchema;
6163
protected final Schema tableSchemaWithMetaFields;
64+
protected final HoodieMerge merge;
6265

6366
/**
6467
* The write schema. In most case the write schema is the same to the
@@ -103,6 +106,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
103106
this.taskContextSupplier = taskContextSupplier;
104107
this.writeToken = makeWriteToken();
105108
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
109+
this.merge = HoodieRecordUtils.loadMerge(config.getMergeClass());
106110
}
107111

108112
/**

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.hudi.table.action.commit;
2020

2121
import org.apache.hudi.common.engine.HoodieEngineContext;
22+
import org.apache.hudi.common.model.HoodieMerge;
2223
import org.apache.hudi.common.model.WriteOperationType;
24+
import org.apache.hudi.common.util.HoodieRecordUtils;
2325
import org.apache.hudi.exception.HoodieUpsertException;
2426
import org.apache.hudi.index.HoodieIndex;
2527
import org.apache.hudi.table.HoodieTable;
@@ -81,9 +83,10 @@ public I combineOnCondition(
8183
*/
8284
public I deduplicateRecords(
8385
I records, HoodieTable<T, I, K, O> table, int parallelism) {
84-
return deduplicateRecords(records, table.getIndex(), parallelism);
86+
HoodieMerge merge = HoodieRecordUtils.loadMerge(table.getConfig().getMergeClass());
87+
return deduplicateRecords(records, table.getIndex(), parallelism, merge);
8588
}
8689

8790
public abstract I deduplicateRecords(
88-
I records, HoodieIndex<?, ?> index, int parallelism);
91+
I records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge);
8992
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import org.apache.hudi.common.engine.HoodieEngineContext;
2424
import org.apache.hudi.common.model.HoodieKey;
2525
import org.apache.hudi.common.model.HoodieRecord;
26+
import org.apache.hudi.common.model.HoodieMerge;
2627
import org.apache.hudi.common.util.collection.Pair;
2728
import org.apache.hudi.index.HoodieIndex;
2829
import org.apache.hudi.table.HoodieTable;
2930

3031
public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
3132
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
32-
3333
private HoodieWriteHelper() {
3434
}
3535

@@ -49,7 +49,7 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
4949

5050
@Override
5151
public HoodieData<HoodieRecord<T>> deduplicateRecords(
52-
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
52+
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
5353
boolean isIndexingGlobal = index.isGlobal();
5454
return records.mapToPair(record -> {
5555
HoodieKey hoodieKey = record.getKey();
@@ -58,10 +58,9 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
5858
return Pair.of(key, record);
5959
}).reduceByKey((rec1, rec2) -> {
6060
@SuppressWarnings("unchecked")
61-
HoodieRecord reducedRec = rec2.preCombine(rec1);
62-
HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() : rec2.getKey();
63-
64-
return (HoodieRecord<T>) reducedRec.newInstance(reducedKey);
61+
HoodieRecord<T> reducedRecord = merge.preCombine(rec1, rec2);
62+
HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
63+
return reducedRecord.newInstance(reducedKey);
6564
}, parallelism).map(Pair::getRight);
6665
}
6766

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
package org.apache.hudi.testutils;
2121

22+
import org.apache.avro.Schema;
23+
import org.apache.avro.generic.GenericRecord;
24+
import org.apache.avro.generic.IndexedRecord;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.FileSystem;
27+
import org.apache.hadoop.fs.Path;
2228
import org.apache.hudi.avro.HoodieAvroUtils;
2329
import org.apache.hudi.avro.HoodieAvroWriteSupport;
2430
import org.apache.hudi.common.bloom.BloomFilter;
@@ -44,13 +50,6 @@
4450
import org.apache.hudi.io.storage.HoodieOrcConfig;
4551
import org.apache.hudi.io.storage.HoodieParquetConfig;
4652
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
47-
48-
import org.apache.avro.Schema;
49-
import org.apache.avro.generic.GenericRecord;
50-
import org.apache.avro.generic.IndexedRecord;
51-
import org.apache.hadoop.conf.Configuration;
52-
import org.apache.hadoop.fs.FileSystem;
53-
import org.apache.hadoop.fs.Path;
5453
import org.apache.log4j.LogManager;
5554
import org.apache.log4j.Logger;
5655
import org.apache.orc.CompressionKind;

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939

4040
import java.util.List;
4141

42-
import static org.apache.hudi.common.data.HoodieList.getList;
43-
4442
public abstract class HoodieFlinkTable<T>
4543
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
4644
implements ExplicitWriteHandleTable<T> {

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.hudi.io.storage.HoodieAvroFileReader;
3232
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
3333
import org.apache.hudi.table.HoodieTable;
34-
import scala.collection.immutable.List;
3534

3635
import org.apache.avro.Schema;
3736
import org.apache.avro.generic.GenericDatumReader;
@@ -43,6 +42,7 @@
4342

4443
import java.io.IOException;
4544
import java.util.Iterator;
45+
import java.util.List;
4646

4747
public class FlinkMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>>,
4848
List<HoodieKey>, List<WriteStatus>> {

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hudi.common.model.HoodieKey;
2525
import org.apache.hudi.common.model.HoodieOperation;
2626
import org.apache.hudi.common.model.HoodieRecord;
27+
import org.apache.hudi.common.model.HoodieMerge;
2728
import org.apache.hudi.common.model.WriteOperationType;
2829
import org.apache.hudi.exception.HoodieUpsertException;
2930
import org.apache.hudi.index.HoodieIndex;
@@ -86,13 +87,14 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie
8687

8788
@Override
8889
public List<HoodieRecord<T>> deduplicateRecords(
89-
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
90+
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
9091
// If index used is global, then records are expected to differ in their partitionPath
9192
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
9293
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
9394

9495
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
95-
@SuppressWarnings("unchecked") final HoodieRecord reducedRec = rec2.preCombine(rec1);
96+
@SuppressWarnings("unchecked")
97+
final HoodieRecord reducedRec = merge.preCombine(rec1, rec2);
9698
// we cannot allow the user to change the key or partitionPath, since that will affect
9799
// everything
98100
// so pick it from one of the records.

0 commit comments

Comments
 (0)