Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String max
* @param maxInstantTime Max Instant Time
* @return
*/
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime);
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime);

/**
* Stream all the latest file slices, in the given range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,28 @@ static Runnable noop() {
/**
* A function which has not any parameter.
*/
public interface Function0<R> extends Serializable {
interface Function0<R> extends Serializable {
R apply();
}

/**
* A function which contains only one parameter.
*/
public interface Function1<T1, R> extends Serializable {
interface Function1<T1, R> extends Serializable {
R apply(T1 val1);
}

/**
* A function which contains two parameters.
*/
public interface Function2<T1, T2, R> extends Serializable {
interface Function2<T1, T2, R> extends Serializable {
R apply(T1 val1, T2 val2);
}

/**
* A function which contains three parameters.
*/
public interface Function3<T1, T2, T3, R> extends Serializable {
interface Function3<T1, T2, T3, R> extends Serializable {
R apply(T1 val1, T2 val2, T3 val3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ protected void loadRecords(String partitionPath) throws Exception {
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();

List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true)
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.collect(toList());

for (FileSlice fileSlice : fileSlices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ public void testIndexStateBootstrap() throws Exception {

// reset the config option
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
validateIndexLoaded();
}

protected void validateIndexLoaded() throws Exception {
preparePipeline(conf)
.consume(TestData.DATA_SET_UPDATE_INSERT)
.checkIndexLoaded(
Expand Down Expand Up @@ -418,7 +422,7 @@ private TestHarness preparePipeline() throws Exception {
return TestHarness.instance().preparePipeline(tempFile, conf);
}

private TestHarness preparePipeline(Configuration conf) throws Exception {
protected TestHarness preparePipeline(Configuration conf) throws Exception {
return TestHarness.instance().preparePipeline(tempFile, conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestData;

import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -36,6 +38,27 @@ protected void setUp(Configuration conf) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}

@Test
public void testIndexStateBootstrapWithCompactionScheduled() throws Exception {
// sets up the delta commits as 1 to generate a new compaction plan.
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
// open the function and ingest data
preparePipeline(conf)
.consume(TestData.DATA_SET_INSERT)
.assertEmptyDataFiles()
.checkpoint(1)
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED1, 4)
.end();

// reset config options
conf.removeConfig(FlinkOptions.COMPACTION_DELTA_COMMITS);
// sets up index bootstrap
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
validateIndexLoaded();
}

@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.testutils.HoodieTestUtils;
Expand Down Expand Up @@ -641,10 +642,11 @@ public static void checkWrittenDataMOR(
File[] dataFiles = partitionDir.listFiles(file ->
file.getName().contains(".log.") && !file.getName().startsWith(".."));
assertNotNull(dataFiles);
HoodieMergedLogRecordScanner scanner = getScanner(
fs, baseFile.getPath(), Arrays.stream(dataFiles).map(File::getAbsolutePath)
.sorted(Comparator.naturalOrder()).collect(Collectors.toList()),
schema, latestInstant);
List<String> logPaths = Arrays.stream(dataFiles)
.sorted((f1, f2) -> HoodieLogFile.getLogFileComparator()
.compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath())))
.map(File::getAbsolutePath).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant);
List<String> readBuffer = scanner.getRecords().values().stream()
.map(hoodieRecord -> {
try {
Expand Down