diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 39bb3a2a5da27..040347c6c37d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -717,6 +717,33 @@ public final Stream getLatestMergedFileSlicesBeforeOrOn(String partit } } + /** + * Stream all "merged" file-slices before on an instant time + * for a MERGE_ON_READ table with index that can index log files(which means it writes pure logs first). + * + *

In streaming read scenario, in order for better reading efficiency, the user can choose to skip the + * base files that are produced by compaction. That is to say, we allow the users to consumer only from + * these partitioned log files, these log files keep the record sequence just like the normal message queue. + * + *

NOTE: only local view is supported. + * + * @param partitionStr Partition Path + * @param maxInstantTime Max Instant Time + */ + public final Stream getAllLogsMergedFileSliceBeforeOrOn(String partitionStr, String maxInstantTime) { + try { + readLock.lock(); + String partition = formatPartitionKey(partitionStr); + ensurePartitionLoadedCorrectly(partition); + return fetchAllStoredFileGroups(partition) + .filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxInstantTime)) + .map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup, maxInstantTime)) + .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent); + } finally { + readLock.unlock(); + } + } + @Override public final Stream getLatestFileSliceInRange(List commitsToReturn) { try { @@ -1080,6 +1107,29 @@ private FileSlice fetchMergedFileSlice(HoodieFileGroup fileGroup, FileSlice file return fileSlice; } + /** + * Returns the file slice with all the file slice log files merged. + * + * @param fileGroup File Group for which the file slice belongs to + * @param maxInstantTime The max instant time + */ + private Option fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup, String maxInstantTime) { + List fileSlices = fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList()); + if (fileSlices.size() == 0) { + return Option.empty(); + } + if (fileSlices.size() == 1) { + return Option.of(fileSlices.get(0)); + } + final FileSlice latestSlice = fileSlices.get(0); + FileSlice merged = new FileSlice(latestSlice.getPartitionPath(), latestSlice.getBaseInstantTime(), + latestSlice.getFileId()); + + // add log files from the latest slice to the earliest + fileSlices.forEach(slice -> slice.getLogFiles().forEach(merged::addLogFile)); + return Option.of(merged); + } + /** * Default implementation for fetching latest base-file. * diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 2dd86d652869f..be558a1525411 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -216,7 +217,7 @@ public Result inputSplits( : instants.get(instants.size() - 1).getTimestamp(); List inputSplits = getInputSplits(metaClient, commitTimeline, - fileStatuses, readPartitions, endInstant, instantRange); + fileStatuses, readPartitions, endInstant, instantRange, false); return Result.instance(inputSplits, endInstant); } @@ -311,7 +312,7 @@ public Result inputSplits( final String endInstant = instantToIssue.getTimestamp(); List inputSplits = getInputSplits(metaClient, commitTimeline, - fileStatuses, readPartitions, endInstant, instantRange); + fileStatuses, readPartitions, endInstant, instantRange, skipCompaction); return Result.instance(inputSplits, endInstant); } @@ -322,12 +323,13 @@ private List getInputSplits( FileStatus[] fileStatuses, Set readPartitions, String endInstant, - InstantRange instantRange) { + InstantRange instantRange, + boolean skipBaseFiles) { final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses); final AtomicInteger cnt = new AtomicInteger(0); final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); return readPartitions.stream() - .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant) + .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, endInstant, skipBaseFiles) .map(fileSlice -> { Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) @@ -342,6 +344,15 @@ private List getInputSplits( .collect(Collectors.toList()); } + private static Stream getFileSlices( + HoodieTableFileSystemView fsView, + String relPartitionPath, + String endInstant, + boolean skipBaseFiles) { + return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant) + : fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant); + } + private FileIndex getFileIndex() { FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType); if (this.requiredPartitions != null) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index b76905ed8af07..2b1e35c8293a9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -26,14 +26,17 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.source.IncrementalInputSplits; import org.apache.hudi.table.HoodieTableSource; import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestUtils; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.configuration.Configuration; @@ -49,8 +52,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -58,6 +63,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; /** * Test cases for MergeOnReadInputFormat and ParquetInputFormat. @@ -310,6 +316,70 @@ void testReadWithDeletesCOW() throws Exception { assertThat(actual, is(expected)); } + @Test + void testReadSkipCompaction() throws Exception { + beforeEach(HoodieTableType.MERGE_ON_READ); + + org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf); + + // write base first with compaction + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(true); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder() + .rowType(TestConfigurations.ROW_TYPE) + .conf(conf) + .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2())) + .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", "par3", "par4"))) + .skipCompaction(true) + .build(); + + // default read the latest commit + // the compaction base files are skipped + IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null); + assertFalse(splits1.isEmpty()); + List result1 = readData(inputFormat, splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0])); + + String actual1 = TestData.rowDataToString(result1); + String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT); + assertThat(actual1, is(expected1)); + + // write another commit using logs and read again + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); + + // read from the compaction commit + String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, false); + conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit); + + IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null); + assertFalse(splits2.isEmpty()); + List result2 = readData(inputFormat, splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0])); + String actual2 = TestData.rowDataToString(result2); + String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT); + assertThat(actual2, is(expected2)); + + // write another commit using logs with separate partition + // so the file group has only logs + TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf); + + // refresh the input format + this.tableSource.reset(); + inputFormat = this.tableSource.getInputFormat(true); + + IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null); + assertFalse(splits3.isEmpty()); + List result3 = readData(inputFormat, splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0])); + String actual3 = TestData.rowDataToString(result3); + String expected3 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT); + assertThat(actual3, is(expected3)); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { @@ -632,10 +702,14 @@ private HoodieTableSource getTableSource(Configuration conf) { conf); } - @SuppressWarnings("unchecked, rawtypes") + @SuppressWarnings("rawtypes") private static List readData(InputFormat inputFormat) throws IOException { InputSplit[] inputSplits = inputFormat.createInputSplits(1); + return readData(inputFormat, inputSplits); + } + @SuppressWarnings("unchecked, rawtypes") + private static List readData(InputFormat inputFormat, InputSplit[] inputSplits) throws IOException { List result = new ArrayList<>(); for (InputSplit inputSplit : inputSplits) {