Skip to content

Commit cbf9b83

Browse files
parisninsivabalan
andauthored
[HUDI-4792] Batch clean files to delete (apache#6580)
This patch makes use of batch call to get fileGroup to delete during cleaning instead of 1 call per partition. This limit the number of call to the view and should fix the trouble with metadata table in context of lot of partitions. Fixes issue apache#6373 Co-authored-by: sivabalan <[email protected]>
1 parent 84b05c8 commit cbf9b83

File tree

7 files changed

+176
-123
lines changed

7 files changed

+176
-123
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.log4j.Logger;
4343

4444
import java.io.IOException;
45+
import java.util.ArrayList;
4546
import java.util.List;
4647
import java.util.Map;
4748
import java.util.stream.Collectors;
@@ -116,9 +117,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
116117
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
117118

118119
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
119-
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
120+
.parallelize(partitionsToClean, cleanerParallelism)
121+
.mapPartitions(partitionIterator -> {
122+
List<String> partitionList = new ArrayList<>();
123+
partitionIterator.forEachRemaining(partitionList::add);
124+
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
125+
return cleanResult.entrySet().iterator();
126+
}, false).collectAsList()
120127
.stream()
121-
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
128+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
122129

123130
Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
124131
.collect(Collectors.toMap(Map.Entry::getKey,

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java

Lines changed: 125 additions & 112 deletions
Large diffs are not rendered by default.

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
5757
HoodieWriteConfig writeConfig = getConfigBuilder(true)
5858
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
5959
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
60-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
60+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
6161
.build();
6262
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
6363
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
@@ -81,7 +81,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
8181
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
8282
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
8383

84-
// 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
84+
// 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
8585
for (int i = 5; i < 9; i++) {
8686
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
8787
client.startCommitWithTime(instantTime);

hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi
116116

117117
/**
118118
* Refresh commits timeline.
119-
*
119+
*
120120
* @param visibleActiveTimeline Visible Active Timeline
121121
*/
122122
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -750,6 +750,20 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
750750
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
751751
}
752752

753+
@Override
754+
public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
755+
return getAllFileGroupsIncludingReplaced(partitionPaths)
756+
.map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList())));
757+
}
758+
759+
private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
760+
List<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
761+
for (String partitionStr : partitionStrList) {
762+
fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList())));
763+
}
764+
return fileGroupPerPartitionList.stream();
765+
}
766+
753767
private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
754768
try {
755769
readLock.lock();

hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,11 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
204204
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
205205
}
206206

207+
@Override
208+
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
209+
return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
210+
}
211+
207212
@Override
208213
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
209214
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);

hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@
5151

5252
import java.io.IOException;
5353
import java.io.Serializable;
54+
import java.util.ArrayList;
5455
import java.util.HashMap;
5556
import java.util.List;
5657
import java.util.Map;
58+
import java.util.stream.Collectors;
5759
import java.util.stream.Stream;
5860

5961
/**
@@ -377,6 +379,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
377379
}
378380
}
379381

382+
@Override
383+
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
384+
ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
385+
for (String partitionPath : partitionPaths) {
386+
Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
387+
fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
388+
}
389+
return fileGroupPerPartitionList.stream();
390+
}
391+
380392
@Override
381393
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
382394
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);

hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,18 +109,18 @@ interface SliceViewWithLatestSlice {
109109
/**
110110
* Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
111111
*
112-
* @param partitionPath Partition path
113-
* @param maxCommitTime Max Instant Time
112+
* @param partitionPath Partition path
113+
* @param maxCommitTime Max Instant Time
114114
* @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
115115
*/
116116
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
117-
boolean includeFileSlicesInPendingCompaction);
117+
boolean includeFileSlicesInPendingCompaction);
118118

119119
/**
120120
* Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
121121
* file-slice before and after compaction request instant is merged and returned.
122-
*
123-
* @param partitionPath Partition Path
122+
*
123+
* @param partitionPath Partition Path
124124
* @param maxInstantTime Max Instant Time
125125
* @return
126126
*/
@@ -149,10 +149,12 @@ interface SliceView extends SliceViewWithLatestSlice {
149149
*/
150150
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
151151

152+
Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);
153+
152154
/**
153155
* Return Pending Compaction Operations.
154156
*
155-
* @return Pair<Pair<InstantTime,CompactionOperation>>
157+
* @return Pair<Pair < InstantTime, CompactionOperation>>
156158
*/
157159
Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();
158160

0 commit comments

Comments
 (0)