Skip to content

Commit f797c76

Browse files
author
wangzixuan.wzxuan
committed
fix expanse
1 parent 91406b4 commit f797c76

File tree

7 files changed

+57
-37
lines changed

7 files changed

+57
-37
lines changed

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -176,21 +176,12 @@ public HoodieRecord expansion(Schema schema, Properties prop, Map<String, Object
176176
boolean withOperationField = Boolean.parseBoolean(mapperConfig.get(WITH_OPERATION_FIELD).toString());
177177
boolean populateMetaFields = Boolean.parseBoolean(mapperConfig.getOrDefault(MapperUtils.POPULATE_META_FIELDS, false).toString());
178178
Option<String> partitionName = unsafeCast(mapperConfig.getOrDefault(PARTITION_NAME, Option.empty()));
179-
if (payloadClass == null && preCombineField == null && !keyGen.isPresent()) {
180-
// Support JavaExecutionStrategy
181-
GenericRecord record = (GenericRecord) data;
182-
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
183-
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
184-
HoodieKey hoodieKey = new HoodieKey(key, partition);
185-
186-
HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
187-
HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, avroPayload);
188-
return hoodieRecord;
189-
} else if (populateMetaFields) {
179+
if (populateMetaFields) {
190180
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data,
191181
payloadClass, preCombineField, withOperationField);
192182
// Support HoodieFileSliceReader
193183
} else if (keyGen.isPresent()) {
184+
// TODO in HoodieFileSliceReader may partitionName=option#empty
194185
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) data,
195186
payloadClass, preCombineField, keyGen.get(), withOperationField, partitionName);
196187
} else {
@@ -200,10 +191,10 @@ public HoodieRecord expansion(Schema schema, Properties prop, Map<String, Object
200191
}
201192

202193
@Override
203-
public HoodieRecord transform(Schema schema, Properties prop) {
194+
public HoodieRecord transform(Schema schema, Properties prop, boolean useKeyGen) {
204195
GenericRecord record = (GenericRecord) data;
205196
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
206-
if (!Boolean.parseBoolean(prop.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
197+
if (useKeyGen && !Boolean.parseBoolean(prop.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
207198
try {
208199
Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory");
209200
Method createKeyGenerator = clazz.getMethod("createKeyGenerator", TypedProperties.class);

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public HoodieRecord expansion(Schema schema, Properties prop, Map<String, Object
176176
}
177177

178178
@Override
179-
public HoodieRecord transform(Schema schema, Properties prop) {
179+
public HoodieRecord transform(Schema schema, Properties prop, boolean useKeygen) {
180180
throw new UnsupportedOperationException();
181181
}
182182

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ public void checkState() {
335335
/**
336336
* This method used in ClusteringExecutionStrategy.
337337
*/
338-
public abstract HoodieRecord transform(Schema schema, Properties prop);
338+
public abstract HoodieRecord transform(Schema schema, Properties prop, boolean useKeyGen);
339339

340340
public abstract Option<IndexedRecord> toIndexedRecord(Schema schema, Properties prop) throws IOException;
341341

hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.Properties;
6363
import java.util.Set;
6464
import java.util.concurrent.atomic.AtomicLong;
65+
import java.util.function.Function;
6566
import java.util.stream.Collectors;
6667

6768
import static org.apache.hudi.TypeUtils.unsafeCast;
@@ -90,7 +91,7 @@ public abstract class AbstractHoodieLogRecordReader {
9091
// Latest valid instant time
9192
// Log-Blocks belonging to inflight delta-instants are filtered-out using this high-watermark.
9293
private final String latestInstantTime;
93-
private final HoodieTableMetaClient hoodieTableMetaClient;
94+
protected final HoodieTableMetaClient hoodieTableMetaClient;
9495
// Merge strategy to use when combining records from log
9596
private final String payloadClassFQN;
9697
// preCombine field
@@ -382,20 +383,36 @@ private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
382383
* handle it.
383384
*/
384385
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
385-
Map<String, Object> mapperConfig = MapperUtils.buildMapperConfig(this.payloadClassFQN, this.preCombineField, this.simpleKeyGenFields, this.withOperationField, this.partitionName);
386-
try (ClosableIterator<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt, recordType, mapperConfig)) {
387-
Option<Schema> schemaOption = getMergedSchema(dataBlock);
388-
Schema finalReadSchema = ((MappingIterator) recordIterator).getSchema();
389-
while (recordIterator.hasNext()) {
390-
HoodieRecord<?> currentRecord = recordIterator.next();
391-
HoodieRecord<?> record = schemaOption.isPresent()
392-
? currentRecord.rewriteRecordWithNewSchema(finalReadSchema, new Properties(), schemaOption.get(), new HashMap<>()) : currentRecord;
393-
processNextRecord(record);
394-
totalLogRecords.incrementAndGet();
386+
Map<String, Object> mapperConfig = MapperUtils.buildMapperConfig(this.payloadClassFQN, this.preCombineField, this.simpleKeyGenFields, this.withOperationField,
387+
this.partitionName, getPopulateMetaFields());
388+
389+
Option<Schema> schemaOption = getMergedSchema(dataBlock);
390+
if (schemaOption.isPresent()) {
391+
try (ClosableIterator<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt, recordType)) {
392+
Schema finalReadSchema = ((MappingIterator) recordIterator).getSchema();
393+
while (recordIterator.hasNext()) {
394+
HoodieRecord currentRecord = recordIterator.next();
395+
HoodieRecord record = currentRecord.rewriteRecordWithNewSchema(finalReadSchema, new Properties(), schemaOption.get(), new HashMap<>())
396+
.expansion(schemaOption.get(), new Properties(), mapperConfig);
397+
processNextRecord(record);
398+
totalLogRecords.incrementAndGet();
399+
}
400+
}
401+
} else {
402+
try (ClosableIterator<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt, recordType, mapperConfig)) {
403+
while (recordIterator.hasNext()) {
404+
HoodieRecord currentRecord = recordIterator.next();
405+
processNextRecord(currentRecord);
406+
totalLogRecords.incrementAndGet();
407+
}
395408
}
396409
}
397410
}
398411

412+
protected boolean getPopulateMetaFields() {
413+
return this.populateMetaFields;
414+
}
415+
399416
/**
400417
* Get final Read Schema for support evolution.
401418
* step1: find the fileSchema for current dataBlock.
@@ -486,14 +503,22 @@ private ClosableIterator<HoodieRecord> getRecordsIterator(HoodieDataBlock dataBl
486503
finalReadSchema = dataBlock.getSchema();
487504
}
488505

489-
return new MappingIterator<>(iter, rec -> {
490-
try {
491-
return rec.expansion(readerSchema, new Properties(), mapperConfig);
492-
} catch (IOException e) {
493-
LOG.error("Error expanse " + rec, e);
494-
throw new HoodieException(e);
495-
}
496-
}, finalReadSchema);
506+
if (mapperConfig == null) {
507+
return new MappingIterator<>(iter, Function.identity(), finalReadSchema);
508+
} else {
509+
return new MappingIterator<>(iter, rec -> {
510+
try {
511+
return rec.expansion(readerSchema, new Properties(), mapperConfig);
512+
} catch (IOException e) {
513+
LOG.error("Error expanse " + rec, e);
514+
throw new HoodieException(e);
515+
}
516+
}, finalReadSchema);
517+
}
518+
}
519+
520+
private ClosableIterator<HoodieRecord> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt, HoodieRecordType type) throws IOException {
521+
return getRecordsIterator(dataBlock, keySpecOpt, type, null);
497522
}
498523

499524
/**

hudi-common/src/main/java/org/apache/hudi/common/util/MapperUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public class MapperUtils {
4848
public static final String WITH_OPERATION_FIELD = "WITH_OPERATION_FIELD";
4949
public static final String PARTITION_NAME = "PARTITION_NAME";
5050
public static final String POPULATE_META_FIELDS = "POPULATE_META_FIELDS";
51-
public static final String RECORD_TYPE = "RECORD_TYPE";
5251

5352
public static Map<String, Object> buildMapperConfig(String payloadClass, String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt, boolean withOperation,
5453
Option<String> partitionName) {

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>
109109
}
110110
}
111111

112+
@Override
113+
protected boolean getPopulateMetaFields() {
114+
return this.hoodieTableMetaClient.getTableConfig().populateMetaFields() && super.getPopulateMetaFields();
115+
}
116+
112117
@Override
113118
protected String getKeyField() {
114119
return HoodieMetadataPayload.KEY_FIELD_NAME;

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkRecord.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,10 @@ public HoodieRecord expansion(Schema schema, Properties prop, Map<String, Object
201201
}
202202

203203
@Override
204-
public HoodieRecord transform(Schema schema, Properties prop) {
204+
public HoodieRecord transform(Schema schema, Properties prop, boolean useKeygen) {
205205
StructType structType = HoodieInternalRowUtils.getCacheSchema(schema);
206206
Option<SparkKeyGeneratorInterface> keyGeneratorOpt = Option.empty();
207-
if (!Boolean.parseBoolean(prop.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
207+
if (useKeygen && !Boolean.parseBoolean(prop.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
208208
try {
209209
Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory");
210210
Method createKeyGenerator = clazz.getMethod("createKeyGenerator", TypedProperties.class);

0 commit comments

Comments
 (0)