diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 9719da7dc168a..f29a789b916b6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -992,7 +992,7 @@ public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() { public int getWriteMaxConcurrentRequestCount() { if (this.writeMaxConcurrentRequestCount < 1) { - return 4 * Runtime.getRuntime().availableProcessors(); + return 4 * getAvailableProcessorCount(); } return this.writeMaxConcurrentRequestCount; } @@ -1013,6 +1013,10 @@ public String getClientProvidedEncryptionKey() { return rawConfig.get(accSpecEncKey, null); } + public static int getAvailableProcessorCount() { + return Runtime.getRuntime().availableProcessors(); + } + @VisibleForTesting void setReadBufferSize(int bufferSize) { this.readBufferSize = bufferSize; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 47866140145e9..20cfdd7eca9a8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URI; @@ -57,6 +58,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonPathCapabilities; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -82,6 +84,7 @@ import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsLocatedFileStatus; +import org.apache.hadoop.fs.azurebfs.services.ContentSummaryProcessor; import org.apache.hadoop.fs.azurebfs.utils.Listener; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; @@ -484,6 +487,33 @@ public boolean delete(final Path f, final boolean recursive) throws IOException } + /** + * Returns a ContentSummary instance containing the count of directories, + * files and total number of bytes under a given path + * + * @param path The given path + * @return ContentSummary + * @throws IOException if an error is encountered during listStatus calls + * or if there is any issue with the thread pool used + * while processing + */ + @Override + public ContentSummary getContentSummary(Path path) throws IOException { + try { + TracingContext tracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.GET_CONTENT_SUMMARY, true, + tracingHeaderFormat, listener); + return (new ContentSummaryProcessor(abfsStore)).getContentSummary(path, + tracingContext); + } catch (InterruptedException e) { + LOG.debug("Thread interrupted"); + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException ex) { + LOG.debug("GetContentSummary failed with error: {}", ex.getMessage()); + throw new PathIOException(path.toString(), ex); + } + } + @Override public FileStatus[] listStatus(final Path f) throws IOException { LOG.debug( @@ -1192,9 +1222,8 @@ public RemoteIterator listStatusIterator(Path path) if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener); - AbfsListStatusRemoteIterator abfsLsItr = - new AbfsListStatusRemoteIterator(getFileStatus(path, tracingContext), abfsStore, - tracingContext); + AbfsListStatusRemoteIterator abfsLsItr = new AbfsListStatusRemoteIterator( + path, abfsStore, tracingContext); return RemoteIterators.typeCastingRemoteIterator(abfsLsItr); } else { return super.listStatusIterator(path); @@ -1516,7 +1545,7 @@ Map getInstrumentationMap() { } @VisibleForTesting - String getFileSystemId() { + public String getFileSystemId() { return fileSystemId; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index f4f895996447c..85c06e50c4e4a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -52,6 +52,8 @@ import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; @@ -173,6 +175,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private final IdentityTransformerInterface identityTransformer; private final AbfsPerfTracker abfsPerfTracker; private final AbfsCounters abfsCounters; + private final ThreadPoolExecutor contentSummaryExecutorService; /** * The set of directories where we should store files as append blobs. @@ -256,6 +259,17 @@ public AzureBlobFileSystemStore( this.appendBlobDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA))); } + contentSummaryExecutorService = new ThreadPoolExecutor(0, + 4 * AbfsConfiguration.getAvailableProcessorCount(), 60, + TimeUnit.SECONDS, new SynchronousQueue<>()); + contentSummaryExecutorService.setRejectedExecutionHandler( + (runnable, threadPoolExecutor) -> { + try { + contentSummaryExecutorService.getQueue().put(runnable); + } catch (InterruptedException e) { + LOG.debug("Could not submit GetContentSummary task to thread pool"); + } + }); this.blockFactory = abfsStoreBuilder.blockFactory; this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks; this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( @@ -1734,6 +1748,10 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) { return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName); } + public ExecutorService getContentSummaryExecutorService() { + return contentSummaryExecutorService; + } + /** * A File status with version info extracted from the etag value returned * in a LIST or HEAD request. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java index 6b6e98c9c7082..0bbe986650856 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java @@ -28,6 +28,7 @@ public enum FSOperationType { DELETE("DL"), GET_ACL_STATUS("GA"), GET_ATTR("GR"), + GET_CONTENT_SUMMARY("GC"), GET_FILESTATUS("GF"), LISTSTATUS("LS"), MKDIR("MK"), diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java index ce6207bf5f20c..e978f6c342431 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -45,7 +46,7 @@ public class AbfsListStatusRemoteIterator private static final int MAX_QUEUE_SIZE = 10; private static final long POLL_WAIT_TIME_IN_MS = 250; - private final FileStatus fileStatus; + private final Path path; private final ListingSupport listingSupport; private final ArrayBlockingQueue listResultQueue; private final TracingContext tracingContext; @@ -55,9 +56,9 @@ public class AbfsListStatusRemoteIterator private String continuation; private Iterator currIterator; - public AbfsListStatusRemoteIterator(final FileStatus fileStatus, + public AbfsListStatusRemoteIterator(final Path path, final ListingSupport listingSupport, TracingContext tracingContext) { - this.fileStatus = fileStatus; + this.path = path; this.listingSupport = listingSupport; this.tracingContext = tracingContext; listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); @@ -144,7 +145,7 @@ private synchronized void addNextBatchIteratorToQueue() throws IOException, InterruptedException { List fileStatuses = new ArrayList<>(); continuation = listingSupport - .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE, + .listStatus(path, null, fileStatuses, FETCH_ALL_FALSE, continuation, tracingContext); if (!fileStatuses.isEmpty()) { listResultQueue.put(new AbfsListResult(fileStatuses.iterator())); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java new file mode 100644 index 0000000000000..b4329a99d9d14 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +/** + * Class to carry out parallelized recursive listing on a given path to + * collect directory and file count/size information, as part of the + * implementation for the Filesystem method getContentSummary + */ +public class ContentSummaryProcessor { + private static final int MAX_THREAD_COUNT = 16; + private static final int POLL_TIMEOUT = 100; + private static final Logger LOG = LoggerFactory.getLogger(ContentSummaryProcessor.class); + private final AtomicLong fileCount = new AtomicLong(0L); + private final AtomicLong directoryCount = new AtomicLong(0L); + private final AtomicLong totalBytes = new AtomicLong(0L); + private final AtomicInteger numTasks = new AtomicInteger(0); + private final ListingSupport abfsStore; + private final ExecutorService executorService; + private final CompletionService completionService; + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + + /** + * Processes a given path for count of subdirectories, files and total number + * of bytes + * @param abfsStore Instance of AzureBlobFileSystemStore, used to make + * listStatus calls to server + */ + public ContentSummaryProcessor(ListingSupport abfsStore) { + this.abfsStore = abfsStore; + this.executorService = ((AzureBlobFileSystemStore) abfsStore).getContentSummaryExecutorService(); + completionService = new ExecutorCompletionService<>(this.executorService); + } + + public ContentSummary getContentSummary(Path path, + TracingContext tracingContext) + throws IOException, ExecutionException, InterruptedException { + + processDirectoryTree(path, tracingContext); + while (!queue.isEmpty() || numTasks.get() > 0) { + try { + completionService.take().get(); + } finally { + numTasks.decrementAndGet(); + LOG.debug( + "FileStatus queue size = {}, number of submitted unfinished tasks" + + " = {}, active thread count = {}", + queue.size(), numTasks, + ((ThreadPoolExecutor) executorService).getActiveCount()); + } + } + + LOG.debug("Processed content summary of subtree under given path"); + ContentSummary.Builder builder = + new ContentSummary.Builder().directoryCount( + directoryCount.get()).fileCount(fileCount.get()) + .length(totalBytes.get()).spaceConsumed(totalBytes.get()); + return builder.build(); + } + + /** + * Calls listStatus on given fileStatus and populated fileStatus queue with + * subdirectories. Is called by new tasks to process the complete subtree + * under a given fileStatus + * @param fileStatus : Path to a file or directory + * @throws IOException: listStatus error + * @throws InterruptedException: error while inserting into queue + */ + private void processDirectoryTree(Path fileStatus, + TracingContext tracingContext) throws IOException, InterruptedException { + AbfsListStatusRemoteIterator iterator = new AbfsListStatusRemoteIterator( + fileStatus, abfsStore, tracingContext); + while (iterator.hasNext()) { + FileStatus status = iterator.next(); + if (status.isDirectory()) { + queue.put(status); + processDirectory(); + conditionalSubmitTaskToExecutor(tracingContext); + } else { + processFile(status); + } + } + } + + private void processDirectory() { + directoryCount.incrementAndGet(); + } + + /** + * Increments file count and byte count + * @param fileStatus: Provides file size to update byte count + */ + private void processFile(FileStatus fileStatus) { + fileCount.incrementAndGet(); + totalBytes.addAndGet(fileStatus.getLen()); + } + + /** + * Submit task for processing a subdirectory based on current size of + * filestatus queue and number of already submitted tasks + */ + private synchronized void conditionalSubmitTaskToExecutor(TracingContext tracingContext) { + if (!queue.isEmpty() && numTasks.get() < MAX_THREAD_COUNT) { + numTasks.incrementAndGet(); + completionService.submit(() -> { + FileStatus fileStatus1; + while ((fileStatus1 = queue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)) + != null) { + processDirectoryTree(fileStatus1.getPath(), + new TracingContext(tracingContext)); + } + return null; + }); + } + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java index 3f50aec659142..f7b5eec051b2d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java @@ -69,9 +69,8 @@ public void testAbfsIteratorWithHasNext() throws Exception { final List fileNames = createFilesUnderDirectory(testDir); ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); - RemoteIterator fsItr = new AbfsListStatusRemoteIterator( - getFileSystem().getFileStatus(testDir), listngSupport, - getTestTracingContext(getFileSystem(), true)); + RemoteIterator fsItr = new AbfsListStatusRemoteIterator(testDir, + listngSupport, getTestTracingContext(getFileSystem(), true)); Assertions.assertThat(fsItr) .describedAs("RemoteIterator should be instance of " + "AbfsListStatusRemoteIterator by default") @@ -97,12 +96,10 @@ public void testAbfsIteratorWithoutHasNext() throws Exception { setPageSize(10); final List fileNames = createFilesUnderDirectory(testDir); - ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); - RemoteIterator fsItr = new AbfsListStatusRemoteIterator( - getFileSystem().getFileStatus(testDir), listngSupport, - getTestTracingContext(getFileSystem(), true)); - Assertions.assertThat(fsItr) - .describedAs("RemoteIterator should be instance of " + ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore()); + RemoteIterator fsItr = new AbfsListStatusRemoteIterator(testDir, + listingSupport, getTestTracingContext(getFileSystem(), true)); + Assertions.assertThat(fsItr).describedAs("RemoteIterator should be instance of " + "AbfsListStatusRemoteIterator by default") .isInstanceOf(AbfsListStatusRemoteIterator.class); int itrCount = 0; @@ -114,7 +111,7 @@ public void testAbfsIteratorWithoutHasNext() throws Exception { LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next); verifyIteratorResultCount(itrCount, fileNames); int minNumberOfInvocations = TEST_FILES_NUMBER / 10; - verify(listngSupport, Mockito.atLeast(minNumberOfInvocations)) + verify(listingSupport, Mockito.atLeast(minNumberOfInvocations)) .listStatus(any(Path.class), nullable(String.class), anyList(), anyBoolean(), nullable(String.class), @@ -170,7 +167,7 @@ public void testNextWhenNoMoreElementsPresent() throws Exception { Path testDir = createTestDirectory(); setPageSize(10); RemoteIterator fsItr = - new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), + new AbfsListStatusRemoteIterator(testDir, getFileSystem().getAbfsStore(), getTestTracingContext(getFileSystem(), true)); fsItr = Mockito.spy(fsItr); @@ -214,7 +211,7 @@ public void testIOException() throws Exception { String exceptionMessage = "test exception"; ListingSupport lsSupport =getMockListingSupport(exceptionMessage); RemoteIterator fsItr = - new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), + new AbfsListStatusRemoteIterator(testDir, lsSupport, getTestTracingContext(getFileSystem(), true)); LambdaTestUtils.intercept(IOException.class, fsItr::next); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestGetContentSummary.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestGetContentSummary.java new file mode 100644 index 0000000000000..5d6994e951d83 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestGetContentSummary.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_LIST_MAX_RESULTS; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +public class ITestGetContentSummary extends AbstractAbfsIntegrationTest { + + private static final int TEST_BUFFER_SIZE = 20; + private static final int FILES_PER_DIRECTORY = 2; + private static final int MAX_THREADS = 16; + private static final int NUM_FILES_FOR_LIST_MAX_TEST = + DEFAULT_AZURE_LIST_MAX_RESULTS + 10; + private static final int NUM_CONCURRENT_CALLS = 8; + + private final String[] directories = {"/testFolder", + "/testFolderII", + "/testFolder/testFolder1", + "/testFolder/testFolder2", + "/testFolder/testFolder3", + "/testFolder/testFolder2/testFolder4", + "/testFolder/testFolder2/testFolder5", + "/testFolder/testFolder3/testFolder6", + "/testFolder/testFolder3/testFolder7"}; + + private final byte[] b = new byte[TEST_BUFFER_SIZE]; + + public ITestGetContentSummary() throws Exception { + new Random().nextBytes(b); + } + + @Test + public void testFilesystemRoot() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + createDirectoryStructure(); + int fileCount = directories.length * FILES_PER_DIRECTORY; + ContentSummary contentSummary = fs.getContentSummary(new Path("/")); + verifyContentSummary(contentSummary, directories.length + 2, fileCount, + directories.length * TEST_BUFFER_SIZE); + } + + @Test + public void testFileContentSummary() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path filePath = path("testFile"); + try (FSDataOutputStream out = fs.create(filePath)) { + out.write(b); + } + ContentSummary contentSummary = fs.getContentSummary(filePath); + verifyContentSummary(contentSummary, 0, 1, TEST_BUFFER_SIZE); + } + + @Test + public void testLeafDir() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path testFolder = path("testFolder"); + fs.mkdirs(new Path(testFolder + "/testFolder2")); + Path leafDir = new Path(testFolder + "/testFolder1/testFolder3"); + fs.mkdirs(leafDir); + ContentSummary contentSummary = fs.getContentSummary(leafDir); + verifyContentSummary(contentSummary, 0, 0, 0); + } + + @Test + public void testIntermediateDirWithFilesOnly() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + Path testFolder = path("testFolder"); + Path intermediateDir = new Path(testFolder + "/testFolder1"); + fs.mkdirs(intermediateDir); + populateDirWithFiles(intermediateDir, FILES_PER_DIRECTORY); + ContentSummary contentSummary = + fs.getContentSummary(intermediateDir); + verifyContentSummary(contentSummary, 0, FILES_PER_DIRECTORY, + TEST_BUFFER_SIZE); + } + + @Test + public void testIntermediateDirWithFilesAndSubdirs() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + Path intermediateDir = new Path(path("/testFolder") + "/testFolder1"); + fs.mkdirs(intermediateDir); + populateDirWithFiles(intermediateDir, FILES_PER_DIRECTORY); + fs.mkdirs(new Path(intermediateDir + "/testFolder3")); + fs.registerListener( + new TracingHeaderValidator(getConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.GET_CONTENT_SUMMARY, true, + 0)); + ContentSummary contentSummary = + fs.getContentSummary(intermediateDir); + verifyContentSummary(contentSummary, 1, FILES_PER_DIRECTORY, + TEST_BUFFER_SIZE); + } + + @Test + public void testDirOverListMaxResultsItems() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + Path pathToListMaxDir = new Path(path("/testFolder") + "/listMaxDir"); + fs.mkdirs(new Path(pathToListMaxDir + "/testFolder2")); + populateDirWithFiles(pathToListMaxDir, NUM_FILES_FOR_LIST_MAX_TEST); + verifyContentSummary( + fs.getContentSummary(pathToListMaxDir), 1, + NUM_FILES_FOR_LIST_MAX_TEST, TEST_BUFFER_SIZE); + } + + @Test + public void testInvalidPath() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + intercept(IOException.class, () -> fs.getContentSummary(new Path( + "/nonExistentPath"))); + } + + @Test + public void testConcurrentGetContentSummaryCalls() + throws InterruptedException, ExecutionException, IOException { + AzureBlobFileSystem fs = getFileSystem(); + ExecutorService executorService = new ThreadPoolExecutor(1, MAX_THREADS, 5, + TimeUnit.SECONDS, new SynchronousQueue<>()); + CompletionService completionService = + new ExecutorCompletionService<>(executorService); + Path testPath = createDirectoryStructure(); + for (int i = 0; i < NUM_CONCURRENT_CALLS; i++) { + completionService.submit(() -> fs.getContentSummary(new Path( + testPath + "/testFolder"))); + } + for (int i = 0; i < NUM_CONCURRENT_CALLS; i++) { + ContentSummary contentSummary = completionService.take().get(); + verifyContentSummary(contentSummary, 7, 8 * FILES_PER_DIRECTORY, + 8 * TEST_BUFFER_SIZE); + } + executorService.shutdown(); + } + + private void verifyContentSummary(ContentSummary contentSummary, + long expectedDirectoryCount, long expectedFileCount, long expectedByteCount) { + Assertions.assertThat(contentSummary.getDirectoryCount()) + .describedAs("Incorrect directory count").isEqualTo(expectedDirectoryCount); + Assertions.assertThat(contentSummary.getFileCount()) + .describedAs("Incorrect file count").isEqualTo(expectedFileCount); + Assertions.assertThat(contentSummary.getLength()) + .describedAs("Incorrect length").isEqualTo(expectedByteCount); + Assertions.assertThat(contentSummary.getSpaceConsumed()) + .describedAs("Incorrect value of space consumed").isEqualTo(expectedByteCount); + } + + private Path createDirectoryStructure() + throws IOException, ExecutionException, InterruptedException { + AzureBlobFileSystem fs = getFileSystem(); + Path testPath = path("testPath"); + for (String directory : directories) { + Path dirPath = new Path(testPath + directory); + fs.mkdirs(dirPath); + populateDirWithFiles(dirPath, FILES_PER_DIRECTORY); + } + return testPath; + } + + private void populateDirWithFiles(Path directory, int numFiles) + throws ExecutionException, InterruptedException, IOException { + final List> tasks = new ArrayList<>(); + ExecutorService es = Executors.newFixedThreadPool(10); + for (int i = 0; i < numFiles; i++) { + final Path fileName = new Path(directory, String.format("test-%02d", i)); + tasks.add(es.submit(() -> { + touch(fileName); + return null; + })); + } + for (Future task : tasks) { + task.get(); + } + try (FSDataOutputStream out = getFileSystem().append( + new Path(directory + "/test-00"))) { + out.write(b); + } + es.shutdownNow(); + } +}