Skip to content

Commit 6cd6761

Browse files
author
gengxiaoyu
committed
a new way to config the merger
1 parent 5002e13 commit 6cd6761

File tree

24 files changed

+122
-68
lines changed

24 files changed

+122
-68
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,15 @@ public class HoodieWriteConfig extends HoodieConfig {
132132
public static final ConfigProperty<String> MERGER_IMPLS = ConfigProperty
133133
.key("hoodie.datasource.write.merger.impls")
134134
.defaultValue(HoodieAvroRecordMerger.class.getName())
135-
.withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used "
135+
.withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used. "
136+
+ "These merger impls will filter by hoodie.datasource.write.merger.strategy "
136137
+ "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)");
137138

139+
public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
140+
.key("hoodie.datasource.write.merger.strategy")
141+
.defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
142+
.withDocumentation("Id of merger strategy. Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");
143+
138144
public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
139145
.key("hoodie.datasource.write.keygenerator.class")
140146
.noDefaultValue()
@@ -912,7 +918,8 @@ private void applyMergerClass() {
912918
.map(String::trim)
913919
.distinct()
914920
.collect(Collectors.toList());
915-
this.recordMerger = HoodieRecordUtils.generateRecordMerger(getString(BASE_PATH), engineType, mergers);
921+
String mergerStrategy = getString(MERGER_STRATEGY);
922+
this.recordMerger = HoodieRecordUtils.generateRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
916923
}
917924

918925
public static HoodieWriteConfig.Builder newBuilder() {
@@ -939,7 +946,7 @@ public void setSchema(String schemaStr) {
939946
}
940947

941948
public void setMergerClass(String mergerStrategy) {
942-
setValue(MERGER_IMPLS, mergerStrategy);
949+
setValue(MERGER_STRATEGY, mergerStrategy);
943950
}
944951

945952
public String getInternalSchema() {
@@ -2258,6 +2265,11 @@ public Builder withMergerImpls(String mergerImpls) {
22582265
return this;
22592266
}
22602267

2268+
public Builder withMergerStrategy(String mergerStrategy) {
2269+
writeConfig.setValue(MERGER_STRATEGY, mergerStrategy);
2270+
return this;
2271+
}
2272+
22612273
public Builder withKeyGenerator(String keyGeneratorClass) {
22622274
writeConfig.setValue(KEYGENERATOR_CLASS_NAME, keyGeneratorClass);
22632275
return this;

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
2525
import org.apache.hudi.common.util.Option;
26+
import org.apache.hudi.common.util.StringUtils;
2627
import org.apache.hudi.common.util.ValidationUtils;
2728
import org.apache.hudi.metadata.HoodieMetadataPayload;
2829

@@ -33,6 +34,11 @@
3334

3435
public class HoodieAvroRecordMerger implements HoodieRecordMerger {
3536

37+
@Override
38+
public String getMergingStrategy() {
39+
return StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
40+
}
41+
3642
@Override
3743
public Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
3844
ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO);

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,9 @@ public interface HoodieRecordMerger extends Serializable {
4949
* SPARK, AVRO, FLINK
5050
*/
5151
HoodieRecordType getRecordType();
52+
53+
/**
54+
* The kind of merging strategy this recordMerger belongs to. An UUID represents merging strategy.
55+
*/
56+
String getMergingStrategy();
5257
}

hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hudi.common.config.HoodieConfig;
2727
import org.apache.hudi.common.config.OrderedProperties;
2828
import org.apache.hudi.common.config.TypedProperties;
29-
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
3029
import org.apache.hudi.common.model.HoodieFileFormat;
3130
import org.apache.hudi.common.model.HoodieRecord;
3231
import org.apache.hudi.common.model.HoodieTableType;
@@ -156,11 +155,10 @@ public class HoodieTableConfig extends HoodieConfig {
156155
.withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then "
157156
+ " produce a new base file.");
158157

159-
public static final ConfigProperty<String> MERGER_IMPLS = ConfigProperty
160-
.key("hoodie.compaction.merger.impls")
161-
.defaultValue(HoodieAvroRecordMerger.class.getName())
162-
.withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used "
163-
+ "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)");
158+
public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
159+
.key("hoodie.compaction.merger.strategy")
160+
.defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
161+
.withDocumentation("Id of merger strategy. Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");
164162

165163
public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
166164
.key("hoodie.archivelog.folder")
@@ -244,7 +242,7 @@ public class HoodieTableConfig extends HoodieConfig {
244242

245243
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
246244

247-
public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName, String mergerImpls) {
245+
public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName, String mergerStrategy) {
248246
super();
249247
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
250248
LOG.info("Loading table properties from " + propertyPath);
@@ -256,9 +254,9 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName
256254
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
257255
needStore = true;
258256
}
259-
if (contains(MERGER_IMPLS) && payloadClassName != null
260-
&& !getString(MERGER_IMPLS).equals(mergerImpls)) {
261-
setValue(MERGER_IMPLS, mergerImpls);
257+
if (contains(MERGER_STRATEGY) && payloadClassName != null
258+
&& !getString(MERGER_STRATEGY).equals(mergerStrategy)) {
259+
setValue(MERGER_STRATEGY, mergerStrategy);
262260
needStore = true;
263261
}
264262
if (needStore) {
@@ -428,7 +426,7 @@ public static void create(FileSystem fs, Path metadataFolder, Properties propert
428426
hoodieConfig.setDefaultValue(TYPE);
429427
if (hoodieConfig.getString(TYPE).equals(HoodieTableType.MERGE_ON_READ.name())) {
430428
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME);
431-
hoodieConfig.setDefaultValue(MERGER_IMPLS);
429+
hoodieConfig.setDefaultValue(MERGER_STRATEGY);
432430
}
433431
hoodieConfig.setDefaultValue(ARCHIVELOG_FOLDER);
434432
if (!hoodieConfig.contains(TIMELINE_LAYOUT_VERSION)) {
@@ -501,11 +499,8 @@ public String getPayloadClass() {
501499
/**
502500
* Read the payload class for HoodieRecords from the table properties.
503501
*/
504-
public String getMergerImpls() {
505-
// There could be tables written with merge strategy from com.uber.hoodie. Need to transparently
506-
// change to org.apache.hudi
507-
return getStringOrDefault(MERGER_IMPLS).replace("com.uber.hoodie",
508-
"org.apache.hudi");
502+
public String getMergerStrategy() {
503+
return getStringOrDefault(MERGER_STRATEGY);
509504
}
510505

511506
public String getPreCombineField() {

hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public class HoodieTableMetaClient implements Serializable {
116116

117117
protected HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
118118
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
119-
String payloadClassName, String mergerImpls, FileSystemRetryConfig fileSystemRetryConfig) {
119+
String payloadClassName, String mergerStrategy, FileSystemRetryConfig fileSystemRetryConfig) {
120120
LOG.info("Loading HoodieTableMetaClient from " + basePath);
121121
this.consistencyGuardConfig = consistencyGuardConfig;
122122
this.fileSystemRetryConfig = fileSystemRetryConfig;
@@ -125,7 +125,7 @@ protected HoodieTableMetaClient(Configuration conf, String basePath, boolean loa
125125
this.metaPath = new SerializablePath(new CachingPath(basePath, METAFOLDER_NAME));
126126
this.fs = getFs();
127127
TableNotFoundException.checkTableValidity(fs, this.basePath.get(), metaPath.get());
128-
this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), payloadClassName, mergerImpls);
128+
this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), payloadClassName, mergerStrategy);
129129
this.tableType = tableConfig.getTableType();
130130
Option<TimelineLayoutVersion> tableConfigVersion = tableConfig.getTimelineLayoutVersion();
131131
if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
@@ -159,7 +159,7 @@ public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient)
159159
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig)
160160
.setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion))
161161
.setPayloadClassName(null)
162-
.setMergerImpls(null)
162+
.setMergerStrategy(null)
163163
.setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build();
164164
}
165165

@@ -633,7 +633,7 @@ public void initializeBootstrapDirsIfNotExists() throws IOException {
633633

634634
private static HoodieTableMetaClient newMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
635635
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
636-
String payloadClassName, String mergerImpls, FileSystemRetryConfig fileSystemRetryConfig, Properties props) {
636+
String payloadClassName, String mergerStrategy, FileSystemRetryConfig fileSystemRetryConfig, Properties props) {
637637
HoodieMetastoreConfig metastoreConfig = null == props
638638
? new HoodieMetastoreConfig.Builder().build()
639639
: new HoodieMetastoreConfig.Builder().fromProperties(props).build();
@@ -643,7 +643,7 @@ private static HoodieTableMetaClient newMetaClient(Configuration conf, String ba
643643
conf, consistencyGuardConfig, fileSystemRetryConfig,
644644
props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), props.getProperty(HoodieTableConfig.NAME.key()), metastoreConfig)
645645
: new HoodieTableMetaClient(conf, basePath,
646-
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, mergerImpls, fileSystemRetryConfig);
646+
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, mergerStrategy, fileSystemRetryConfig);
647647
}
648648

649649
public static Builder builder() {
@@ -659,7 +659,7 @@ public static class Builder {
659659
private String basePath;
660660
private boolean loadActiveTimelineOnLoad = false;
661661
private String payloadClassName = null;
662-
private String mergerImpls = null;
662+
private String mergerStrategy = null;
663663
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
664664
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
665665
private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
@@ -685,8 +685,8 @@ public Builder setPayloadClassName(String payloadClassName) {
685685
return this;
686686
}
687687

688-
public Builder setMergerImpls(String mergerImpls) {
689-
this.mergerImpls = mergerImpls;
688+
public Builder setMergerStrategy(String mergerStrategy) {
689+
this.mergerStrategy = mergerStrategy;
690690
return this;
691691
}
692692

@@ -715,7 +715,7 @@ public HoodieTableMetaClient build() {
715715
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
716716
return newMetaClient(conf, basePath,
717717
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName,
718-
mergerImpls, fileSystemRetryConfig, props);
718+
mergerStrategy, fileSystemRetryConfig, props);
719719
}
720720
}
721721

@@ -732,7 +732,7 @@ public static class PropertyBuilder {
732732
private String recordKeyFields;
733733
private String archiveLogFolder;
734734
private String payloadClassName;
735-
private String mergerImpls;
735+
private String mergerStrategy;
736736
private Integer timelineLayoutVersion;
737737
private String baseFileFormat;
738738
private String preCombineField;
@@ -799,8 +799,8 @@ public PropertyBuilder setPayloadClassName(String payloadClassName) {
799799
return this;
800800
}
801801

802-
public PropertyBuilder setMergerImpls(String mergerImpls) {
803-
this.mergerImpls = mergerImpls;
802+
public PropertyBuilder setMergerStrategy(String mergerStrategy) {
803+
this.mergerStrategy = mergerStrategy;
804804
return this;
805805
}
806806

@@ -910,7 +910,7 @@ public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
910910
.setTableName(metaClient.getTableConfig().getTableName())
911911
.setArchiveLogFolder(metaClient.getArchivePath())
912912
.setPayloadClassName(metaClient.getTableConfig().getPayloadClass())
913-
.setMergerImpls(metaClient.getTableConfig().getMergerImpls());
913+
.setMergerStrategy(metaClient.getTableConfig().getMergerStrategy());
914914
}
915915

916916
public PropertyBuilder fromProperties(Properties properties) {
@@ -940,9 +940,9 @@ public PropertyBuilder fromProperties(Properties properties) {
940940
setPayloadClassName(
941941
hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
942942
}
943-
if (hoodieConfig.contains(HoodieTableConfig.MERGER_IMPLS)) {
944-
setMergerImpls(
945-
hoodieConfig.getString(HoodieTableConfig.MERGER_IMPLS));
943+
if (hoodieConfig.contains(HoodieTableConfig.MERGER_STRATEGY)) {
944+
setMergerStrategy(
945+
hoodieConfig.getString(HoodieTableConfig.MERGER_STRATEGY));
946946
}
947947
if (hoodieConfig.contains(HoodieTableConfig.TIMELINE_LAYOUT_VERSION)) {
948948
setTimelineLayoutVersion(hoodieConfig.getInt(HoodieTableConfig.TIMELINE_LAYOUT_VERSION));
@@ -1021,8 +1021,8 @@ public Properties build() {
10211021
if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != null) {
10221022
tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, payloadClassName);
10231023
}
1024-
if (tableType == HoodieTableType.MERGE_ON_READ && mergerImpls != null) {
1025-
tableConfig.setValue(HoodieTableConfig.MERGER_IMPLS, mergerImpls);
1024+
if (tableType == HoodieTableType.MERGE_ON_READ && mergerStrategy != null) {
1025+
tableConfig.setValue(HoodieTableConfig.MERGER_STRATEGY, mergerStrategy);
10261026
}
10271027

10281028
if (null != tableCreateSchema) {

hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public static HoodieRecordMerger loadRecordMerger(String mergerClass) {
6767
* Instantiate a given class with a record merge.
6868
*/
6969
public static HoodieRecordMerger generateRecordMerger(String basePath, EngineType engineType,
70-
List<String> mergerClassList) {
70+
List<String> mergerClassList, String mergerStrategy) {
7171
if (mergerClassList.isEmpty() || HoodieTableMetadata.isMetadataTable(basePath)) {
7272
return HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
7373
} else {
@@ -81,6 +81,7 @@ public static HoodieRecordMerger generateRecordMerger(String basePath, EngineTyp
8181
}
8282
})
8383
.filter(Objects::nonNull)
84+
.filter(merger -> merger.getMergingStrategy().equals(mergerStrategy))
8485
.filter(merger -> recordTypeCompatibleEngine(merger.getRecordType(), engineType))
8586
.findFirst()
8687
.orElse(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));

hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class StringUtils {
3333

3434
public static final String EMPTY_STRING = "";
3535

36+
public static final String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
37+
3638
/**
3739
* <p>
3840
* Joins the elements of the provided array into a single String containing the provided list of elements.

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.model.HoodieCleaningPolicy;
2828
import org.apache.hudi.common.model.HoodieTableType;
2929
import org.apache.hudi.common.model.WriteOperationType;
30+
import org.apache.hudi.common.util.StringUtils;
3031
import org.apache.hudi.config.HoodieIndexConfig;
3132
import org.apache.hudi.config.HoodieWriteConfig;
3233
import org.apache.hudi.exception.HoodieException;
@@ -120,9 +121,16 @@ private FlinkOptions() {
120121
.key("record.merger.impls")
121122
.stringType()
122123
.defaultValue(HoodieAvroRecordMerger.class.getName())
123-
.withDescription("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used "
124+
.withDescription("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used. "
125+
+ "These merger impls will filter by record.merger.strategy. "
124126
+ "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)");
125127

128+
public static final ConfigOption<String> RECORD_MERGER_STRATEGY = ConfigOptions
129+
.key("record.merger.strategy")
130+
.stringType()
131+
.defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
132+
.withDescription("Id of merger strategy. Hudi will pick RecordMergers in record.merger.impls which has the same merger strategy id");
133+
126134
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
127135
.key("partition.default_name")
128136
.stringType()

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818

1919
package org.apache.hudi.sink;
2020

21-
import java.util.Arrays;
2221
import org.apache.hudi.client.WriteStatus;
23-
import org.apache.hudi.common.engine.EngineType;
2422
import org.apache.hudi.common.model.HoodieAvroRecord;
2523
import org.apache.hudi.common.model.HoodieKey;
2624
import org.apache.hudi.common.model.HoodieOperation;
@@ -29,7 +27,6 @@
2927
import org.apache.hudi.common.model.HoodieRecordLocation;
3028
import org.apache.hudi.common.model.HoodieRecordPayload;
3129
import org.apache.hudi.common.model.WriteOperationType;
32-
import org.apache.hudi.common.util.HoodieRecordUtils;
3330
import org.apache.hudi.common.util.ObjectSizeCalculator;
3431
import org.apache.hudi.common.util.ValidationUtils;
3532
import org.apache.hudi.configuration.FlinkOptions;
@@ -204,12 +201,7 @@ private void initWriteFunction() {
204201
}
205202

206203
private void initMergeClass() {
207-
List<String> mergers = Arrays.stream(config.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(","))
208-
.map(String::trim)
209-
.distinct()
210-
.collect(Collectors.toList());
211-
recordMerger = HoodieRecordUtils.generateRecordMerger(writeClient.getConfig().getBasePath(), EngineType.FLINK,
212-
mergers);
204+
recordMerger = writeClient.getConfig().getRecordMerger();
213205
LOG.info("init hoodie merge with class [{}]", recordMerger.getClass().getName());
214206
}
215207

0 commit comments

Comments
 (0)