From c59b24fece00cf5882ad899c50029b0ec8a8c1a9 Mon Sep 17 00:00:00 2001 From: Christos Karampeazis-Papadakis Date: Tue, 12 Jan 2021 12:54:56 +0100 Subject: [PATCH 1/3] YARN-7713. Add parallel copying of directories into FSDownload --- .../apache/hadoop/yarn/util/FSDownload.java | 95 ++++++++++++++++--- .../hadoop/yarn/util/TestFSDownload.java | 89 ++++++++++++++++- 2 files changed, 169 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index e5fb417561179..6067728c6fb14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -25,26 +25,22 @@ import java.io.OutputStream; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.*; +import java.util.concurrent.*; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.*; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; @@ -288,6 +284,76 @@ private void verifyAndCopy(Path destination) downloadAndUnpack(sCopy, destination); } + private class ThreadCopyTask implements Callable { + private final Path destination; + private final Path[] sourcePaths; + private final FileSystem sourceFileSystem; + private final FileSystem destinationFileSystem; + + ThreadCopyTask(Path destination, Path[] sourcePaths, + FileSystem sourceFileSystem, + FileSystem destinationFileSystem) { + this.destination = destination; + this.sourcePaths = sourcePaths; + this.sourceFileSystem = sourceFileSystem; + this.destinationFileSystem = destinationFileSystem; + } + + public Void call() throws IOException, ExecutionException, + InterruptedException { + FileUtil.copy(sourceFileSystem, sourcePaths, + destinationFileSystem, destination, false, + true, conf); + return null; + } + } + + /** + * Split directory's contents in groups, process each group + * in its own Callable task. + * @param sourceFileSystem Source filesystem + * @param destinationFileSystem Destination filesystem + * @param source source path to copy. Typically HDFS + * @param destination destination path. Typically local filesystem + * @exception YarnException Any error has occurred + */ + private void localizeDirectoryInParallel(FileSystem sourceFileSystem, + FileSystem destinationFileSystem, + Path source, Path destination) + throws YarnException { + FileStatus[] fileStatuses; + try { + fileStatuses = sourceFileSystem.listStatus(source); + }catch (Exception e) { + throw new YarnException("Download and unpack failed", e); + } + int nThreads = Math.min(conf.getInt( + YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT), + fileStatuses.length); + List> tasks = new ArrayList<>(nThreads); + ExecutorService executor = Executors.newFixedThreadPool(nThreads); + if (nThreads > fileStatuses.length){ + nThreads = fileStatuses.length; + } + int chunkSize = (int) Math.floor((float) fileStatuses.length / nThreads); + for (int i = 0; i < fileStatuses.length; i += chunkSize){ + Path[] sourcePaths = Arrays.stream(fileStatuses, i, + Math.min(i + chunkSize, fileStatuses.length)) + .map(FileStatus::getPath) + .toArray(Path[]::new); + tasks.add(new ThreadCopyTask(destination, sourcePaths, + sourceFileSystem, destinationFileSystem)); + } + List> futures = tasks.stream() + .map(executor::submit) + .collect(Collectors.toList()); + while (!futures.isEmpty()) { + futures.removeIf(Future::isDone); + } + executor.shutdown(); + } + /** * Copy source path to destination with localization rules. * @param source source path to copy. Typically HDFS @@ -299,11 +365,12 @@ private void downloadAndUnpack(Path source, Path destination) try { FileSystem sourceFileSystem = source.getFileSystem(conf); FileSystem destinationFileSystem = destination.getFileSystem(conf); - if (sourceFileSystem.getFileStatus(source).isDirectory()) { - FileUtil.copy( - sourceFileSystem, source, - destinationFileSystem, destination, false, - true, conf); + FileStatus sourceFileStatus = sourceFileSystem.getFileStatus(source); + if (sourceFileStatus.isDirectory()) { + destinationFileSystem.mkdirs(destination, + sourceFileStatus.getPermission()); + localizeDirectoryInParallel(sourceFileSystem, destinationFileSystem, + source, destination); } else { unpack(source, destination, sourceFileSystem, destinationFileSystem); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 59b779c071df4..0bda34257ccc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -29,6 +29,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -615,7 +616,8 @@ private void verifyPermsRecursively(FileSystem fs, } @Test (timeout=10000) - public void testDirDownload() throws IOException, InterruptedException { + public void testDownloadToDirectory() + throws IOException, InterruptedException { FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -678,6 +680,91 @@ public void testDirDownload() throws IOException, InterruptedException { } } + /** + * Download a directory to test parallel directory download. + */ + @Test (timeout=10000) + public void testDirectoryDownload() throws IOException, InterruptedException { + FileContext files = FileContext.getLocalFSFileContext(conf); + Path baseDirectory = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + Path sourceBaseDirectory = new Path(baseDirectory, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + files.mkdir(sourceBaseDirectory, null, true); + conf.setStrings(TestFSDownload.class.getName(), + sourceBaseDirectory.toString()); + + Map rsrcVis = new HashMap<>(); + Random rand = new Random(); + rand.setSeed(rand.nextLong()); + Map> pending = new HashMap<>(); + + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); + LocalDirAllocator dirs = + new LocalDirAllocator(TestFSDownload.class.getName()); + LocalResourceVisibility vis; + LocalResource rsrc; + Path newFilePath; + for (int i = 0; i < 5; i++) { + vis = (i % 2 == 1) ? LocalResourceVisibility.APPLICATION : + LocalResourceVisibility.PRIVATE; + newFilePath = new Path(sourceBaseDirectory, + (char) ('a' + i) + "Dir" + i + ".jar"); + rsrc = createJar(files, newFilePath, vis); + rsrcVis.put(rsrc, vis); + } + rsrc = recordFactory.newRecordInstance(LocalResource.class); + newFilePath = new Path(sourceBaseDirectory, "subDirectory"); + rsrc.setResource(URL.fromPath(newFilePath)); + File newDirectory = new File((files.makeQualified(newFilePath)).toUri()); + Files.createDirectory(newDirectory.toPath()); + FileStatus sourceFileStatus = files.getFileStatus(newFilePath); + + for (int i = 0; i < 5; i++) { + vis = (i % 2 == 1) ? LocalResourceVisibility.PRIVATE : + LocalResourceVisibility.APPLICATION; + newFilePath = new Path(newDirectory.toString(), + "file" + (char) ('A' + i) + ".jar"); + createJar(files, newFilePath, vis); + } + + vis = LocalResourceVisibility.APPLICATION; + rsrc = recordFactory.newRecordInstance(LocalResource.class); + rsrc.setResource(URL.fromPath(sourceBaseDirectory)); + rsrc.setSize(sourceFileStatus.getLen()); + rsrc.setTimestamp(sourceFileStatus.getModificationTime()); + rsrc.setType(LocalResourceType.ARCHIVE); + rsrc.setVisibility(vis); + rsrcVis.put(rsrc, vis); + + Path destPath = dirs.getLocalPathForWrite(baseDirectory.toString(), conf); + destPath = new Path(destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + FSDownload directoryFsd = + new FSDownload(files, UserGroupInformation.getCurrentUser(), + conf, destPath, rsrc); + pending.put(rsrc, exec.submit(directoryFsd)); + + exec.shutdown(); + while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS)); + for (Future path: pending.values()) { + Assert.assertTrue(path.isDone()); + } + try { + for (Map.Entry> p : pending.entrySet()) { + Path localized = p.getValue().get(); + FileStatus status = files.getFileStatus(localized); + assert(status.isDirectory()); + assert(rsrcVis.containsKey(p.getKey())); + + verifyPermsRecursively(localized.getFileSystem(conf), + files, localized, rsrcVis.get(p.getKey())); + } + } catch (ExecutionException e) { + throw new IOException("Failed exec", e); + } + } + @Test (timeout=10000) public void testUniqueDestinationPath() throws Exception { FileContext files = FileContext.getLocalFSFileContext(conf); From 9444f36db04be15a2b90d6574cc8e3789cd81f78 Mon Sep 17 00:00:00 2001 From: Christos Karampeazis-Papadakis Date: Mon, 18 Jan 2021 12:59:23 +0100 Subject: [PATCH 2/3] YARN-7713. Replace Callable's logic with parallelStream --- .../apache/hadoop/yarn/util/FSDownload.java | 82 ++++++++----------- 1 file changed, 35 insertions(+), 47 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 6067728c6fb14..bfe7ab19758bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -27,6 +27,8 @@ import java.security.PrivilegedExceptionAction; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -284,33 +286,25 @@ private void verifyAndCopy(Path destination) downloadAndUnpack(sCopy, destination); } - private class ThreadCopyTask implements Callable { - private final Path destination; - private final Path[] sourcePaths; - private final FileSystem sourceFileSystem; - private final FileSystem destinationFileSystem; - - ThreadCopyTask(Path destination, Path[] sourcePaths, - FileSystem sourceFileSystem, - FileSystem destinationFileSystem) { - this.destination = destination; - this.sourcePaths = sourcePaths; - this.sourceFileSystem = sourceFileSystem; - this.destinationFileSystem = destinationFileSystem; - } - - public Void call() throws IOException, ExecutionException, - InterruptedException { - FileUtil.copy(sourceFileSystem, sourcePaths, - destinationFileSystem, destination, false, - true, conf); - return null; - } + /** + * Partitions {@code FileStatus} array into numOfParts parts. + * @param fileStatuses input FileStatus array + * @param numOfParts number of partitions to be created + * @return {@code Collection} of partitions as {@code List} objects + */ + private Collection> partitionInputList( + FileStatus[] fileStatuses, + int numOfParts){ + int chunkSize = (int) Math.ceil((float) fileStatuses.length / numOfParts); + final AtomicInteger counter = new AtomicInteger(); + return Arrays.stream(fileStatuses) + .collect(Collectors.groupingBy(it -> + counter.getAndIncrement() / chunkSize)) + .values(); } /** - * Split directory's contents in groups, process each group - * in its own Callable task. + * Split directory's contents in groups and localize them in parallel. * @param sourceFileSystem Source filesystem * @param destinationFileSystem Destination filesystem * @param source source path to copy. Typically HDFS @@ -327,31 +321,25 @@ private void localizeDirectoryInParallel(FileSystem sourceFileSystem, }catch (Exception e) { throw new YarnException("Download and unpack failed", e); } - int nThreads = Math.min(conf.getInt( + int nThreads = conf.getInt( YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, - YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT), - fileStatuses.length); - List> tasks = new ArrayList<>(nThreads); - ExecutorService executor = Executors.newFixedThreadPool(nThreads); - if (nThreads > fileStatuses.length){ - nThreads = fileStatuses.length; - } - int chunkSize = (int) Math.floor((float) fileStatuses.length / nThreads); - for (int i = 0; i < fileStatuses.length; i += chunkSize){ - Path[] sourcePaths = Arrays.stream(fileStatuses, i, - Math.min(i + chunkSize, fileStatuses.length)) - .map(FileStatus::getPath) - .toArray(Path[]::new); - tasks.add(new ThreadCopyTask(destination, sourcePaths, - sourceFileSystem, destinationFileSystem)); - } - List> futures = tasks.stream() - .map(executor::submit) - .collect(Collectors.toList()); - while (!futures.isEmpty()) { - futures.removeIf(Future::isDone); + YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT); + AtomicReference ioException = new AtomicReference<>(); + partitionInputList(fileStatuses, nThreads).parallelStream() + .forEach(part -> { + try { + Path[] sourcePaths = part.stream() + .map(FileStatus::getPath) + .toArray(Path[]::new); + FileUtil.copy(sourceFileSystem, sourcePaths, destinationFileSystem, + destination, false, true, conf); + } catch (IOException e) { + ioException.set(e); + } + }); + if (ioException.get() != null){ + throw new YarnException("Download and unpack failed", ioException.get()); } - executor.shutdown(); } /** From f6c40539d11e3e8362c6cfa9420d43b35a43a5ce Mon Sep 17 00:00:00 2001 From: Christos Karampeazis-Papadakis Date: Fri, 5 Feb 2021 19:33:31 +0100 Subject: [PATCH 3/3] YARN-7713. Fix parallelism --- .../apache/hadoop/yarn/util/FSDownload.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index bfe7ab19758bb..316e5ed49b6ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -325,18 +325,31 @@ private void localizeDirectoryInParallel(FileSystem sourceFileSystem, YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT); AtomicReference ioException = new AtomicReference<>(); - partitionInputList(fileStatuses, nThreads).parallelStream() - .forEach(part -> { - try { - Path[] sourcePaths = part.stream() - .map(FileStatus::getPath) - .toArray(Path[]::new); - FileUtil.copy(sourceFileSystem, sourcePaths, destinationFileSystem, - destination, false, true, conf); - } catch (IOException e) { - ioException.set(e); - } - }); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + List> callableList = new ArrayList<>(); + for (List part : partitionInputList(fileStatuses, nThreads)) { + callableList.add(() -> { + try { + Path[] sourcePaths = part.stream() + .map(FileStatus::getPath) + .toArray(Path[]::new); + FileUtil.copy(sourceFileSystem, sourcePaths, destinationFileSystem, + destination, false, true, conf); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + executorService.shutdown(); + } + if (ioException.get() != null){ throw new YarnException("Download and unpack failed", ioException.get()); } @@ -358,7 +371,7 @@ private void downloadAndUnpack(Path source, Path destination) destinationFileSystem.mkdirs(destination, sourceFileStatus.getPermission()); localizeDirectoryInParallel(sourceFileSystem, destinationFileSystem, - source, destination); + source, destination); } else { unpack(source, destination, sourceFileSystem, destinationFileSystem); }