@@ -68,41 +68,46 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOExceptio
6868
6969 @ Override
7070 public List <String > getAllPartitionPaths () throws IOException {
71- FileSystem fs = new Path (datasetBasePath ).getFileSystem (hadoopConf .get ());
71+ Path basePath = new Path (datasetBasePath );
72+ FileSystem fs = basePath .getFileSystem (hadoopConf .get ());
7273 if (assumeDatePartitioning ) {
7374 return FSUtils .getAllPartitionFoldersThreeLevelsDown (fs , datasetBasePath );
7475 }
7576
7677 List <Path > pathsToList = new CopyOnWriteArrayList <>();
77- pathsToList .add (new Path ( datasetBasePath ) );
78+ pathsToList .add (basePath );
7879 List <String > partitionPaths = new CopyOnWriteArrayList <>();
7980
8081 while (!pathsToList .isEmpty ()) {
8182 // TODO: Get the parallelism from HoodieWriteConfig
8283 int listingParallelism = Math .min (DEFAULT_LISTING_PARALLELISM , pathsToList .size ());
8384
8485 // List all directories in parallel
85- List <FileStatus []> dirToFileListing = engineContext .map (pathsToList , path -> {
86+ List <Pair < Path , FileStatus []> > dirToFileListing = engineContext .map (pathsToList , path -> {
8687 FileSystem fileSystem = path .getFileSystem (hadoopConf .get ());
87- return fileSystem .listStatus (path );
88+ return Pair . of ( path , fileSystem .listStatus (path ) );
8889 }, listingParallelism );
8990 pathsToList .clear ();
9091
9192 // if current dictionary contains PartitionMetadata, add it to result
9293 // if current dictionary does not contain PartitionMetadata, add it to queue
93- dirToFileListing .stream ().flatMap (Arrays ::stream ).parallel ()
94- .forEach (fileStatus -> {
95- if (fileStatus .isDirectory ()) {
96- if (HoodiePartitionMetadata .hasPartitionMetadata (fs , fileStatus .getPath ())) {
97- partitionPaths .add (FSUtils .getRelativePartitionPath (new Path (datasetBasePath ), fileStatus .getPath ()));
98- } else if (!fileStatus .getPath ().getName ().equals (HoodieTableMetaClient .METAFOLDER_NAME )) {
99- pathsToList .add (fileStatus .getPath ());
100- }
101- } else if (fileStatus .getPath ().getName ().startsWith (HoodiePartitionMetadata .HOODIE_PARTITION_METAFILE_PREFIX )) {
102- String partitionName = FSUtils .getRelativePartitionPath (new Path (datasetBasePath ), fileStatus .getPath ().getParent ());
103- partitionPaths .add (partitionName );
104- }
105- });
94+ dirToFileListing .forEach (p -> {
95+ Option <FileStatus > partitionMetaFile = Option .fromJavaOptional (Arrays .stream (p .getRight ()).parallel ()
96+ .filter (fileStatus -> fileStatus .getPath ().getName ().startsWith (HoodiePartitionMetadata .HOODIE_PARTITION_METAFILE_PREFIX ))
97+ .findFirst ());
98+
99+ if (partitionMetaFile .isPresent ()) {
100+ // Is a partition.
101+ String partitionName = FSUtils .getRelativePartitionPath (basePath , p .getLeft ());
102+ partitionPaths .add (partitionName );
103+ } else {
104+ // Add sub-dirs to the queue
105+ pathsToList .addAll (Arrays .stream (p .getRight ())
106+ .filter (fileStatus -> fileStatus .isDirectory () && !fileStatus .getPath ().getName ().equals (HoodieTableMetaClient .METAFOLDER_NAME ))
107+ .map (fileStatus -> fileStatus .getPath ())
108+ .collect (Collectors .toList ()));
109+ }
110+ });
106111 }
107112 return partitionPaths ;
108113 }
0 commit comments