From 15c69602a857dc8931a91f66e4f41a433266fb90 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 9 Dec 2021 16:20:34 +0800 Subject: [PATCH 1/4] ready to test --- .../FileSystemBackedTableMetadata.java | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index f5e14ba1dd34d..761466afd60ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -37,6 +37,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; public class FileSystemBackedTableMetadata implements HoodieTableMetadata { @@ -64,43 +65,38 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOExceptio @Override public List getAllPartitionPaths() throws IOException { + FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); if (assumeDatePartitioning) { - FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath); } - List pathsToList = new LinkedList<>(); + List pathsToList = new CopyOnWriteArrayList<>(); pathsToList.add(new Path(datasetBasePath)); - List partitionPaths = new ArrayList<>(); + List partitionPaths = new CopyOnWriteArrayList<>(); while (!pathsToList.isEmpty()) { // TODO: Get the parallelism from HoodieWriteConfig int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); // List all directories in parallel - List> dirToFileListing = engineContext.map(pathsToList, path -> { + List dirToFileListing = engineContext.map(pathsToList, path -> { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); - return Pair.of(path, fileSystem.listStatus(path)); + return fileSystem.listStatus(path); }, listingParallelism); pathsToList.clear(); // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to // the results. - dirToFileListing.forEach(p -> { - Option partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel() - .filter(fs -> fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) - .findFirst()); - - if (partitionMetaFile.isPresent()) { - // Is a partition. - String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), p.getLeft()); - partitionPaths.add(partitionName); - } else { - // Add sub-dirs to the queue - pathsToList.addAll(Arrays.stream(p.getRight()) - .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) - .map(fs -> fs.getPath()) - .collect(Collectors.toList())); + dirToFileListing.stream().flatMap(Arrays::stream).parallel().forEach(fileStatus -> { + if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + partitionPaths.add(datasetBasePath); + } + if (fileStatus.isDirectory()) { + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) { + partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())); + } else { + pathsToList.add(fileStatus.getPath()); + } } }); } From 640ae3526424bc96e0da1dc27c4672155c7f4211 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 9 Dec 2021 17:01:43 +0800 Subject: [PATCH 2/4] remove unused import --- .../apache/hudi/metadata/FileSystemBackedTableMetadata.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 761466afd60ab..0ed588838f7bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -31,10 +30,8 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; From 75c024c02c675c0d643fc8456c2bac3af326a470 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 13 Jan 2022 13:07:00 +0800 Subject: [PATCH 3/4] tuning --- .../FileSystemBackedTableMetadata.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 0ed588838f7bc..084d5a5268628 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -82,20 +83,21 @@ public List getAllPartitionPaths() throws IOException { }, listingParallelism); pathsToList.clear(); - // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to - // the results. - dirToFileListing.stream().flatMap(Arrays::stream).parallel().forEach(fileStatus -> { - if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { - partitionPaths.add(datasetBasePath); - } - if (fileStatus.isDirectory()) { - if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) { - partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())); - } else { - pathsToList.add(fileStatus.getPath()); - } - } - }); + // if current dictionary contains PartitionMetadata, add it to result + // if current dictionary does not contain PartitionMetadata, add it to queue + dirToFileListing.stream().flatMap(Arrays::stream).parallel() + .forEach(fileStatus -> { + if (fileStatus.isDirectory()) { + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) { + partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())); + } else { + pathsToList.add(fileStatus.getPath()); + } + } else if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent()); + partitionPaths.add(partitionName); + } + }); } return partitionPaths; } From 1239d1a4acaf19661522edeefb74bd7cd0b19adf Mon Sep 17 00:00:00 2001 From: yuezhang Date: Wed, 19 Jan 2022 09:40:14 +0800 Subject: [PATCH 4/4] tuning --- .../org/apache/hudi/metadata/FileSystemBackedTableMetadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index a3f7db33a8f88..d05b95dfdb495 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -90,7 +90,7 @@ public List getAllPartitionPaths() throws IOException { if (fileStatus.isDirectory()) { if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) { partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())); - } else { + } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { pathsToList.add(fileStatus.getPath()); } } else if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {