Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,46 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOExceptio

@Override
public List<String> getAllPartitionPaths() throws IOException {
FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
Path basePath = new Path(datasetBasePath);
FileSystem fs = basePath.getFileSystem(hadoopConf.get());
if (assumeDatePartitioning) {
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
}

List<Path> pathsToList = new CopyOnWriteArrayList<>();
pathsToList.add(new Path(datasetBasePath));
pathsToList.add(basePath);
List<String> partitionPaths = new CopyOnWriteArrayList<>();

while (!pathsToList.isEmpty()) {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());

// List all directories in parallel
List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, path -> {
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
return fileSystem.listStatus(path);
return Pair.of(path, fileSystem.listStatus(path));
}, listingParallelism);
pathsToList.clear();

// if current dictionary contains PartitionMetadata, add it to result
// if current dictionary does not contain PartitionMetadata, add it to queue
dirToFileListing.stream().flatMap(Arrays::stream).parallel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan making this parallelized with engineContext should solve the problem and avoid listing each partition, right, instead of reverting the changes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 . I think if we simply do the processing we need to do within the single engineContext.map(..) call, we should be able to solve the original problem in #4643.

.forEach(fileStatus -> {
if (fileStatus.isDirectory()) {
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) {
partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath()));
} else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
pathsToList.add(fileStatus.getPath());
}
} else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
partitionPaths.add(partitionName);
}
});
dirToFileListing.forEach(p -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we have an utilty for this in FSUtils.

Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
.filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
.findFirst());

if (partitionMetaFile.isPresent()) {
// Is a partition.
String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
partitionPaths.add(partitionName);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you don't need to check METAFOLDER_NAME at all. can we filter it out at the beginning for pathsToList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to incorporate this. I wanted to see if we can do this only for first time where we list the base path. but we have to do it within the while loop and so we might do it unnecessarily for every loop anyways. will go ahead w/ the patch for now for 0.11.1. but lets discuss and I can put up a follow up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a follow up JIRA?

Copy link
Contributor Author

@nsivabalan nsivabalan Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted this in the latest patch put up #6234 . no follow ups required for now.

.map(fileStatus -> fileStatus.getPath())
.collect(Collectors.toList()));
}
});
}
return partitionPaths;
}
Expand Down