diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index fb703e37af32b..9bad2e3486e7f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -957,8 +957,16 @@ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieT return inflightTimelineExcludeClusteringCommit; } - private Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) { - return getPendingRollbackInfos(metaClient).getOrDefault(commitToRollback, Option.empty()); + protected Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) { + return getPendingRollbackInfo(metaClient, commitToRollback, true); + } + + protected Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) { + return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty()); + } + + protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient) { + return getPendingRollbackInfos(metaClient, true); } /** @@ -966,20 +974,25 @@ private Option getPendingRollbackInfo(HoodieTableMeta * @param metaClient instance of {@link HoodieTableMetaClient} to use. * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair. */ - protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient) { + protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) { List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList()); Map> infoMap = new HashMap<>(); for (HoodieInstant instant : instants) { try { HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant); String action = rollbackPlan.getInstantToRollback().getAction(); - if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) { - boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action) - && ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent(); - if (!isClustering) { - String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime(); - infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan))); + if (ignoreCompactionAndClusteringInstants) { + if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) { + boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action) + && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(), + rollbackPlan.getInstantToRollback().getCommitTime())).isPresent(); + if (!isClustering) { + String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime(); + infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan))); + } } + } else { + infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan))); } } catch (IOException e) { LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e); @@ -1211,7 +1224,8 @@ protected Option inlineScheduleClustering(Option> ex } protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { - String commitTime = HoodieActiveTimeline.createNewInstantTime(); + Option pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false); + String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); table.rollback(context, commitTime, inflightInstant, false, false); table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index bb4ae962038fe..e5262ad6bb9f2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -28,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -89,6 +90,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -478,7 +480,7 @@ public abstract Option scheduleRollback(HoodieEngineContext String instantTime, HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers); - + /** * Rollback the (inflight/committed) record changes with the given commit time. *
@@ -519,14 +521,19 @@ public abstract Option scheduleRestore(HoodieEngineContext co
                                                     String restoreInstantTime,
                                                     String instantToRestore);
 
+  public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
+    rollbackInflightCompaction(inflightInstant, s -> Option.empty());
+  }
+
   /**
    * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
    * to the .requested file.
    *
    * @param inflightInstant Inflight Compaction Instant
    */
-  public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
-    String commitTime = HoodieActiveTimeline.createNewInstantTime();
+  public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function> getPendingRollbackInstantFunc) {
+    final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
+        -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
     scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
     rollback(context, commitTime, inflightInstant, false, false);
     getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 7e142d89c4b87..ec1ecd6a0c291 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -325,7 +325,7 @@ protected HoodieWriteMetadata> compact(String compactionIns
     HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant);
+      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
       table.getMetaClient().reloadActiveTimeline();
     }
     compactionTimer = metrics.getCompactionCtx();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 6ab33b422d0c1..6e67bd69bdfd7 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -56,6 +56,8 @@
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
+import org.apache.hudi.common.testutils.ClusteringTestUtils;
+import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -1438,9 +1440,9 @@ public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields, bo
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
   }
 
-  @ParameterizedTest
-  @MethodSource("populateMetaFieldsParams")
-  public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception {
+  @Test
+  public void testPendingClusteringRollback() throws Exception {
+    boolean populateMetaFields = true;
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
@@ -1467,6 +1469,33 @@ public void testPendingClusteringRollback(boolean populateMetaFields) throws Exc
     metaClient.reloadActiveTimeline();
     // verify there are no pending clustering instants
     assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
+
+    // delete rollback.completed instant to mimic failed rollback of clustering. and then trigger rollback of clustering again. same rollback instant should be used.
+    HoodieInstant rollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
+    FileCreateUtils.deleteRollbackCommit(metaClient.getBasePath(), rollbackInstant.getTimestamp());
+    metaClient.reloadActiveTimeline();
+
+    // create replace commit requested meta file so that rollback will not throw FileNotFoundException
+    // create file slice with instantTime 001 and build clustering plan including this created 001 file slice.
+    HoodieClusteringPlan clusteringPlan = ClusteringTestUtils.createClusteringPlan(metaClient, pendingClusteringInstant.getTimestamp(), "1");
+    // create requested replace commit
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
+        .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
+
+    FileCreateUtils.createRequestedReplaceCommit(metaClient.getBasePath(), pendingClusteringInstant.getTimestamp(), Option.of(requestedReplaceMetadata));
+
+    // trigger clustering again. no new rollback instants should be generated.
+    try {
+      client.cluster(pendingClusteringInstant.getTimestamp(), false);
+      // new replace commit metadata generated is fake one. so, clustering will fail. but the intention of test is ot check for duplicate rollback instants.
+    } catch (Exception e) {
+      //ignore.
+    }
+
+    metaClient.reloadActiveTimeline();
+    // verify that there is no new rollback instant generated
+    HoodieInstant newRollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
+    assertEquals(rollbackInstant.getTimestamp(), newRollbackInstant.getTimestamp());
   }
 
   @ParameterizedTest
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 0c3434433aeee..73b1da95648e2 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -35,6 +35,7 @@
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -180,6 +181,61 @@ public void testInlineScheduleCompaction(boolean scheduleInlineCompaction) throw
     }
   }
 
+  @Test
+  public void testRepeatedRollbackOfCompaction() throws Exception {
+    boolean scheduleInlineCompaction = false;
+    HoodieFileFormat fileFormat = HoodieFileFormat.PARQUET;
+    Properties properties = new Properties();
+    properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.toString());
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
+
+    HoodieWriteConfig cfg = getConfigBuilder(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+
+      HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+      /*
+       * Write 1 (only inserts)
+       */
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List records = dataGen.generateInserts(newCommitTime, 200);
+      Stream dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime, true);
+      assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
+
+      /*
+       * Write 2 (updates)
+       */
+      newCommitTime = "004";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 100);
+      updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, true);
+
+      Option compactionInstant = client.scheduleCompaction(Option.empty());
+      client.compact(compactionInstant.get());
+
+      // trigger compaction again.
+      client.compact(compactionInstant.get());
+
+      metaClient.reloadActiveTimeline();
+      // verify that there is no new rollback instant generated
+      HoodieInstant rollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
+      FileCreateUtils.deleteRollbackCommit(metaClient.getBasePath().substring(metaClient.getBasePath().indexOf(":") + 1),
+          rollbackInstant.getTimestamp());
+      metaClient.reloadActiveTimeline();
+      SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+      // trigger compaction again.
+      client1.compact(compactionInstant.get());
+      metaClient.reloadActiveTimeline();
+      // verify that there is no new rollback instant generated
+      HoodieInstant newRollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
+      assertEquals(rollbackInstant.getTimestamp(), newRollbackInstant.getTimestamp());
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception {