Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -116,9 +117,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
.parallelize(partitionsToClean, cleanerParallelism)
.mapPartitions(partitionIterator -> {
List<String> partitionList = new ArrayList<>();
partitionIterator.forEachRemaining(partitionList::add);
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
return cleanResult.entrySet().iterator();
}, false).collectAsList()
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
HoodieWriteConfig writeConfig = getConfigBuilder(true)
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
Expand All @@ -81,7 +81,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);

// 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
// 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
for (int i = 5; i < 9; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi
this.metaClient = metaClient;
refreshTimeline(visibleActiveTimeline);
resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);
this.bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
this.bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
// Load Pending Compaction Operations
resetPendingCompactionOperations(CompactionUtils.getAllPendingCompactionOperations(metaClient).values().stream()
.map(e -> Pair.of(e.getKey(), CompactionOperation.convertFromAvroRecordInstance(e.getValue()))));
Expand All @@ -121,7 +121,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi

/**
* Refresh commits timeline.
*
*
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
Expand Down Expand Up @@ -163,13 +163,13 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
* Build FileGroups from passed in file-status.
*/
protected List<HoodieFileGroup> buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline,
boolean addPendingCompactionFileSlice) {
boolean addPendingCompactionFileSlice) {
return buildFileGroups(convertFileStatusesToBaseFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline,
addPendingCompactionFileSlice);
}

protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile> baseFileStream,
Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
Map<Pair<String, String>, List<HoodieBaseFile>> baseFiles =
baseFileStream.collect(Collectors.groupingBy(baseFile -> {
String partitionPathStr = getPartitionPathFor(baseFile);
Expand Down Expand Up @@ -227,7 +227,7 @@ private void resetFileGroupsReplaced(HoodieTimeline timeline) {

// get replace instant mapping for each partition, fileId
return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> entry.getValue().stream().map(e ->
new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant)));
new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant)));
} catch (HoodieIOException ex) {

if (ex.getIOException() instanceof FileNotFoundException) {
Expand Down Expand Up @@ -417,7 +417,7 @@ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
* With async compaction, it is possible to see partial/complete base-files due to inflight-compactions, Ignore those
* base-files.
*
* @param fileSlice File Slice
* @param fileSlice File Slice
* @param includeEmptyFileSlice include empty file-slice
*/
protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) {
Expand Down Expand Up @@ -557,8 +557,8 @@ public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr, String in
return Option.empty();
} else {
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
}
} finally {
Expand Down Expand Up @@ -593,8 +593,8 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commi
return fetchAllStoredFileGroups()
.filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional(
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
&& !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
&& !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
.map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get()));
} finally {
readLock.unlock();
Expand Down Expand Up @@ -624,9 +624,9 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return fetchLatestFileSlices(partitionPath)
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
.map(this::addBootstrapBaseFileIfPresent);
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
.map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
Expand Down Expand Up @@ -681,26 +681,26 @@ public final Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionSt

@Override
public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime,
boolean includeFileSlicesInPendingCompaction) {
boolean includeFileSlicesInPendingCompaction) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
.map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent);
.map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent);
} else {
return allFileSliceStream
.map(sliceStream ->
Option.fromJavaOptional(sliceStream
.filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
.filter(slice -> !slice.isEmpty())
.findFirst()))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
.map(sliceStream ->
Option.fromJavaOptional(sliceStream
.filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
.filter(slice -> !slice.isEmpty())
.findFirst()))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
}
} finally {
readLock.unlock();
Expand Down Expand Up @@ -792,6 +792,15 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}

@Override
public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a separate api in the interface, let's extract the logic to a separate methos in CleanPlanner.

List<Pair<String, List<HoodieFileGroup>>> allFileGroups = new ArrayList<>();
partitionPaths.forEach(partitionPath -> {
allFileGroups.add(Pair.of(partitionPath, getAllFileGroups(partitionPath).collect(Collectors.toList())));
});
return allFileGroups.stream();
}

private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
try {
readLock.lock();
Expand Down Expand Up @@ -899,8 +908,8 @@ public final Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendi
protected abstract boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId);

/**
* Get pending clustering instant time for specified file group. Return None if file group is not in pending
* clustering operation.
* Get pending clustering instant time for specified file group. Return None if file group is not in pending
* clustering operation.
*/
protected abstract Option<HoodieInstant> getPendingClusteringInstant(final HoodieFileGroupId fileGroupId);

Expand Down Expand Up @@ -1002,7 +1011,7 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingLogCompac
* Add a complete partition view to store.
*
* @param partitionPath Partition Path
* @param fileGroups File Groups for the partition path
* @param fileGroups File Groups for the partition path
*/
abstract void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups);

Expand Down Expand Up @@ -1122,7 +1131,7 @@ Stream<FileSlice> fetchLatestFileSlices(String partitionPath) {
/**
* Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet.
*
* @param lastSlice Latest File slice for a file-group
* @param lastSlice Latest File slice for a file-group
* @param penultimateSlice Penultimate file slice for a file-group in commit timeline order
*/
private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) {
Expand Down Expand Up @@ -1187,7 +1196,7 @@ private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup,
* Default implementation for fetching latest base-file.
*
* @param partitionPath Partition path
* @param fileId File Id
* @param fileId File Id
* @return base File if present
*/
protected Option<HoodieBaseFile> fetchLatestBaseFile(String partitionPath, String fileId) {
Expand All @@ -1199,7 +1208,7 @@ protected Option<HoodieBaseFile> fetchLatestBaseFile(String partitionPath, Strin
* Default implementation for fetching file-slice.
*
* @param partitionPath Partition path
* @param fileId File Id
* @param fileId File Id
* @return File Slice if present
*/
public Option<FileSlice> fetchLatestFileSlice(String partitionPath, String fileId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -378,6 +380,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
}
}

@Override
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
for (String partitionPath : partitionPaths) {
Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
}
return fileGroupPerPartitionList.stream();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ interface SliceViewWithLatestSlice {
/**
* Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
*
* @param partitionPath Partition path
* @param maxCommitTime Max Instant Time
* @param partitionPath Partition path
* @param maxCommitTime Max Instant Time
* @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
boolean includeFileSlicesInPendingCompaction);
boolean includeFileSlicesInPendingCompaction);

/**
* Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
* file-slice before and after compaction request instant is merged and returned.
*
* @param partitionPath Partition Path
*
* @param partitionPath Partition Path
* @param maxInstantTime Max Instant Time
* @return
*/
Expand Down Expand Up @@ -149,10 +149,12 @@ interface SliceView extends SliceViewWithLatestSlice {
*/
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);

Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do away with this api, doesn't add much value. Instead, we can directly use the getAllFileGroups(String partitionPath) api at the call site.


/**
* Return Pending Compaction Operations.
*
* @return Pair<Pair<InstantTime,CompactionOperation>>
* @return Pair<Pair < InstantTime, CompactionOperation>>
*/
Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();

Expand Down