Skip to content

Commit 324ebc9

Browse files
committed
Refactor rollback inflight instant for clustering/compaction to reuse some code
1 parent fec49dc commit 324ebc9

File tree

7 files changed

+46
-47
lines changed

7 files changed

+46
-47
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,7 @@ protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMe
10941094
return getPendingRollbackInfo(metaClient, commitToRollback, true);
10951095
}
10961096

1097-
protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
1097+
public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
10981098
return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
10991099
}
11001100

@@ -1375,14 +1375,6 @@ protected Option<String> inlineScheduleClustering(Option<Map<String, String>> ex
13751375
return scheduleClustering(extraMetadata);
13761376
}
13771377

1378-
public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
1379-
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
1380-
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
1381-
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
1382-
table.rollback(context, commitTime, inflightInstant, false, false);
1383-
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
1384-
}
1385-
13861378
/**
13871379
* Finalize Write operation.
13881380
*

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.hudi.client;
2020

21-
import org.apache.hadoop.fs.FileStatus;
22-
import org.apache.hadoop.fs.Path;
2321
import org.apache.hudi.avro.model.HoodieCompactionOperation;
2422
import org.apache.hudi.avro.model.HoodieCompactionPlan;
2523
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -44,6 +42,9 @@
4442
import org.apache.hudi.exception.HoodieException;
4543
import org.apache.hudi.exception.HoodieIOException;
4644
import org.apache.hudi.table.action.compact.OperationResult;
45+
46+
import org.apache.hadoop.fs.FileStatus;
47+
import org.apache.hadoop.fs.Path;
4748
import org.apache.log4j.LogManager;
4849
import org.apache.log4j.Logger;
4950

@@ -172,7 +173,7 @@ public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId, b
172173
Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName());
173174
if (metaClient.getFs().exists(inflightPath)) {
174175
// revert if in inflight state
175-
metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight);
176+
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
176177
}
177178
// Overwrite compaction plan with updated info
178179
metaClient.getActiveTimeline().saveToCompactionRequested(

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818

1919
package org.apache.hudi.table;
2020

21-
import org.apache.avro.Schema;
22-
import org.apache.avro.specific.SpecificRecordBase;
23-
import org.apache.hadoop.conf.Configuration;
24-
import org.apache.hadoop.fs.FileSystem;
25-
import org.apache.hadoop.fs.Path;
2621
import org.apache.hudi.avro.HoodieAvroUtils;
2722
import org.apache.hudi.avro.model.HoodieCleanMetadata;
2823
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -65,6 +60,7 @@
6560
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
6661
import org.apache.hudi.common.util.Functions;
6762
import org.apache.hudi.common.util.Option;
63+
import org.apache.hudi.common.util.ValidationUtils;
6864
import org.apache.hudi.common.util.collection.Pair;
6965
import org.apache.hudi.config.HoodieWriteConfig;
7066
import org.apache.hudi.exception.HoodieException;
@@ -82,6 +78,12 @@
8278
import org.apache.hudi.table.marker.WriteMarkersFactory;
8379
import org.apache.hudi.table.storage.HoodieLayoutFactory;
8480
import org.apache.hudi.table.storage.HoodieStorageLayout;
81+
82+
import org.apache.avro.Schema;
83+
import org.apache.avro.specific.SpecificRecordBase;
84+
import org.apache.hadoop.conf.Configuration;
85+
import org.apache.hadoop.fs.FileSystem;
86+
import org.apache.hadoop.fs.Path;
8587
import org.apache.log4j.LogManager;
8688
import org.apache.log4j.Logger;
8789

@@ -545,12 +547,38 @@ public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
545547
*
546548
* @param inflightInstant Inflight Compaction Instant
547549
*/
548-
public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
550+
public void rollbackInflightCompaction(HoodieInstant inflightInstant,
551+
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
552+
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
553+
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
554+
}
555+
556+
/**
557+
* Rollback inflight clustering instant to requested clustering instant
558+
*
559+
* @param inflightInstant Inflight clustering instant
560+
* @param getPendingRollbackInstantFunc Function to get rollback instant
561+
*/
562+
public void rollbackInflightClustering(HoodieInstant inflightInstant,
563+
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
564+
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
565+
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
566+
}
567+
568+
/**
569+
* Rollback inflight instant to requested instant
570+
*
571+
* @param inflightInstant Inflight instant
572+
* @param getPendingRollbackInstantFunc Function to get rollback instant
573+
*/
574+
private void rollbackInflightInstant(HoodieInstant inflightInstant,
575+
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
576+
ValidationUtils.checkArgument(inflightInstant.isInflight());
549577
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
550578
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
551579
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
552580
rollback(context, commitTime, inflightInstant, false, false);
553-
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
581+
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
554582
}
555583

556584
/**

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstan
355355
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
356356
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
357357
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
358-
rollbackInflightClustering(inflightInstant, table);
358+
table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
359359
table.getMetaClient().reloadActiveTimeline();
360360
}
361361
clusteringTimer = metrics.getClusteringCtx();

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -347,16 +347,14 @@ public Option<byte[]> readIndexPlanAsBytes(HoodieInstant instant) {
347347
}
348348

349349
/**
350-
* Revert compaction State from inflight to requested.
350+
* Revert instant state from inflight to requested.
351351
*
352352
* @param inflightInstant Inflight Instant
353353
* @return requested instant
354354
*/
355-
public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) {
356-
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
357-
ValidationUtils.checkArgument(inflightInstant.isInflight());
355+
public HoodieInstant revertInstantFromInflightToRequested(HoodieInstant inflightInstant) {
358356
HoodieInstant requestedInstant =
359-
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp());
357+
new HoodieInstant(State.REQUESTED, inflightInstant.getAction(), inflightInstant.getTimestamp());
360358
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
361359
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
362360
transitionState(inflightInstant, requestedInstant, Option.empty());
@@ -514,26 +512,6 @@ public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant inflightI
514512
return commitInstant;
515513
}
516514

517-
/**
518-
* Revert replace requested State from inflight to requested.
519-
*
520-
* @param inflightInstant Inflight Instant
521-
* @return requested instant
522-
*/
523-
public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant inflightInstant) {
524-
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
525-
ValidationUtils.checkArgument(inflightInstant.isInflight());
526-
HoodieInstant requestedInstant =
527-
new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp());
528-
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
529-
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
530-
transitionState(inflightInstant, requestedInstant, Option.empty());
531-
} else {
532-
deleteInflight(inflightInstant);
533-
}
534-
return requestedInstant;
535-
}
536-
537515
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
538516
transitionState(fromInstant, toInstant, data, false);
539517
}

hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ public void testTimelineInstantOperations() {
336336
timeline = timeline.reload();
337337
assertFalse(timeline.containsInstant(compaction));
338338
assertTrue(timeline.containsInstant(inflight));
339-
compaction = timeline.revertCompactionInflightToRequested(inflight);
339+
compaction = timeline.revertInstantFromInflightToRequested(inflight);
340340
timeline = timeline.reload();
341341
assertTrue(timeline.containsInstant(compaction));
342342
assertFalse(timeline.containsInstant(inflight));

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public static void main(String[] args) throws Exception {
114114
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
115115
if (timeline.containsInstant(inflightInstant)) {
116116
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
117-
writeClient.rollbackInflightClustering(inflightInstant, table);
117+
table.rollbackInflightClustering(inflightInstant, commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
118118
table.getMetaClient().reloadActiveTimeline();
119119
}
120120

0 commit comments

Comments
 (0)