Skip to content

Commit 03c49ca

Browse files
authored
[HUDI-3998] Fix getCommitsSinceLastCleaning failed when async cleaning (#5478)
- The last completed commit timestamp is used to calculate how many commit have been completed since the last clean. we might need to save this w/ clean plan so that next time when we trigger clean, we can start calculating from that.
1 parent 1b21792 commit 03c49ca

19 files changed

Lines changed: 145 additions & 71 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean
167167
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
168168
actionInstant.getAction(), actionInstant.getTimestamp())
169169
: null))
170+
.withLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
170171
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
171172
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
172173
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hudi.avro.model.HoodieActionInstant;
2222
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
23+
import org.apache.hudi.avro.model.HoodieCleanMetadata;
2324
import org.apache.hudi.avro.model.HoodieCleanerPlan;
2425
import org.apache.hudi.common.engine.HoodieEngineContext;
2526
import org.apache.hudi.common.model.CleanFileInfo;
@@ -64,11 +65,16 @@ private int getCommitsSinceLastCleaning() {
6465
Option<HoodieInstant> lastCleanInstant = table.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
6566
HoodieTimeline commitTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
6667

67-
String latestCleanTs;
68-
int numCommits = 0;
69-
if (lastCleanInstant.isPresent()) {
70-
latestCleanTs = lastCleanInstant.get().getTimestamp();
71-
numCommits = commitTimeline.findInstantsAfter(latestCleanTs).countInstants();
68+
int numCommits;
69+
if (lastCleanInstant.isPresent() && !table.getActiveTimeline().isEmpty(lastCleanInstant.get())) {
70+
try {
71+
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
72+
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(lastCleanInstant.get()).get());
73+
String lastCompletedCommitTimestamp = cleanMetadata.getLastCompletedCommitTimestamp();
74+
numCommits = commitTimeline.findInstantsAfter(lastCompletedCommitTimestamp).countInstants();
75+
} catch (IOException e) {
76+
throw new HoodieIOException("Parsing of last clean instant " + lastCleanInstant.get() + " failed", e);
77+
}
7278
} else {
7379
numCommits = commitTimeline.countInstants();
7480
}
@@ -123,6 +129,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
123129

124130
return new HoodieCleanerPlan(earliestInstant
125131
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
132+
planner.getLastCompletedCommitTimestamp(),
126133
config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
127134
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete);
128135
} catch (IOException e) {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,17 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
475475
return earliestCommitToRetain;
476476
}
477477

478+
/**
479+
* Returns the last completed commit timestamp before clean.
480+
*/
481+
public String getLastCompletedCommitTimestamp() {
482+
if (commitTimeline.lastInstant().isPresent()) {
483+
return commitTimeline.lastInstant().get().getTimestamp();
484+
} else {
485+
return "";
486+
}
487+
}
488+
478489
/**
479490
* Determine if file slice needed to be preserved for pending compaction.
480491
*

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,15 +258,16 @@ private void createReplace(String instantTime, WriteOperationType writeOperation
258258
}
259259

260260
private void createCleanMetadata(String instantTime) throws IOException {
261-
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
262-
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
261+
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""),
262+
"", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
263263
HoodieCleanStat cleanStats = new HoodieCleanStat(
264264
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
265265
HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
266266
Collections.emptyList(),
267267
Collections.emptyList(),
268268
Collections.emptyList(),
269-
instantTime);
269+
instantTime,
270+
"");
270271
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
271272
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
272273
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ public void testMultiClean() {
262262
HoodieWriteConfig writeConfig = getConfigBuilder()
263263
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
264264
.withEnableBackupForRemoteFileSystemView(false).build())
265-
266265
.withCleanConfig(HoodieCleanConfig.newBuilder()
267266
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
268267
.allowMultipleCleans(false)
@@ -455,9 +454,10 @@ private void testInsertAndCleanByVersions(
455454
/**
456455
* Test Clean-By-Commits using insert/upsert API.
457456
*/
458-
@Test
459-
public void testInsertAndCleanByCommits() throws Exception {
460-
testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false);
457+
@ParameterizedTest
458+
@ValueSource(booleans = {true, false})
459+
public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
460+
testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync);
461461
}
462462

463463
/**
@@ -473,7 +473,8 @@ public void testFailedInsertAndCleanByCommits() throws Exception {
473473
*/
474474
@Test
475475
public void testInsertPreppedAndCleanByCommits() throws Exception {
476-
testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, true);
476+
testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
477+
true, false);
477478
}
478479

479480
/**
@@ -483,15 +484,15 @@ public void testInsertPreppedAndCleanByCommits() throws Exception {
483484
public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
484485
testInsertAndCleanByCommits(
485486
(client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
486-
SparkRDDWriteClient::upsertPreppedRecords, true);
487+
SparkRDDWriteClient::upsertPreppedRecords, true, false);
487488
}
488489

489490
/**
490491
* Test Clean-By-Commits using bulk-insert/upsert API.
491492
*/
492493
@Test
493494
public void testBulkInsertAndCleanByCommits() throws Exception {
494-
testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false);
495+
testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false);
495496
}
496497

497498
/**
@@ -505,12 +506,12 @@ public void testBulkInsertAndCleanByCommits() throws Exception {
505506
*/
506507
private void testInsertAndCleanByCommits(
507508
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
508-
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
509+
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync)
509510
throws Exception {
510511
int maxCommits = 3; // keep upto 3 commits from the past
511512
HoodieWriteConfig cfg = getConfigBuilder()
512513
.withCleanConfig(HoodieCleanConfig.newBuilder()
513-
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
514+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(maxCommits).build())
514515
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
515516
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
516517
.build();
@@ -539,6 +540,10 @@ private void testInsertAndCleanByCommits(
539540
metaClient = HoodieTableMetaClient.reload(metaClient);
540541
HoodieTable table1 = HoodieSparkTable.create(cfg, context, metaClient);
541542
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
543+
HoodieInstant lastInstant = activeTimeline.lastInstant().get();
544+
if (cfg.isAsyncClean()) {
545+
activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
546+
}
542547
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
543548
// commit
544549
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
@@ -560,6 +565,9 @@ private void testInsertAndCleanByCommits(
560565
LOG.debug("Data File - " + value);
561566
commitTimes.add(value.getCommitTime());
562567
});
568+
if (cfg.isAsyncClean()) {
569+
commitTimes.remove(lastInstant.getTimestamp());
570+
}
563571
assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
564572
"Only contain acceptable versions of file should be present");
565573
}
@@ -677,7 +685,7 @@ protected List<HoodieCleanStat> runCleaner(
677685
String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
678686
p.getSuccessDeleteFiles().forEach(p2 -> {
679687
try {
680-
metaClient.getFs().create(new Path(dirPath, p2), true);
688+
metaClient.getFs().create(new Path(dirPath, p2), true).close();
681689
} catch (IOException e) {
682690
throw new HoodieIOException(e.getMessage(), e);
683691
}
@@ -941,7 +949,7 @@ public void testCleanMetadataUpgradeDowngrade() {
941949
// create partition1 clean stat.
942950
HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
943951
partition1, deletePathPatterns1, successDeleteFiles1,
944-
failedDeleteFiles1, instantTime);
952+
failedDeleteFiles1, instantTime, "");
945953

946954
List<String> deletePathPatterns2 = new ArrayList<>();
947955
List<String> successDeleteFiles2 = new ArrayList<>();
@@ -950,7 +958,7 @@ public void testCleanMetadataUpgradeDowngrade() {
950958
// create partition2 empty clean stat.
951959
HoodieCleanStat cleanStat2 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
952960
partition2, deletePathPatterns2, successDeleteFiles2,
953-
failedDeleteFiles2, instantTime);
961+
failedDeleteFiles2, instantTime, "");
954962

955963
// map with absolute file path.
956964
Map<String, Tuple3> oldExpected = new HashMap<>();
@@ -1167,12 +1175,13 @@ public void testCleaningWithZeroPartitionPaths() throws Exception {
11671175
/**
11681176
* Test Keep Latest Commits when there are pending compactions.
11691177
*/
1170-
@Test
1171-
public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
1178+
@ParameterizedTest
1179+
@ValueSource(booleans = {true, false})
1180+
public void testKeepLatestCommitsWithPendingCompactions(boolean isAsync) throws Exception {
11721181
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
11731182
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
11741183
.withCleanConfig(HoodieCleanConfig.newBuilder()
1175-
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
1184+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build())
11761185
.build();
11771186
// Deletions:
11781187
// . FileId Base Logs Total Retained Commits

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -732,8 +732,8 @@ public HoodieInstant createEmptyCleanMetadata(String instantTime, boolean inflig
732732
}
733733

734734
public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmptyForAll, boolean isEmptyCompleted) throws IOException {
735-
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
736-
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
735+
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", "",
736+
new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
737737
if (inflightOnly) {
738738
HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan);
739739
} else {
@@ -743,7 +743,8 @@ HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEF
743743
Collections.emptyList(),
744744
Collections.emptyList(),
745745
Collections.emptyList(),
746-
instantTime);
746+
instantTime,
747+
"");
747748
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
748749
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata, isEmptyForAll, isEmptyCompleted);
749750
}

hudi-common/src/main/avro/HoodieCleanMetadata.avsc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
{"name": "timeTakenInMillis", "type": "long"},
2424
{"name": "totalFilesDeleted", "type": "int"},
2525
{"name": "earliestCommitToRetain", "type": "string"},
26+
{"name": "lastCompletedCommitTimestamp", "type": "string", "default" : ""},
2627
{"name": "partitionMetadata", "type": {
2728
"type" : "map", "values" : "HoodieCleanPartitionMetadata"
2829
}

hudi-common/src/main/avro/HoodieCleanerPlan.avsc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@
4242
}],
4343
"default" : null
4444
},
45+
{
46+
"name": "lastCompletedCommitTimestamp",
47+
"type": "string",
48+
"default" : ""
49+
},
4550
{
4651
"name": "policy",
4752
"type": "string"

hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,22 @@ public class HoodieCleanStat implements Serializable {
4747
private final List<String> failedDeleteBootstrapBaseFiles;
4848
// Earliest commit that was retained in this clean
4949
private final String earliestCommitToRetain;
50+
// Last completed commit timestamp before clean
51+
private final String lastCompletedCommitTimestamp;
5052
// set to true if partition is deleted
5153
private final boolean isPartitionDeleted;
5254

5355
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
54-
List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain) {
56+
List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain,String lastCompletedCommitTimestamp) {
5557
this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain,
56-
CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
58+
lastCompletedCommitTimestamp, CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
5759
CollectionUtils.createImmutableList(), false);
5860
}
5961

6062
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
6163
List<String> successDeleteFiles, List<String> failedDeleteFiles,
62-
String earliestCommitToRetain, List<String> deleteBootstrapBasePathPatterns,
64+
String earliestCommitToRetain,String lastCompletedCommitTimestamp,
65+
List<String> deleteBootstrapBasePathPatterns,
6366
List<String> successDeleteBootstrapBaseFiles,
6467
List<String> failedDeleteBootstrapBaseFiles,
6568
boolean isPartitionDeleted) {
@@ -69,6 +72,7 @@ public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<S
6972
this.successDeleteFiles = successDeleteFiles;
7073
this.failedDeleteFiles = failedDeleteFiles;
7174
this.earliestCommitToRetain = earliestCommitToRetain;
75+
this.lastCompletedCommitTimestamp = lastCompletedCommitTimestamp;
7276
this.deleteBootstrapBasePathPatterns = deleteBootstrapBasePathPatterns;
7377
this.successDeleteBootstrapBaseFiles = successDeleteBootstrapBaseFiles;
7478
this.failedDeleteBootstrapBaseFiles = failedDeleteBootstrapBaseFiles;
@@ -111,11 +115,15 @@ public String getEarliestCommitToRetain() {
111115
return earliestCommitToRetain;
112116
}
113117

118+
public String getLastCompletedCommitTimestamp() {
119+
return lastCompletedCommitTimestamp;
120+
}
121+
114122
public boolean isPartitionDeleted() {
115123
return isPartitionDeleted;
116124
}
117125

118-
public static HoodieCleanStat.Builder newBuilder() {
126+
public static Builder newBuilder() {
119127
return new Builder();
120128
}
121129

@@ -130,6 +138,7 @@ public static class Builder {
130138
private List<String> failedDeleteFiles;
131139
private String partitionPath;
132140
private String earliestCommitToRetain;
141+
private String lastCompletedCommitTimestamp;
133142
private List<String> deleteBootstrapBasePathPatterns;
134143
private List<String> successDeleteBootstrapBaseFiles;
135144
private List<String> failedDeleteBootstrapBaseFiles;
@@ -181,15 +190,20 @@ public Builder withEarliestCommitRetained(Option<HoodieInstant> earliestCommitTo
181190
return this;
182191
}
183192

193+
public Builder withLastCompletedCommitTimestamp(String lastCompletedCommitTimestamp) {
194+
this.lastCompletedCommitTimestamp = lastCompletedCommitTimestamp;
195+
return this;
196+
}
197+
184198
public Builder isPartitionDeleted(boolean isPartitionDeleted) {
185199
this.isPartitionDeleted = isPartitionDeleted;
186200
return this;
187201
}
188202

189203
public HoodieCleanStat build() {
190204
return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles,
191-
earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles,
192-
failedDeleteBootstrapBaseFiles, isPartitionDeleted);
205+
earliestCommitToRetain, lastCompletedCommitTimestamp, deleteBootstrapBasePathPatterns,
206+
successDeleteBootstrapBaseFiles, failedDeleteBootstrapBaseFiles, isPartitionDeleted);
193207
}
194208
}
195209

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public HoodieCleanMetadata downgradeFrom(HoodieCleanMetadata input) {
8383

8484
return HoodieCleanMetadata.newBuilder()
8585
.setEarliestCommitToRetain(input.getEarliestCommitToRetain())
86+
.setLastCompletedCommitTimestamp(input.getLastCompletedCommitTimestamp())
8687
.setStartCleanTime(input.getStartCleanTime())
8788
.setTimeTakenInMillis(input.getTimeTakenInMillis())
8889
.setTotalFilesDeleted(input.getTotalFilesDeleted())

0 commit comments

Comments
 (0)