@@ -230,24 +230,24 @@ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLa
230230 LOG .info ("Cleaning " + partitionPaths + ", retaining latest " + config .getCleanerFileVersionsRetained ()
231231 + " file versions. " );
232232 Map <String , Pair <Boolean , List <CleanFileInfo >>> map = new HashMap <>();
233- List <CleanFileInfo > deletePaths = new ArrayList <>();
234233 // Collect all the datafiles savepointed by all the savepoints
235234 List <String > savepointedFiles = hoodieTable .getSavepointTimestamps ().stream ()
236235 .flatMap (this ::getSavepointedDataFiles )
237236 .collect (Collectors .toList ());
238237
239238 // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
240239 // In other words, the file versions only apply to the active file groups.
241- List <Pair <String , List <HoodieFileGroup >>> fileGroups = fileSystemView .getAllFileGroups (partitionPaths ).collect (Collectors .toList ());
242- for (Pair <String , List <HoodieFileGroup >> pairFileGroup : fileGroups ) {
243-
244- deletePaths .addAll (getReplacedFilesEligibleToClean (savepointedFiles , pairFileGroup .getLeft (), Option .empty ()));
240+ List <Pair <String , List <HoodieFileGroup >>> fileGroupsPerPartition = fileSystemView .getAllFileGroups (partitionPaths ).collect (Collectors .toList ());
241+ for (Pair <String , List <HoodieFileGroup >> partitionFileGroupList : fileGroupsPerPartition ) {
242+ List <CleanFileInfo > deletePaths = new ArrayList <>(getReplacedFilesEligibleToClean (savepointedFiles , partitionFileGroupList .getLeft (), Option .empty ()));
245243 boolean toDeletePartition = false ;
246- for (HoodieFileGroup fileGroup : pairFileGroup .getRight ()) {
244+ for (HoodieFileGroup fileGroup : partitionFileGroupList .getRight ()) {
247245 int keepVersions = config .getCleanerFileVersionsRetained ();
248246 // do not cleanup slice required for pending compaction
249247 Iterator <FileSlice > fileSliceIterator =
250- fileGroup .getAllFileSlices ().filter (fs -> !isFileSliceNeededForPendingCompaction (fs )).iterator ();
248+ fileGroup .getAllFileSlices ()
249+ .filter (fs -> !isFileSliceNeededForPendingCompaction (fs ))
250+ .iterator ();
251251 if (isFileGroupInPendingCompaction (fileGroup )) {
252252 // We have already saved the last version of file-groups for pending compaction Id
253253 keepVersions --;
@@ -270,10 +270,10 @@ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLa
270270 }
271271 }
272272 // if there are no valid file groups for the partition, mark it to be deleted
273- if (fileGroups .isEmpty ()) {
273+ if (partitionFileGroupList . getValue () .isEmpty ()) {
274274 toDeletePartition = true ;
275275 }
276- map .put (pairFileGroup .getLeft (), Pair .of (toDeletePartition , deletePaths ));
276+ map .put (partitionFileGroupList .getLeft (), Pair .of (toDeletePartition , deletePaths ));
277277 }
278278 return map ;
279279 }
@@ -301,7 +301,6 @@ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLa
301301 */
302302 private Map <String , Pair <Boolean , List <CleanFileInfo >>> getFilesToCleanKeepingLatestCommits (List <String > partitionPaths , int commitsRetained , HoodieCleaningPolicy policy ) {
303303 LOG .info ("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. " );
304- List <CleanFileInfo > deletePaths = new ArrayList <>();
305304 Map <String , Pair <Boolean , List <CleanFileInfo >>> cleanFileInfoPerPartitionMap = new HashMap <>();
306305
307306 // Collect all the datafiles savepointed by all the savepoints
@@ -315,12 +314,12 @@ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLa
315314 Option <HoodieInstant > earliestCommitToRetainOption = getEarliestCommitToRetain ();
316315 HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption .get ();
317316 // add active files
318- List <Pair <String , List <HoodieFileGroup >>> fileGroups = fileSystemView .getAllFileGroups (partitionPaths ).collect (Collectors .toList ());
319- for (Pair <String , List <HoodieFileGroup >> pairFileGroup : fileGroups ) {
320-
317+ List <Pair <String , List <HoodieFileGroup >>> fileGroupsPerPartition = fileSystemView .getAllFileGroups (partitionPaths ).collect (Collectors .toList ());
318+ for (Pair <String , List <HoodieFileGroup >> partitionFileGroupList : fileGroupsPerPartition ) {
319+ List < CleanFileInfo > deletePaths = new ArrayList <>( getReplacedFilesEligibleToClean ( savepointedFiles , partitionFileGroupList . getLeft (), earliestCommitToRetainOption ));
321320 // all replaced file groups before earliestCommitToRetain are eligible to clean
322- deletePaths .addAll (getReplacedFilesEligibleToClean (savepointedFiles , pairFileGroup .getLeft (), earliestCommitToRetainOption ));
323- for (HoodieFileGroup fileGroup : pairFileGroup .getRight ()) {
321+ deletePaths .addAll (getReplacedFilesEligibleToClean (savepointedFiles , partitionFileGroupList .getLeft (), earliestCommitToRetainOption ));
322+ for (HoodieFileGroup fileGroup : partitionFileGroupList .getRight ()) {
324323 List <FileSlice > fileSliceList = fileGroup .getAllFileSlices ().collect (Collectors .toList ());
325324
326325 if (fileSliceList .isEmpty ()) {
@@ -391,10 +390,10 @@ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLa
391390 }
392391 }
393392 // if there are no valid file groups for the partition, mark it to be deleted
394- if (fileGroups .isEmpty ()) {
393+ if (partitionFileGroupList . getValue () .isEmpty ()) {
395394 toDeletePartition = true ;
396395 }
397- cleanFileInfoPerPartitionMap .put (pairFileGroup .getLeft (), Pair .of (toDeletePartition , deletePaths ));
396+ cleanFileInfoPerPartitionMap .put (partitionFileGroupList .getLeft (), Pair .of (toDeletePartition , deletePaths ));
398397 }
399398 }
400399 return cleanFileInfoPerPartitionMap ;
0 commit comments