diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 7f3b437178fd4..bd7ec798ed1a5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -42,6 +42,7 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -116,9 +117,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName()); Map>> cleanOpsWithPartitionMeta = context - .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) + .parallelize(partitionsToClean, cleanerParallelism) + .mapPartitions(partitionIterator -> { + List partitionList = new ArrayList<>(); + partitionIterator.forEachRemaining(partitionList::add); + Map>> cleanResult = planner.getDeletePaths(partitionList); + return cleanResult.entrySet().iterator(); + }, false).collectAsList() .stream() - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); Map> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index c08bec8a267e1..d09d10e648b60 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -60,6 +60,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -230,10 +231,10 @@ private List getPartitionPathsForFullCleaning() { * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * single file (i.e run it with versionsRetained = 1) */ - private Pair> getFilesToCleanKeepingLatestVersions(String partitionPath) { - LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + private Map>> getFilesToCleanKeepingLatestVersions(List partitionPaths) { + LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained() + " file versions. "); - List deletePaths = new ArrayList<>(); + Map>> map = new HashMap<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) @@ -241,45 +242,48 @@ private Pair> getFilesToCleanKeepingLatestVersions( // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely // In other words, the file versions only apply to the active file groups. - deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty())); - boolean toDeletePartition = false; - List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - for (HoodieFileGroup fileGroup : fileGroups) { - int keepVersions = config.getCleanerFileVersionsRetained(); - // do not cleanup slice required for pending compaction - Iterator fileSliceIterator = - fileGroup.getAllFileSlices() - .filter(fs -> !isFileSliceNeededForPendingMajorOrMinorCompaction(fs)) - .iterator(); - if (isFileGroupInPendingMajorOrMinorCompaction(fileGroup)) { - // We have already saved the last version of file-groups for pending compaction Id - keepVersions--; - } + List>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); + for (Pair> partitionFileGroupList : fileGroupsPerPartition) { + List deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty())); + boolean toDeletePartition = false; + for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { + int keepVersions = config.getCleanerFileVersionsRetained(); + // do not cleanup slice required for pending compaction + Iterator fileSliceIterator = + fileGroup.getAllFileSlices() + .filter(fs -> !isFileSliceNeededForPendingMajorOrMinorCompaction(fs)) + .iterator(); + if (isFileGroupInPendingMajorOrMinorCompaction(fileGroup)) { + // We have already saved the last version of file-groups for pending compaction Id + keepVersions--; + } - while (fileSliceIterator.hasNext() && keepVersions > 0) { - // Skip this most recent version - fileSliceIterator.next(); - keepVersions--; - } - // Delete the remaining files - while (fileSliceIterator.hasNext()) { - FileSlice nextSlice = fileSliceIterator.next(); - Option dataFile = nextSlice.getBaseFile(); - if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { - // do not clean up a savepoint data file - continue; + while (fileSliceIterator.hasNext() && keepVersions > 0) { + // Skip this most recent version + fileSliceIterator.next(); + keepVersions--; + } + // Delete the remaining files + while (fileSliceIterator.hasNext()) { + FileSlice nextSlice = fileSliceIterator.next(); + Option dataFile = nextSlice.getBaseFile(); + if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { + // do not clean up a savepoint data file + continue; + } + deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } - deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } + // if there are no valid file groups for the partition, mark it to be deleted + if (partitionFileGroupList.getValue().isEmpty()) { + toDeletePartition = true; + } + map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); } - // if there are no valid file groups for the partition, mark it to be deleted - if (fileGroups.isEmpty()) { - toDeletePartition = true; - } - return Pair.of(toDeletePartition, deletePaths); + return map; } - private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath) { + private Map>> getFilesToCleanKeepingLatestCommits(List partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS); } @@ -300,9 +304,9 @@ private Pair> getFilesToCleanKeepingLatestCommits(S * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted, * and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted. */ - private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { - LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); - List deletePaths = new ArrayList<>(); + private Map>> getFilesToCleanKeepingLatestCommits(List partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) { + LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. "); + Map>> cleanFileInfoPerPartitionMap = new HashMap<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() @@ -314,76 +318,79 @@ private Pair> getFilesToCleanKeepingLatestCommits(S if (commitTimeline.countInstants() > commitsRetained) { Option earliestCommitToRetainOption = getEarliestCommitToRetain(); HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get(); - // all replaced file groups before earliestCommitToRetain are eligible to clean - deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption)); // add active files - List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - for (HoodieFileGroup fileGroup : fileGroups) { - List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); - - if (fileSliceList.isEmpty()) { - continue; - } - - String lastVersion = fileSliceList.get(0).getBaseInstantTime(); - String lastVersionBeforeEarliestCommitToRetain = - getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); - - // Ensure there are more than 1 version of the file (we only clean old files from updates) - // i.e always spare the last commit. - for (FileSlice aSlice : fileSliceList) { - Option aFile = aSlice.getBaseFile(); - String fileCommitTime = aSlice.getBaseInstantTime(); - if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { - // do not clean up a savepoint data file + List>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); + for (Pair> partitionFileGroupList : fileGroupsPerPartition) { + // all replaced file groups before earliestCommitToRetain are eligible to clean + List deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption)); + for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { + List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); + + if (fileSliceList.isEmpty()) { continue; } - if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - // Dont delete the latest commit and also the last commit before the earliest commit we - // are retaining - // The window of commit retain == max query run time. So a query could be running which - // still - // uses this file. - if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { - // move on to the next file + String lastVersion = fileSliceList.get(0).getBaseInstantTime(); + String lastVersionBeforeEarliestCommitToRetain = + getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); + + // Ensure there are more than 1 version of the file (we only clean old files from updates) + // i.e always spare the last commit. + for (FileSlice aSlice : fileSliceList) { + Option aFile = aSlice.getBaseFile(); + String fileCommitTime = aSlice.getBaseInstantTime(); + if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { + // do not clean up a savepoint data file continue; } - } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - // This block corresponds to KEEP_LATEST_BY_HOURS policy - // Do not delete the latest commit. - if (fileCommitTime.equals(lastVersion)) { - // move on to the next file - continue; + + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + // Dont delete the latest commit and also the last commit before the earliest commit we + // are retaining + // The window of commit retain == max query run time. So a query could be running which + // still + // uses this file. + if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { + // move on to the next file + continue; + } + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + // This block corresponds to KEEP_LATEST_BY_HOURS policy + // Do not delete the latest commit. + if (fileCommitTime.equals(lastVersion)) { + // move on to the next file + continue; + } } - } - // Always keep the last commit - if (!isFileSliceNeededForPendingMajorOrMinorCompaction(aSlice) && HoodieTimeline - .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { - // this is a commit, that should be cleaned. - aFile.ifPresent(hoodieDataFile -> { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); - if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); + // Always keep the last commit + if (!isFileSliceNeededForPendingMajorOrMinorCompaction(aSlice) && HoodieTimeline + .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { + // this is a commit, that should be cleaned. + aFile.ifPresent(hoodieDataFile -> { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); + if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); + } + }); + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ + || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { + // 1. If merge on read, then clean the log files for the commits as well; + // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow. + deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } - }); - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ - || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { - // 1. If merge on read, then clean the log files for the commits as well; - // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow. - deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); } } } - } - // if there are no valid file groups for the partition, mark it to be deleted - if (fileGroups.isEmpty()) { - toDeletePartition = true; + // if there are no valid file groups for the partition, mark it to be deleted + if (partitionFileGroupList.getValue().isEmpty()) { + toDeletePartition = true; + } + cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); } } - return Pair.of(toDeletePartition, deletePaths); + return cleanFileInfoPerPartitionMap; } /** @@ -391,10 +398,11 @@ private Pair> getFilesToCleanKeepingLatestCommits(S * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained. * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5. + * * @param partitionPath partition path to check * @return list of files to clean */ - private Pair> getFilesToCleanKeepingLatestHours(String partitionPath) { + private Map>> getFilesToCleanKeepingLatestHours(List partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS); } @@ -460,21 +468,23 @@ private List getCleanFileInfoForSlice(FileSlice nextSlice) { /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public Pair> getDeletePaths(String partitionPath) { + public Map>> getDeletePaths(List partitionPaths) { HoodieCleaningPolicy policy = config.getCleanerPolicy(); - Pair> deletePaths; + Map>> deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); + deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { - deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); + deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - deletePaths = getFilesToCleanKeepingLatestHours(partitionPath); + deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath); - if (deletePaths.getKey()) { - LOG.info("Partition " + partitionPath + " to be deleted"); + for (String partitionPath : deletePaths.keySet()) { + LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath); + if (deletePaths.get(partitionPath).getLeft()) { + LOG.info("Partition " + partitionPath + " to be deleted"); + } } return deletePaths; } @@ -503,8 +513,8 @@ public Option getEarliestCommitToRetain() { * Returns the last completed commit timestamp before clean. */ public String getLastCompletedCommitTimestamp() { - if (commitTimeline.lastInstant().isPresent()) { - return commitTimeline.lastInstant().get().getTimestamp(); + if (commitTimeline.getContiguousCompletedWriteTimeline().lastInstant().isPresent()) { + return commitTimeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp(); } else { return ""; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java index baff4ebac8752..967e313f4ee9b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -57,7 +57,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce HoodieWriteConfig writeConfig = getConfigBuilder(true) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build()) .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) { @@ -81,7 +81,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); - // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit + // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit for (int i = 5; i < 9; i++) { String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); client.startCommitWithTime(instantTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index a614523ba066e..dc527e94b9eba 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -107,7 +107,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi this.metaClient = metaClient; refreshTimeline(visibleActiveTimeline); resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline); - this.bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient); + this.bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient); // Load Pending Compaction Operations resetPendingCompactionOperations(CompactionUtils.getAllPendingCompactionOperations(metaClient).values().stream() .map(e -> Pair.of(e.getKey(), CompactionOperation.convertFromAvroRecordInstance(e.getValue())))); @@ -121,7 +121,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi /** * Refresh commits timeline. - * + * * @param visibleActiveTimeline Visible Active Timeline */ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { @@ -163,13 +163,13 @@ public List addFilesToView(FileStatus[] statuses) { * Build FileGroups from passed in file-status. */ protected List buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline, - boolean addPendingCompactionFileSlice) { + boolean addPendingCompactionFileSlice) { return buildFileGroups(convertFileStatusesToBaseFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline, addPendingCompactionFileSlice); } protected List buildFileGroups(Stream baseFileStream, - Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) { + Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) { Map, List> baseFiles = baseFileStream.collect(Collectors.groupingBy(baseFile -> { String partitionPathStr = getPartitionPathFor(baseFile); @@ -227,7 +227,7 @@ private void resetFileGroupsReplaced(HoodieTimeline timeline) { // get replace instant mapping for each partition, fileId return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> entry.getValue().stream().map(e -> - new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant))); + new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant))); } catch (HoodieIOException ex) { if (ex.getIOException() instanceof FileNotFoundException) { @@ -417,7 +417,7 @@ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { * With async compaction, it is possible to see partial/complete base-files due to inflight-compactions, Ignore those * base-files. * - * @param fileSlice File Slice + * @param fileSlice File Slice * @param includeEmptyFileSlice include empty file-slice */ protected Stream filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) { @@ -557,8 +557,8 @@ public final Option getBaseFileOn(String partitionStr, String in return Option.empty(); } else { return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles() - .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS, - instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null)) + .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS, + instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null)) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df)); } } finally { @@ -593,8 +593,8 @@ public final Stream getLatestBaseFilesInRange(List commi return fetchAllStoredFileGroups() .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn)) .map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional( - fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime()) - && !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent()) + fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime()) + && !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent()) .map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get())); } finally { readLock.unlock(); @@ -624,9 +624,9 @@ public final Stream getLatestFileSlices(String partitionStr) { String partitionPath = formatPartitionKey(partitionStr); ensurePartitionLoadedCorrectly(partitionPath); return fetchLatestFileSlices(partitionPath) - .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId())) - .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true)) - .map(this::addBootstrapBaseFileIfPresent); + .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId())) + .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true)) + .map(this::addBootstrapBaseFileIfPresent); } finally { readLock.unlock(); } @@ -681,26 +681,26 @@ public final Stream getLatestUnCompactedFileSlices(String partitionSt @Override public final Stream getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime, - boolean includeFileSlicesInPendingCompaction) { + boolean includeFileSlicesInPendingCompaction) { try { readLock.lock(); String partitionPath = formatPartitionKey(partitionStr); ensurePartitionLoadedCorrectly(partitionPath); Stream> allFileSliceStream = fetchAllStoredFileGroups(partitionPath) - .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime)) - .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime)); + .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime)) + .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime)); if (includeFileSlicesInPendingCompaction) { return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false))) - .map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get) - .map(this::addBootstrapBaseFileIfPresent); + .map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get) + .map(this::addBootstrapBaseFileIfPresent); } else { return allFileSliceStream - .map(sliceStream -> - Option.fromJavaOptional(sliceStream - .filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId())) - .filter(slice -> !slice.isEmpty()) - .findFirst())) - .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent); + .map(sliceStream -> + Option.fromJavaOptional(sliceStream + .filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId())) + .filter(slice -> !slice.isEmpty()) + .findFirst())) + .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent); } } finally { readLock.unlock(); @@ -792,6 +792,15 @@ public final Stream getAllFileGroups(String partitionStr) { return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg)); } + @Override + public final Stream>> getAllFileGroups(List partitionPaths) { + List>> allFileGroups = new ArrayList<>(); + partitionPaths.forEach(partitionPath -> { + allFileGroups.add(Pair.of(partitionPath, getAllFileGroups(partitionPath).collect(Collectors.toList()))); + }); + return allFileGroups.stream(); + } + private Stream getAllFileGroupsIncludingReplaced(final String partitionStr) { try { readLock.lock(); @@ -899,8 +908,8 @@ public final Stream> getFileGroupsInPendi protected abstract boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId); /** - * Get pending clustering instant time for specified file group. Return None if file group is not in pending - * clustering operation. + * Get pending clustering instant time for specified file group. Return None if file group is not in pending + * clustering operation. */ protected abstract Option getPendingClusteringInstant(final HoodieFileGroupId fileGroupId); @@ -1002,7 +1011,7 @@ protected abstract Option> getPendingLogCompac * Add a complete partition view to store. * * @param partitionPath Partition Path - * @param fileGroups File Groups for the partition path + * @param fileGroups File Groups for the partition path */ abstract void storePartitionView(String partitionPath, List fileGroups); @@ -1122,7 +1131,7 @@ Stream fetchLatestFileSlices(String partitionPath) { /** * Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet. * - * @param lastSlice Latest File slice for a file-group + * @param lastSlice Latest File slice for a file-group * @param penultimateSlice Penultimate file slice for a file-group in commit timeline order */ private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) { @@ -1187,7 +1196,7 @@ private Option fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup, * Default implementation for fetching latest base-file. * * @param partitionPath Partition path - * @param fileId File Id + * @param fileId File Id * @return base File if present */ protected Option fetchLatestBaseFile(String partitionPath, String fileId) { @@ -1199,7 +1208,7 @@ protected Option fetchLatestBaseFile(String partitionPath, Strin * Default implementation for fetching file-slice. * * @param partitionPath Partition path - * @param fileId File Id + * @param fileId File Id * @return File Slice if present */ public Option fetchLatestFileSlice(String partitionPath, String fileId) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index 62edc4daa33e0..956b7420a6a21 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -204,6 +204,11 @@ public Stream getAllFileGroups(String partitionPath) { return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); } + @Override + public Stream>> getAllFileGroups(List partitionPaths) { + return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); + } + @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 759ef70c6d80f..0763226dba44f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -51,9 +51,11 @@ import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -378,6 +380,16 @@ public Stream getAllFileGroups(String partitionPath) { } } + @Override + public Stream>> getAllFileGroups(List partitionPaths) { + ArrayList>> fileGroupPerPartitionList = new ArrayList<>(); + for (String partitionPath : partitionPaths) { + Stream fileGroup = getAllFileGroups(partitionPath); + fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList()))); + } + return fileGroupPerPartitionList.stream(); + } + @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 18c9a9af99817..486a095684ceb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -109,18 +109,18 @@ interface SliceViewWithLatestSlice { /** * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime. * - * @param partitionPath Partition path - * @param maxCommitTime Max Instant Time + * @param partitionPath Partition path + * @param maxCommitTime Max Instant Time * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction */ Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, - boolean includeFileSlicesInPendingCompaction); + boolean includeFileSlicesInPendingCompaction); /** * Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the * file-slice before and after compaction request instant is merged and returned. - * - * @param partitionPath Partition Path + * + * @param partitionPath Partition Path * @param maxInstantTime Max Instant Time * @return */ @@ -149,10 +149,12 @@ interface SliceView extends SliceViewWithLatestSlice { */ Stream getAllFileGroups(String partitionPath); + Stream>> getAllFileGroups(List partitionPaths); + /** * Return Pending Compaction Operations. * - * @return Pair> + * @return Pair> */ Stream> getPendingCompactionOperations();