diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 7330286734a08..c32e2cabb1012 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -124,7 +124,7 @@ Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String max * @param maxInstantTime Max Instant Time * @return */ - public Stream getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime); + Stream getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime); /** * Stream all the latest file slices, in the given range. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java index 0b82f091402a0..728ac717e4cd5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java @@ -33,28 +33,28 @@ static Runnable noop() { /** * A function which has not any parameter. */ - public interface Function0 extends Serializable { + interface Function0 extends Serializable { R apply(); } /** * A function which contains only one parameter. */ - public interface Function1 extends Serializable { + interface Function1 extends Serializable { R apply(T1 val1); } /** * A function which contains two parameters. */ - public interface Function2 extends Serializable { + interface Function2 extends Serializable { R apply(T1 val1, T2 val2); } /** * A function which contains three parameters. */ - public interface Function3 extends Serializable { + interface Function3 extends Serializable { R apply(T1 val1, T2 val2, T3 val3); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 1fc8d393be6a9..5e317a8b041d7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -198,7 +198,7 @@ protected void loadRecords(String partitionPath) throws Exception { Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema(); List fileSlices = this.hoodieTable.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true) + .getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) .collect(toList()); for (FileSlice fileSlice : fileSlices) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 4771a7a3455b0..403d0272b4e18 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -352,6 +352,10 @@ public void testIndexStateBootstrap() throws Exception { // reset the config option conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); + validateIndexLoaded(); + } + + protected void validateIndexLoaded() throws Exception { preparePipeline(conf) .consume(TestData.DATA_SET_UPDATE_INSERT) .checkIndexLoaded( @@ -418,7 +422,7 @@ private TestHarness preparePipeline() throws Exception { return TestHarness.instance().preparePipeline(tempFile, conf); } - private TestHarness preparePipeline(Configuration conf) throws Exception { + protected TestHarness preparePipeline(Configuration conf) throws Exception { return TestHarness.instance().preparePipeline(tempFile, conf); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index a35a0ac8d0b88..f2c0500f9555c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -20,8 +20,10 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; @@ -36,6 +38,27 @@ protected void setUp(Configuration conf) { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); } + @Test + public void testIndexStateBootstrapWithCompactionScheduled() throws Exception { + // sets up the delta commits as 1 to generate a new compaction plan. + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + // open the function and ingest data + preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED1, 4) + .end(); + + // reset config options + conf.removeConfig(FlinkOptions.COMPACTION_DELTA_COMMITS); + // sets up index bootstrap + conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); + validateIndexLoaded(); + } + @Override public void testInsertClustering() { // insert clustering is only valid for cow table. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 61f1657c2c6ed..c31c2bbadae25 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -641,10 +642,11 @@ public static void checkWrittenDataMOR( File[] dataFiles = partitionDir.listFiles(file -> file.getName().contains(".log.") && !file.getName().startsWith("..")); assertNotNull(dataFiles); - HoodieMergedLogRecordScanner scanner = getScanner( - fs, baseFile.getPath(), Arrays.stream(dataFiles).map(File::getAbsolutePath) - .sorted(Comparator.naturalOrder()).collect(Collectors.toList()), - schema, latestInstant); + List logPaths = Arrays.stream(dataFiles) + .sorted((f1, f2) -> HoodieLogFile.getLogFileComparator() + .compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath()))) + .map(File::getAbsolutePath).collect(Collectors.toList()); + HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant); List readBuffer = scanner.getRecords().values().stream() .map(hoodieRecord -> { try {