Skip to content

Commit 765dd2e

Browse files
authored
[HUDI-4221] Optimzing getAllPartitionPaths (#6234)
- Levering spark par for dir processing
1 parent ce4330d commit 765dd2e

1 file changed

Lines changed: 29 additions & 21 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@
2929
import org.apache.hudi.common.table.HoodieTableMetaClient;
3030
import org.apache.hudi.common.util.Option;
3131
import org.apache.hudi.common.util.collection.Pair;
32+
import org.apache.hudi.exception.HoodieMetadataException;
3233

3334
import org.apache.hadoop.fs.FileStatus;
3435
import org.apache.hadoop.fs.FileSystem;
3536
import org.apache.hadoop.fs.Path;
36-
import org.apache.hudi.exception.HoodieMetadataException;
3737

3838
import java.io.IOException;
3939
import java.util.Arrays;
@@ -83,31 +83,39 @@ public List<String> getAllPartitionPaths() throws IOException {
8383
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
8484

8585
// List all directories in parallel
86-
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
86+
List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> {
8787
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
88-
return Pair.of(path, fileSystem.listStatus(path));
88+
return Arrays.stream(fileSystem.listStatus(path));
8989
}, listingParallelism);
9090
pathsToList.clear();
9191

9292
// if current dictionary contains PartitionMetadata, add it to result
93-
// if current dictionary does not contain PartitionMetadata, add it to queue
94-
dirToFileListing.forEach(p -> {
95-
Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
96-
.filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
97-
.findFirst());
98-
99-
if (partitionMetaFile.isPresent()) {
100-
// Is a partition.
101-
String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
102-
partitionPaths.add(partitionName);
103-
} else {
104-
// Add sub-dirs to the queue
105-
pathsToList.addAll(Arrays.stream(p.getRight())
106-
.filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
107-
.map(fileStatus -> fileStatus.getPath())
108-
.collect(Collectors.toList()));
109-
}
110-
});
93+
// if current dictionary does not contain PartitionMetadata, add it to queue to be processed.
94+
int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size());
95+
if (!dirToFileListing.isEmpty()) {
96+
// result below holds a list of pair. first entry in the pair optionally holds the deduced list of partitions.
97+
// and second entry holds optionally a directory path to be processed further.
98+
List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> {
99+
FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
100+
if (fileStatus.isDirectory()) {
101+
if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) {
102+
return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())), Option.empty());
103+
} else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
104+
return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
105+
}
106+
} else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
107+
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
108+
return Pair.of(Option.of(partitionName), Option.empty());
109+
}
110+
return Pair.of(Option.empty(), Option.empty());
111+
}, fileListingParallelism);
112+
113+
partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
114+
.collect(Collectors.toList()));
115+
116+
pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
117+
.collect(Collectors.toList()));
118+
}
111119
}
112120
return partitionPaths;
113121
}

0 commit comments

Comments
 (0)