Skip to content

Commit e3dabbc

Browse files
authored
[HUDI-4298] Mor table reading for base and log files lost sequence of events (#6286)
* [HUDI-4298] Mor table reading for base and log files lost sequence of events Signed-off-by: HunterXHunter <1356469429@qq.com>
1 parent 8b35013 commit e3dabbc

3 files changed

Lines changed: 47 additions & 9 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hudi.util.DataTypeUtils;
3939
import org.apache.hudi.util.RowDataProjection;
4040
import org.apache.hudi.util.RowDataToAvroConverters;
41+
import org.apache.hudi.util.StreamerUtil;
4142
import org.apache.hudi.util.StringToRowDataConverter;
4243

4344
import org.apache.avro.Schema;
@@ -63,6 +64,7 @@
6364
import java.util.Iterator;
6465
import java.util.LinkedHashMap;
6566
import java.util.List;
67+
import java.util.Properties;
6668
import java.util.Set;
6769
import java.util.stream.IntStream;
6870

@@ -634,10 +636,12 @@ static class MergeIterator implements RecordIterator {
634636

635637
private final Set<String> keyToSkip = new HashSet<>();
636638

639+
private final Properties payloadProps;
640+
637641
private RowData currentRecord;
638642

639643
MergeIterator(
640-
Configuration finkConf,
644+
Configuration flinkConf,
641645
org.apache.hadoop.conf.Configuration hadoopConf,
642646
MergeOnReadInputSplit split,
643647
RowType tableRowType,
@@ -650,7 +654,8 @@ static class MergeIterator implements RecordIterator {
650654
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
651655
this.tableSchema = tableSchema;
652656
this.reader = reader;
653-
this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf);
657+
this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, hadoopConf);
658+
this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
654659
this.logKeysIterator = scanner.getRecords().keySet().iterator();
655660
this.requiredSchema = requiredSchema;
656661
this.requiredPos = requiredPos;
@@ -751,7 +756,7 @@ private Option<IndexedRecord> mergeRowWithLog(
751756
String curKey) throws IOException {
752757
final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
753758
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
754-
return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
759+
return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps);
755760
}
756761
}
757762

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
228228
.withClientNumRetries(30)
229229
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
230230
.build())
231-
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
232-
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
233-
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
234-
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
235-
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
236-
.build())
231+
.withPayloadConfig(getPayloadConfig(conf))
237232
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
238233
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
239234
.withAutoCommit(false)
@@ -251,6 +246,18 @@ public static HoodieWriteConfig getHoodieClientConfig(
251246
return writeConfig;
252247
}
253248

249+
/**
250+
* Returns the payload config with given configuration.
251+
*/
252+
public static HoodiePayloadConfig getPayloadConfig(Configuration conf) {
253+
return HoodiePayloadConfig.newBuilder()
254+
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
255+
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
256+
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
257+
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
258+
.build();
259+
}
260+
254261
/**
255262
* Converts the give {@link Configuration} to {@link TypedProperties}.
256263
* The default values are also set up.

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hudi.client.HoodieFlinkWriteClient;
2222
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
23+
import org.apache.hudi.common.model.EventTimeAvroPayload;
2324
import org.apache.hudi.common.model.HoodieTableType;
2425
import org.apache.hudi.common.table.HoodieTableMetaClient;
2526
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -473,6 +474,31 @@ void testReadIncrementally(HoodieTableType tableType) throws Exception {
473474
TestData.assertRowDataEquals(actual6, Collections.emptyList());
474475
}
475476

477+
@Test
478+
void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception {
479+
Map<String, String> options = new HashMap<>();
480+
options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName());
481+
beforeEach(HoodieTableType.MERGE_ON_READ, options);
482+
483+
// write base file first with compaction.
484+
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
485+
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
486+
TestData.writeData(TestData.DATA_SET_DISORDER_INSERT, conf);
487+
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
488+
final String baseResult = TestData.rowDataToString(readData(inputFormat));
489+
String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
490+
assertThat(baseResult, is(expected));
491+
492+
// write another commit using logs and read again.
493+
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
494+
TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
495+
this.tableSource.reset();
496+
inputFormat = this.tableSource.getInputFormat();
497+
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
498+
final String baseMergeLogFileResult = TestData.rowDataToString(readData(inputFormat));
499+
assertThat(baseMergeLogFileResult, is(expected));
500+
}
501+
476502
@Test
477503
void testReadArchivedCommitsIncrementally() throws Exception {
478504
Map<String, String> options = new HashMap<>();

0 commit comments

Comments
 (0)