Skip to content

Commit 8bd34a6

Browse files
author
wangzixuan.wzxuan
committed
refact HoodieRecordMerger
1 parent 7c076ec commit 8bd34a6

File tree

81 files changed

+639
-634
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+639
-634
lines changed

hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.hudi.cli.TableHeader;
2525
import org.apache.hudi.common.config.HoodieCommonConfig;
2626
import org.apache.hudi.common.fs.FSUtils;
27-
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
27+
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
2828
import org.apache.hudi.common.model.HoodieLogFile;
2929
import org.apache.hudi.common.model.HoodieRecord;
3030
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
4040
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
4141
import org.apache.hudi.common.util.ClosableIterator;
42+
import org.apache.hudi.common.util.HoodieRecordUtils;
4243
import org.apache.hudi.common.util.Option;
4344
import org.apache.hudi.config.HoodieCompactionConfig;
4445
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -222,8 +223,7 @@ public String showLogFileRecords(
222223
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
223224
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
224225
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
225-
.withRecordType(HoodieRecordType.AVRO)
226-
.withMergeClass(HoodieAvroRecordMerge.class.getName())
226+
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
227227
.build();
228228
for (HoodieRecord hoodieRecord : scanner) {
229229
Option<IndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());

hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
225225
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
226226
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
227227
.withRecordType(HoodieRecordType.valueOf(HoodieWriteConfig.RECORD_TYPE.defaultValue()))
228-
.withMergeClass(HoodieCompactionConfig.MERGE_CLASS_NAME.defaultValue())
228+
.withRecordMerger(HoodieCompactionConfig.MERGE_CLASS_NAME.defaultValue())
229229
.build();
230230

231231
Iterator<HoodieRecord> records = scanner.iterator();

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hudi.common.engine.HoodieEngineContext;
2424
import org.apache.hudi.common.config.TypedProperties;
2525
import org.apache.hudi.common.util.collection.Pair;
26+
import org.apache.hudi.config.HoodieWriteConfig;
2627

2728
import org.apache.log4j.LogManager;
2829
import org.apache.log4j.Logger;
@@ -49,8 +50,9 @@ public FullRecordBootstrapDataProvider(TypedProperties props, HoodieEngineContex
4950
* @param tableName Hudi Table Name
5051
* @param sourceBasePath Source Base Path
5152
* @param partitionPaths Partition Paths
53+
* @param config config
5254
* @return input records
5355
*/
5456
public abstract I generateInputRecords(String tableName,
55-
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths);
57+
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieWriteConfig config);
5658
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.hudi.common.config.ConfigGroups;
2323
import org.apache.hudi.common.config.ConfigProperty;
2424
import org.apache.hudi.common.config.HoodieConfig;
25-
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
25+
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
2626
import org.apache.hudi.common.model.HoodieCleaningPolicy;
2727
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
2828
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -237,7 +237,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
237237

238238
public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
239239
.key("hoodie.compaction.merge.class")
240-
.defaultValue(HoodieAvroRecordMerge.class.getName())
240+
.defaultValue(HoodieAvroRecordMerger.class.getName())
241241
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
242242
+ "types, such as Spark records or Flink records.");
243243

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,11 @@
3434
import org.apache.hudi.common.engine.EngineType;
3535
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
3636
import org.apache.hudi.common.fs.FileSystemRetryConfig;
37-
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
37+
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
3838
import org.apache.hudi.common.model.HoodieCleaningPolicy;
3939
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
4040
import org.apache.hudi.common.model.HoodieFileFormat;
41-
import org.apache.hudi.common.model.HoodieRecord;
42-
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
41+
import org.apache.hudi.common.model.HoodieRecordMerger;
4342
import org.apache.hudi.common.model.HoodieTableType;
4443
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
4544
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -48,6 +47,7 @@
4847
import org.apache.hudi.common.table.marker.MarkerType;
4948
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
5049
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
50+
import org.apache.hudi.common.util.ConfigUtils;
5151
import org.apache.hudi.common.util.Option;
5252
import org.apache.hudi.common.util.ReflectionUtils;
5353
import org.apache.hudi.common.util.StringUtils;
@@ -64,7 +64,6 @@
6464
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
6565
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
6666
import org.apache.hudi.keygen.constant.KeyGeneratorType;
67-
import org.apache.hudi.metadata.HoodieTableMetadata;
6867
import org.apache.hudi.metrics.MetricsReporterType;
6968
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
7069
import org.apache.hudi.table.RandomFileIdPrefixProvider;
@@ -91,7 +90,6 @@
9190
import java.util.Map;
9291
import java.util.Objects;
9392
import java.util.Properties;
94-
import java.util.function.Function;
9593
import java.util.function.Supplier;
9694
import java.util.stream.Collectors;
9795

@@ -129,24 +127,17 @@ public class HoodieWriteConfig extends HoodieConfig {
129127
.withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
130128
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
131129

132-
// Detect HoodieMerge type with HoodieRecordType.
133-
public static final Function<HoodieConfig, Option<String>> MERGE_CLASS_INFER_FUNCTION = cfg -> {
134-
if (cfg.getStringOrDefault(HoodieWriteConfig.RECORD_TYPE).equals(HoodieRecordType.AVRO.name())) {
135-
return Option.of(HoodieAvroRecordMerge.class.getName());
136-
} else if (cfg.getStringOrDefault(HoodieWriteConfig.RECORD_TYPE).equals(HoodieRecordType.SPARK.name())) {
137-
return Option.of("org.apache.hudi.HoodieSparkRecordMerge");
138-
} else {
139-
return Option.empty();
140-
}
141-
};
142-
143-
public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
130+
public static final ConfigProperty<String> MERGER_CLASS_NAME = ConfigProperty
144131
.key("hoodie.datasource.write.merge.class")
145-
.defaultValue(HoodieAvroRecordMerge.class.getName())
146-
.withInferFunction(MERGE_CLASS_INFER_FUNCTION)
132+
.noDefaultValue()
147133
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
148134
+ "types, such as Spark records or Flink records.");
149135

136+
public static final ConfigProperty<Boolean> SELF_ADAPTION_MERGER_CLASS_NAME = ConfigProperty
137+
.key("hoodie.datasource.write.self.adaption.merge.class")
138+
.defaultValue(false)
139+
.withDocumentation("Automatically identify merger class. If false use default merger class(HoodieAvroRecordMerger)");
140+
150141
public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
151142
.key("hoodie.datasource.write.keygenerator.class")
152143
.noDefaultValue()
@@ -159,13 +150,6 @@ public class HoodieWriteConfig extends HoodieConfig {
159150
.withDocumentation("Easily configure one the built-in key generators, instead of specifying the key generator class."
160151
+ "Currently supports SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE");
161152

162-
public static final ConfigProperty<String> RECORD_TYPE = ConfigProperty
163-
.key("hoodie.datasource.write.record.type")
164-
.defaultValue(HoodieRecordType.AVRO.name())
165-
.withValidValues(HoodieRecordType.AVRO.name(), HoodieRecordType.SPARK.name())
166-
.withDocumentation("The data type used by engine."
167-
+ "Currently supports AVRO, SPARK");
168-
169153
public static final ConfigProperty<String> ROLLBACK_USING_MARKERS_ENABLE = ConfigProperty
170154
.key("hoodie.rollback.using.markers")
171155
.defaultValue("true")
@@ -527,7 +511,7 @@ public class HoodieWriteConfig extends HoodieConfig {
527511
private HoodieCommonConfig commonConfig;
528512
private HoodieStorageConfig storageConfig;
529513
private EngineType engineType;
530-
private HoodieRecordType recordType;
514+
private HoodieRecordMerger recordMerger;
531515

532516
/**
533517
* @deprecated Use {@link #TBL_NAME} and its methods instead
@@ -904,15 +888,15 @@ protected HoodieWriteConfig() {
904888
super();
905889
this.engineType = EngineType.SPARK;
906890
this.clientSpecifiedViewStorageConfig = null;
907-
this.recordType = generateRecordType();
891+
applyMergerClass();
908892
}
909893

910894
protected HoodieWriteConfig(EngineType engineType, Properties props) {
911895
super(props);
912896
Properties newProps = new Properties();
913897
newProps.putAll(props);
914898
this.engineType = engineType;
915-
this.recordType = generateRecordType();
899+
applyMergerClass();
916900
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
917901
this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build();
918902
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
@@ -924,14 +908,12 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) {
924908
this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
925909
}
926910

927-
private HoodieRecordType generateRecordType() {
928-
HoodieRecordType recordType = HoodieRecord.HoodieRecordType.valueOf(getStringOrDefault(RECORD_TYPE));
929-
String basePath = getString(BASE_PATH);
930-
boolean metadataTable = HoodieTableMetadata.isMetadataTable(basePath);
931-
if (metadataTable) {
932-
recordType = HoodieRecordType.AVRO;
911+
private void applyMergerClass() {
912+
if (getBoolean(SELF_ADAPTION_MERGER_CLASS_NAME)) {
913+
this.recordMerger = ConfigUtils.generateRecordMerger(getString(BASE_PATH), engineType, Option.ofNullable(getString(MERGER_CLASS_NAME)));
914+
} else {
915+
this.recordMerger = ConfigUtils.generateRecordMerger(getString(BASE_PATH), Option.ofNullable(getString(MERGER_CLASS_NAME)));
933916
}
934-
return recordType;
935917
}
936918

937919
public static HoodieWriteConfig.Builder newBuilder() {
@@ -945,6 +927,10 @@ public String getBasePath() {
945927
return getString(BASE_PATH);
946928
}
947929

930+
public HoodieRecordMerger getRecordMerger() {
931+
return recordMerger;
932+
}
933+
948934
public String getSchema() {
949935
return getString(AVRO_SCHEMA_STRING);
950936
}
@@ -953,6 +939,10 @@ public void setSchema(String schemaStr) {
953939
setValue(AVRO_SCHEMA_STRING, schemaStr);
954940
}
955941

942+
public void setMergerClass(String mergerClass) {
943+
setValue(MERGER_CLASS_NAME, mergerClass);
944+
}
945+
956946
public String getInternalSchema() {
957947
return getString(INTERNAL_SCHEMA_STRING);
958948
}
@@ -1016,10 +1006,6 @@ public String getKeyGeneratorClass() {
10161006
return getString(KEYGENERATOR_CLASS_NAME);
10171007
}
10181008

1019-
public HoodieRecord.HoodieRecordType getRecordType() {
1020-
return recordType;
1021-
}
1022-
10231009
public boolean isConsistentLogicalTimestampEnabled() {
10241010
return getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
10251011
}
@@ -1374,10 +1360,6 @@ public String getPayloadClass() {
13741360
return getString(HoodieCompactionConfig.PAYLOAD_CLASS_NAME);
13751361
}
13761362

1377-
public String getMergeClass() {
1378-
return getString(HoodieCompactionConfig.MERGE_CLASS_NAME);
1379-
}
1380-
13811363
public int getTargetPartitionsPerDayBasedCompaction() {
13821364
return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
13831365
}
@@ -2257,6 +2239,11 @@ public Builder withWritePayLoad(String payload) {
22572239
return this;
22582240
}
22592241

2242+
public Builder withMergerClass(String mergerClass) {
2243+
writeConfig.setValue(MERGER_CLASS_NAME, mergerClass);
2244+
return this;
2245+
}
2246+
22602247
public Builder withKeyGenerator(String keyGeneratorClass) {
22612248
writeConfig.setValue(KEYGENERATOR_CLASS_NAME, keyGeneratorClass);
22622249
return this;

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ private void writeToBuffer(HoodieRecord<T> record) {
515515
record.seal();
516516
}
517517
// fetch the ordering val first in case the record was deflated.
518-
final Comparable<?> orderingVal = record.getOrderingValue();
518+
final Comparable<?> orderingVal = record.getOrderingValue(config.getProps());
519519
Option<HoodieRecord> indexedRecord = prepareRecord(record);
520520
if (indexedRecord.isPresent()) {
521521
// Skip the ignored record.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
100100
partitionMetadata.trySave(getPartitionId());
101101
createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
102102
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable.getHadoopConf(), config.getStorageConfig(),
103-
writeSchemaWithMetaFields, this.taskContextSupplier, config.getRecordType());
103+
writeSchemaWithMetaFields, this.taskContextSupplier, config.getRecordMerger().getRecordType());
104104
} catch (IOException e) {
105105
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
106106
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.model.HoodieOperation;
2828
import org.apache.hudi.common.model.HoodiePartitionMetadata;
2929
import org.apache.hudi.common.model.HoodieRecord;
30+
import org.apache.hudi.common.model.HoodieRecord.Source;
3031
import org.apache.hudi.common.model.HoodieRecordLocation;
3132
import org.apache.hudi.common.model.HoodieWriteStat;
3233
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
@@ -338,7 +339,9 @@ public void write(HoodieRecord<T> oldRecord) {
338339
// writing the first record. So make a copy of the record to be merged
339340
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
340341
try {
341-
Option<HoodieRecord> combinedRecord = merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props);
342+
oldRecord.setSource(Source.BASE);
343+
hoodieRecord.setSource(Source.LOG);
344+
Option<HoodieRecord> combinedRecord = recordMerger.merge(oldRecord, hoodieRecord, schema, props);
342345

343346
if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
344347
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
@@ -444,7 +447,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
444447

445448
long oldNumWrites = 0;
446449
try {
447-
HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(this.config.getRecordType()).getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
450+
HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
448451
oldNumWrites = reader.getTotalRecords();
449452
} catch (IOException e) {
450453
throw new HoodieUpsertException("Failed to check for merge data validation", e);

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ protected HoodieBaseFile getLatestDataFile() {
6262
}
6363

6464
protected HoodieFileReader createNewFileReader() throws IOException {
65-
return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
65+
return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
6666
new Path(getLatestDataFile().getPath()));
6767
}
6868
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@
2727
import org.apache.hudi.common.engine.TaskContextSupplier;
2828
import org.apache.hudi.common.fs.FSUtils;
2929
import org.apache.hudi.common.model.HoodieRecord;
30-
import org.apache.hudi.common.model.HoodieMerge;
30+
import org.apache.hudi.common.model.HoodieRecordMerger;
3131
import org.apache.hudi.common.model.IOType;
32-
import org.apache.hudi.common.util.HoodieRecordUtils;
3332
import org.apache.hudi.common.util.HoodieTimer;
3433
import org.apache.hudi.common.util.Option;
3534
import org.apache.hudi.common.util.ReflectionUtils;
@@ -61,7 +60,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
6160
*/
6261
protected final Schema tableSchema;
6362
protected final Schema tableSchemaWithMetaFields;
64-
protected final HoodieMerge merge;
63+
protected final HoodieRecordMerger recordMerger;
6564

6665
/**
6766
* The write schema. In most case the write schema is the same to the
@@ -106,7 +105,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
106105
this.taskContextSupplier = taskContextSupplier;
107106
this.writeToken = makeWriteToken();
108107
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
109-
this.merge = HoodieRecordUtils.loadMerge(config.getMergeClass(), hoodieTable.getConfig().getBasePath());
108+
this.recordMerger = config.getRecordMerger();
110109
}
111110

112111
/**
@@ -231,6 +230,6 @@ protected long getAttemptId() {
231230

232231
protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable,
233232
HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {
234-
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable.getHadoopConf(), config.getStorageConfig(), schema, taskContextSupplier, config.getRecordType());
233+
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable.getHadoopConf(), config.getStorageConfig(), schema, taskContextSupplier, config.getRecordMerger().getRecordType());
235234
}
236235
}

0 commit comments

Comments
 (0)