diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index aeb546b0ca5c6..9351ccf178075 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -18,7 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; @@ -33,6 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -108,22 +108,28 @@ protected void commit(String instantTime, Map preppedRecordList = HoodieList.getList(preppedRecords); try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) { - if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { + if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) { // if this is a new commit being applied to metadata for the first time writeClient.startCommitWithTime(instantTime); metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); } else { - // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. - // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. - // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes - // are upserts to metadata table and so only a new delta commit will be created. - // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is - // already part of completed commit. So, we have to manually remove the completed instant and proceed. - // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. - HoodieInstant alreadyCompletedInstant = - metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get(); - HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant); - metadataMetaClient.reloadActiveTimeline(); + Option alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant(); + if (alreadyCompletedInstant.isPresent()) { + // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. + // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. + // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes + // are upserts to metadata table and so only a new delta commit will be created. + // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is + // already part of completed commit. So, we have to manually remove the completed instant and proceed. + // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. + HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get()); + metadataMetaClient.reloadActiveTimeline(); + } + // If the alreadyCompletedInstant is empty, that means there is a requested or inflight + // instant with the same instant time. This happens for data table clean action which + // reuses the same instant time without rollback first. It is a no-op here as the + // clean plan is the same, so we don't need to delete the requested and inflight instant + // files in the active timeline. } List statuses = preppedRecordList.size() > 0 diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 80b94edf7ecd6..d0173f984a2f0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -136,21 +136,29 @@ protected void commit(String instantTime, Map entry.getTimestamp().equals(instantTime)).lastInstant().get(); - HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant); - metadataMetaClient.reloadActiveTimeline(); + Option alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant(); + if (alreadyCompletedInstant.isPresent()) { + // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. + // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. + // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes + // are upserts to metadata table and so only a new delta commit will be created. + // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is + // already part of completed commit. So, we have to manually remove the completed instant and proceed. + // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. + HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get()); + metadataMetaClient.reloadActiveTimeline(); + } + // If the alreadyCompletedInstant is empty, that means there is a requested or inflight + // instant with the same instant time. This happens for data table clean action which + // reuses the same instant time without rollback first. It is a no-op here as the + // clean plan is the same, so we don't need to delete the requested and inflight instant + // files in the active timeline. } + List statuses = writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect(); statuses.forEach(writeStatus -> { if (writeStatus.hasErrors()) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 56cfe959bbabf..dd8a83b219151 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -75,6 +75,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.clean.CleanPlanner; @@ -627,19 +628,24 @@ private void testFailedInsertAndCleanByCommits( * @param config HoodieWriteConfig */ protected List runCleaner(HoodieWriteConfig config) throws IOException { - return runCleaner(config, false, 1, false); + return runCleaner(config, false, false, 1, false); } protected List runCleanerWithInstantFormat(HoodieWriteConfig config, boolean needInstantInHudiFormat) throws IOException { - return runCleaner(config, false, 1, needInstantInHudiFormat); + return runCleaner(config, false, false, 1, needInstantInHudiFormat); } protected List runCleaner(HoodieWriteConfig config, int firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { - return runCleaner(config, false, firstCommitSequence, needInstantInHudiFormat); + return runCleaner(config, false, false, firstCommitSequence, needInstantInHudiFormat); } protected List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { - return runCleaner(config, simulateRetryFailure, 1, false); + return runCleaner(config, simulateRetryFailure, false, 1, false); + } + + protected List runCleaner( + HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure) throws IOException { + return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 1, false); } /** @@ -647,7 +653,9 @@ protected List runCleaner(HoodieWriteConfig config, boolean sim * * @param config HoodieWriteConfig */ - protected List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure, Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { + protected List runCleaner( + HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure, + Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { SparkRDDWriteClient writeClient = getHoodieWriteClient(config); String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); @@ -670,6 +678,17 @@ protected List runCleaner(HoodieWriteConfig config, boolean sim }); }); metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant); + + if (config.isMetadataTableEnabled() && simulateMetadataFailure) { + // Simulate the failure of corresponding instant in the metadata table + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() + .setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())) + .setConf(metaClient.getHadoopConf()) + .build(); + HoodieInstant deltaCommit = new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs); + metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit); + } + // retry clean operation again writeClient.clean(); final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant); @@ -1215,12 +1234,80 @@ public void testCleanPreviousCorruptedCleanFiles() throws IOException { assertEquals(0, cleanStats.size(), "Must not clean any files"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRerunFailedClean(boolean simulateMetadataFailure) throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) + .build(); + + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); + HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + + // make 1 commit, with 1 file per partition + String file1P0C0 = UUID.randomUUID().toString(); + String file1P1C0 = UUID.randomUUID().toString(); + testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); + + HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", + Collections.unmodifiableMap(new HashMap>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0)); + put(p1, CollectionUtils.createImmutableList(file1P1C0)); + } + }) + ); + metadataWriter.update(commitMetadata, "00000000000001", false); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + // make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1 + // notice that clustering generates empty inflight commit files + Map partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0); + String file2P0C1 = partitionAndFileId002.get(p0); + Pair replaceMetadata = + generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, file2P0C1); + testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); + + // make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0 + // notice that clustering generates empty inflight commit files + Map partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1); + String file3P1C2 = partitionAndFileId003.get(p1); + replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, file1P1C0, file3P1C2); + testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); + + // make next replacecommit, with 1 clustering operation. Replace data in p0 again + // notice that clustering generates empty inflight commit files + Map partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0); + String file4P0C3 = partitionAndFileId004.get(p0); + replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, file2P0C1, file4P0C3); + testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); + + // run cleaner with failures + List hoodieCleanStats = runCleaner(config, true, simulateMetadataFailure, 5, true); + assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + //file1P1C0 still stays because its not replaced until 3 and its the only version available + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + } + /** * Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective. * - * @param insertFn Insert API to be tested + * @param insertFn Insert API to be tested * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) * @throws Exception in case of errors */ private void testInsertAndCleanFailedWritesByVersions( diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index f44d67e83398b..60367f8d017d7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -42,6 +42,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.TestCleaner; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -65,9 +66,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertNull; /** * Tests covering different clean plan policies/strategies. @@ -93,11 +94,12 @@ public void testInvalidCleaningTriggerStrategy() { private static Stream argumentsForTestKeepLatestCommits() { return Stream.of( - Arguments.of(false, false, false), - Arguments.of(true, false, false), - Arguments.of(false, true, false), - Arguments.of(false, false, true) - ); + Arguments.of(false, false, false, false), + Arguments.of(true, false, false, false), + Arguments.of(true, true, false, false), + Arguments.of(false, false, true, false), + Arguments.of(false, false, false, true) + ); } /** @@ -105,17 +107,19 @@ private static Stream argumentsForTestKeepLatestCommits() { */ @ParameterizedTest @MethodSource("argumentsForTestKeepLatestCommits") - public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { + public void testKeepLatestCommits( + boolean simulateFailureRetry, boolean simulateMetadataFailure, + boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withIncrementalCleaningMode(enableIncrementalClean) - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) - .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) - .retainCommits(2) - .withMaxCommitsBeforeCleaning(2).build()) - .build(); + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withIncrementalCleaningMode(enableIncrementalClean) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(2) + .withMaxCommitsBeforeCleaning(2).build()) + .build(); HoodieTestTable testTable = HoodieTestTable.of(metaClient); String p0 = "2020/01/01"; @@ -130,20 +134,21 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", - Collections.unmodifiableMap(new HashMap>() { - { - put(p0, CollectionUtils.createImmutableList(file1P0C0)); - put(p1, CollectionUtils.createImmutableList(file1P1C0)); - } - }) - ); + Collections.unmodifiableMap(new HashMap>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0)); + put(p1, CollectionUtils.createImmutableList(file1P1C0)); + } + }) + ); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); metaClient = HoodieTableMetaClient.reload(metaClient); - List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry, 2, true); + List hoodieCleanStatsOne = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2, true); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); @@ -160,9 +165,10 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn } }); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry, 4, true); + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + List hoodieCleanStatsTwo = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4, true); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1)); @@ -171,40 +177,42 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn // make next commit, with 2 updates to existing files, and 1 insert String file3P0C2 = testTable.addInflightCommit("00000000000005") - .withBaseFilesInPartition(p0, file1P0C0) - .withBaseFilesInPartition(p0, file2P0C1) - .getFileIdsWithBaseFilesInPartitions(p0).get(p0); + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .getFileIdsWithBaseFilesInPartitions(p0).get(p0); commitMetadata = generateCommitMetadata("00000000000003", - CollectionUtils.createImmutableMap( - p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); + CollectionUtils.createImmutableMap( + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000005"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000005"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry, 6, true); + List hoodieCleanStatsThree = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6, true); assertEquals(0, hoodieCleanStatsThree.size(), - "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); + "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); // make next commit, with 2 updates to existing files, and 1 insert String file4P0C3 = testTable.addInflightCommit("00000000000007") - .withBaseFilesInPartition(p0, file1P0C0) - .withBaseFilesInPartition(p0, file2P0C1) - .getFileIdsWithBaseFilesInPartitions(p0).get(p0); + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .getFileIdsWithBaseFilesInPartitions(p0).get(p0); commitMetadata = generateCommitMetadata("00000000000004", - CollectionUtils.createImmutableMap( - p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); + CollectionUtils.createImmutableMap( + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000007"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000007"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry, 8, true); + List hoodieCleanStatsFour = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8, true); // enableBootstrapSourceClean would delete the bootstrap base file as the same time HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, p0); assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size() - + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 - : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); + + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0)); @@ -220,19 +228,20 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn metaClient = HoodieTableMetaClient.reload(metaClient); String file5P0C4 = testTable.addInflightCommit("00000000000009") - .withBaseFilesInPartition(p0, file1P0C0) - .withBaseFilesInPartition(p0, file2P0C1) - .getFileIdsWithBaseFilesInPartitions(p0).get(p0); + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .getFileIdsWithBaseFilesInPartitions(p0).get(p0); commitMetadata = generateCommitMetadata("00000000000009", CollectionUtils.createImmutableMap( - p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file5P0C4))); + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file5P0C4))); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000009"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000009"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry, 10, true); + List hoodieCleanStatsFive = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 10, true); assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files since at least 2 commits are needed from last clean operation before " - + "clean can be scheduled again"); + + "clean can be scheduled again"); assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); @@ -243,13 +252,14 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn // No cleaning on partially written file, with no commit. testTable.forCommit("00000000000011").withBaseFilesInPartition(p0, file3P0C2); commitMetadata = generateCommitMetadata("00000000000011", CollectionUtils.createImmutableMap(p0, - CollectionUtils.createImmutableList(file3P0C2))); + CollectionUtils.createImmutableList(file3P0C2))); metaClient.getActiveTimeline().createNewInstant( - new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011")); + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011")); metaClient.getActiveTimeline().transitionRequestedToInflight( - new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsFive2 = runCleaner(config, simulateFailureRetry, 12, true); + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + List hoodieCleanStatsFive2 = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 12, true); HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0); assertNull(cleanStat, "Must not clean any files"); assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2)); @@ -454,15 +464,17 @@ public void testKeepLatestCommitsMOR() throws Exception { */ @ParameterizedTest @MethodSource("argumentsForTestKeepLatestCommits") - public void testKeepXHoursWithCleaning(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { + public void testKeepXHoursWithCleaning( + boolean simulateFailureRetry, boolean simulateMetadataFailure, + boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withIncrementalCleaningMode(enableIncrementalClean) - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) - .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2).build()) - .build(); + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withIncrementalCleaningMode(enableIncrementalClean) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2).build()) + .build(); HoodieTestTable testTable = HoodieTestTable.of(metaClient); String p0 = "2020/01/01"; @@ -488,12 +500,13 @@ public void testKeepXHoursWithCleaning(boolean simulateFailureRetry, boolean ena }) ); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, firstCommitTs), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, firstCommitTs), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); metaClient = HoodieTableMetaClient.reload(metaClient); - List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); + List hoodieCleanStatsOne = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertTrue(testTable.baseFileExists(p0, firstCommitTs, file1P0C0)); assertTrue(testTable.baseFileExists(p1, firstCommitTs, file1P1C0)); @@ -512,9 +525,10 @@ public void testKeepXHoursWithCleaning(boolean simulateFailureRetry, boolean ena } }); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, secondCommitTs), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, secondCommitTs), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + List hoodieCleanStatsTwo = + runCleaner(config, simulateFailureRetry, simulateMetadataFailure); assertEquals(2, hoodieCleanStatsTwo.size(), "Should clean one file each from both the partitions"); assertTrue(testTable.baseFileExists(p0, secondCommitTs, file2P0C1)); assertTrue(testTable.baseFileExists(p1, secondCommitTs, file2P1C1));