Skip to content

Commit bd45932

Browse files
minihippowangzixuan.wzxuangengxiaoyu
authored
[HUDI-4301] [HUDI-3384][HUDI-3385] Spark specific file reader/writer.(#5629)
* [HUDI-4301] [HUDI-3384][HUDI-3385] Spark specific file reader/writer. * add schema finger print * add benchmark * a new way to config the merger * fix Co-authored-by: wangzixuan.wzxuan <[email protected]> Co-authored-by: gengxiaoyu <[email protected]>
1 parent d0300c2 commit bd45932

File tree

198 files changed

+4357
-1950
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

198 files changed

+4357
-1950
lines changed

hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import org.apache.hudi.cli.HoodiePrintHelper;
2525
import org.apache.hudi.cli.TableHeader;
2626
import org.apache.hudi.common.fs.FSUtils;
27-
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
2827
import org.apache.hudi.common.model.HoodieLogFile;
2928
import org.apache.hudi.common.model.HoodieRecord;
29+
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
3030
import org.apache.hudi.common.table.HoodieTableMetaClient;
3131
import org.apache.hudi.common.table.log.HoodieLogFormat;
3232
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -83,7 +83,7 @@ public String showArchivedCommits(
8383
// read the avro blocks
8484
while (reader.hasNext()) {
8585
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
86-
blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
86+
blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
8787
}
8888
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
8989
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -157,8 +157,8 @@ public String showCommits(
157157
// read the avro blocks
158158
while (reader.hasNext()) {
159159
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
160-
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
161-
recordItr.forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
160+
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
161+
recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
162162
}
163163
}
164164
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)

hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
2626
import org.apache.hudi.cli.HoodieCLI;
2727
import org.apache.hudi.common.fs.FSUtils;
28-
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
2928
import org.apache.hudi.common.model.HoodieLogFile;
3029
import org.apache.hudi.common.model.HoodieRecord;
30+
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
3131
import org.apache.hudi.common.table.HoodieTableMetaClient;
3232
import org.apache.hudi.common.table.log.HoodieLogFormat;
3333
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -125,9 +125,9 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
125125
// read the avro blocks
126126
while (reader.hasNext() && copyCount < limit) {
127127
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
128-
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
128+
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
129129
while (recordItr.hasNext()) {
130-
IndexedRecord ir = (IndexedRecord) recordItr.next().getData();
130+
IndexedRecord ir = recordItr.next().getData();
131131
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
132132
// metadata record from the entry and convert it to json.
133133
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()

hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import org.apache.hudi.common.config.HoodieCommonConfig;
2626
import org.apache.hudi.common.fs.FSUtils;
2727
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
28+
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
2829
import org.apache.hudi.common.model.HoodieLogFile;
2930
import org.apache.hudi.common.model.HoodieRecord;
31+
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
3032
import org.apache.hudi.common.table.HoodieTableMetaClient;
3133
import org.apache.hudi.common.table.TableSchemaResolver;
3234
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -38,6 +40,7 @@
3840
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
3941
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
4042
import org.apache.hudi.common.util.ClosableIterator;
43+
import org.apache.hudi.common.util.HoodieRecordUtils;
4144
import org.apache.hudi.common.util.Option;
4245
import org.apache.hudi.config.HoodieCompactionConfig;
4346
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -125,7 +128,7 @@ public String showLogFileCommits(
125128
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
126129
}
127130
if (n instanceof HoodieDataBlock) {
128-
try (ClosableIterator<HoodieRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieAvroIndexedRecord::new)) {
131+
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
129132
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
130133
}
131134
}
@@ -221,11 +224,12 @@ public String showLogFileRecords(
221224
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
222225
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
223226
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
227+
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
224228
.build();
225229
for (HoodieRecord hoodieRecord : scanner) {
226-
Option<IndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
230+
Option<HoodieAvroIndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
227231
if (allRecords.size() < limit) {
228-
allRecords.add(record.get());
232+
allRecords.add(record.get().getData());
229233
}
230234
}
231235
} else {
@@ -239,10 +243,10 @@ public String showLogFileRecords(
239243
HoodieLogBlock n = reader.next();
240244
if (n instanceof HoodieDataBlock) {
241245
HoodieDataBlock blk = (HoodieDataBlock) n;
242-
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
246+
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
243247
recordItr.forEachRemaining(record -> {
244248
if (allRecords.size() < limit) {
245-
allRecords.add((IndexedRecord) record.getData());
249+
allRecords.add(record.getData());
246250
}
247251
});
248252
}

hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2323
import org.apache.hudi.avro.HoodieAvroWriteSupport
2424
import org.apache.hudi.client.SparkTaskContextSupplier
2525
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
26+
import org.apache.hudi.common.config.HoodieStorageConfig
2627
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
2728
import org.apache.hudi.common.util.BaseFileUtils
28-
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
29+
import org.apache.hudi.config.HoodieIndexConfig
2930
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
3031
import org.apache.parquet.avro.AvroSchemaConverter
3132
import org.apache.parquet.hadoop.metadata.CompressionCodecName
3233
import org.apache.spark.sql.{DataFrame, SQLContext}
33-
3434
import scala.collection.JavaConversions._
3535
import scala.collection.mutable._
3636

hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.common.fs.FSUtils;
3030
import org.apache.hudi.common.model.HoodieAvroRecord;
3131
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
32+
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
3233
import org.apache.hudi.common.model.HoodieLogFile;
3334
import org.apache.hudi.common.model.HoodieRecord;
3435
import org.apache.hudi.common.model.HoodieTableType;
@@ -38,6 +39,7 @@
3839
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
3940
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
4041
import org.apache.hudi.common.testutils.SchemaTestUtil;
42+
import org.apache.hudi.common.util.HoodieRecordUtils;
4143
import org.apache.hudi.common.util.Option;
4244
import org.apache.hudi.config.HoodieCompactionConfig;
4345
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -222,6 +224,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
222224
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
223225
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
224226
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
227+
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
225228
.build();
226229

227230
Iterator<HoodieRecord> records = scanner.iterator();

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
3333
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
3434
import org.apache.hudi.common.model.HoodieLogFile;
35+
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
3536
import org.apache.hudi.common.model.HoodieTableType;
3637
import org.apache.hudi.common.model.HoodieRecord;
3738
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -344,7 +345,7 @@ public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOExcept
344345
// Read the avro blocks
345346
while (reader.hasNext()) {
346347
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
347-
blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
348+
blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
348349
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
349350
writeToFile(wrapperSchema, records);
350351
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hudi.common.engine.HoodieEngineContext;
2424
import org.apache.hudi.common.config.TypedProperties;
2525
import org.apache.hudi.common.util.collection.Pair;
26+
import org.apache.hudi.config.HoodieWriteConfig;
2627

2728
import org.apache.log4j.LogManager;
2829
import org.apache.log4j.Logger;
@@ -49,8 +50,9 @@ public FullRecordBootstrapDataProvider(TypedProperties props, HoodieEngineContex
4950
* @param tableName Hudi Table Name
5051
* @param sourceBasePath Source Base Path
5152
* @param partitionPaths Partition Paths
53+
* @param config config
5254
* @return input records
5355
*/
5456
public abstract I generateInputRecords(String tableName,
55-
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths);
57+
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieWriteConfig config);
5658
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,62 +21,32 @@
2121

2222
import org.apache.hudi.common.model.HoodieRecord;
2323
import org.apache.hudi.common.util.Option;
24-
import org.apache.hudi.common.util.SpillableMapUtils;
2524
import org.apache.hudi.common.util.collection.Pair;
26-
import org.apache.hudi.config.HoodiePayloadConfig;
27-
import org.apache.hudi.exception.HoodieIOException;
28-
import org.apache.hudi.io.storage.HoodieAvroFileReader;
25+
import org.apache.hudi.io.storage.HoodieFileReader;
2926

3027
import org.apache.avro.Schema;
31-
import org.apache.avro.generic.GenericRecord;
3228

3329
import java.io.IOException;
3430
import java.util.Iterator;
35-
import java.util.stream.StreamSupport;
31+
import java.util.Properties;
3632

3733
/**
3834
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
3935
*/
4036
public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {
37+
4138
private final Iterator<HoodieRecord<T>> recordsIterator;
4239

4340
public static HoodieFileSliceReader getFileSliceReader(
44-
Option<HoodieAvroFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
45-
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) throws IOException {
41+
Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option<Pair<String, String>> simpleKeyGenFieldsOpt) throws IOException {
4642
if (baseFileReader.isPresent()) {
47-
Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
43+
Iterator<HoodieRecord> baseIterator = baseFileReader.get().getRecordIterator(schema);
4844
while (baseIterator.hasNext()) {
49-
GenericRecord record = (GenericRecord) baseIterator.next();
50-
HoodieRecord hoodieRecord = transform(
51-
record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
52-
scanner.processNextRecord(hoodieRecord);
45+
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props,
46+
simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false));
5347
}
54-
return new HoodieFileSliceReader(scanner.iterator());
55-
} else {
56-
Iterable<HoodieRecord> iterable = () -> scanner.iterator();
57-
HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
58-
return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
59-
.map(e -> {
60-
try {
61-
GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, payloadConfig.getProps()).get();
62-
return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
63-
} catch (IOException io) {
64-
throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);
65-
}
66-
}).iterator());
6748
}
68-
}
69-
70-
private static HoodieRecord transform(GenericRecord record,
71-
HoodieMergedLogRecordScanner scanner,
72-
String payloadClass,
73-
String preCombineField,
74-
Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
75-
return simpleKeyGenFieldsOpt.isPresent()
76-
? SpillableMapUtils.convertToHoodieRecordPayload(record,
77-
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty())
78-
: SpillableMapUtils.convertToHoodieRecordPayload(record,
79-
payloadClass, preCombineField, scanner.isWithOperationField(), scanner.getPartitionName());
49+
return new HoodieFileSliceReader(scanner.iterator());
8050
}
8151

8252
private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hudi.common.config.ConfigGroups;
2323
import org.apache.hudi.common.config.ConfigProperty;
2424
import org.apache.hudi.common.config.HoodieConfig;
25-
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
2625
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
2726
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
2827
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -120,12 +119,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
120119
+ "compaction during each compaction run. By default. Hudi picks the log file "
121120
+ "with most accumulated unmerged data");
122121

123-
public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
124-
.key("hoodie.compaction.merge.class")
125-
.defaultValue(HoodieAvroRecordMerge.class.getName())
126-
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
127-
+ "types, such as Spark records or Flink records.");
128-
129122
public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
130123
.key("hoodie.compaction.lazy.block.read")
131124
.defaultValue("true")
@@ -359,11 +352,6 @@ public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
359352
return this;
360353
}
361354

362-
public Builder withMergeClass(String mergeClass) {
363-
compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass);
364-
return this;
365-
}
366-
367355
public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
368356
compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
369357
return this;

0 commit comments

Comments
 (0)